This is an automated email from the ASF dual-hosted git repository. rabbah pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new bb01428 Add owperf as a performance evaluation tool for OpenWhisk (#4320) bb01428 is described below commit bb01428bca92f1f0dc2b4b8a710be52de1d6ba47 Author: Erez Hadad <er...@il.ibm.com> AuthorDate: Wed Apr 3 05:08:07 2019 +0300 Add owperf as a performance evaluation tool for OpenWhisk (#4320) --- tools/owperf/README.md | 101 +++++ tools/owperf/owperf.js | 865 +++++++++++++++++++++++++++++++++++++++++++ tools/owperf/owperf.sh | 22 ++ tools/owperf/owperf_data.odg | Bin 0 -> 17721 bytes tools/owperf/owperf_data.png | Bin 0 -> 171488 bytes tools/owperf/package.json | 24 ++ tools/owperf/setup.sh | 81 ++++ tools/owperf/testAction.js | 41 ++ 8 files changed, 1134 insertions(+) diff --git a/tools/owperf/README.md b/tools/owperf/README.md new file mode 100644 index 0000000..70f12b7 --- /dev/null +++ b/tools/owperf/README.md @@ -0,0 +1,101 @@ +<!-- +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +--> +# :electric_plug: owperf - a performance test tool for Apache OpenWhisk + +## General Info +This test tool benchmarks an OpenWhisk deployment for (warm) latency and throughput, with several new capabilities: +1. Measure performance of rules (trigger-to-action) in addition to actions +1. Deeper profiling without instrumentation (e.g., Kamino) by leveraging the activation records in addition to the client's timing data. This avoids special setups, and can help gain performance insights on third-party deployments of OpenWhisk. +1. New tunables that can affect performance: + 1. Parameter size - controls the size of the parameter passed to the action or event + 1. Actions per iteration (a.k.a. _ratio_) - controls how many rules are associated with a trigger [for rules] or how many actions are asynchronously invoked (burst size) at each iteration of a test worker [for actions]. +1. "Master apart" mode - Allow the master client to perform latency measurements while the worker clients stress OpenWhisk using a specific invocation pattern in the background. Useful for measuring latency under load, and for comparing latencies of rules and actions under load. +The tool is written in node.js, using mainly the modules of OpenWhisk client, cluster for concurrency, and commander for CLI procssing. + +### Operation +The general operation of a test is simple: +1. **Setup**: the tool creates the test action, test trigger, and a number of rules that matches the ratio tunable above. +1. **Test**: the tool fires up a specified number of concurrent clients - a master and workers. + 1. Each client wakes up once every _delta_ msec (iteration) and invokes the specified activity: either the trigger (for rule testing) or multiple concurrent actions - matching the ratio tunable. Action invocations can be blocking. + 1. After each client has completed a number of initial iterations (warmup), measurement begins, controlled by the master client, for either a specified number of iterations or specified time. + 1. At the end of the measurement, each client retrieves the activation records of its triggers and/or actions, and generates summary data that is sent to the master, which generates and prints the final results. +1. **Teardown**: clean up the OpenWhisk assets created during setup + +Final results are written to the standard output stream (so can be redirected to a file) as a single highly-detailed CSV record containing all the input settings and the output measurements (see below). There is additional control information that is written to the standard error stream and can be silenced in CLI. The control information also contains the CSV header, so it can be copied into a spreadsheet if needed. + +It is possible to invoke the tool in "Master apart" mode, where the master client is invoking a diffrent activity than the workers, and at possibly a different (very likely, much slower) rate. In this mode, latency statsitics are computed based solely on the master's data, since the worker's activity is used only as background to stress the OpenWhisk deployment. So one experiment can have the master client invoke rules and another one can have the master client invoke actions, while in b [...] + +The tool is highly customizable via CLI options. All the independent test variables are controlled via CLI. This includes number of workers, invocation pattern, OW client configuration, test action sleep time, etc. + +Test setup and teardown can be independently skipped via CLI, and/or directly invoked from the external setup script (```setup.sh```), so that setup can be shared between multiple tests. More advanced users can replace the test action with a custom action in the setup script to benchmark action invocation or event-respose throughput and latency of specific applications. + +**Clock skew**: OpenWhisk is a distributed system, which means that clock skew is expected between the client machine computing invocation timestamps and the controllers or invokers that generate the timestamps in the activation records. However, this tool assumes that clock skew is bound at few msec range, due to having all machines clocks synchronized, typically using NTP. At such a scale, clock skew is quite small compared to the measured time periods. Some of the time periods are mea [...] + +## Initial Setup +The tool requires very little setup. You need to have node.js (v8+) and the wsk CLI client installed (on $PATH). Before the first run, execute ```npm install``` in the tool folder to install the dependencies. +**Throttling**: By default, OW performance is throttled according to some [limits](https://github.com/apache/incubator-openwhisk/blob/master/docs/reference.md#system-limits), such as maximum number of concurrent requests, or maximum invocations per minute. If your benchmark stresses OpenWhisk beyond the limit value, you might want to relax those limits. If it's an OpenWhisk deployment that you control, you can set the limits to 999999, thereby effectively cancelling the limits. If it's a [...] + +## Usage +To use the tool, run ```./owperf.sh <options>``` to perform a test. To see all the available options and defaults run ```./owperf.sh -h```. + +The default for ratio is 1. If using a different ratio, be sure to specify the same ratio value for all steps. + +For example, let's perform a test of rule performance with 3 clients, using the default delta of 200 msec, for 100 iterations (counted at the master client, excluding the warmup), ratio of 4. Each client performs 5 iterations per second, each iteration firing a trigger that invokes 4 rules, yielding a total of 3x5x4=60 rule invocations per second. The command to run this test: ```./owperf.sh -a rule -w 3 -i 100 -r 4``` + +## Measurements +As explained above, the owperf tool collects both latency and throughput data at each experiment. + +### Latency +The following time-stamps are collected for each invocation, of either action, or rule (containing an action): +* **BI** (Before Invocation) - taken by a client immediately before invoking - either the trigger fire (for rules), or an action invocation. +* **TS** (Trigger Start) - taken from the activation record of the trigger linked to the rules, so applies only to rule tests. All actions invoked by the rules of the same trigger have the same TS value. +* **AS** (Action Start) - taken from the activation record of the action. +* **AE** (Action End) - taken from the activation record of the action. +* **AI** (After Invocation) - taken by the client immmediately after the invocation, for blocking action invocation tests only. + +Based on these timestamps, the following measurements are taken: +* **OEA** (Overhead of Entering Action) - OpenWhisk processing overhead from sending the action invocation or trigger fire to the beginning of the action execution. OEA = AS-BI +* **D** - the duration of the test action - as reported by the action itself in the return value. +* **AD** - Action Duration - as measured by OpenWhisk invoker. AD = AE - AS. Always expect that AD >= D. +* **OER** (Overhead of Executing Request) - OpenWhisk processing overhead from sending the action invocation or trigger fire to the completion of the action execution in the OpenWhisk Invoker. OER = AE-BI-D +* **TA** (Trigger to Answer) - the processing time from the start of the trigger process to the start of the action (rule tests only). TA = AS-TS +* **ORA** (Overhead of Returning from Action) - time from action end till being received by the client (blocking action tests only). ORA = AI - AE +* **RTT** (Round Trip Time) - time at the client from action invocation till reply received (blocking action tests only). RTT = AI - BI +* **ORTT** (Overhead of RTT) - RTT at the client exclugin the net action computation time. ORTT = RTT - D + +For each measurement, the tool computes average (_avg_), standard deviation (_std_), and extremes (_min_ and _max_). + +The following chart depicts the relationship between the various measurements and the action invocation and rule invocation flows. + +![](owperf_data.png) + +### Throughput +Throughput is measured w.r.t. several different counters. During post-processing of an experiment, each counter value is divided by the measurement time period to compute a respective throughput. +* **Attempts** - number of invocation attempts performed inside the time frame (according to their BI). This is the "arrival rate" of invocations, should be close to _clients * ratio / delta_ . +* **Requests** - number of requests sent to OpenWhisk inside the time frame. Each action invocation is one request, and each trigger fire is also one request (so a client invoking rules at ratio _k_ generates _k+1_ requests). +* **Activations** - number of completed activations inside the time frame, counting both trigger activations (based on TS), and action activations (based on AS and AE). +* **Invocations** - number of successful invocations of complete rules or actions (depending on the activity). This is the "service rate" of invocations (assuming errors happen only because OW is overloaded). + +For each counter, the tool reports the total counter value (_abs_), total throughput per second (_tp_), througput of the worker clients without the master (_tpw_) and the master's percentage of throughput relative to workers (_tpd_). The last two values are important mostly for master apart mode. + +Aside from that, the tool also counts **errors**. Failed invocations - of actions, of triggers, or of actions from triggers (via rules) are counted each as an error. The tool reports both absolute error count (_abs_) and percent out of requests (_percent_). + +## Acknowledgements +The owperf tool has been developed by IBM Research as part of the [CLASS](https://class-project.eu/) EU project. CLASS aims to integrate OpenWhisk as a foundation for latency-sensitive polyglot event-driven big-data analytics platform running on a compute continuum from the cloud to the edge. CLASS is funded by the European Union's Horizon 2020 Programme grant agreement No. 780622. + diff --git a/tools/owperf/owperf.js b/tools/owperf/owperf.js new file mode 100644 index 0000000..9b529e4 --- /dev/null +++ b/tools/owperf/owperf.js @@ -0,0 +1,865 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * This is a test tool for measuring the performance of OpenWhisk actions and rules. + * The full documentation of the tool is available in README.md . + */ + +const fs = require('fs'); +const ini = require('ini'); +const cluster = require('cluster'); +const openwhisk = require('openwhisk'); +const program = require('commander'); +const exec = require('node-exec-promise').exec; + +const ACTION = "action"; +const RULE = "rule"; +const RESULT = "result"; +const ACTIVATION = "activation"; +const NONE = "none"; + +function parseIntDef(strval, defval) { + return parseInt(strval); +} + +program + .description('Latency and throughput measurement of OpenWhisk actions and rules') + .version('0.0.1') + .option('-a, --activity <action/rule>', "Activity to measure", /^(action|rule)$/i, "action") + .option('-b, --blocking <result/activation/none>', "For actions, wait until result or activation, or don't wait", /^(result|activation|none)$/i, "none") + .option('-d, --delta <msec>', "Time diff between consequent invocations of the same worker, in msec", parseIntDef, 200) + .option('-i, --iterations <count>', "Number of measurement iterations", parseInt) + .option('-p, --period <msec>', "Period of measurement in msec", parseInt) + .option('-r, --ratio <count>', "How many actions per iteration (or rules per trigger)", parseIntDef, 1) + .option('-s, --parameter_size <size>', "Size of string parameter passed to trigger or actions", parseIntDef, 1000) + .option('-w, --workers <count>', "Total number of concurrent workers incl. master", parseIntDef, 1) + .option('-A, --master_activity <action/rule>', "Set master activity apart from other workerss", /^(action|rule)$/i) + .option('-B, --master_blocking <result/activation/none>', "Set master blocking apart from other workers", /^(result|activation|none)$/i) + .option('-D, --master_delta <msec>', "Set master delta apart from other workers", parseInt) + .option('-u, --warmup <count>', "How many invocations to perform at each worker as warmup", parseIntDef, 5) + .option('-l, --delay <msec>', "How many msec to delay at each action", parseIntDef, 50) + .option('-P --pp_delay <msec>', "Wait for remaining activations to finalize before post-processing", parseIntDef, 60000) + .option('-G --burst_timing', "For actions, use the same invocation timing (BI) for all actions in a burst") + .option('-S --no-setup', "Skip test setup (so use previous setup)") + .option('-T --no-teardown', "Skip test teardown (to allow setup reuse)") + .option('-f --config_file <filepath>', "Specify a wskprops configuration file to use", `${process.env.HOME}/.wskprops`) + .option('-q, --quiet', "Suppress progress information on stderr"); + +program.parse(process.argv); + +var testRecord = {input: {}, output: {}}; // holds the final test data + +for (var opt in program.opts()) + if (typeof program[opt] != 'function') + testRecord.input[opt] = program[opt]; + +// If neither period nor iterations are set, then period is set by default to 1000 msec +if (!testRecord.input.iterations && !testRecord.input.period) + testRecord.input.period = 1000; + +// If either master_activity, master_blocking or master_delta are set, then test is in 'master apart' mode +testRecord.input.master_apart = ((testRecord.input.master_activity || testRecord.input.master_blocking || testRecord.input.master_delta) && true); + +mLog("Parameter Configuration:"); +for (var opt in testRecord.input) + mLog(`${opt} = ${testRecord.input[opt]}`); +mLog("-----\n"); + +mLog("Generating invocation parameters"); +var inputMessage = "A".repeat(testRecord.input.parameter_size); +var params = {sleep: testRecord.input.delay, message: inputMessage}; + +mLog("Loading wskprops"); +const config = ini.parse(fs.readFileSync(testRecord.input.config_file, "utf-8")); +mLog("APIHOST = " + config.APIHOST); +mLog("AUTH = " + config.AUTH); +mLog("-----\n"); +const wskParams = `--apihost ${config.APIHOST} --auth ${config.AUTH} -i`; // to be used when invoking setup and teardown via external wsk + +// openwhisk client used for invocations +const ow = openwhisk({apihost: config.APIHOST, api_key: config.AUTH, ignore_certs: true}); + +// counters for throughput computation (all) +const tpCounters = {attempts: 0, invocations: 0, activations: 0, requests: 0, errors: 0}; + +// counters for latency computation +const latCounters = { + ta: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, + oea: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, + oer: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, + d: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, + ad: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, + ora: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, + rtt: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined}, + ortt: {sum: undefined, sumSqr: undefined, min: undefined, max: undefined} +}; + +const measurementTime = {start: -1, stop: -1}; + +const sampleData = []; // array of samples (tuples of collected invocation data, for rule or for action, depending on the activity) + +var loopSleeper; // used to abort sleep in mainLoop() +var abort = false; // used to abort the loop in mainLoop() + +// Used only at the master +var workerData = []; // holds data for each worker, at [1..#workers]. Master's entry is 0. + +const activity = ((cluster.isWorker || !testRecord.input.master_activity) ? testRecord.input.activity : testRecord.input.master_activity); + +if (cluster.isMaster) + runMaster(); +else + runWorker(); + +// -------- END OF MAIN ------------- + +/** + * Master operation + */ +function runMaster() { + + // Setup OpenWhisk assets for the test + testSetup().then(() => { + + // Start workers, configure interaction + for(var i = 0; i < testRecord.input.workers; i++) { + if (i > 0) // fork only (workers - 1) times + cluster.fork(); + } + + for (const id in cluster.workers) { + + // Exit handler for each worker + cluster.workers[id].on('exit', (code, signal) => { + if (signal) + mLog(`Worker ${id} was killed by signal: ${signal}`); + else + if (code !== 0) + mLog(`Worker ${id} exited with error code: ${code}`); + checkExit(); + }); + + // Message handler for each worker + cluster.workers[id].on('message', (msg) => { + if (msg.init) + // Initialization barrier for workers. Makes sure they are all fully engaged when the measurement start + checkInit(); + + if (msg.summary) { + workerData[id] = msg.summary; + checkSummary(); + } + }); + } + + mainLoop().then(() => { + + // set finish of measurement and notify all other workers + measurementTime.stop = new Date().getTime(); + testRecord.output.measure_time = (measurementTime.stop - measurementTime.start) / 1000.0; // measurement duration converted to seconds + mLog(`Stop measurement. Start post-processing after ${testRecord.input.pp_delay} msec`); + mLogSampleHeader(); + for (const j in cluster.workers) + cluster.workers[j].send({abort: measurementTime}); + + // The master's post-processing to generate its workerData + sleep(testRecord.input.pp_delay) + .then(() => { + postProcess() + .then(() => { + // The master's workerData + workerData[0] = {lat: latCounters, tp: tpCounters}; + checkSummary(); + }) + .catch(err => { // FATAL - shouldn't happen unless BUG + mLog(`Post-process ERROR in MASTER: ${err}`); + throw err; + }); + }); + }); + + }); + +} + + +/** + * Setup assets before the test depending on configuration + */ +async function testSetup() { + + if (!testRecord.input.setup) + return; + + const cmd = `./setup.sh s ${testRecord.input.ratio} ${wskParams}`; + mLog(`SETUP: ${cmd}`); + + try { + await exec(cmd); + } + catch (error) { + mLog(`FATAL: setup failure - ${error}`); + process.exit(-2); + } +} + + +/** + * Teardown assets after the test depending on configuration + */ +async function testTeardown() { + + if (!testRecord.input.teardown) + return; + + const cmd = `./setup.sh t ${testRecord.input.ratio} ${wskParams}`; + mLog(`TEARDOWN: ${cmd}`); + + try { + await exec(cmd); + } + catch (error) { + mLog(`WARNING: teardown error - ${error}`); + process.exit(-3); + } +} + + +/** + * Print table header for samples to the runtime log on stderr + */ +function mLogSampleHeader() { + mLog("bi,\tas,\tae,\tts,\tta,\toea,\toer,\td,\tad,\tai,\tora,\trtt,\tortt"); +} + +/** + * Worker operation + */ +function runWorker() { + + // abort message from master will set the measurement time frame and abort the loop + process.on('message', (msg) => { + if (msg.abort) { + // Set the measurement time frame at the worker - required for post-processing + measurementTime.start = msg.abort.start; + measurementTime.stop = msg.abort.stop; + abortLoop(); + } + }); + + mainLoop().then(() => { + sleep(testRecord.input.pp_delay) + .then(() => { + postProcess() + .then(() => { + process.send({summary:{lat: latCounters, tp:tpCounters}}); + process.exit(); + }) + .catch(err => { // shouldn't happen unless BUG + mLog(`Post-process ERROR in WORKER: ${err}`); + throw err; + }); + }); + }); +} + + +// Barrier for checking all workers have initialized and then start measurement + +var remainingInits = testRecord.input.workers; +var remainingIterations = -1; + +function checkInit() { + remainingInits--; + if (remainingInits == 0) { // all workers are engaged (incl. master) - can start measurement + mLog("All clients finished warmup. Start measurement."); + measurementTime.start = new Date().getTime(); + + if (testRecord.input.period) + setTimeout(abortLoop, testRecord.input.period); + + if (testRecord.input.iterations) + remainingIterations = testRecord.input.iterations; + } +} + +// Barrier for checking all workers have finished, generate output and exit + +var remainingExits = testRecord.input.workers; + +function checkExit() { + remainingExits--; + if (remainingExits == 0) { + mLog("All workers finished - generating output and exiting."); + generateOutput(); + // Cleanup test assets from OW and then exit + testTeardown().then(() => { + mLog("Done"); + process.exit(); + }); + } +} + +// Barrier for receiving post-processing results from all workers before computing final results + +var remainingSummaries = testRecord.input.workers; + +function checkSummary() { + remainingSummaries--; + if (remainingSummaries == 0) { + mLogSampleHeader(); + mLog("All clients post-processing completed - computing output.") + computeOutputRecord(); + checkExit(); + } +} + + +/** + * Main loop for invocations - invoke activity asynchronously once every (delta) msec until aborted + */ +async function mainLoop() { + + var warmupCounter = testRecord.input.warmup; + const delta = ((cluster.isWorker || !testRecord.input.master_delta) ? testRecord.input.delta : testRecord.input.master_delta); + const blocking = ((cluster.isWorker || !testRecord.input.master_blocking) ? testRecord.input.blocking : testRecord.input.master_blocking); + const doBlocking = (blocking != NONE); + const getResult = (blocking == RESULT); + + while (!abort) { + + // ---- + // Pass init (worker - send message) after <warmup> iterations + if (warmupCounter == 0) { + if (cluster.isMaster) + checkInit(); + else // worker - send init + process.send({init: 1}); + } + + if (warmupCounter >= 0) // take 0 down to -1 to make sure it does not trigger another init message + warmupCounter--; + // ---- + + // If iterations limit set, abort loop when finished iterations + if (remainingIterations == 0) { + abortLoop(); + continue; + } + + if (remainingIterations > 0) + remainingIterations--; + + const si = new Date().getTime(); // SI = Start of Iteration timestamp + + var samples; + + if (activity == ACTION) + samples = await invokeActions(testRecord.input.ratio, doBlocking, getResult, si); + else + samples = await invokeRules(si); + + samples.forEach(sample => { + sampleData.push(sample); + }); + + const ei = new Date().getTime(); // EI = End of Iteration timestamp + const duration = ei - si; + if (delta > duration) { + loopSleeper = sleep(delta - duration); + if (!abort) // check again to avoid race condition on loopSleeper + await loopSleeper; + } + } +} + + +/** + * Used to abort the mainLoop() function + */ +function abortLoop() { + abort = true; + if (loopSleeper) + loopSleeper.resolve(); +} + + +/** + * Invoke the predefined OW action a specified number of times without waiting using Promises (burst). + * Returns a promise that resolves to an array of {id, isError}. + */ +function invokeActions(count, doBlocking, getResult, burst_bi) { + return new Promise( function (resolve, reject) { + var ipa = []; // array of invocation promises; + for(var i = 0; i< count; i++) { + ipa[i] = new Promise((resolve, reject) => { + const bi = (testRecord.input.burst_timing ? burst_bi : new Date().getTime()); // default is BI per invocation + ow.actions.invoke({name: 'testAction', blocking: doBlocking, result: getResult, params: params}) + // If returnedJSON is full activation or just activation ID then activation ID should be in "activationId" field + // If returnedJSON is the result of the test action, then "activationId" is part of the returned result of the test action + .then(returnedJSON => { + var ai; // after invocation + if (doBlocking) + ai = new Date().getTime(); // only for blocking invocations, AI is meaningful + resolve({aaid: returnedJSON.activationId, bi: bi, ai: ai}); + }) + .catch(err => { + resolve({aaidError: err}); + }); + }); + } + + Promise.all(ipa).then(ipArray => { + resolve(ipArray); + }).catch(err => { // Impossible to reach since no contained promise rejects + reject(err); + }); + + }); +} + + +/** + * Invoke the predefined OW rules asynchronously and return a promise of an array with a single element of {id, isError} + */ +function invokeRules(bi) { + return new Promise( function (resolve, reject) { + const triggerSamples = []; + // Fire trigger to invoke the rule + ow.triggers.invoke({name: 'testTrigger', params: params}) + .then(triggerActivationIdJSON => { + const triggerActivationId = triggerActivationIdJSON.activationId; + triggerSamples.push({taid: triggerActivationId, bi: bi}); + resolve(triggerSamples); + }) + .catch (err => { + triggerSamples.push({taidError: err}); + resolve(triggerSamples); + }); + }); +} + + +/** + * This function processes the sampleData. Each sample is processed as following: + * 1. A sample with error (TAID or AAID) is processed directly (not much to do beyond counting errors) + * 2. An action sample - first attempt to retrieve activation, then process with it + * 3. A rule sample - first convert to set of bound action samples (by processing the trigger activation), then process each action sample in step 2 above + */ +async function postProcess() { + for(var i in sampleData) { + const sample = sampleData[i]; + if (activity == ACTION) { + await processSampleWithAction(sample); + } + else { // activity == RULE + if (sample.taidError) // TAID error - no need to retrieve bound actions - move to process the sample directly + processSample(sample); + else { // have valid TAID - retrieve bound action ids and then process + const actionSamples = await getActionSamplesOfRules(sample); + for(var j in actionSamples) + await processSampleWithAction(actionSamples[j]); + } + } + } +} + + +/** + * Retrieve the activation ids of the actions bound to the trigger activation provided by id. + * Failure to retrieve trigger activation for a valid activation id is considered a fatal error, since the activation must exist. + * @param {*} triggerActivation + */ +function getActionSamplesOfRules(triggerSample) { + return new Promise((resolve, reject) => { + ow.activations.get({name: triggerSample.taid}) + .then(triggerActivation => { + triggerSample.ts = triggerActivation.start; + var actionSamples = []; + for(var i = 0; i < triggerActivation.logs.length; i++) { + const boundActionRecord = JSON.parse(triggerActivation.logs[i]); + const actionSample = Object.assign({}, triggerSample); + if (boundActionRecord.success) + actionSample.aaid = boundActionRecord.activationId; + else + actionSample.aaidError = boundActionRecord.error; + actionSamples.push(actionSample); + } + resolve(actionSamples); + }) + .catch (err => { // FATAL: failed to retrieve trigger activation for a valid id + mLog(`getActionSamplesOfRules returned ERROR: ${err}`); + reject(err); + }); + }); +} + + +/** + * Processing each action sample sequentially, i.e., wait until activation is retrieved before retrieving the next one. + * Otherwise, concurrent retrieval of possibly thousands of activations and more, may cause issues. + * Failure to retrieve activation record for a valid id is ok, assuming the action may have not completed yet. + * @param {*} actionSample + */ +async function processSampleWithAction(actionSample) { + if (actionSample.aaidError) // no activation, move on to processing sample with error + processSample(actionSample); + else { // have activation, try to get record + var activation; + try { + activation = await ow.activations.get({name: actionSample.aaid}); + } + catch (err) { + mLog(`Failed to retrieve activation for id: ${actionSample.aaid} for reason: ${err}`); + } + processSample(actionSample, activation); + } +} + + +/** + * Process a single sample + optional related action activation, updating latency and throughput counters + * @param {*} sample + */ +function processSample(sample, activation) { + + const bi = sample.bi; + + if (bi < measurementTime.start || bi > measurementTime.stop) { // BI outside time frame. No further processing. + mLog(`Sample discarded. BI exceeds measurement time frame`); + return; + } + + tpCounters.attempts++; // each sample invoked in the time frame counts as one invocation attempt + + if (sample.taidError) { // trigger activation failed - count one request, one error. No further processing. + tpCounters.requests++; + tpCounters.errors++; + mLog(`Sample discarded. Trigger activation error: ${sample.taidError}`); + return; + } + + var ts; + if (sample.ts) { + ts = parseInt(sample.ts); + + if (ts >= measurementTime.start && ts <= measurementTime.stop) { // trigger activation in time frame - count one activation, one request + tpCounters.activations++; + tpCounters.requests++; + } + } + else + ts = undefined; + + if (sample.aaidError) { // action activation failed - count one request, one error. No further processing. + tpCounters.requests++; + tpCounters.errors++; + mLog(`Sample discarded. Action activation error: ${sample.aaidError}`); + return; + } + + if (!activation) { // no activation, so assumed incomplete. No further processing. + mLog(`Sample discarded. Activation was not retrieved.`) + return; + } + + const as = parseInt(activation.start); + const ae = parseInt(activation.end); + const d = parseInt(activation.response.result.duration); + + if (as < measurementTime.start || ae > measurementTime.stop) { // got activation, but it exceeds the time frame. No further processing. + mLog(`Sample discarded. Action activation exceeded measurement time frame.`) + return; + } + + // Activation is in time frame, so count one activation, one request and one full invocation + tpCounters.activations++; + tpCounters.requests++; + tpCounters.invocations++; + + // For full invocations, update latency counters + + const ta = (ts ? as - ts : undefined); + const ad = ae - as; + const oea = as - bi; + const oer = ae - bi - d; + + updateLatSample("d", d); + updateLatSample("ta", ta); + updateLatSample("ad", ad); + updateLatSample("oea", oea); + updateLatSample("oer", oer); + + // for blocking action invocations - will be "undefined" otherwise + const ai = sample.ai; + + const ora = (ai ? ai - ae : undefined); + const rtt = (ai ? ai - bi : undefined); + const ortt = (rtt ? rtt - d : undefined); + + updateLatSample("ora", ora); + updateLatSample("rtt", rtt); + updateLatSample("ortt", ortt); + + mLog(`${bi},\t${as},\t${ae},\t${ts},\t${ta},\t${oea},\t${oer},\t${d},\t${ad},\t${ai},\t${ora},\t${rtt},\t${ortt}`); +} + +/** + * Update counters of one latency statistic of a worker with value data from one sample + */ +function updateLatSample(statName, value) { + + if (!value) // value == undefined => skip it + return; + + // Update sum for avg + if (!latCounters[statName].sum) + latCounters[statName].sum = 0; + latCounters[statName].sum += value; + + // Update sumSqr for std + if (!latCounters[statName].sumSqr) + latCounters[statName].sumSqr = 0; + latCounters[statName].sumSqr += value * value; + + // Update min value + if (!latCounters[statName].min || latCounters[statName].min > value) + latCounters[statName].min = value; + + // Update max value + if (!latCounters[statName].max || latCounters[statName].max < value) + latCounters[statName].max = value; +} + + +/** + * Compute the final output record based on the workerData records. + * The output of the program is a single CSV row of data consisting of the input parameters, + * then latencies computed above - avg (average) and std (std. dev.), then throughput. + */ +function computeOutputRecord() { + + // Latency stats: avg, std, min, max + ["ta", "oea", "oer", "d", "ad", "ora", "rtt", "ortt"].forEach(statName => { + testRecord.output[statName] = computeLatStats(statName); + }); + + // Tp stats: abs, tp, tpw, tpd + ["attempts", "invocations", "activations", "requests"].forEach(statName => { + testRecord.output[statName] = computeTpStats(statName); + }); + + // Error stats: abs, percent + testRecord.output.errors = computErrorStats(); +} + + +/** + * Based on workerData, compute average and standard deviation of a given latency statistic. + * @param {*} statName + */ +function computeLatStats(statName) { + var totalSum = 0; + var totalSumSqr = 0; + var totalInvocations = 0; + var hasSamples = undefined; // does the current stat have any samples. If not => undefined, not NaN + var min = undefined; + var max = undefined; + if (testRecord.input.master_apart) { // in master_apart mode, only master performs latency measurements + totalSum = workerData[0].lat[statName].sum; + totalSumSqr = workerData[0].lat[statName].sumSqr; + min = workerData[0].lat[statName].min; + max = workerData[0].lat[statName].max; + totalInvocations = workerData[0].tp.invocations; + } + else // in regular mode, all workers participate in latency measurements + workerData.forEach(wd => { + if (wd.lat[statName].sum) { // If this worker has valid latency samples (not undefined) + hasSamples = 1; + totalSum += wd.lat[statName].sum; + totalSumSqr += wd.lat[statName].sumSqr; + if (!min || min > wd.lat[statName].min) + min = wd.lat[statName].min; + if (!max || max < wd.lat[statName].max) + max = wd.lat[statName].max; + } + totalInvocations += wd.tp.invocations; + }); + + const avg = (hasSamples ? totalSum / totalInvocations : undefined); + const std = (hasSamples ? Math.sqrt(totalSumSqr / totalInvocations - avg * avg) : undefined); + + return ({avg: avg, std: std, min: min, max: max}); +} + + +/** + * Based on workerData, compute throughput of a given counter, with (tp) and without (tpw) the master, and the percent difference (tpd) + * @param {*} statName + */ +function computeTpStats(statName) { + var masterCount = workerData[0].tp[statName]; + var totalCount = 0; + workerData.forEach(wd => {totalCount += wd.tp[statName];}); + const tp = totalCount / testRecord.output.measure_time; // throughput + const tpw = (totalCount - masterCount) / testRecord.output.measure_time; // throughput without master + const tpd = (tp - tpw) * 100.0 / tp; // percent difference relative to TP + + return ({abs: totalCount, tp: tp, tpw: tpw, tpd: tpd}); +} + + +/** + * Based on workerData, compute the relative portion of total errors out of total requests + */ +function computErrorStats() { + var totalErrors = 0; + var totalRequests = 0; + + workerData.forEach(wd => { + totalErrors += wd.tp.errors; + totalRequests += wd.tp.requests; + }); + + const errAbs = totalErrors; + const errPer = totalErrors * 100.0 / totalRequests; + return ({abs: errAbs, percent: errPer}); +} + + +/** + * Generate a properly formatted output record to stdout. The header is also printed, but via mDump to stderr and can be + * silenced. + */ +function generateOutput() { + var first = true; + + // First, print header to stderr + dfsObject(testRecord, (name, data, isRoot, isObj) => { + if (!isObj) { // print leaf nodes + if (!first) + mWrite(",\t"); + first = false; + mWrite(`${name}`); + } + }); + mWrite("\n"); + + first = true; + + // Now, print data to stdout + dfsObject(testRecord, (name, data, isRoot, isObj) => { + if (!isObj) { // print leaf nodes + if (!first) + process.stdout.write(",\t"); + first = false; + if (typeof data == 'number') // round each number to 3 decimal digits + data = round(data, 3); + process.stdout.write(`${data}`); + } + }); + process.stdout.write("\n"); +} + + +/** + * Sleep for a given time. Useful mostly with await from an async function + * resolve and reject are externalized as properties to allow early abortion + * @param {*} ms + */ +function sleep(ms) { + var res, rej; + var p = new Promise((resolve, reject) => { + setTimeout(resolve, ms); + res = resolve; + rej = reject; + }); + p.resolve = res; + p.reject = rej; + + return p; + } + + +/** + * Generate a random integer in the range of [1..max] + * @param {*} max + */ +function getRandomInt(max) { + return Math.floor(Math.random() * Math.floor(max) + 1); + } + + +/** + * Round a number after specified decimal digits + * @param {*} num + * @param {*} digits + */ + function round(num, digits = 0) { + const factor = Math.pow(10, digits); + return Math.round(num * factor) / factor; +} + + +// If not quiet, emit control messages on stderr (with newline) +function mLog(text) { + if (!testRecord.input.quiet) + console.error(`${clientId()}:\t${text}`); +} + + +/** + * Return the id of the client - MASTER-0 or WORKER-k (k=1..w-1) + */ +function clientId() { + return (cluster.isMaster ? "MASTER-0" : `WORKER-${cluster.worker.id}`); +} + + +// If not quiet, write strings on stderr (w/o newline) +function mWrite(text) { + if (!testRecord.input.quiet) + process.stderr.write(text); +} + +/** + * Traverse a (potentially deep) object in DFS, visiting each non-function node with function f + * @param {*} data + * @param {*} func + */ +function dfsObject(data, func, allowInherited = false) { + var isRoot = true; + var rootObj = data; + crawlObj("", data, func, allowInherited); + + function crawlObj(name, data, f, allowInherited) { + var isObj = (typeof data == 'object'); + var isFunc = (typeof data == 'function'); + if (!isFunc) + f(name, data, isRoot, isObj); // visit the current node + isRoot = false; + if (isObj) + for (var child in data) { + if (allowInherited || data.hasOwnProperty(child)) { + const childName = (name == "" ? child : name + "." + child); + crawlObj(childName, data[child], f, true); // After root level no need to check inheritance + } + } + } +} diff --git a/tools/owperf/owperf.sh b/tools/owperf/owperf.sh new file mode 100755 index 0000000..fa9ffb6 --- /dev/null +++ b/tools/owperf/owperf.sh @@ -0,0 +1,22 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This is a simple launch script for owperf + +node owperf.js $@ diff --git a/tools/owperf/owperf_data.odg b/tools/owperf/owperf_data.odg new file mode 100644 index 0000000..bada864 Binary files /dev/null and b/tools/owperf/owperf_data.odg differ diff --git a/tools/owperf/owperf_data.png b/tools/owperf/owperf_data.png new file mode 100644 index 0000000..754cb51 Binary files /dev/null and b/tools/owperf/owperf_data.png differ diff --git a/tools/owperf/package.json b/tools/owperf/package.json new file mode 100644 index 0000000..4e4db61 --- /dev/null +++ b/tools/owperf/package.json @@ -0,0 +1,24 @@ +{ + "name": "owperf", + "version": "1.0.0", + "description": "owperf - a performance evaluation tool for Apache OpenWhisk", + "main": "owperf.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": { + "name": "Erez Hadad", + "email": "er...@il.ibm.com" + }, + "license": "Apache-2.0", + "dependencies": { + "btoa": "^1.2.1", + "child-process-promise": "^2.2.1", + "cluster": "^0.7.7", + "commander": "^2.19.0", + "ini": "^1.3.5", + "node-exec-promise": "^1.0.2", + "openwhisk": "^3.18.0", + "xmlhttprequest": "^1.8.0" + } +} diff --git a/tools/owperf/setup.sh b/tools/owperf/setup.sh new file mode 100755 index 0000000..ee90c68 --- /dev/null +++ b/tools/owperf/setup.sh @@ -0,0 +1,81 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +set -x + +# Setup for overhead test: create action, create trigger, and create a number of rules as required. +# This script performs both setup and teardown +# Designed to be an idempotent operation (can be applied repeatedly for the same result) +# Usage: setup.sh [op] [ratio] <wsk global flags> +# op - MANDATORY. "s" for setup, "t" for teardown +# ratio - MANDATORY. ratio as defined in README.md +# wsk global flags - OPTIONAL. Global flags for the wsk command (e.g. for specifying non-default wsk API host, auth, etc) + +MAXRULES=30 # assume no more than 30 rules per trigger max +op=$1 # s for setup, t for teardown +count=$2 # ratio for rules +delcount=$count # For teardown, delete ratio rules. For setup, delete MAXRULES +if [ "$op" = "s" ]; then + delcount=$MAXRULES +fi + +shift 2 +wskparams="$@" # All other parameters are assumed to be OW-specific + + +function remove_assets() { + + # Delete rules + for i in $(seq 1 $delcount); do + wsk rule delete testRule_$i $@; + done + + # Delete trigger + wsk trigger delete testTrigger $@ + + # Delete action + wsk action delete testAction $@ + +} + + +function deploy_assets() { + + # Create action + wsk action create testAction testAction.js --kind nodejs:8 $@ + + # Create trigger after deleting it + wsk trigger create testTrigger $@ + + # Create rules + for i in $(seq 1 $count); do + wsk rule create testRule_$i testTrigger testAction $@; + done + +} + + +# Always start with removal of existing assets +remove_assets + +# If setup requested, deploy new assets +if [ "$op" = "s" ]; then + deploy_assets +fi + diff --git a/tools/owperf/testAction.js b/tools/owperf/testAction.js new file mode 100644 index 0000000..2ded797 --- /dev/null +++ b/tools/owperf/testAction.js @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * Default test action for owperf. Sleeps specified time. + * All test actions should return the invocation parameters (to stress the return path), but augmented with the execution duration and the activation id. + * Use this code as reference if you want to create a custom test action. + */ + +function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +async function main(params) { + var start = new Date().getTime(); + params.activationId = process.env.__OW_ACTIVATION_ID; + await sleep(parseInt(params.sleep)); + var end = new Date().getTime(); + params.duration = end - start; + return params; +} + +// Invoke main when runnig - only when setting TEST in the env +if (process.env.TEST) + main({sleep:50}).then(params => console.log(params.duration)); +