This is an automated email from the ASF dual-hosted git repository. japetrsn pushed a commit to tag 1.3.1 in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-alarms.git
commit 1c7febdcb59705df76dc209fe7e1d585d5bfdc18 Author: Jason Peterson <jason...@us.ibm.com> AuthorDate: Tue Nov 7 17:32:04 2017 -0500 create fire once action to support firing trigger on a specific date --- .gitignore | 1 + action/alarm.js | 63 +---- action/alarmFeed_package.json | 5 + action/alarmOnce.js | 31 +++ action/alarmOnce_package.json | 5 + action/alarmWebAction.js | 310 +++++-------------------- action/{package.json => alarmWeb_package.json} | 2 +- action/lib/Database.js | 172 ++++++++++++++ action/{alarm.js => lib/common.js} | 75 +++--- installCatalog.sh | 31 ++- package.json | 2 +- provider/lib/utils.js | 89 ++++--- 12 files changed, 410 insertions(+), 376 deletions(-) diff --git a/.gitignore b/.gitignore index ac4bde5..30c1445 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ node_modules/ .env .vscode action/*.zip +action/package.json diff --git a/action/alarm.js b/action/alarm.js index a5c217f..6e72985 100644 --- a/action/alarm.js +++ b/action/alarm.js @@ -1,4 +1,4 @@ -var request = require('request'); +const common = require('./lib/common'); function main(msg) { @@ -14,74 +14,17 @@ function main(msg) { var lifecycleEvent = msg.lifecycleEvent; var endpoint = msg.apihost; - var webparams = createWebParams(msg); + var webparams = common.createWebParams(msg); var url = `https://${endpoint}/api/v1/web/whisk.system/alarmsWeb/alarmWebAction.http`; if (lifecycleEvent in eventMap) { var method = eventMap[lifecycleEvent]; - return requestHelper(url, webparams, method); + return common.requestHelper(url, webparams, method); } else { return Promise.reject('unsupported lifecycleEvent'); } } -function requestHelper(url, input, method) { - - return new Promise(function(resolve, reject) { - - request({ - method : method, - url : url, - json: input, - rejectUnauthorized: false - }, function(error, response, body) { - - if (!error && response.statusCode === 200) { - resolve(body); - } - else { - if (response) { - console.log('alarm: Error invoking whisk action:', response.statusCode, body); - reject(body); - } - else { - console.log('alarm: Error invoking whisk action:', error); - reject(error); - } - } - }); - }); -} - -function createWebParams(rawParams) { - var namespace = process.env.__OW_NAMESPACE; - var triggerName = '/' + namespace + '/' + parseQName(rawParams.triggerName).name; - - var webparams = Object.assign({}, rawParams); - delete webparams.lifecycleEvent; - delete webparams.apihost; - - webparams.triggerName = triggerName; - - return webparams; -} - -function parseQName(qname) { - var parsed = {}; - var delimiter = '/'; - var defaultNamespace = '_'; - if (qname && qname.charAt(0) === delimiter) { - var parts = qname.split(delimiter); - parsed.namespace = parts[1]; - parsed.name = parts.length > 2 ? parts.slice(2).join(delimiter) : ''; - } else { - parsed.namespace = defaultNamespace; - parsed.name = qname; - } - return parsed; -} exports.main = main; - - diff --git a/action/alarmFeed_package.json b/action/alarmFeed_package.json new file mode 100644 index 0000000..8ae9040 --- /dev/null +++ b/action/alarmFeed_package.json @@ -0,0 +1,5 @@ +{ + "name": "alarmFeed", + "version": "1.0.0", + "main": "alarm.js" +} diff --git a/action/alarmOnce.js b/action/alarmOnce.js new file mode 100644 index 0000000..38a7ebc --- /dev/null +++ b/action/alarmOnce.js @@ -0,0 +1,31 @@ +const common = require('./lib/common'); + +function main(msg) { + + let eventMap = { + CREATE: 'put', + READ: 'get', + // UPDATE: 'put', + DELETE: 'delete' + }; + // for creation -> CREATE + // for reading -> READ + // for deletion -> DELETE + var lifecycleEvent = msg.lifecycleEvent; + + var endpoint = msg.apihost; + var webparams = common.createWebParams(msg); + webparams.fireOnce = true; + + var url = `https://${endpoint}/api/v1/web/whisk.system/alarmsWeb/alarmWebAction.http`; + + if (lifecycleEvent in eventMap) { + var method = eventMap[lifecycleEvent]; + return common.requestHelper(url, webparams, method); + } else { + return Promise.reject('unsupported lifecycleEvent'); + } +} + + +exports.main = main; diff --git a/action/alarmOnce_package.json b/action/alarmOnce_package.json new file mode 100644 index 0000000..e48cd14 --- /dev/null +++ b/action/alarmOnce_package.json @@ -0,0 +1,5 @@ +{ + "name": "alarmOnce", + "version": "1.0.0", + "main": "alarmOnce.js" +} diff --git a/action/alarmWebAction.js b/action/alarmWebAction.js index 31ed6fd..41d7917 100644 --- a/action/alarmWebAction.js +++ b/action/alarmWebAction.js @@ -1,38 +1,27 @@ -var request = require('request'); -var CronJob = require('cron').CronJob; +var cronParser = require('cron-parser'); var moment = require('moment'); +const common = require('./lib/common'); +const Database = require('./lib/Database'); + function main(params) { if (!params.authKey) { - return sendError(400, 'no authKey parameter was provided'); + return common.sendError(400, 'no authKey parameter was provided'); } if (!params.triggerName) { - return sendError(400, 'no trigger name parameter was provided'); + return common.sendError(400, 'no trigger name parameter was provided'); } - var triggerParts = parseQName(params.triggerName); + var triggerParts = common.parseQName(params.triggerName); var triggerID = `${params.authKey}/${triggerParts.namespace}/${triggerParts.name}`; - var triggerURL = `https://${params.apihost}/api/v1/namespaces/${triggerParts.namespace}/triggers/${triggerParts.name}`; - var nano = require('nano')(params.DB_URL); - var db = nano.db.use(params.DB_NAME); var workers = params.workers instanceof Array ? params.workers : []; + var db; if (params.__ow_method === "put") { - if (!params.cron) { - return sendError(400, 'alarms trigger feed is missing the cron parameter'); - } - else { - try { - new CronJob(params.cron, function() {}); - } catch(ex) { - return sendError(400, `cron pattern '${params.cron}' is not valid`); - } - } - if (typeof params.trigger_payload === 'string') { params.trigger_payload = {payload: params.trigger_payload}; } @@ -41,24 +30,55 @@ function main(params) { apikey: params.authKey, name: triggerParts.name, namespace: triggerParts.namespace, - cron: params.cron, payload: params.trigger_payload || {}, - maxTriggers: params.maxTriggers || -1, status: { 'active': true, 'dateChanged': Date.now() } }; + if (params.fireOnce) { + if (!params.date) { + return common.sendError(400, 'alarms once trigger feed is missing the date parameter'); + } + else { + var date = new Date(params.date); + if (isNaN(date.getTime())) { + return common.sendError(400, `date parameter '${params.date}' is not a valid Date`); + } + else if (Date.now() >= date.getTime()) { + return common.sendError(400, `date parameter '${params.date}' must be in the future`); + } + else { + newTrigger.date = params.date; + } + } + } + else { + if (!params.cron) { + return common.sendError(400, 'alarms trigger feed is missing the cron parameter'); + } + else { + try { + cronParser.parseExpression(params.cron); + newTrigger.cron = params.cron; + newTrigger.maxTriggers = params.maxTriggers || -1; + } catch(ex) { + return common.sendError(400, `cron pattern '${params.cron}' is not valid`); + } + } + } + return new Promise(function (resolve, reject) { - verifyTriggerAuth(triggerURL, params.authKey, false) + common.verifyTriggerAuth(triggerURL, params.authKey, false) .then(() => { - return getWorkerID(db, workers); + db = new Database(params.DB_URL, params.DB_NAME); + return db.getWorkerID(workers); }) .then((worker) => { console.log('trigger will be assigned to worker ' + worker); newTrigger.worker = worker; - return createTrigger(db, triggerID, newTrigger); + return db.createTrigger(triggerID, newTrigger); }) .then(() => { resolve({ @@ -75,16 +95,16 @@ function main(params) { } else if (params.__ow_method === "get") { return new Promise(function (resolve, reject) { - verifyTriggerAuth(triggerURL, params.authKey, false) + common.verifyTriggerAuth(triggerURL, params.authKey, false) .then(() => { - return getTrigger(db, triggerID); + db = new Database(params.DB_URL, params.DB_NAME); + return db.getTrigger(triggerID); }) .then(doc => { var body = { config: { name: doc.name, namespace: doc.namespace, - cron: doc.cron, payload: doc.payload }, status: { @@ -94,6 +114,12 @@ function main(params) { reason: doc.status.reason } }; + if (doc.date) { + body.config.date = doc.date; + } + else { + body.config.cron = doc.cron; + } resolve({ statusCode: 200, headers: {'Content-Type': 'application/json'}, @@ -108,12 +134,13 @@ function main(params) { else if (params.__ow_method === "delete") { return new Promise(function (resolve, reject) { - verifyTriggerAuth(triggerURL, params.authKey, true) + common.verifyTriggerAuth(triggerURL, params.authKey, true) .then(() => { - return updateTrigger(db, triggerID, 0); + db = new Database(params.DB_URL, params.DB_NAME); + return db.updateTrigger(triggerID, 0); }) .then(id => { - return deleteTrigger(db, id, 0); + return db.deleteTrigger(id, 0); }) .then(() => { resolve({ @@ -128,231 +155,10 @@ function main(params) { }); } else { - return sendError(400, 'unsupported lifecycleEvent'); + return common.sendError(400, 'unsupported lifecycleEvent'); } } -function getWorkerID(db, availabeWorkers) { - - return new Promise((resolve, reject) => { - var workerID = availabeWorkers[0] || 'worker0'; - - if (availabeWorkers.length > 1) { - db.view('triggerViews', 'triggers_by_worker', {reduce: true, group: true}, function (err, body) { - if (!err) { - var triggersByWorker = {}; - - availabeWorkers.forEach(worker => { - triggersByWorker[worker] = 0; - }); - - body.rows.forEach(row => { - if (row.key in triggersByWorker) { - triggersByWorker[row.key] = row.value; - } - }); - - // find which worker has the least number of assigned triggers - for (var worker in triggersByWorker) { - if (triggersByWorker[worker] < triggersByWorker[workerID]) { - workerID = worker; - } - } - resolve(workerID); - } else { - reject(err); - } - }); - } - else { - resolve(workerID); - } - }); -} - -function createTrigger(triggerDB, triggerID, newTrigger) { - - return new Promise(function(resolve, reject) { - - triggerDB.insert(newTrigger, triggerID, function (err) { - if (!err) { - resolve(); - } - else { - reject(sendError(err.statusCode, 'error creating alarm trigger.', err.message)); - } - }); - }); -} - -function getTrigger(triggerDB, triggerID, retry = true) { - - return new Promise(function(resolve, reject) { - - triggerDB.get(triggerID, function (err, existing) { - if (err) { - if (retry) { - var parts = triggerID.split('/'); - var id = parts[0] + '/_/' + parts[2]; - getTrigger(triggerDB, id, false) - .then(doc => { - resolve(doc); - }) - .catch(err => { - reject(err); - }); - } else { - reject(sendError(err.statusCode, 'could not find the trigger in the database')); - } - } else { - resolve(existing); - } - }); - }); -} - -function updateTrigger(triggerDB, triggerID, retryCount) { - - return new Promise(function(resolve, reject) { - - triggerDB.get(triggerID, function (err, existing) { - if (!err) { - var updatedTrigger = existing; - updatedTrigger.status = {'active': false}; - - triggerDB.insert(updatedTrigger, triggerID, function (err) { - if (err) { - if (err.statusCode === 409 && retryCount < 5) { - setTimeout(function () { - updateTrigger(triggerDB, triggerID, (retryCount + 1)) - .then(id => { - resolve(id); - }) - .catch(err => { - reject(err); - }); - }, 1000); - } - else { - reject(sendError(err.statusCode, 'there was an error while marking the trigger for delete in the database.', err.message)); - } - } - else { - resolve(triggerID); - } - }); - } - else { - //legacy alarms triggers may have been created with _ namespace - if (retryCount === 0) { - var parts = triggerID.split('/'); - var id = parts[0] + '/_/' + parts[2]; - updateTrigger(triggerDB, id, (retryCount + 1)) - .then(id => { - resolve(id); - }) - .catch(err => { - reject(err); - }); - } - else { - reject(sendError(err.statusCode, 'could not find the trigger in the database')); - } - } - }); - }); -} - -function deleteTrigger(triggerDB, triggerID, retryCount) { - - return new Promise(function(resolve, reject) { - - triggerDB.get(triggerID, function (err, existing) { - if (!err) { - triggerDB.destroy(existing._id, existing._rev, function (err) { - if (err) { - if (err.statusCode === 409 && retryCount < 5) { - setTimeout(function () { - deleteTrigger(triggerDB, triggerID, (retryCount + 1)) - .then(resolve) - .catch(err => { - reject(err); - }); - }, 1000); - } - else { - reject(sendError(err.statusCode, 'there was an error while deleting the trigger from the database.', err.message)); - } - } - else { - resolve(); - } - }); - } - else { - reject(sendError(err.statusCode, 'could not find the trigger in the database')); - } - }); - }); -} - -function verifyTriggerAuth(triggerURL, authKey, isDelete) { - var auth = authKey.split(':'); - - return new Promise(function(resolve, reject) { - - request({ - method: 'get', - url: triggerURL, - auth: { - user: auth[0], - pass: auth[1] - }, - rejectUnauthorized: false - }, function(err, response) { - if (err) { - reject(sendError(400, 'Trigger authentication request failed.', err.message)); - } - else if(response.statusCode >= 400 && !(isDelete && response.statusCode === 404)) { - reject(sendError(response.statusCode, 'Trigger authentication request failed.')); - } - else { - resolve(); - } - }); - }); -} - -function sendError(statusCode, error, message) { - var params = {error: error}; - if (message) { - params.message = message; - } - - return { - statusCode: statusCode, - headers: { 'Content-Type': 'application/json' }, - body: new Buffer(JSON.stringify(params)).toString('base64') - }; -} - - -function parseQName(qname) { - var parsed = {}; - var delimiter = '/'; - var defaultNamespace = '_'; - if (qname && qname.charAt(0) === delimiter) { - var parts = qname.split(delimiter); - parsed.namespace = parts[1]; - parsed.name = parts.length > 2 ? parts.slice(2).join(delimiter) : ''; - } else { - parsed.namespace = defaultNamespace; - parsed.name = qname; - } - return parsed; -} - - exports.main = main; diff --git a/action/package.json b/action/alarmWeb_package.json similarity index 79% rename from action/package.json rename to action/alarmWeb_package.json index e80f7af..1a9c164 100644 --- a/action/package.json +++ b/action/alarmWeb_package.json @@ -3,6 +3,6 @@ "version": "1.0.0", "main": "alarmWebAction.js", "dependencies" : { - "cron": "^1.2.1" + "cron-parser": "^2.4.3" } } diff --git a/action/lib/Database.js b/action/lib/Database.js new file mode 100644 index 0000000..e1fdb27 --- /dev/null +++ b/action/lib/Database.js @@ -0,0 +1,172 @@ +const common = require('./common'); + +// constructor for DB object - a thin, promise-loving wrapper around nano +module.exports = function(dbURL, dbName) { + var nano = require('nano')(dbURL); + this.db = nano.db.use(dbName); + var utilsDB = this; + + this.getWorkerID = function(availabeWorkers) { + + return new Promise((resolve, reject) => { + var workerID = availabeWorkers[0] || 'worker0'; + + if (availabeWorkers.length > 1) { + utilsDB.db.view('triggerViews', 'triggers_by_worker', {reduce: true, group: true}, function (err, body) { + if (!err) { + var triggersByWorker = {}; + + availabeWorkers.forEach(worker => { + triggersByWorker[worker] = 0; + }); + + body.rows.forEach(row => { + if (row.key in triggersByWorker) { + triggersByWorker[row.key] = row.value; + } + }); + + // find which worker has the least number of assigned triggers + for (var worker in triggersByWorker) { + if (triggersByWorker[worker] < triggersByWorker[workerID]) { + workerID = worker; + } + } + resolve(workerID); + } else { + reject(err); + } + }); + } + else { + resolve(workerID); + } + }); + }; + + this.createTrigger = function(triggerID, newTrigger) { + + return new Promise(function(resolve, reject) { + + utilsDB.db.insert(newTrigger, triggerID, function (err) { + if (!err) { + resolve(); + } + else { + reject(common.sendError(err.statusCode, 'error creating alarm trigger.', err.message)); + } + }); + }); + }; + + this.getTrigger = function(triggerID, retry = true) { + + return new Promise(function(resolve, reject) { + + utilsDB.db.get(triggerID, function (err, existing) { + if (err) { + if (retry) { + var parts = triggerID.split('/'); + var id = parts[0] + '/_/' + parts[2]; + utilsDB.getTrigger(id, false) + .then(doc => { + resolve(doc); + }) + .catch(err => { + reject(err); + }); + } else { + reject(common.sendError(err.statusCode, 'could not find the trigger in the database')); + } + } else { + resolve(existing); + } + }); + }); + }; + + this.updateTrigger = function(triggerID, retryCount) { + + return new Promise(function(resolve, reject) { + + utilsDB.db.get(triggerID, function (err, existing) { + if (!err) { + var updatedTrigger = existing; + updatedTrigger.status = {'active': false}; + + utilsDB.db.insert(updatedTrigger, triggerID, function (err) { + if (err) { + if (err.statusCode === 409 && retryCount < 5) { + setTimeout(function () { + utilsDB.updateTrigger(triggerID, (retryCount + 1)) + .then(id => { + resolve(id); + }) + .catch(err => { + reject(err); + }); + }, 1000); + } + else { + reject(common.sendError(err.statusCode, 'there was an error while marking the trigger for delete in the database.', err.message)); + } + } + else { + resolve(triggerID); + } + }); + } + else { + //legacy alarms triggers may have been created with _ namespace + if (retryCount === 0) { + var parts = triggerID.split('/'); + var id = parts[0] + '/_/' + parts[2]; + utilsDB.updateTrigger(id, (retryCount + 1)) + .then(id => { + resolve(id); + }) + .catch(err => { + reject(err); + }); + } + else { + reject(common.sendError(err.statusCode, 'could not find the trigger in the database')); + } + } + }); + }); + }; + + this.deleteTrigger = function(triggerID, retryCount) { + + return new Promise(function(resolve, reject) { + + utilsDB.db.get(triggerID, function (err, existing) { + if (!err) { + utilsDB.db.destroy(existing._id, existing._rev, function (err) { + if (err) { + if (err.statusCode === 409 && retryCount < 5) { + setTimeout(function () { + utilsDB.deleteTrigger(triggerID, (retryCount + 1)) + .then(resolve) + .catch(err => { + reject(err); + }); + }, 1000); + } + else { + reject(common.sendError(err.statusCode, 'there was an error while deleting the trigger from the database.', err.message)); + } + } + else { + resolve(); + } + }); + } + else { + reject(common.sendError(err.statusCode, 'could not find the trigger in the database')); + } + }); + }); + }; +}; diff --git a/action/alarm.js b/action/lib/common.js similarity index 53% copy from action/alarm.js copy to action/lib/common.js index a5c217f..a6c1477 100644 --- a/action/alarm.js +++ b/action/lib/common.js @@ -1,30 +1,4 @@ -var request = require('request'); - -function main(msg) { - - let eventMap = { - CREATE: 'put', - READ: 'get', - // UPDATE: 'put', - DELETE: 'delete' - }; - // for creation -> CREATE - // for reading -> READ - // for deletion -> DELETE - var lifecycleEvent = msg.lifecycleEvent; - - var endpoint = msg.apihost; - var webparams = createWebParams(msg); - - var url = `https://${endpoint}/api/v1/web/whisk.system/alarmsWeb/alarmWebAction.http`; - - if (lifecycleEvent in eventMap) { - var method = eventMap[lifecycleEvent]; - return requestHelper(url, webparams, method); - } else { - return Promise.reject('unsupported lifecycleEvent'); - } -} +const request = require('request'); function requestHelper(url, input, method) { @@ -67,6 +41,33 @@ function createWebParams(rawParams) { return webparams; } +function verifyTriggerAuth(triggerURL, authKey, isDelete) { + var auth = authKey.split(':'); + + return new Promise(function(resolve, reject) { + + request({ + method: 'get', + url: triggerURL, + auth: { + user: auth[0], + pass: auth[1] + }, + rejectUnauthorized: false + }, function(err, response) { + if (err) { + reject(sendError(400, 'Trigger authentication request failed.', err.message)); + } + else if(response.statusCode >= 400 && !(isDelete && response.statusCode === 404)) { + reject(sendError(response.statusCode, 'Trigger authentication request failed.')); + } + else { + resolve(); + } + }); + }); +} + function parseQName(qname) { var parsed = {}; var delimiter = '/'; @@ -82,6 +83,24 @@ function parseQName(qname) { return parsed; } -exports.main = main; +function sendError(statusCode, error, message) { + var params = {error: error}; + if (message) { + params.message = message; + } + + return { + statusCode: statusCode, + headers: { 'Content-Type': 'application/json' }, + body: new Buffer(JSON.stringify(params)).toString('base64') + }; +} +module.exports = { + 'requestHelper': requestHelper, + 'createWebParams': createWebParams, + 'verifyTriggerAuth': verifyTriggerAuth, + 'parseQName': parseQName, + 'sendError': sendError +}; diff --git a/installCatalog.sh b/installCatalog.sh index 41fd83b..63fde64 100755 --- a/installCatalog.sh +++ b/installCatalog.sh @@ -55,10 +55,35 @@ $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared yes ala -p cron '' \ -p trigger_payload '' -$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" alarms/alarm "$PACKAGE_HOME/action/alarm.js" \ +# make alarmFeed.zip +cd action + +if [ -e alarmFeed.zip ] +then + rm -rf alarmFeed.zip +fi + +cp -f alarmFeed_package.json package.json +zip -r alarmFeed.zip lib package.json alarm.js + +$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" alarms/alarm "$PACKAGE_HOME/action/alarmFeed.zip" \ -a description 'Fire trigger when alarm occurs' \ -a feed true + +# make alarmOnce.zip +if [ -e alarmOnce.zip ] +then + rm -rf alarmOnce.zip +fi + +cp -f alarmOnce_package.json package.json +zip -r alarmOnce.zip lib package.json alarmOnce.js + +$WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" alarms/once "$PACKAGE_HOME/action/alarmOnce.zip" \ + -a description 'Fire trigger once when alarm occurs' \ + -a feed true + if [ -n "$WORKERS" ]; then $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no alarmsWeb \ @@ -74,7 +99,7 @@ else fi # make alarmWebAction.zip -cd action +cp -f alarmWeb_package.json package.json npm install if [ -e alarmWebAction.zip ]; @@ -82,7 +107,7 @@ then rm -rf alarmWebAction.zip fi -zip -r alarmWebAction.zip package.json alarmWebAction.js node_modules +zip -r alarmWebAction.zip lib package.json alarmWebAction.js node_modules $WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" alarmsWeb/alarmWebAction "$PACKAGE_HOME/action/alarmWebAction.zip" \ -a description 'Create/Delete a trigger in alarms provider Database' \ diff --git a/package.json b/package.json index 6331109..0b21780 100755 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "license": "ISC", "dependencies": { "body-parser": "^1.15.0", - "cron": "^1.1.0", + "node-schedule": "^1.2.5", "express": "^4.13.4", "lodash": "^4.5.0", "nano": "6.4.2", diff --git a/provider/lib/utils.js b/provider/lib/utils.js index c804cb8..ff4ae05 100644 --- a/provider/lib/utils.js +++ b/provider/lib/utils.js @@ -1,6 +1,6 @@ var _ = require('lodash'); var request = require('request'); -var CronJob = require('cron').CronJob; +var schedule = require('node-schedule'); var HttpStatus = require('http-status-codes'); var constants = require('./constants.js'); @@ -35,34 +35,49 @@ module.exports = function( try { return new Promise(function(resolve, reject) { - var cronHandle = new CronJob(newTrigger.cron, - function onTick() { - if (utils.activeHost === utils.host) { - var triggerHandle = utils.triggers[triggerIdentifier]; - if (triggerHandle && (triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) { - try { - utils.fireTrigger(newTrigger.namespace, newTrigger.name, newTrigger.payload, newTrigger.apikey); - } catch (e) { - logger.error(method, 'Exception occurred while firing trigger', triggerIdentifier, e); - } + var cachedTrigger = { + apikey: newTrigger.apikey, + name: newTrigger.name, + namespace: newTrigger.namespace + }; + + var cron; + if (newTrigger.date) { + cron = new Date(newTrigger.date); + if (cron.getTime() > Date.now()) { + logger.info(method, 'Creating a fire once alarms trigger', triggerIdentifier); + cachedTrigger.date = newTrigger.date; + } + else { + return reject("the fire once date has expired"); + } + } + else { + cron = newTrigger.cron; + var maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS; + cachedTrigger.triggersLeft = maxTriggers; + cachedTrigger.maxTriggers = maxTriggers; + cachedTrigger.cron = cron; + } + + var cronHandle = new schedule.Job(function() { + if (utils.activeHost === utils.host) { + var triggerHandle = utils.triggers[triggerIdentifier]; + if (triggerHandle && (!triggerHandle.maxTriggers || triggerHandle.maxTriggers === -1 || triggerHandle.triggersLeft > 0)) { + try { + utils.fireTrigger(newTrigger.namespace, newTrigger.name, newTrigger.payload, newTrigger.apikey); + } catch (e) { + logger.error(method, 'Exception occurred while firing trigger', triggerIdentifier, e); } } } - ); + }); logger.info(method, triggerIdentifier, 'starting cron job'); - cronHandle.start(); + cronHandle.schedule(cron); - var maxTriggers = newTrigger.maxTriggers || constants.DEFAULT_MAX_TRIGGERS; + cachedTrigger.cronHandle = cronHandle; + utils.triggers[triggerIdentifier] = cachedTrigger; - utils.triggers[triggerIdentifier] = { - cron: newTrigger.cron, - cronHandle: cronHandle, - triggersLeft: maxTriggers, - maxTriggers: maxTriggers, - apikey: newTrigger.apikey, - name: newTrigger.name, - namespace: newTrigger.namespace - }; resolve(triggerIdentifier); }); } catch (err) { @@ -83,13 +98,11 @@ module.exports = function( utils.postTrigger(dataTrigger, payload, uri, auth, 0) .then(triggerId => { logger.info(method, 'Trigger', triggerId, 'was successfully fired'); - if (dataTrigger.triggersLeft === 0) { - utils.disableTrigger(triggerIdentifier, undefined, 'Automatically disabled after reaching max triggers'); - logger.warn(method, 'no more triggers left, disabled', triggerIdentifier); - } + utils.disableExtinctTriggers(triggerIdentifier, dataTrigger); }) .catch(err => { logger.error(method, err); + utils.disableExtinctTriggers(triggerIdentifier, dataTrigger); }); }; @@ -99,7 +112,7 @@ module.exports = function( return new Promise(function(resolve, reject) { // only manage trigger fires if they are not infinite - if (dataTrigger.maxTriggers !== -1) { + if (dataTrigger.maxTriggers && dataTrigger.maxTriggers !== -1) { dataTrigger.triggersLeft--; } @@ -118,7 +131,7 @@ module.exports = function( if (error || response.statusCode >= 400) { // only manage trigger fires if they are not infinite - if (dataTrigger.maxTriggers !== -1) { + if (dataTrigger.maxTriggers && dataTrigger.maxTriggers !== -1) { dataTrigger.triggersLeft++; } logger.error(method, 'there was an error invoking', triggerIdentifier, response ? response.statusCode : error); @@ -145,7 +158,7 @@ module.exports = function( } } } else { - logger.info(method, 'fired', triggerIdentifier, dataTrigger.triggersLeft, 'triggers left'); + logger.info(method, 'fired', triggerIdentifier); resolve(triggerIdentifier); } } @@ -161,6 +174,20 @@ module.exports = function( [HttpStatus.REQUEST_TIMEOUT, HttpStatus.TOO_MANY_REQUESTS].indexOf(statusCode) === -1); }; + this.disableExtinctTriggers = function(triggerIdentifier, dataTrigger) { + var method = 'disableExtinctTriggers'; + + if (dataTrigger.date) { + utils.disableTrigger(triggerIdentifier, undefined, 'Automatically disabled after firing once'); + logger.info(method, 'the fire once date has expired, disabled', triggerIdentifier); + } + else if (dataTrigger.maxTriggers && dataTrigger.triggersLeft === 0) { + utils.disableTrigger(triggerIdentifier, undefined, 'Automatically disabled after reaching max triggers'); + logger.warn(method, 'no more triggers left, disabled', triggerIdentifier); + } + + }; + this.disableTrigger = function(triggerIdentifier, statusCode, message) { var method = 'disableTrigger'; @@ -198,7 +225,7 @@ module.exports = function( if (utils.triggers[triggerIdentifier]) { if (utils.triggers[triggerIdentifier].cronHandle) { - utils.triggers[triggerIdentifier].cronHandle.stop(); + utils.triggers[triggerIdentifier].cronHandle.cancel(); } delete utils.triggers[triggerIdentifier]; logger.info(method, 'trigger', triggerIdentifier, 'successfully deleted from memory');