This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-alarms.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f4b947  Support for multiple workers (#95)
6f4b947 is described below

commit 6f4b947bed6593dba91579424a8e224206a6465e
Author: Jason Peterson <jason...@us.ibm.com>
AuthorDate: Wed Sep 20 13:38:37 2017 -0400

    Support for multiple workers (#95)
---
 action/alarmWebAction.js                           |  60 +++++++++--
 installCatalog.sh                                  |  24 +++--
 provider/lib/utils.js                              |  55 +++++-----
 .../system/packages/AlarmsMultiWorkersTests.scala  | 120 +++++++++++++++++++++
 4 files changed, 216 insertions(+), 43 deletions(-)

diff --git a/action/alarmWebAction.js b/action/alarmWebAction.js
index bc13019..232d82d 100644
--- a/action/alarmWebAction.js
+++ b/action/alarmWebAction.js
@@ -17,6 +17,7 @@ function main(params) {
 
     var nano = require('nano')(params.DB_URL);
     var db = nano.db.use(params.DB_NAME);
+    var workers = params.workers instanceof Array ? params.workers : [];
 
     if (params.__ow_method === "put") {
 
@@ -44,20 +45,25 @@ function main(params) {
             maxTriggers: params.maxTriggers || -1,
             status: {
                 'active': true,
-                'dateChanged': new Date().toISOString(),
+                'dateChanged': new Date().toISOString()
             }
         };
 
         return new Promise(function (resolve, reject) {
             verifyTriggerAuth(triggerURL, params.authKey, false)
             .then(() => {
-                 return createTrigger(db, triggerID, newTrigger);
+                return getWorkerID(db, workers);
+            })
+            .then((worker) => {
+                console.log('trigger will be assigned to worker ' + worker);
+                newTrigger.worker = worker;
+                return createTrigger(db, triggerID, newTrigger);
             })
             .then(() => {
                 resolve({
                     statusCode: 200,
                     headers: {'Content-Type': 'application/json'},
-                    body: new Buffer(JSON.stringify({'status': 
'success'})).toString('base64'),
+                    body: new Buffer(JSON.stringify({'status': 
'success'})).toString('base64')
                 });
             })
             .catch(err => {
@@ -80,7 +86,7 @@ function main(params) {
                 resolve({
                     statusCode: 200,
                     headers: {'Content-Type': 'application/json'},
-                    body: new Buffer(JSON.stringify({'status': 
'success'})).toString('base64'),
+                    body: new Buffer(JSON.stringify({'status': 
'success'})).toString('base64')
                 });
             })
             .catch(err => {
@@ -93,6 +99,44 @@ function main(params) {
     }
 }
 
+function getWorkerID(db, availabeWorkers) {
+
+    return new Promise((resolve, reject) => {
+        var workerID = availabeWorkers[0] || 'worker0';
+
+        if (availabeWorkers.length > 1) {
+            db.view('triggerViews', 'triggers_by_worker', {reduce: true, 
group: true}, function (err, body) {
+                if (!err) {
+                    var triggersByWorker = {};
+
+                    availabeWorkers.forEach(worker => {
+                        triggersByWorker[worker] = 0;
+                    });
+
+                    body.rows.forEach(row => {
+                        if (row.key in triggersByWorker) {
+                            triggersByWorker[row.key] = row.value;
+                        }
+                    });
+
+                    // find which worker has the least number of assigned 
triggers
+                    for (var worker in triggersByWorker) {
+                        if (triggersByWorker[worker] < 
triggersByWorker[workerID]) {
+                            workerID = worker;
+                        }
+                    }
+                    resolve(workerID);
+                } else {
+                    reject(err);
+                }
+            });
+        }
+        else {
+            resolve(workerID);
+        }
+    });
+}
+
 function createTrigger(triggerDB, triggerID, newTrigger) {
 
     return new Promise(function(resolve, reject) {
@@ -124,7 +168,8 @@ function updateTrigger(triggerDB, triggerID, retryCount) {
                                 updateTrigger(triggerDB, triggerID, 
(retryCount + 1))
                                 .then(id => {
                                     resolve(id);
-                                }).catch(err => {
+                                })
+                                .catch(err => {
                                     reject(err);
                                 });
                             }, 1000);
@@ -146,7 +191,8 @@ function updateTrigger(triggerDB, triggerID, retryCount) {
                     updateTrigger(triggerDB, id, (retryCount + 1))
                     .then(id => {
                         resolve(id);
-                    }).catch(err => {
+                    })
+                    .catch(err => {
                         reject(err);
                     });
                 }
@@ -227,7 +273,7 @@ function sendError(statusCode, error, message) {
     return {
         statusCode: statusCode,
         headers: { 'Content-Type': 'application/json' },
-        body: new Buffer(JSON.stringify(params)).toString('base64'),
+        body: new Buffer(JSON.stringify(params)).toString('base64')
     };
 }
 
diff --git a/installCatalog.sh b/installCatalog.sh
index 96934f6..41fd83b 100755
--- a/installCatalog.sh
+++ b/installCatalog.sh
@@ -4,7 +4,7 @@
 # automatically
 #
 # To run this command
-# ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost>
+# ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> <apihost> 
<workers>
 
 set -e
 set -x
@@ -14,7 +14,7 @@ WSK_CLI="$OPENWHISK_HOME/bin/wsk"
 
 if [ $# -eq 0 ]
 then
-echo "Usage: ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> 
<apihost>"
+echo "Usage: ./installCatalog.sh <authkey> <edgehost> <dburl> <dbprefix> 
<apihost> <workers>"
 fi
 
 AUTH="$1"
@@ -22,6 +22,7 @@ EDGEHOST="$2"
 DB_URL="$3"
 DB_NAME="${4}alarmservice"
 APIHOST="$5"
+WORKERS="$6"
 
 # If the auth key file exists, read the key in the file. Otherwise, take the
 # first argument as the key itself.
@@ -58,16 +59,25 @@ $WSK_CLI -i --apihost "$EDGEHOST" action update --kind 
nodejs:6 --auth "$AUTH" a
      -a description 'Fire trigger when alarm occurs' \
      -a feed true
 
-$WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared no 
alarmsWeb \
-     -p DB_URL "$DB_URL" \
-     -p DB_NAME "$DB_NAME" \
-     -p apihost "$APIHOST"
+if [ -n "$WORKERS" ];
+then
+    $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared 
no alarmsWeb \
+        -p DB_URL "$DB_URL" \
+        -p DB_NAME "$DB_NAME" \
+        -p apihost "$APIHOST" \
+        -p workers "$WORKERS"
+else
+    $WSK_CLI -i --apihost "$EDGEHOST" package update --auth "$AUTH" --shared 
no alarmsWeb \
+        -p DB_URL "$DB_URL" \
+        -p DB_NAME "$DB_NAME" \
+        -p apihost "$APIHOST"
+fi
 
 # make alarmWebAction.zip
 cd action
 npm install
 
-if [ -e alarmWebAction.zip ]
+if [ -e alarmWebAction.zip ];
 then
     rm -rf alarmWebAction.zip
 fi
diff --git a/provider/lib/utils.js b/provider/lib/utils.js
index 83f57d0..5103356 100644
--- a/provider/lib/utils.js
+++ b/provider/lib/utils.js
@@ -164,36 +164,33 @@ module.exports = function(
     this.disableTrigger = function(triggerIdentifier, statusCode, message) {
         var method = 'disableTrigger';
 
-        //only active/master provider should update the database
-        if (utils.activeHost === utils.host) {
-            triggerDB.get(triggerIdentifier, function (err, existing) {
-                if (!err) {
-                    if (!existing.status || existing.status.active === true) {
-                        var updatedTrigger = existing;
-                        var status = {
-                            'active': false,
-                            'dateChanged': new Date().toISOString(),
-                            'reason': {'kind': 'AUTO', 'statusCode': 
statusCode, 'message': message}
-                        };
-                        updatedTrigger.status = status;
-
-                        triggerDB.insert(updatedTrigger, triggerIdentifier, 
function (err) {
-                            if (err) {
-                                logger.error(method, 'there was an error while 
disabling', triggerIdentifier, 'in database.', err);
-                            }
-                            else {
-                                logger.info(method, 'trigger', 
triggerIdentifier, 'successfully disabled in database');
-                            }
-                        });
-                    }
-                }
-                else {
-                    logger.info(method, 'could not find', triggerIdentifier, 
'in database');
-                    //make sure it is removed from memory as well
-                    utils.deleteTrigger(triggerIdentifier);
+        triggerDB.get(triggerIdentifier, function (err, existing) {
+            if (!err) {
+                if (!existing.status || existing.status.active === true) {
+                    var updatedTrigger = existing;
+                    var status = {
+                        'active': false,
+                        'dateChanged': new Date().toISOString(),
+                        'reason': {'kind': 'AUTO', 'statusCode': statusCode, 
'message': message}
+                    };
+                    updatedTrigger.status = status;
+
+                    triggerDB.insert(updatedTrigger, triggerIdentifier, 
function (err) {
+                        if (err) {
+                            logger.error(method, 'there was an error while 
disabling', triggerIdentifier, 'in database.', err);
+                        }
+                        else {
+                            logger.info(method, 'trigger', triggerIdentifier, 
'successfully disabled in database');
+                        }
+                    });
                 }
-            });
-        }
+            }
+            else {
+                logger.info(method, 'could not find', triggerIdentifier, 'in 
database');
+                //make sure it is removed from memory as well
+                utils.deleteTrigger(triggerIdentifier);
+            }
+        });
     };
 
     this.deleteTrigger = function(triggerIdentifier) {
diff --git a/tests/src/test/scala/system/packages/AlarmsMultiWorkersTests.scala 
b/tests/src/test/scala/system/packages/AlarmsMultiWorkersTests.scala
new file mode 100644
index 0000000..ebdcfb8
--- /dev/null
+++ b/tests/src/test/scala/system/packages/AlarmsMultiWorkersTests.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package system.packages
+
+import com.jayway.restassured.RestAssured
+import com.jayway.restassured.config.SSLConfig
+import com.jayway.restassured.http.ContentType
+import common._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{FlatSpec, Matchers}
+import spray.json.DefaultJsonProtocol.StringJsonFormat
+import spray.json.DefaultJsonProtocol._
+import spray.json.{pimpAny, _}
+import whisk.core.database.test.DatabaseScriptTestUtils
+import whisk.utils.JsHelpers
+
+
+@RunWith(classOf[JUnitRunner])
+class AlarmsMultiWorkersTests extends FlatSpec
+    with Matchers
+    with WskActorSystem
+    with WskTestHelpers
+    with StreamLogging
+    with DatabaseScriptTestUtils {
+
+    val wskprops = WskProps()
+    val wsk = new Wsk
+    val auth = WhiskProperties.getBasicAuth
+    val user = auth.fst
+    val password = auth.snd
+
+    val webAction = "/whisk.system/alarmsWeb/alarmWebAction"
+    val webActionURL = 
s"https://${wskprops.apihost}/api/v1/web${webAction}.http";
+
+    behavior of "Alarms multi workers feed tests"
+
+    it should "create triggers assigned to worker0 and worker1" in 
withAssetCleaner(WskProps()) {
+        (wp, assetHelper) =>
+            implicit val wskprops = wp // shadow global props and make implicit
+
+            val worker0Trigger = 
s"dummyAlarmsTrigger-${System.currentTimeMillis}"
+            val worker0Params = JsObject(
+                "triggerName" -> JsString(worker0Trigger),
+                "authKey" -> JsString(s"$user:$password"),
+                "cron" -> "* * * * *".toJson,
+                "workers" -> JsArray(JsString("worker0")))
+
+            val worker1Trigger = 
s"dummyAlarmsTrigger-${System.currentTimeMillis}"
+            val worker1Params = JsObject(
+                "triggerName" -> JsString(worker1Trigger),
+                "authKey" -> JsString(s"$user:$password"),
+                "cron" -> "* * * * *".toJson,
+                "workers" -> JsArray(JsString("worker0"), JsString("worker1")))
+
+            try {
+                wsk.trigger.create(worker0Trigger)
+
+                //create trigger feed and assign to worker0
+                makePutCallWithExpectedResult(worker0Params, 200)
+
+                wsk.trigger.create(worker1Trigger)
+
+                //create trigger feed and assign to worker0 or worker1
+                //the one with the least assigned triggers will be chosen
+                makePutCallWithExpectedResult(worker1Params, 200)
+
+                val dbName = s"${dbPrefix}alarmservice"
+                val documents = getAllDocs(dbName)
+
+                val worker1Doc = documents
+                        .fields("rows")
+                        .convertTo[List[JsObject]]
+                        
.filter(_.fields("id").convertTo[String].equals(s"$user:$password/_/$worker1Trigger"))
+
+                JsHelpers.getFieldPath(worker1Doc(0), "doc", "worker") 
shouldBe Some(JsString("worker1"))
+            } finally {
+                //delete triggers
+                wsk.trigger.delete(worker0Trigger)
+                wsk.trigger.delete(worker1Trigger)
+
+                makeDeleteCallWithExpectedResult(worker0Params, 200)
+                makeDeleteCallWithExpectedResult(worker1Params, 200)
+            }
+    }
+
+    def makePutCallWithExpectedResult(params: JsObject, expectedCode: Int) = {
+        val response = RestAssured.given()
+                .contentType(ContentType.JSON)
+                .config(RestAssured.config().sslConfig(new 
SSLConfig().relaxedHTTPSValidation()))
+                .body(params.toString())
+                .put(webActionURL)
+        assert(response.statusCode() == expectedCode)
+    }
+
+    def makeDeleteCallWithExpectedResult(params: JsObject, expectedCode: Int) 
= {
+        val response = RestAssured.given()
+                .contentType(ContentType.JSON)
+                .config(RestAssured.config().sslConfig(new 
SSLConfig().relaxedHTTPSValidation()))
+                .body(params.toString())
+                .delete(webActionURL)
+        assert(response.statusCode() == expectedCode)
+    }
+
+
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Reply via email to