This is an automated email from the ASF dual-hosted git repository. csantanapr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-alarms.git
The following commit(s) were added to refs/heads/master by this push: new 6f4b947 Support for multiple workers (#95) 6f4b947 is described below commit 6f4b947bed6593dba91579424a8e224206a6465e Author: Jason Peterson <jason...@us.ibm.com> AuthorDate: Wed Sep 20 13:38:37 2017 -0400 Support for multiple workers (#95) --- action/alarmWebAction.js | 60 +++++++++-- installCatalog.sh | 24 +++-- provider/lib/utils.js | 55 +++++----- .../system/packages/AlarmsMultiWorkersTests.scala | 120 +++++++++++++++++++++ 4 files changed, 216 insertions(+), 43 deletions(-) diff --git a/action/alarmWebAction.js b/action/alarmWebAction.js index bc13019..232d82d 100644 --- a/action/alarmWebAction.js +++ b/action/alarmWebAction.js @@ -17,6 +17,7 @@ function main(params) { var nano = require('nano')(params.DB_URL); var db = nano.db.use(params.DB_NAME); + var workers = params.workers instanceof Array ? params.workers : []; if (params.__ow_method === "put") { @@ -44,20 +45,25 @@ function main(params) { maxTriggers: params.maxTriggers || -1, status: { 'active': true, - 'dateChanged': new Date().toISOString(), + 'dateChanged': new Date().toISOString() } }; return new Promise(function (resolve, reject) { verifyTriggerAuth(triggerURL, params.authKey, false) .then(() => { - return createTrigger(db, triggerID, newTrigger); + return getWorkerID(db, workers); + }) + .then((worker) => { + console.log('trigger will be assigned to worker ' + worker); + newTrigger.worker = worker; + return createTrigger(db, triggerID, newTrigger); }) .then(() => { resolve({ statusCode: 200, headers: {'Content-Type': 'application/json'}, - body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64'), + body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64') }); }) .catch(err => { @@ -80,7 +86,7 @@ function main(params) { resolve({ statusCode: 200, headers: {'Content-Type': 'application/json'}, - body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64'), + body: new Buffer(JSON.stringify({'status': 'success'})).toString('base64') }); }) .catch(err => { @@ -93,6 +99,44 @@ function main(params) { } } +function getWorkerID(db, availabeWorkers) { + + return new Promise((resolve, reject) => { + var workerID = availabeWorkers[0] || 'worker0'; + + if (availabeWorkers.length > 1) { + db.view('triggerViews', 'triggers_by_worker', {reduce: true, group: true}, function (err, body) { + if (!err) { + var triggersByWorker = {}; + + availabeWorkers.forEach(worker => { + triggersByWorker[worker] = 0; + }); + + body.rows.forEach(row => { + if (row.key in triggersByWorker) { + triggersByWorker[row.key] = row.value; + } + }); + + // find which worker has the least number of assigned triggers + for (var worker in triggersByWorker) { + if (triggersByWorker[worker] < triggersByWorker[workerID]) { + workerID = worker; + } + } + resolve(workerID); + } else { + reject(err); + } + }); + } + else { + resolve(workerID); + } + }); +} + function createTrigger(triggerDB, triggerID, newTrigger) { return new Promise(function(resolve, reject) { @@ -124,7 +168,8 @@ function updateTrigger(triggerDB, triggerID, retryCount) { updateTrigger(triggerDB, triggerID, (retryCount + 1)) .then(id => { resolve(id); - }).catch(err => { + }) + .catch(err => { reject(err); }); }, 1000); @@ -146,7 +191,8 @@ function updateTrigger(triggerDB, triggerID, retryCount) { updateTrigger(triggerDB, id, (retryCount + 1)) .then(id => { resolve(id); - }).catch(err => { + }) + .catch(err => { reject(err); }); } @@ -227,7 +273,7 @@ function sendError(statusCode, error, message) { return { statusCode: statusCode, headers: { 'Content-Type': 'application/json' }, - body: new Buffer(JSON.stringify(params)).toString('base64'), + body: new Buffer(JSON.stringify(params)).toString('base64') }; } diff --git a/installCatalog.sh b/installCatalog.sh index 96934f6..41fd83b 100755 --- a/installCatalog.sh +++ b/installCatalog.sh @@ -4,7 +4,7 @@ # automatically # # To run this command -# ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost> +# ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost> <workers> set -e set -x @@ -14,7 +14,7 @@ WSK_CLI="$OPENWHISK_HOME/bin/wsk" if [ $# -eq 0 ] then -echo "Usage: ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost>" +echo "Usage: ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost> <workers>" fi AUTH="$1" @@ -22,6 +22,7 @@ EDGEHOST="$2" DB_URL="$3" DB_NAME="${4}alarmservice" APIHOST="$5" +WORKERS="$6" # If the auth key file exists, read the key in the file. Otherwise, take the # first argument as the key itself. @@ -58,16 +59,25 @@ $WSK_CLI -i --apihost "$EDGEHOST" action update --kind nodejs:6 --auth "$AUTH" a -a description 'Fire trigger when alarm occurs' \ -a feed true -$WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no alarmsWeb \ - -p DB_URL "$DB_URL" \ - -p DB_NAME "$DB_NAME" \ - -p apihost "$APIHOST" +if [ -n "$WORKERS" ]; +then + $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no alarmsWeb \ + -p DB_URL "$DB_URL" \ + -p DB_NAME "$DB_NAME" \ + -p apihost "$APIHOST" \ + -p workers "$WORKERS" +else + $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no alarmsWeb \ + -p DB_URL "$DB_URL" \ + -p DB_NAME "$DB_NAME" \ + -p apihost "$APIHOST" +fi # make alarmWebAction.zip cd action npm install -if [ -e alarmWebAction.zip ] +if [ -e alarmWebAction.zip ]; then rm -rf alarmWebAction.zip fi diff --git a/provider/lib/utils.js b/provider/lib/utils.js index 83f57d0..5103356 100644 --- a/provider/lib/utils.js +++ b/provider/lib/utils.js @@ -164,36 +164,33 @@ module.exports = function( this.disableTrigger = function(triggerIdentifier, statusCode, message) { var method = 'disableTrigger'; - //only active/master provider should update the database - if (utils.activeHost === utils.host) { - triggerDB.get(triggerIdentifier, function (err, existing) { - if (!err) { - if (!existing.status || existing.status.active === true) { - var updatedTrigger = existing; - var status = { - 'active': false, - 'dateChanged': new Date().toISOString(), - 'reason': {'kind': 'AUTO', 'statusCode': statusCode, 'message': message} - }; - updatedTrigger.status = status; - - triggerDB.insert(updatedTrigger, triggerIdentifier, function (err) { - if (err) { - logger.error(method, 'there was an error while disabling', triggerIdentifier, 'in database.', err); - } - else { - logger.info(method, 'trigger', triggerIdentifier, 'successfully disabled in database'); - } - }); - } - } - else { - logger.info(method, 'could not find', triggerIdentifier, 'in database'); - //make sure it is removed from memory as well - utils.deleteTrigger(triggerIdentifier); + triggerDB.get(triggerIdentifier, function (err, existing) { + if (!err) { + if (!existing.status || existing.status.active === true) { + var updatedTrigger = existing; + var status = { + 'active': false, + 'dateChanged': new Date().toISOString(), + 'reason': {'kind': 'AUTO', 'statusCode': statusCode, 'message': message} + }; + updatedTrigger.status = status; + + triggerDB.insert(updatedTrigger, triggerIdentifier, function (err) { + if (err) { + logger.error(method, 'there was an error while disabling', triggerIdentifier, 'in database.', err); + } + else { + logger.info(method, 'trigger', triggerIdentifier, 'successfully disabled in database'); + } + }); } - }); - } + } + else { + logger.info(method, 'could not find', triggerIdentifier, 'in database'); + //make sure it is removed from memory as well + utils.deleteTrigger(triggerIdentifier); + } + }); }; this.deleteTrigger = function(triggerIdentifier) { diff --git a/tests/src/test/scala/system/packages/AlarmsMultiWorkersTests.scala b/tests/src/test/scala/system/packages/AlarmsMultiWorkersTests.scala new file mode 100644 index 0000000..ebdcfb8 --- /dev/null +++ b/tests/src/test/scala/system/packages/AlarmsMultiWorkersTests.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package system.packages + +import com.jayway.restassured.RestAssured +import com.jayway.restassured.config.SSLConfig +import com.jayway.restassured.http.ContentType +import common._ +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{FlatSpec, Matchers} +import spray.json.DefaultJsonProtocol.StringJsonFormat +import spray.json.DefaultJsonProtocol._ +import spray.json.{pimpAny, _} +import whisk.core.database.test.DatabaseScriptTestUtils +import whisk.utils.JsHelpers + + +@RunWith(classOf[JUnitRunner]) +class AlarmsMultiWorkersTests extends FlatSpec + with Matchers + with WskActorSystem + with WskTestHelpers + with StreamLogging + with DatabaseScriptTestUtils { + + val wskprops = WskProps() + val wsk = new Wsk + val auth = WhiskProperties.getBasicAuth + val user = auth.fst + val password = auth.snd + + val webAction = "/whisk.system/alarmsWeb/alarmWebAction" + val webActionURL = s"https://${wskprops.apihost}/api/v1/web${webAction}.http" + + behavior of "Alarms multi workers feed tests" + + it should "create triggers assigned to worker0 and worker1" in withAssetCleaner(WskProps()) { + (wp, assetHelper) => + implicit val wskprops = wp // shadow global props and make implicit + + val worker0Trigger = s"dummyAlarmsTrigger-${System.currentTimeMillis}" + val worker0Params = JsObject( + "triggerName" -> JsString(worker0Trigger), + "authKey" -> JsString(s"$user:$password"), + "cron" -> "* * * * *".toJson, + "workers" -> JsArray(JsString("worker0"))) + + val worker1Trigger = s"dummyAlarmsTrigger-${System.currentTimeMillis}" + val worker1Params = JsObject( + "triggerName" -> JsString(worker1Trigger), + "authKey" -> JsString(s"$user:$password"), + "cron" -> "* * * * *".toJson, + "workers" -> JsArray(JsString("worker0"), JsString("worker1"))) + + try { + wsk.trigger.create(worker0Trigger) + + //create trigger feed and assign to worker0 + makePutCallWithExpectedResult(worker0Params, 200) + + wsk.trigger.create(worker1Trigger) + + //create trigger feed and assign to worker0 or worker1 + //the one with the least assigned triggers will be chosen + makePutCallWithExpectedResult(worker1Params, 200) + + val dbName = s"${dbPrefix}alarmservice" + val documents = getAllDocs(dbName) + + val worker1Doc = documents + .fields("rows") + .convertTo[List[JsObject]] + .filter(_.fields("id").convertTo[String].equals(s"$user:$password/_/$worker1Trigger")) + + JsHelpers.getFieldPath(worker1Doc(0), "doc", "worker") shouldBe Some(JsString("worker1")) + } finally { + //delete triggers + wsk.trigger.delete(worker0Trigger) + wsk.trigger.delete(worker1Trigger) + + makeDeleteCallWithExpectedResult(worker0Params, 200) + makeDeleteCallWithExpectedResult(worker1Params, 200) + } + } + + def makePutCallWithExpectedResult(params: JsObject, expectedCode: Int) = { + val response = RestAssured.given() + .contentType(ContentType.JSON) + .config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation())) + .body(params.toString()) + .put(webActionURL) + assert(response.statusCode() == expectedCode) + } + + def makeDeleteCallWithExpectedResult(params: JsObject, expectedCode: Int) = { + val response = RestAssured.given() + .contentType(ContentType.JSON) + .config(RestAssured.config().sslConfig(new SSLConfig().relaxedHTTPSValidation())) + .body(params.toString()) + .delete(webActionURL) + assert(response.statusCode() == expectedCode) + } + + +} -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].