2021-03-14 11:09:02 +01:00

324 lines
11 KiB
JavaScript

var Class = require('better-js-class');
var cps = require('cps');
var mysql = require('mysql');
var $U = require('underscore');
var getValue = function(o) {
for (var k in o) {
return o[k];
}
};
module.exports = function() {
var DB = Class({
_init: function(cfg) {
var transactionOverride = cfg['useTransaction'];
delete cfg['useTransaction'];
var cursorOverride = cfg['useCursor'];
delete cfg['useCursor'];
this._cfg = cfg;
// console.log(this._cfg);
this._pool = mysql.createPool(this._cfg);
if (transactionOverride) {
this._transactionCfg = this._buildCfg(cfg, transactionOverride);
// console.log('transactionCfg:', this._transactionCfg);
this._transactionPool = mysql.createPool(this._transactionCfg);
}
if (cursorOverride) {
this._cursorCfg = this._buildCfg(cfg, cursorOverride);
// console.log('cursorCfg:', this._cursorCfg);
this._cursorPool = mysql.createPool(this._cursorCfg);
}
this._schema = {};
this._prepared = false;
},
_buildCfg: function(cfg, override) {
var res = {};
for (var k in cfg) {
res[k] = cfg[k];
}
$U.extend(res, override);
return res;
},
connect: function(proc, cb) {
var me = this;
cps.seq([
function(_, cb) {
me._prepare(cb);
},
function(_, cb) {
me._connect(me._pool, proc, cb);
}
], cb);
},
_prepare: function(cb) {
if (this._prepared) {
return cb();
}
// console.log('call prepare');
var me = this;
var conn;
this._connect(me._pool, function(conn, cb) {
cps.seq([
function(res, cb) {
conn.query('show tables', cb);
},
function(tables, cb) {
cps.peach(tables, function(table, cb) {
var tableName = getValue(table);
cps.seq([
function(_, cb) {
conn.query('desc ' + tableName, cb);
},
function(columns, cb) {
me._schema[tableName] = $U.map(columns, function(column) {
return column['Field'];
});
me._prepared = true;
cb();
}
], cb);
}, cb);
}
], cb);
}, cb);
},
_connect: function(pool, proc, cb) {
var me = this;
var conn;
cps.seq([
function(_, cb) {
pool.getConnection(cb);
},
function(res, cb) {
conn = res;
cps.rescue({
'try': function(cb) {
proc(conn, cb);
},
'finally': function(cb) {
// console.log('release connection');
conn.release();
cb();
}
}, cb);
}
], cb);
},
transaction: function(conn, proc, cb) {
var me = this;
if (!me._transactionPool) {
cb(new Error('transaction-not-setup-error'));
return;
}
var txnConn;
var commitRes;
if (me._isTxnConnection(conn)) {
proc(conn, cb);
} else {
cps.seq([
function(_, cb) {
me._prepare(cb);
},
/*
function(_, cb) {
me._getTxnConnection(cb);
},
*/
function(_, cb) {
me._connect(me._transactionPool, function(conn, cb) {
me._enterTransaction(conn);
txnConn = conn;
cps.rescue({
'try': function(cb) {
cps.seq([
function(_, cb) {
// console.log('start transaction');
txnConn.query('START TRANSACTION', cb);
},
function(_, cb) {
cps.rescue({
'try': function(cb) {
cps.seq([
function(_, cb) {
proc(txnConn, cb);
},
function(res, cb) {
commitRes = res;
// console.log('committing');
txnConn.query('COMMIT', cb);
},
function(_, cb) {
// console.log('committed');
cb(null, commitRes);
}
], cb);
},
'catch': function(err, cb) {
cps.seq([
function(_, cb) {
// console.log('rolling back ...');
txnConn.query('ROLLBACK', cb);
},
function(_, cb) {
// console.log('rolled back');
throw(err);
}
], cb);
}
}, cb);
}
], cb);
},
'finally': function(cb) {
// console.log('txn connection release');
// txnConn.release();
me._leaveTransaction(txnConn);
cb();
}
}, cb);
}, cb);
}
], cb);
}
},
cursor: function(q, proc, _cb) {
var me = this;
if (!me._cursorPool) {
_cb(new Error('cursor-not-setup-error'));
return;
}
var returned = false;
var cb = function(err, res) {
if (!returned) {
returned = true;
_cb(err, res);
} else {
}
}
var breakCB = cb;
this._cursorPool.getConnection(function(err, conn) {
var query = conn.query(q);
query
.on('error', function(err) {
// console.log('cursor error');
conn.release();
cb(new Error(err));
})
.on('result', function(res) {
// console.log('cursor result');
conn.pause();
var cb = function(err, res) {
if (err) {
conn.release();
breakCB(err);
} else {
conn.resume();
}
};
cps.seq([
function(_, cb) {
// console.log('call row processor');
proc(res, cb);
}
], cb);
})
.on('end', function() {
// console.log('cursor end');
conn.release();
cb();
})
;
});
},
_isTxnConnection: function(conn) {
return conn != null && conn.__transaction__;
},
_enterTransaction: function(conn) {
conn.__transaction__ = true;
},
_leaveTransaction: function(conn) {
conn.__transaction__ = false;
},
end: function() {
this._pool.end();
if (this._transactionPool) {
this._transactionPool.end();
}
if (this._cursorPool) {
this._cursorPool.end();
}
},
getConnection: function(cb) {
var me = this;
cps.seq([
function(_, cb) {
me._prepare(cb);
},
function(_, cb) {
me._pool.getConnection(cb);
}
], cb);
}
});
$U.extend(DB, {
format: function(str, bindings) {
var l = str.split('?')
if (l.length - 1 != bindings.length) {
throw new Error('sql string format error');
}
var res = [];
for (var i = 0; i < bindings.length; i++) {
res.push(l[i]);
res.push(mysql.escape(bindings[i]));
}
res.push(l[l.length - 1]);
return res.join(' ');
}
});
return DB;
}();