This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new bb01428  Add owperf as a performance evaluation tool for OpenWhisk 
(#4320)
bb01428 is described below

commit bb01428bca92f1f0dc2b4b8a710be52de1d6ba47
Author: Erez Hadad <er...@il.ibm.com>
AuthorDate: Wed Apr 3 05:08:07 2019 +0300

    Add owperf as a performance evaluation tool for OpenWhisk (#4320)
---
 tools/owperf/README.md       | 101 +++++
 tools/owperf/owperf.js       | 865 +++++++++++++++++++++++++++++++++++++++++++
 tools/owperf/owperf.sh       |  22 ++
 tools/owperf/owperf_data.odg | Bin 0 -> 17721 bytes
 tools/owperf/owperf_data.png | Bin 0 -> 171488 bytes
 tools/owperf/package.json    |  24 ++
 tools/owperf/setup.sh        |  81 ++++
 tools/owperf/testAction.js   |  41 ++
 8 files changed, 1134 insertions(+)

diff --git a/tools/owperf/README.md b/tools/owperf/README.md
new file mode 100644
index 0000000..70f12b7
--- /dev/null
+++ b/tools/owperf/README.md
@@ -0,0 +1,101 @@
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+# :electric_plug: owperf - a performance test tool for Apache OpenWhisk
+
+## General Info
+This test tool benchmarks an OpenWhisk deployment for (warm) latency and 
throughput, with several new capabilities:
+1. Measure performance of rules (trigger-to-action) in addition to actions
+1. Deeper profiling without instrumentation (e.g., Kamino) by leveraging the 
activation records in addition to the client's timing data. This avoids special 
setups, and can help gain performance insights on third-party deployments of 
OpenWhisk.
+1. New tunables that can affect performance:
+   1. Parameter size - controls the size of the parameter passed to the action 
or event
+   1. Actions per iteration (a.k.a. _ratio_) - controls how many rules are 
associated with a trigger [for rules] or how many actions are asynchronously 
invoked (burst size) at each iteration of a test worker [for actions].
+1. "Master apart" mode - Allow the master client to perform latency 
measurements while the worker clients stress OpenWhisk using a specific 
invocation pattern in the background. Useful for measuring latency under load, 
and for comparing latencies of rules and actions under load.
+The tool is written in node.js, using mainly the modules of OpenWhisk client, 
cluster for concurrency, and commander for CLI procssing.
+
+### Operation
+The general operation of a test is simple:
+1. **Setup**: the tool creates the test action, test trigger, and a number of 
rules that matches the ratio tunable above.
+1. **Test**: the tool fires up a specified number of concurrent clients - a 
master and workers.
+   1. Each client wakes up once every _delta_ msec (iteration) and invokes the 
specified activity: either the trigger (for rule testing) or multiple 
concurrent actions - matching the ratio tunable. Action invocations can be 
blocking.
+   1. After each client has completed a number of initial iterations (warmup), 
measurement begins, controlled by the master client, for either a specified 
number of iterations or specified time.
+   1. At the end of the measurement, each client retrieves the activation 
records of its triggers and/or actions, and generates summary data that is sent 
to the master, which generates and prints the final results.
+1. **Teardown**: clean up the OpenWhisk assets created during setup
+
+Final results are written to the standard output stream (so can be redirected 
to a file) as a single highly-detailed CSV record containing all the input 
settings and the output measurements (see below). There is additional control 
information that is written to the standard error stream and can be silenced in 
CLI. The control information also contains the CSV header, so it can be copied 
into a spreadsheet if needed.
+
+It is possible to invoke the tool in "Master apart" mode, where the master 
client is invoking a diffrent activity than the workers, and at possibly a 
different (very likely, much slower) rate. In this mode, latency statsitics are 
computed based solely on the master's data, since the worker's activity is used 
only as background to stress the OpenWhisk deployment. So one experiment can 
have the master client invoke rules and another one can have the master client 
invoke actions, while in b [...]
+
+The tool is highly customizable via CLI options. All the independent test 
variables are controlled via CLI. This includes number of workers, invocation 
pattern, OW client configuration, test action sleep time, etc.
+
+Test setup and teardown can be independently skipped via CLI, and/or directly 
invoked from the external setup script (```setup.sh```), so that setup can be 
shared between multiple tests. More advanced users can replace the test action 
with a custom action in the setup script to benchmark action invocation or 
event-respose throughput and latency of specific applications.
+
+**Clock skew**: OpenWhisk is a distributed system, which means that clock skew 
is expected between the client machine computing invocation timestamps and the 
controllers or invokers that generate the timestamps in the activation records. 
However, this tool assumes that clock skew is bound at few msec range, due to 
having all machines clocks synchronized, typically using NTP. At such a scale, 
clock skew is quite small compared to the measured time periods. Some of the 
time periods are mea [...]
+
+## Initial Setup
+The tool requires very little setup. You need to have node.js (v8+) and the 
wsk CLI client installed (on $PATH). Before the first run, execute ```npm 
install``` in the tool folder to install the dependencies.
+**Throttling**: By default, OW performance is throttled according to some 
[limits](https://github.com/apache/incubator-openwhisk/blob/master/docs/reference.md#system-limits),
 such as maximum number of concurrent requests, or maximum invocations per 
minute. If your benchmark stresses OpenWhisk beyond the limit value, you might 
want to relax those limits. If it's an OpenWhisk deployment that you control, 
you can set the limits to 999999, thereby effectively cancelling the limits. If 
it's a [...]
+
+## Usage
+To use the tool, run ```./owperf.sh <options>``` to perform a test. To see all 
the available options and defaults run ```./owperf.sh -h```.
+
+The default for ratio is 1. If using a different ratio, be sure to specify the 
same ratio value for all steps.
+
+For example, let's perform a test of rule performance with 3 clients, using 
the default delta of 200 msec, for 100 iterations (counted at the master 
client, excluding the warmup), ratio of 4. Each client performs 5 iterations 
per second, each iteration firing a trigger that invokes 4 rules, yielding a 
total of 3x5x4=60 rule invocations per second. The command to run this test: 
```./owperf.sh -a rule -w 3 -i 100 -r 4```
+
+## Measurements
+As explained above, the owperf tool collects both latency and throughput data 
at each experiment.
+
+### Latency
+The following time-stamps are collected for each invocation, of either action, 
or rule (containing an action):
+* **BI** (Before Invocation) - taken by a client immediately before invoking - 
either the trigger fire (for rules), or an action invocation.
+* **TS** (Trigger Start) - taken from the activation record of the trigger 
linked to the rules, so applies only to rule tests. All actions invoked by the 
rules of the same trigger have the same TS value.
+* **AS** (Action Start) - taken from the activation record of the action.
+* **AE** (Action End) - taken from the activation record of the action.
+* **AI** (After Invocation) - taken by the client immmediately after the 
invocation, for blocking action invocation tests only.
+
+Based on these timestamps, the following measurements are taken:
+* **OEA** (Overhead of Entering Action) - OpenWhisk processing overhead from 
sending the action invocation or trigger fire to the beginning of the action 
execution. OEA = AS-BI
+* **D** - the duration of the test action - as reported by the action itself 
in the return value.
+* **AD** - Action Duration - as measured by OpenWhisk invoker. AD = AE - AS. 
Always expect that AD >= D.
+* **OER** (Overhead of Executing Request) - OpenWhisk processing overhead from 
sending the action invocation or trigger fire to the completion of the action 
execution in the OpenWhisk Invoker. OER = AE-BI-D
+* **TA** (Trigger to Answer) - the processing time from the start of the 
trigger process to the start of the action (rule tests only). TA = AS-TS
+* **ORA** (Overhead of Returning from Action) - time from action end till 
being received by the client (blocking action tests only). ORA = AI - AE
+* **RTT** (Round Trip Time) - time at the client from action invocation till 
reply received (blocking action tests only). RTT = AI - BI
+* **ORTT** (Overhead of RTT) - RTT at the client exclugin the net action 
computation time. ORTT = RTT - D
+
+For each measurement, the tool computes average (_avg_), standard deviation 
(_std_), and extremes (_min_ and _max_).
+
+The following chart depicts the relationship between the various measurements 
and the action invocation and rule invocation flows.
+
+![](owperf_data.png)
+
+### Throughput
+Throughput is measured w.r.t. several different counters. During 
post-processing of an experiment, each counter value is divided by the 
measurement time period to compute a respective throughput.
+* **Attempts** - number of invocation attempts performed inside the time frame 
(according to their BI). This is the "arrival rate" of invocations, should be 
close to _clients * ratio / delta_ .
+* **Requests** - number of requests sent to OpenWhisk inside the time frame. 
Each action invocation is one request, and each trigger fire is also one 
request (so a client invoking rules at ratio _k_ generates _k+1_ requests).
+* **Activations** - number of completed activations inside the time frame, 
counting both trigger activations (based on TS), and action activations (based 
on AS and AE).
+* **Invocations** - number of successful invocations of complete rules or 
actions (depending on the activity). This is the "service rate" of invocations 
(assuming errors happen only because OW is overloaded).
+
+For each counter, the tool reports the total counter value (_abs_), total 
throughput per second (_tp_), througput of the worker clients without the 
master (_tpw_) and the master's percentage of throughput relative to workers 
(_tpd_). The last two values are important mostly for master apart mode.
+
+Aside from that, the tool also counts **errors**. Failed invocations - of 
actions, of triggers, or of actions from triggers (via rules) are counted each 
as an error. The tool reports both absolute error count (_abs_) and percent out 
of requests (_percent_).
+
+## Acknowledgements
+The owperf tool has been developed by IBM Research as part of the 
[CLASS](https://class-project.eu/) EU project. CLASS aims to integrate 
OpenWhisk as a foundation for latency-sensitive polyglot event-driven big-data 
analytics platform running on a compute continuum from the cloud to the edge. 
CLASS is funded by the European Union's Horizon 2020 Programme grant agreement 
No. 780622.
+
diff --git a/tools/owperf/owperf.js b/tools/owperf/owperf.js
new file mode 100644
index 0000000..9b529e4
--- /dev/null
+++ b/tools/owperf/owperf.js
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * This is a test tool for measuring the performance of OpenWhisk actions and 
rules.
+ * The full documentation of the tool is available in README.md .
+ */
+
+const fs = require('fs');
+const ini = require('ini');
+const cluster = require('cluster');
+const openwhisk = require('openwhisk');
+const program = require('commander');
+const exec = require('node-exec-promise').exec;
+
+const ACTION = "action";
+const RULE = "rule";
+const RESULT = "result";
+const ACTIVATION = "activation";
+const NONE = "none";
+
+function parseIntDef(strval, defval) {
+    return parseInt(strval);
+}
+
+program
+    .description('Latency and throughput measurement of OpenWhisk actions and 
rules')
+    .version('0.0.1')
+    .option('-a, --activity <action/rule>', "Activity to measure", 
/^(action|rule)$/i, "action")
+    .option('-b, --blocking <result/activation/none>', "For actions, wait 
until result or activation, or don't wait", /^(result|activation|none)$/i, 
"none")
+    .option('-d, --delta <msec>', "Time diff between consequent invocations of 
the same worker, in msec", parseIntDef, 200)
+    .option('-i, --iterations <count>', "Number of measurement iterations", 
parseInt)
+    .option('-p, --period <msec>', "Period of measurement in msec", parseInt)
+    .option('-r, --ratio <count>', "How many actions per iteration (or rules 
per trigger)", parseIntDef, 1)
+    .option('-s, --parameter_size <size>', "Size of string parameter passed to 
trigger or actions", parseIntDef, 1000)
+    .option('-w, --workers <count>', "Total number of concurrent workers incl. 
master", parseIntDef, 1)
+    .option('-A, --master_activity <action/rule>', "Set master activity apart 
from other workerss", /^(action|rule)$/i)
+    .option('-B, --master_blocking <result/activation/none>', "Set master 
blocking apart from other workers", /^(result|activation|none)$/i)
+    .option('-D, --master_delta <msec>', "Set master delta apart from other 
workers", parseInt)
+    .option('-u, --warmup <count>', "How many invocations to perform at each 
worker as warmup", parseIntDef, 5)
+    .option('-l, --delay <msec>', "How many msec to delay at each action", 
parseIntDef, 50)
+    .option('-P --pp_delay <msec>', "Wait for remaining activations to 
finalize before post-processing", parseIntDef, 60000)
+    .option('-G --burst_timing', "For actions, use the same invocation timing 
(BI) for all actions in a burst")
+    .option('-S --no-setup', "Skip test setup (so use previous setup)")
+    .option('-T --no-teardown', "Skip test teardown (to allow setup reuse)")
+    .option('-f --config_file <filepath>', "Specify a wskprops configuration 
file to use", `${process.env.HOME}/.wskprops`)
+    .option('-q, --quiet', "Suppress progress information on stderr");
+
+program.parse(process.argv);
+
+var testRecord = {input: {}, output: {}};    // holds the final test data
+
+for (var opt in program.opts())
+    if (typeof program[opt] != 'function')
+        testRecord.input[opt] = program[opt];
+
+// If neither period nor iterations are set, then period is set by default to 
1000 msec
+if (!testRecord.input.iterations && !testRecord.input.period)
+    testRecord.input.period = 1000;
+
+// If either master_activity, master_blocking or master_delta are set, then 
test is in 'master apart' mode
+testRecord.input.master_apart = ((testRecord.input.master_activity || 
testRecord.input.master_blocking || testRecord.input.master_delta) && true);
+
+mLog("Parameter Configuration:");
+for (var opt in testRecord.input)
+    mLog(`${opt} = ${testRecord.input[opt]}`);
+mLog("-----\n");
+
+mLog("Generating invocation parameters");
+var inputMessage = "A".repeat(testRecord.input.parameter_size);
+var params = {sleep: testRecord.input.delay, message: inputMessage};
+
+mLog("Loading wskprops");
+const config = ini.parse(fs.readFileSync(testRecord.input.config_file, 
"utf-8"));
+mLog("APIHOST = " + config.APIHOST);
+mLog("AUTH = " + config.AUTH);
+mLog("-----\n");
+const wskParams = `--apihost ${config.APIHOST} --auth ${config.AUTH} -i`;    
// to be used when invoking setup and teardown via external wsk
+
+// openwhisk client used for invocations
+const ow = openwhisk({apihost: config.APIHOST, api_key: config.AUTH, 
ignore_certs: true});
+
+// counters for throughput computation (all)
+const tpCounters = {attempts: 0, invocations: 0, activations: 0, requests: 0, 
errors: 0};
+
+// counters for latency computation
+const latCounters = {
+                    ta: {sum: undefined, sumSqr: undefined, min: undefined, 
max: undefined},
+                    oea: {sum: undefined, sumSqr: undefined, min: undefined, 
max: undefined},
+                    oer: {sum: undefined, sumSqr: undefined, min: undefined, 
max: undefined},
+                    d: {sum: undefined, sumSqr: undefined, min: undefined, 
max: undefined},
+                    ad: {sum: undefined, sumSqr: undefined, min: undefined, 
max: undefined},
+                    ora: {sum: undefined, sumSqr: undefined, min: undefined, 
max: undefined},
+                    rtt: {sum: undefined, sumSqr: undefined, min: undefined, 
max: undefined},
+                    ortt: {sum: undefined, sumSqr: undefined, min: undefined, 
max: undefined}
+};
+
+const measurementTime = {start: -1, stop: -1};
+
+const sampleData = [];    // array of samples (tuples of collected invocation 
data, for rule or for action, depending on the activity)
+
+var loopSleeper;    // used to abort sleep in mainLoop()
+var abort = false;    // used to abort the loop in mainLoop()
+
+// Used only at the master
+var workerData = [];    // holds data for each worker, at [1..#workers]. 
Master's entry is 0.
+
+const activity = ((cluster.isWorker || !testRecord.input.master_activity) ? 
testRecord.input.activity : testRecord.input.master_activity);
+
+if (cluster.isMaster)
+    runMaster();
+else
+    runWorker();
+
+// -------- END OF MAIN -------------
+
+/**
+ * Master operation
+ */
+function runMaster() {
+
+    // Setup OpenWhisk assets for the test
+    testSetup().then(() => {
+
+        // Start workers, configure interaction
+        for(var i = 0; i < testRecord.input.workers; i++) {
+            if (i > 0)        // fork only (workers - 1) times
+                cluster.fork();
+        }
+
+        for (const id in cluster.workers) {
+
+            // Exit handler for each worker
+            cluster.workers[id].on('exit', (code, signal) => {
+                if (signal)
+                    mLog(`Worker ${id} was killed by signal: ${signal}`);
+                else
+                    if (code !== 0)
+                        mLog(`Worker ${id} exited with error code: ${code}`);
+                checkExit();
+            });
+
+            // Message handler for each worker
+            cluster.workers[id].on('message', (msg) => {
+                if (msg.init)
+                    // Initialization barrier for workers. Makes sure they are 
all fully engaged when the measurement start
+                    checkInit();
+
+                if (msg.summary) {
+                    workerData[id] = msg.summary;
+                    checkSummary();
+                }
+            });
+        }
+
+        mainLoop().then(() => {
+
+            // set finish of measurement and notify all other workers
+            measurementTime.stop = new Date().getTime();
+            testRecord.output.measure_time = (measurementTime.stop - 
measurementTime.start) / 1000.0;    // measurement duration converted to seconds
+            mLog(`Stop measurement. Start post-processing after 
${testRecord.input.pp_delay} msec`);
+            mLogSampleHeader();
+            for (const j in cluster.workers)
+                cluster.workers[j].send({abort: measurementTime});
+
+            // The master's post-processing to generate its workerData
+            sleep(testRecord.input.pp_delay)
+                .then(() => {
+                    postProcess()
+                    .then(() => {
+                        // The master's workerData
+                        workerData[0] = {lat: latCounters, tp: tpCounters};
+                        checkSummary();
+                    })
+                    .catch(err => {    // FATAL - shouldn't happen unless BUG
+                        mLog(`Post-process ERROR in MASTER: ${err}`);
+                        throw err;
+                    });
+                });
+        });
+
+    });
+
+}
+
+
+/**
+ * Setup assets before the test depending on configuration
+ */
+async function testSetup() {
+
+    if (!testRecord.input.setup)
+        return;
+
+    const cmd = `./setup.sh s ${testRecord.input.ratio} ${wskParams}`;
+    mLog(`SETUP: ${cmd}`);
+
+    try {
+        await exec(cmd);
+    }
+    catch (error) {
+        mLog(`FATAL: setup failure - ${error}`);
+        process.exit(-2);
+    }
+}
+
+
+/**
+ * Teardown assets after the test depending on configuration
+ */
+async function testTeardown() {
+
+    if (!testRecord.input.teardown)
+        return;
+
+    const cmd = `./setup.sh t ${testRecord.input.ratio} ${wskParams}`;
+    mLog(`TEARDOWN: ${cmd}`);
+
+    try {
+        await exec(cmd);
+    }
+    catch (error) {
+        mLog(`WARNING: teardown error - ${error}`);
+        process.exit(-3);
+    }
+}
+
+
+/**
+ * Print table header for samples to the runtime log on stderr
+ */
+function mLogSampleHeader() {
+    
mLog("bi,\tas,\tae,\tts,\tta,\toea,\toer,\td,\tad,\tai,\tora,\trtt,\tortt");
+}
+
+/**
+ * Worker operation
+ */
+function runWorker() {
+
+    // abort message from master will set the measurement time frame and abort 
the loop
+    process.on('message', (msg) => {
+        if (msg.abort) {
+            // Set the measurement time frame at the worker - required for 
post-processing
+            measurementTime.start = msg.abort.start;
+            measurementTime.stop = msg.abort.stop;
+            abortLoop();
+        }
+    });
+
+    mainLoop().then(() => {
+        sleep(testRecord.input.pp_delay)
+            .then(() => {
+                postProcess()
+                    .then(() => {
+                        process.send({summary:{lat: latCounters, 
tp:tpCounters}});
+                        process.exit();
+                    })
+                    .catch(err => {    // shouldn't happen unless BUG
+                        mLog(`Post-process ERROR in WORKER: ${err}`);
+                        throw err;
+                    });
+                });
+    });
+}
+
+
+// Barrier for checking all workers have initialized and then start measurement
+
+var remainingInits = testRecord.input.workers;
+var remainingIterations = -1;
+
+function checkInit() {
+    remainingInits--;
+    if (remainingInits == 0) {    // all workers are engaged (incl. master) - 
can start measurement
+        mLog("All clients finished warmup. Start measurement.");
+        measurementTime.start = new Date().getTime();
+
+        if (testRecord.input.period)
+            setTimeout(abortLoop, testRecord.input.period);
+
+        if (testRecord.input.iterations)
+            remainingIterations = testRecord.input.iterations;
+    }
+}
+
+// Barrier for checking all workers have finished, generate output and exit
+
+var remainingExits = testRecord.input.workers;
+
+function checkExit() {
+    remainingExits--;
+    if (remainingExits == 0) {
+        mLog("All workers finished - generating output and exiting.");
+        generateOutput();
+        // Cleanup test assets from OW and then exit
+        testTeardown().then(() => {
+            mLog("Done");
+            process.exit();
+        });
+    }
+}
+
+// Barrier for receiving post-processing results from all workers before 
computing final results
+
+var remainingSummaries = testRecord.input.workers;
+
+function checkSummary() {
+    remainingSummaries--;
+    if (remainingSummaries == 0) {
+        mLogSampleHeader();
+        mLog("All clients post-processing completed - computing output.")
+        computeOutputRecord();
+        checkExit();
+    }
+}
+
+
+/**
+ * Main loop for invocations - invoke activity asynchronously once every 
(delta) msec until aborted
+ */
+async function mainLoop() {
+
+    var warmupCounter = testRecord.input.warmup;
+    const delta = ((cluster.isWorker || !testRecord.input.master_delta) ? 
testRecord.input.delta : testRecord.input.master_delta);
+    const blocking = ((cluster.isWorker || !testRecord.input.master_blocking) 
? testRecord.input.blocking : testRecord.input.master_blocking);
+    const doBlocking = (blocking != NONE);
+    const getResult = (blocking == RESULT);
+
+    while (!abort) {
+
+        // ----
+        // Pass init (worker - send message) after <warmup> iterations
+        if (warmupCounter == 0) {
+            if (cluster.isMaster)
+                checkInit();
+            else     // worker - send init
+                process.send({init: 1});
+        }
+
+        if (warmupCounter >= 0)        // take 0 down to -1 to make sure it 
does not trigger another init message
+            warmupCounter--;
+        // ----
+
+        // If iterations limit set, abort loop when finished iterations
+        if (remainingIterations == 0) {
+            abortLoop();
+            continue;
+        }
+
+        if (remainingIterations > 0)
+            remainingIterations--;
+
+        const si = new Date().getTime();    // SI = Start of Iteration 
timestamp
+
+        var samples;
+
+        if (activity == ACTION)
+            samples = await invokeActions(testRecord.input.ratio, doBlocking, 
getResult, si);
+        else
+            samples = await invokeRules(si);
+
+        samples.forEach(sample => {
+            sampleData.push(sample);
+        });
+
+        const ei = new Date().getTime();    // EI = End of Iteration timestamp
+        const duration = ei - si;
+        if (delta > duration) {
+            loopSleeper = sleep(delta - duration);
+            if (!abort)        // check again to avoid race condition on 
loopSleeper
+                await loopSleeper;
+        }
+    }
+}
+
+
+/**
+ * Used to abort the mainLoop() function
+ */
+function abortLoop() {
+    abort = true;
+    if (loopSleeper)
+        loopSleeper.resolve();
+}
+
+
+/**
+ * Invoke the predefined OW action a specified number of times without waiting 
using Promises (burst).
+ * Returns a promise that resolves to an array of {id, isError}.
+ */
+function invokeActions(count, doBlocking, getResult, burst_bi) {
+    return new Promise( function (resolve, reject) {
+        var ipa = [];    // array of invocation promises;
+        for(var i = 0; i< count; i++) {
+            ipa[i] = new Promise((resolve, reject) => {
+                const bi = (testRecord.input.burst_timing ? burst_bi : new 
Date().getTime());  // default is BI per invocation
+                ow.actions.invoke({name: 'testAction', blocking: doBlocking, 
result: getResult, params: params})
+                    // If returnedJSON is full activation or just activation 
ID then activation ID should be in "activationId" field
+                    // If returnedJSON is the result of the test action, then 
"activationId" is part of the returned result of the test action
+                    .then(returnedJSON => {
+                        var ai;    // after invocation
+                        if (doBlocking)
+                            ai = new Date().getTime();     // only for 
blocking invocations, AI is meaningful
+                        resolve({aaid: returnedJSON.activationId, bi: bi, ai: 
ai});
+                    })
+                    .catch(err => {
+                        resolve({aaidError: err});
+                    });
+            });
+        }
+
+        Promise.all(ipa).then(ipArray => {
+            resolve(ipArray);
+        }).catch(err => {    // Impossible to reach since no contained promise 
rejects
+            reject(err);
+        });
+
+    });
+}
+
+
+/**
+ * Invoke the predefined OW rules asynchronously and return a promise of an 
array with a single element of {id, isError}
+ */
+function invokeRules(bi) {
+    return new Promise( function (resolve, reject) {
+        const triggerSamples = [];
+        // Fire trigger to invoke the rule
+        ow.triggers.invoke({name: 'testTrigger', params: params})
+            .then(triggerActivationIdJSON => {
+                const triggerActivationId = 
triggerActivationIdJSON.activationId;
+                triggerSamples.push({taid: triggerActivationId, bi: bi});
+                resolve(triggerSamples);
+            })
+            .catch (err => {
+                triggerSamples.push({taidError: err});
+                resolve(triggerSamples);
+            });
+    });
+}
+
+
+/**
+ * This function processes the sampleData. Each sample is processed as 
following:
+ * 1. A sample with error (TAID or AAID) is processed directly (not much to do 
beyond counting errors)
+ * 2. An action sample - first attempt to retrieve activation, then process 
with it
+ * 3. A rule sample - first convert to set of bound action samples (by 
processing the trigger activation), then process each action sample in step 2 
above
+  */
+async function postProcess() {
+    for(var i in sampleData) {
+        const sample = sampleData[i];
+        if (activity == ACTION) {
+            await processSampleWithAction(sample);
+        }
+        else {        // activity == RULE
+            if (sample.taidError)    // TAID error - no need to retrieve bound 
actions - move to process the sample directly
+                processSample(sample);
+            else {    // have valid TAID - retrieve bound action ids and then 
process
+                const actionSamples = await getActionSamplesOfRules(sample);
+                for(var j in actionSamples)
+                    await processSampleWithAction(actionSamples[j]);
+            }
+        }
+    }
+}
+
+
+/**
+ * Retrieve the activation ids of the actions bound to the trigger activation 
provided by id.
+ * Failure to retrieve trigger activation for a valid activation id is 
considered a fatal error, since the activation must exist.
+ * @param {*} triggerActivation
+ */
+function getActionSamplesOfRules(triggerSample) {
+    return new Promise((resolve, reject) => {
+        ow.activations.get({name: triggerSample.taid})
+            .then(triggerActivation => {
+                triggerSample.ts = triggerActivation.start;
+                var actionSamples = [];
+                for(var i = 0; i < triggerActivation.logs.length; i++) {
+                    const boundActionRecord = 
JSON.parse(triggerActivation.logs[i]);
+                    const actionSample = Object.assign({}, triggerSample);
+                    if (boundActionRecord.success)
+                        actionSample.aaid = boundActionRecord.activationId;
+                    else
+                        actionSample.aaidError = boundActionRecord.error;
+                    actionSamples.push(actionSample);
+                }
+                resolve(actionSamples);
+            })
+            .catch (err =>    {    // FATAL: failed to retrieve trigger 
activation for a valid id
+                mLog(`getActionSamplesOfRules returned ERROR: ${err}`);
+                reject(err);
+            });
+    });
+}
+
+
+/**
+ * Processing each action sample sequentially, i.e., wait until activation is 
retrieved before retrieving the next one.
+ * Otherwise, concurrent retrieval of possibly thousands of activations and 
more, may cause issues.
+ * Failure to retrieve activation record for a valid id is ok, assuming the 
action may have not completed yet.
+ * @param {*} actionSample
+ */
+async function processSampleWithAction(actionSample) {
+    if (actionSample.aaidError)    // no activation, move on to processing 
sample with error
+        processSample(actionSample);
+    else {    // have activation, try to get record
+        var activation;
+        try {
+            activation = await ow.activations.get({name: actionSample.aaid});
+        }
+        catch (err) {
+            mLog(`Failed to retrieve activation for id: ${actionSample.aaid} 
for reason: ${err}`);
+        }
+        processSample(actionSample, activation);
+    }
+}
+
+
+/**
+ * Process a single sample + optional related action activation, updating 
latency and throughput counters
+ * @param {*} sample
+ */
+function processSample(sample, activation) {
+
+    const bi = sample.bi;
+
+    if (bi < measurementTime.start || bi > measurementTime.stop)    {    // BI 
outside time frame. No further processing.
+        mLog(`Sample discarded. BI exceeds measurement time frame`);
+        return;
+    }
+
+    tpCounters.attempts++;    // each sample invoked in the time frame counts 
as one invocation attempt
+
+    if (sample.taidError) {    // trigger activation failed - count one 
request, one error. No further processing.
+        tpCounters.requests++;
+        tpCounters.errors++;
+        mLog(`Sample discarded. Trigger activation error: 
${sample.taidError}`);
+        return;
+    }
+
+    var ts;
+    if (sample.ts) {
+        ts = parseInt(sample.ts);
+
+        if (ts >= measurementTime.start && ts <= measurementTime.stop) {    // 
trigger activation in time frame - count one activation, one request
+            tpCounters.activations++;
+            tpCounters.requests++;
+        }
+    }
+    else
+        ts = undefined;
+
+    if (sample.aaidError) {    // action activation failed - count one 
request, one error. No further processing.
+        tpCounters.requests++;
+        tpCounters.errors++;
+        mLog(`Sample discarded. Action activation error: ${sample.aaidError}`);
+        return;
+    }
+
+    if (!activation) {    // no activation, so assumed incomplete. No further 
processing.
+        mLog(`Sample discarded. Activation was not retrieved.`)
+        return;
+    }
+
+    const as = parseInt(activation.start);
+    const ae = parseInt(activation.end);
+    const d = parseInt(activation.response.result.duration);
+
+    if (as < measurementTime.start || ae > measurementTime.stop) {    // got 
activation, but it exceeds the time frame. No further processing.
+        mLog(`Sample discarded. Action activation exceeded measurement time 
frame.`)
+        return;
+    }
+
+    // Activation is in time frame, so count one activation, one request and 
one full invocation
+    tpCounters.activations++;
+    tpCounters.requests++;
+    tpCounters.invocations++;
+
+    // For full invocations, update latency counters
+
+    const ta = (ts ? as - ts : undefined);
+    const ad = ae - as;
+    const oea = as - bi;
+    const oer = ae - bi - d;
+
+    updateLatSample("d", d);
+    updateLatSample("ta", ta);
+    updateLatSample("ad", ad);
+    updateLatSample("oea", oea);
+    updateLatSample("oer", oer);
+
+    // for blocking action invocations - will be "undefined" otherwise
+    const ai = sample.ai;
+
+    const ora = (ai ? ai - ae : undefined);
+    const rtt = (ai ? ai - bi : undefined);
+    const ortt = (rtt ? rtt - d : undefined);
+
+    updateLatSample("ora", ora);
+    updateLatSample("rtt", rtt);
+    updateLatSample("ortt", ortt);
+
+    
mLog(`${bi},\t${as},\t${ae},\t${ts},\t${ta},\t${oea},\t${oer},\t${d},\t${ad},\t${ai},\t${ora},\t${rtt},\t${ortt}`);
+}
+
+/**
+ * Update counters of one latency statistic of a worker with value data from 
one sample
+ */
+function updateLatSample(statName, value) {
+
+    if (!value)     // value == undefined => skip it
+        return;
+
+    // Update sum for avg
+    if (!latCounters[statName].sum)
+        latCounters[statName].sum = 0;
+    latCounters[statName].sum += value;
+
+    // Update sumSqr for std
+    if (!latCounters[statName].sumSqr)
+        latCounters[statName].sumSqr = 0;
+    latCounters[statName].sumSqr += value * value;
+
+    // Update min value
+    if (!latCounters[statName].min || latCounters[statName].min > value)
+        latCounters[statName].min = value;
+
+    // Update max value
+    if (!latCounters[statName].max || latCounters[statName].max < value)
+        latCounters[statName].max = value;
+}
+
+
+/**
+ * Compute the final output record based on the workerData records.
+ * The output of the program is a single CSV row of data consisting of the 
input parameters,
+ * then latencies computed above - avg (average) and std (std. dev.), then 
throughput.
+ */
+function computeOutputRecord() {
+
+    // Latency stats: avg, std, min, max
+    ["ta", "oea", "oer", "d", "ad", "ora", "rtt", "ortt"].forEach(statName => {
+        testRecord.output[statName] = computeLatStats(statName);
+    });
+
+    // Tp stats: abs, tp, tpw, tpd
+    ["attempts", "invocations", "activations", "requests"].forEach(statName => 
{
+        testRecord.output[statName] = computeTpStats(statName);
+    });
+
+    // Error stats: abs, percent
+    testRecord.output.errors = computErrorStats();
+}
+
+
+/**
+ * Based on workerData, compute average and standard deviation of a given 
latency statistic.
+ * @param {*} statName
+ */
+function computeLatStats(statName) {
+    var totalSum = 0;
+    var totalSumSqr = 0;
+    var totalInvocations = 0;
+    var hasSamples = undefined;    // does the current stat have any samples. 
If not => undefined, not NaN
+    var min = undefined;
+    var max = undefined;
+    if (testRecord.input.master_apart) {    // in master_apart mode, only 
master performs latency measurements
+        totalSum = workerData[0].lat[statName].sum;
+        totalSumSqr = workerData[0].lat[statName].sumSqr;
+        min = workerData[0].lat[statName].min;
+        max = workerData[0].lat[statName].max;
+        totalInvocations = workerData[0].tp.invocations;
+    }
+    else // in regular mode, all workers participate in latency measurements
+        workerData.forEach(wd => {
+            if (wd.lat[statName].sum) {    // If this worker has valid latency 
samples (not undefined)
+                hasSamples = 1;
+                totalSum += wd.lat[statName].sum;
+                totalSumSqr += wd.lat[statName].sumSqr;
+                if (!min || min > wd.lat[statName].min)
+                    min = wd.lat[statName].min;
+                if (!max || max < wd.lat[statName].max)
+                    max = wd.lat[statName].max;
+            }
+            totalInvocations += wd.tp.invocations;
+        });
+
+    const avg = (hasSamples ? totalSum / totalInvocations : undefined);
+    const std = (hasSamples ? Math.sqrt(totalSumSqr / totalInvocations - avg * 
avg) : undefined);
+
+    return ({avg: avg, std: std, min: min, max: max});
+}
+
+
+/**
+ * Based on workerData, compute throughput of a given counter, with (tp) and 
without (tpw) the master, and the percent difference (tpd)
+ * @param {*} statName
+ */
+function computeTpStats(statName) {
+    var masterCount = workerData[0].tp[statName];
+    var totalCount = 0;
+    workerData.forEach(wd => {totalCount += wd.tp[statName];});
+    const tp = totalCount / testRecord.output.measure_time;            // 
throughput
+    const tpw = (totalCount - masterCount) / testRecord.output.measure_time;   
     // throughput without master
+    const tpd = (tp - tpw) * 100.0 / tp;        // percent difference relative 
to TP
+
+    return ({abs: totalCount, tp: tp, tpw: tpw, tpd: tpd});
+}
+
+
+/**
+ * Based on workerData, compute the relative portion of total errors out of 
total requests
+ */
+function computErrorStats() {
+    var totalErrors = 0;
+    var totalRequests = 0;
+
+    workerData.forEach(wd => {
+        totalErrors += wd.tp.errors;
+        totalRequests += wd.tp.requests;
+    });
+
+    const errAbs = totalErrors;
+    const errPer = totalErrors * 100.0 / totalRequests;
+    return ({abs: errAbs, percent: errPer});
+}
+
+
+/**
+ * Generate a properly formatted output record to stdout. The header is also 
printed, but via mDump to stderr and can be
+ * silenced.
+ */
+function generateOutput() {
+    var first = true;
+
+    // First, print header to stderr
+    dfsObject(testRecord, (name, data, isRoot, isObj) => {
+        if (!isObj) {        // print leaf nodes
+            if (!first)
+                mWrite(",\t");
+            first = false;
+            mWrite(`${name}`);
+        }
+    });
+    mWrite("\n");
+
+    first = true;
+
+    // Now, print data to stdout
+    dfsObject(testRecord, (name, data, isRoot, isObj) => {
+        if (!isObj) {        // print leaf nodes
+            if (!first)
+                process.stdout.write(",\t");
+            first = false;
+            if (typeof data == 'number')    // round each number to 3 decimal 
digits
+                data = round(data, 3);
+            process.stdout.write(`${data}`);
+        }
+    });
+    process.stdout.write("\n");
+}
+
+
+/**
+ * Sleep for a given time. Useful mostly with await from an async function
+ * resolve and reject are externalized as properties to allow early abortion
+ * @param {*} ms
+ */
+function sleep(ms) {
+    var res, rej;
+    var p = new Promise((resolve, reject) => {
+        setTimeout(resolve, ms);
+        res = resolve;
+        rej = reject;
+    });
+    p.resolve = res;
+    p.reject = rej;
+
+    return p;
+  }
+
+
+/**
+ * Generate a random integer in the range of [1..max]
+ * @param {*} max
+ */
+function getRandomInt(max) {
+    return Math.floor(Math.random() * Math.floor(max) + 1);
+  }
+
+
+/**
+ * Round a number after specified decimal digits
+ * @param {*} num
+ * @param {*} digits
+ */
+  function round(num, digits = 0) {
+    const factor = Math.pow(10, digits);
+    return Math.round(num * factor) / factor;
+}
+
+
+// If not quiet, emit control messages on stderr (with newline)
+function mLog(text) {
+    if (!testRecord.input.quiet)
+        console.error(`${clientId()}:\t${text}`);
+}
+
+
+/**
+ * Return the id of the client - MASTER-0 or WORKER-k (k=1..w-1)
+ */
+function clientId() {
+    return (cluster.isMaster ? "MASTER-0" : `WORKER-${cluster.worker.id}`);
+}
+
+
+// If not quiet, write strings on stderr (w/o newline)
+function mWrite(text) {
+    if (!testRecord.input.quiet)
+        process.stderr.write(text);
+}
+
+/**
+ * Traverse a (potentially deep) object in DFS, visiting each non-function 
node with function f
+ * @param {*} data
+ * @param {*} func
+ */
+function dfsObject(data, func, allowInherited = false) {
+    var isRoot = true;
+    var rootObj = data;
+    crawlObj("", data, func, allowInherited);
+
+    function crawlObj(name, data, f, allowInherited) {
+        var isObj = (typeof data == 'object');
+        var isFunc = (typeof data == 'function');
+        if (!isFunc)
+            f(name, data, isRoot, isObj);    // visit the current node
+        isRoot = false;
+        if (isObj)
+            for (var child in data) {
+                if (allowInherited || data.hasOwnProperty(child)) {
+                    const childName = (name == "" ? child : name + "." + 
child);
+                    crawlObj(childName, data[child], f, true);    // After 
root level no need to check inheritance
+                }
+            }
+    }
+}
diff --git a/tools/owperf/owperf.sh b/tools/owperf/owperf.sh
new file mode 100755
index 0000000..fa9ffb6
--- /dev/null
+++ b/tools/owperf/owperf.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This is a simple launch script for owperf
+
+node owperf.js $@
diff --git a/tools/owperf/owperf_data.odg b/tools/owperf/owperf_data.odg
new file mode 100644
index 0000000..bada864
Binary files /dev/null and b/tools/owperf/owperf_data.odg differ
diff --git a/tools/owperf/owperf_data.png b/tools/owperf/owperf_data.png
new file mode 100644
index 0000000..754cb51
Binary files /dev/null and b/tools/owperf/owperf_data.png differ
diff --git a/tools/owperf/package.json b/tools/owperf/package.json
new file mode 100644
index 0000000..4e4db61
--- /dev/null
+++ b/tools/owperf/package.json
@@ -0,0 +1,24 @@
+{
+  "name": "owperf",
+  "version": "1.0.0",
+  "description": "owperf - a performance evaluation tool for Apache OpenWhisk",
+  "main": "owperf.js",
+  "scripts": {
+    "test": "echo \"Error: no test specified\" && exit 1"
+  },
+  "author": {
+    "name": "Erez Hadad",
+    "email": "er...@il.ibm.com"
+  },
+  "license": "Apache-2.0",
+  "dependencies": {
+    "btoa": "^1.2.1",
+    "child-process-promise": "^2.2.1",
+    "cluster": "^0.7.7",
+    "commander": "^2.19.0",
+    "ini": "^1.3.5",
+    "node-exec-promise": "^1.0.2",
+    "openwhisk": "^3.18.0",
+    "xmlhttprequest": "^1.8.0"
+  }
+}
diff --git a/tools/owperf/setup.sh b/tools/owperf/setup.sh
new file mode 100755
index 0000000..ee90c68
--- /dev/null
+++ b/tools/owperf/setup.sh
@@ -0,0 +1,81 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -x
+
+# Setup for overhead test: create action, create trigger, and create a number 
of rules as required.
+# This script performs both setup and teardown
+# Designed to be an idempotent operation (can be applied repeatedly for the 
same result)
+# Usage: setup.sh [op] [ratio] <wsk global flags>
+# op - MANDATORY. "s" for setup, "t" for teardown
+# ratio - MANDATORY. ratio as defined in README.md
+# wsk global flags - OPTIONAL. Global flags for the wsk command (e.g. for 
specifying non-default wsk API host, auth, etc)
+
+MAXRULES=30    # assume no more than 30 rules per trigger max
+op=$1          # s for setup, t for teardown
+count=$2       # ratio for rules
+delcount=$count # For teardown, delete ratio rules. For setup, delete MAXRULES
+if [ "$op" = "s" ]; then
+       delcount=$MAXRULES
+fi
+
+shift 2
+wskparams="$@" # All other parameters are assumed to be OW-specific
+
+
+function remove_assets() {
+
+       # Delete rules
+       for i in $(seq 1 $delcount); do
+               wsk rule delete testRule_$i $@;
+       done
+
+       # Delete trigger
+       wsk trigger delete testTrigger $@
+
+       # Delete action
+       wsk action delete testAction $@
+
+}
+
+
+function deploy_assets() {
+
+       # Create action
+       wsk action create testAction testAction.js --kind nodejs:8 $@
+
+       # Create trigger after deleting it
+       wsk trigger create testTrigger $@
+
+       # Create rules
+       for i in $(seq 1 $count); do
+               wsk rule create testRule_$i testTrigger testAction $@;
+       done
+
+}
+
+
+# Always start with removal of existing assets
+remove_assets
+
+# If setup requested, deploy new assets
+if [ "$op" = "s" ]; then
+       deploy_assets
+fi
+
diff --git a/tools/owperf/testAction.js b/tools/owperf/testAction.js
new file mode 100644
index 0000000..2ded797
--- /dev/null
+++ b/tools/owperf/testAction.js
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Default test action for owperf. Sleeps specified time.
+ * All test actions should return the invocation parameters (to stress the 
return path), but augmented with the execution duration and the activation id.
+ * Use this code as reference if you want to create a custom test action.
+ */
+
+function sleep(ms) {
+    return new Promise(resolve => setTimeout(resolve, ms));
+}
+
+async function main(params) {
+    var start = new Date().getTime();
+    params.activationId = process.env.__OW_ACTIVATION_ID;
+    await sleep(parseInt(params.sleep));
+    var end = new Date().getTime();
+    params.duration = end - start;
+    return params;
+}
+
+// Invoke main when runnig - only when setting TEST in the env
+if (process.env.TEST)
+    main({sleep:50}).then(params => console.log(params.duration));
+

Reply via email to