324 lines
11 KiB
JavaScript
324 lines
11 KiB
JavaScript
|
|
var Class = require('better-js-class');
|
|
|
|
var cps = require('cps');
|
|
|
|
var mysql = require('mysql');
|
|
|
|
var $U = require('underscore');
|
|
|
|
var getValue = function(o) {
|
|
for (var k in o) {
|
|
return o[k];
|
|
}
|
|
};
|
|
|
|
|
|
module.exports = function() {
|
|
var DB = Class({
|
|
_init: function(cfg) {
|
|
var transactionOverride = cfg['useTransaction'];
|
|
delete cfg['useTransaction'];
|
|
|
|
var cursorOverride = cfg['useCursor'];
|
|
delete cfg['useCursor'];
|
|
|
|
this._cfg = cfg;
|
|
// console.log(this._cfg);
|
|
this._pool = mysql.createPool(this._cfg);
|
|
|
|
if (transactionOverride) {
|
|
this._transactionCfg = this._buildCfg(cfg, transactionOverride);
|
|
// console.log('transactionCfg:', this._transactionCfg);
|
|
this._transactionPool = mysql.createPool(this._transactionCfg);
|
|
}
|
|
|
|
if (cursorOverride) {
|
|
this._cursorCfg = this._buildCfg(cfg, cursorOverride);
|
|
// console.log('cursorCfg:', this._cursorCfg);
|
|
this._cursorPool = mysql.createPool(this._cursorCfg);
|
|
}
|
|
|
|
this._schema = {};
|
|
this._prepared = false;
|
|
},
|
|
|
|
_buildCfg: function(cfg, override) {
|
|
var res = {};
|
|
|
|
for (var k in cfg) {
|
|
res[k] = cfg[k];
|
|
}
|
|
|
|
$U.extend(res, override);
|
|
return res;
|
|
},
|
|
|
|
connect: function(proc, cb) {
|
|
var me = this;
|
|
|
|
cps.seq([
|
|
function(_, cb) {
|
|
me._prepare(cb);
|
|
},
|
|
function(_, cb) {
|
|
me._connect(me._pool, proc, cb);
|
|
}
|
|
], cb);
|
|
},
|
|
|
|
_prepare: function(cb) {
|
|
if (this._prepared) {
|
|
return cb();
|
|
}
|
|
|
|
// console.log('call prepare');
|
|
var me = this;
|
|
var conn;
|
|
|
|
this._connect(me._pool, function(conn, cb) {
|
|
cps.seq([
|
|
function(res, cb) {
|
|
conn.query('show tables', cb);
|
|
},
|
|
function(tables, cb) {
|
|
cps.peach(tables, function(table, cb) {
|
|
var tableName = getValue(table);
|
|
cps.seq([
|
|
function(_, cb) {
|
|
conn.query('desc ' + tableName, cb);
|
|
},
|
|
function(columns, cb) {
|
|
me._schema[tableName] = $U.map(columns, function(column) {
|
|
return column['Field'];
|
|
});
|
|
me._prepared = true;
|
|
cb();
|
|
}
|
|
], cb);
|
|
}, cb);
|
|
}
|
|
], cb);
|
|
}, cb);
|
|
},
|
|
|
|
_connect: function(pool, proc, cb) {
|
|
var me = this;
|
|
var conn;
|
|
cps.seq([
|
|
function(_, cb) {
|
|
pool.getConnection(cb);
|
|
},
|
|
function(res, cb) {
|
|
conn = res;
|
|
cps.rescue({
|
|
'try': function(cb) {
|
|
proc(conn, cb);
|
|
},
|
|
'finally': function(cb) {
|
|
// console.log('release connection');
|
|
conn.release();
|
|
cb();
|
|
}
|
|
}, cb);
|
|
}
|
|
], cb);
|
|
},
|
|
|
|
transaction: function(conn, proc, cb) {
|
|
var me = this;
|
|
|
|
if (!me._transactionPool) {
|
|
cb(new Error('transaction-not-setup-error'));
|
|
return;
|
|
}
|
|
|
|
var txnConn;
|
|
var commitRes;
|
|
|
|
if (me._isTxnConnection(conn)) {
|
|
proc(conn, cb);
|
|
} else {
|
|
cps.seq([
|
|
function(_, cb) {
|
|
me._prepare(cb);
|
|
},
|
|
/*
|
|
function(_, cb) {
|
|
me._getTxnConnection(cb);
|
|
},
|
|
*/
|
|
function(_, cb) {
|
|
me._connect(me._transactionPool, function(conn, cb) {
|
|
me._enterTransaction(conn);
|
|
txnConn = conn;
|
|
cps.rescue({
|
|
'try': function(cb) {
|
|
cps.seq([
|
|
function(_, cb) {
|
|
// console.log('start transaction');
|
|
txnConn.query('START TRANSACTION', cb);
|
|
},
|
|
function(_, cb) {
|
|
cps.rescue({
|
|
'try': function(cb) {
|
|
cps.seq([
|
|
function(_, cb) {
|
|
proc(txnConn, cb);
|
|
},
|
|
function(res, cb) {
|
|
commitRes = res;
|
|
// console.log('committing');
|
|
txnConn.query('COMMIT', cb);
|
|
},
|
|
function(_, cb) {
|
|
// console.log('committed');
|
|
cb(null, commitRes);
|
|
}
|
|
], cb);
|
|
},
|
|
'catch': function(err, cb) {
|
|
cps.seq([
|
|
function(_, cb) {
|
|
// console.log('rolling back ...');
|
|
txnConn.query('ROLLBACK', cb);
|
|
},
|
|
function(_, cb) {
|
|
// console.log('rolled back');
|
|
throw(err);
|
|
}
|
|
], cb);
|
|
}
|
|
}, cb);
|
|
}
|
|
], cb);
|
|
},
|
|
'finally': function(cb) {
|
|
// console.log('txn connection release');
|
|
// txnConn.release();
|
|
me._leaveTransaction(txnConn);
|
|
cb();
|
|
}
|
|
}, cb);
|
|
}, cb);
|
|
}
|
|
], cb);
|
|
}
|
|
},
|
|
|
|
cursor: function(q, proc, _cb) {
|
|
var me = this;
|
|
|
|
if (!me._cursorPool) {
|
|
_cb(new Error('cursor-not-setup-error'));
|
|
return;
|
|
}
|
|
|
|
var returned = false;
|
|
|
|
var cb = function(err, res) {
|
|
if (!returned) {
|
|
returned = true;
|
|
_cb(err, res);
|
|
} else {
|
|
}
|
|
}
|
|
|
|
var breakCB = cb;
|
|
this._cursorPool.getConnection(function(err, conn) {
|
|
var query = conn.query(q);
|
|
query
|
|
.on('error', function(err) {
|
|
// console.log('cursor error');
|
|
conn.release();
|
|
cb(new Error(err));
|
|
})
|
|
.on('result', function(res) {
|
|
// console.log('cursor result');
|
|
conn.pause();
|
|
|
|
var cb = function(err, res) {
|
|
if (err) {
|
|
conn.release();
|
|
breakCB(err);
|
|
} else {
|
|
conn.resume();
|
|
}
|
|
};
|
|
|
|
cps.seq([
|
|
function(_, cb) {
|
|
// console.log('call row processor');
|
|
proc(res, cb);
|
|
}
|
|
], cb);
|
|
})
|
|
.on('end', function() {
|
|
// console.log('cursor end');
|
|
conn.release();
|
|
cb();
|
|
})
|
|
;
|
|
});
|
|
},
|
|
|
|
_isTxnConnection: function(conn) {
|
|
return conn != null && conn.__transaction__;
|
|
},
|
|
|
|
_enterTransaction: function(conn) {
|
|
conn.__transaction__ = true;
|
|
},
|
|
|
|
_leaveTransaction: function(conn) {
|
|
conn.__transaction__ = false;
|
|
},
|
|
|
|
end: function() {
|
|
this._pool.end();
|
|
if (this._transactionPool) {
|
|
this._transactionPool.end();
|
|
}
|
|
if (this._cursorPool) {
|
|
this._cursorPool.end();
|
|
}
|
|
},
|
|
|
|
getConnection: function(cb) {
|
|
var me = this;
|
|
|
|
cps.seq([
|
|
function(_, cb) {
|
|
me._prepare(cb);
|
|
},
|
|
function(_, cb) {
|
|
me._pool.getConnection(cb);
|
|
}
|
|
], cb);
|
|
}
|
|
});
|
|
|
|
$U.extend(DB, {
|
|
format: function(str, bindings) {
|
|
var l = str.split('?')
|
|
|
|
if (l.length - 1 != bindings.length) {
|
|
throw new Error('sql string format error');
|
|
}
|
|
|
|
var res = [];
|
|
|
|
for (var i = 0; i < bindings.length; i++) {
|
|
res.push(l[i]);
|
|
res.push(mysql.escape(bindings[i]));
|
|
}
|
|
|
|
res.push(l[l.length - 1]);
|
|
|
|
return res.join(' ');
|
|
}
|
|
});
|
|
|
|
return DB;
|
|
}();
|