This is an automated email from the ASF dual-hosted git repository.
dubeejw pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-openwhisk-runtime-go.git
The following commit(s) were added to refs/heads/master by this push:
new c302496 Performance boost (#61)
c302496 is described below
commit c302496f61ddb0e849f9ec821bd03a69e32276b6
Author: Sciabarra.com ltd <[email protected]>
AuthorDate: Thu Nov 29 21:58:22 2018 +0100
Performance boost (#61)
* performance boost for golang
* ops
* fixed makefile for benchmark
* removed invoke.py adding an init.sh
* Added license header
---
examples/benchmark/Makefile | 23 +++
.../_test/hello.sh => examples/benchmark/init.sh | 13 +-
examples/benchmark/main.go | 31 +++
.../_test/hello.sh => examples/benchmark/main.sh | 4 -
openwhisk/_test/hello.sh | 4 -
openwhisk/actionProxy.go | 3 -
openwhisk/actionProxy_test.go | 7 +-
openwhisk/executor.go | 212 +++++----------------
openwhisk/executor_test.go | 55 +-----
openwhisk/initHandler_test.go | 9 +-
openwhisk/runHandler.go | 25 +--
openwhisk/util_test.go | 4 +-
12 files changed, 129 insertions(+), 261 deletions(-)
diff --git a/examples/benchmark/Makefile b/examples/benchmark/Makefile
new file mode 100644
index 0000000..22a03f4
--- /dev/null
+++ b/examples/benchmark/Makefile
@@ -0,0 +1,23 @@
+IMG?=whisk/actionloop-golang-v1.11
+IMG2?=whisk/actionloop
+
+all: golang bash
+
+test.lua:
+ echo 'wrk.method = "POST"'>test.lua
+ echo "wrk.body = '{\"value\":{\"name\":\"Mike\"}}'">>test.lua
+ echo 'wrk.headers["Content-Type"] = "application/json"'>>test.lua
+
+golang: test.lua
+ docker run -d --name under-test --rm -p 8080:8080 $(IMG)
+ bash init.sh main.go
+ wrk -t1 -c1 -stest.lua http://localhost:8080/run
+ docker kill under-test
+
+bash: test.lua
+ docker run -d --name under-test --rm -p 8080:8080 $(IMG2)
+ bash init.sh main.sh
+ wrk -t1 -c1 -stest.lua http://localhost:8080/run
+ docker kill under-test
+
+.PHONY: all golang bash
diff --git a/openwhisk/_test/hello.sh b/examples/benchmark/init.sh
old mode 100755
new mode 100644
similarity index 79%
copy from openwhisk/_test/hello.sh
copy to examples/benchmark/init.sh
index ff21212..d34951f
--- a/openwhisk/_test/hello.sh
+++ b/examples/benchmark/init.sh
@@ -15,13 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-while read line
-do
- name="$(echo $line | jq -r .value.name)"
- if [ "$name" == "*" ]
- then echo "Goodbye!" >&2
- exit 0
- fi
- echo msg="hello $name"
- echo '{"hello": "'$name'"}' >&3
-done
+INIT=${1:?action}
+jq -n --rawfile file $INIT '{ "value": {"main":"main", "code":$file}}'
>$INIT.json
+curl -XPOST -H "Content-Type: application/json" http://localhost:8080/init -d
@$INIT.json
diff --git a/examples/benchmark/main.go b/examples/benchmark/main.go
new file mode 100644
index 0000000..5fdaf5d
--- /dev/null
+++ b/examples/benchmark/main.go
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+
+import "fmt"
+
+// Main function for the action
+func Main(obj map[string]interface{}) map[string]interface{} {
+ name, ok := obj["name"].(string)
+ if !ok {
+ name = "world"
+ }
+ fmt.Printf("name=%s\n", name)
+ msg := make(map[string]interface{})
+ msg["golang-main-single"] = "Hello, " + name + "!"
+ return msg
+}
diff --git a/openwhisk/_test/hello.sh b/examples/benchmark/main.sh
similarity index 92%
copy from openwhisk/_test/hello.sh
copy to examples/benchmark/main.sh
index ff21212..d8ddb7f 100755
--- a/openwhisk/_test/hello.sh
+++ b/examples/benchmark/main.sh
@@ -18,10 +18,6 @@
while read line
do
name="$(echo $line | jq -r .value.name)"
- if [ "$name" == "*" ]
- then echo "Goodbye!" >&2
- exit 0
- fi
echo msg="hello $name"
echo '{"hello": "'$name'"}' >&3
done
diff --git a/openwhisk/_test/hello.sh b/openwhisk/_test/hello.sh
index ff21212..d8ddb7f 100755
--- a/openwhisk/_test/hello.sh
+++ b/openwhisk/_test/hello.sh
@@ -18,10 +18,6 @@
while read line
do
name="$(echo $line | jq -r .value.name)"
- if [ "$name" == "*" ]
- then echo "Goodbye!" >&2
- exit 0
- fi
echo msg="hello $name"
echo '{"hello": "'$name'"}' >&3
done
diff --git a/openwhisk/actionProxy.go b/openwhisk/actionProxy.go
index c61c1a8..cb6e441 100644
--- a/openwhisk/actionProxy.go
+++ b/openwhisk/actionProxy.go
@@ -24,9 +24,6 @@ import (
"os"
)
-// OutputGuard constant string
-const OutputGuard = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
-
// ActionProxy is the container of the data specific to a server
type ActionProxy struct {
diff --git a/openwhisk/actionProxy_test.go b/openwhisk/actionProxy_test.go
index 395e811..2d767af 100644
--- a/openwhisk/actionProxy_test.go
+++ b/openwhisk/actionProxy_test.go
@@ -52,8 +52,7 @@ func TestStartLatestAction_emit1(t *testing.T) {
buf := []byte("#!/bin/sh\nwhile read a; do echo 1 >&3 ; done\n")
ap.ExtractAction(&buf, "bin")
ap.StartLatestAction()
- ap.theExecutor.io <- []byte("x")
- res := <-ap.theExecutor.io
+ res, _ := ap.theExecutor.Interact([]byte("x"))
assert.Equal(t, res, []byte("1\n"))
ap.theExecutor.Stop()
}
@@ -77,8 +76,8 @@ func TestStartLatestAction_emit2(t *testing.T) {
buf := []byte("#!/bin/sh\nwhile read a; do echo 2 >&3 ; done\n")
ap.ExtractAction(&buf, "bin")
ap.StartLatestAction()
- ap.theExecutor.io <- []byte("z")
- assert.Equal(t, <-ap.theExecutor.io, []byte("2\n"))
+ res, _ := ap.theExecutor.Interact([]byte("z"))
+ assert.Equal(t, res, []byte("2\n"))
/**/
ap.theExecutor.Stop()
}
diff --git a/openwhisk/executor.go b/openwhisk/executor.go
index 1658fe8..06d875c 100644
--- a/openwhisk/executor.go
+++ b/openwhisk/executor.go
@@ -23,29 +23,22 @@ import (
"io"
"os"
"os/exec"
- "runtime"
"time"
)
-// DefaultTimeoutInit to wait for a process to start
-var DefaultTimeoutInit = 5 * time.Millisecond
+// OutputGuard constant string
+const OutputGuard = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX\n"
-// DefaultTimeoutDrain to wait for draining logs
-var DefaultTimeoutDrain = 5 * time.Millisecond
+// DefaultTimeoutStart to wait for a process to start
+var DefaultTimeoutStart = 5 * time.Millisecond
// Executor is the container and the guardian of a child process
// It starts a command, feeds input and output, read logs and control its
termination
type Executor struct {
- io chan []byte
- log chan bool
- exit chan error
- _cmd *exec.Cmd
- _input io.WriteCloser
- _output *bufio.Reader
- _logout *bufio.Reader
- _logerr *bufio.Reader
- _outbuf *bufio.Writer
- _errbuf *bufio.Writer
+ cmd *exec.Cmd
+ input io.WriteCloser
+ output *bufio.Reader
+ exited chan bool
}
// NewExecutor creates a child subprocess using the provided command line,
@@ -53,188 +46,85 @@ type Executor struct {
// You can then start it getting a communication channel
func NewExecutor(logout *os.File, logerr *os.File, command string, args
...string) (proc *Executor) {
cmd := exec.Command(command, args...)
+ cmd.Stdout = logout
+ cmd.Stderr = logerr
cmd.Env = []string{
"__OW_API_HOST=" + os.Getenv("__OW_API_HOST"),
}
+ Debug("env: %v", cmd.Env)
if Debugging {
cmd.Env = append(cmd.Env, "OW_DEBUG=/tmp/action.log")
}
- Debug("env: %v", cmd.Env)
-
- stdin, err := cmd.StdinPipe()
- if err != nil {
- return nil
- }
-
- stdout, err := cmd.StdoutPipe()
+ input, err := cmd.StdinPipe()
if err != nil {
return nil
}
-
- stderr, err := cmd.StderrPipe()
- if err != nil {
- return nil
- }
-
pipeOut, pipeIn, err := os.Pipe()
if err != nil {
return nil
}
cmd.ExtraFiles = []*os.File{pipeIn}
-
- pout := bufio.NewReader(pipeOut)
- sout := bufio.NewReader(stdout)
- serr := bufio.NewReader(stderr)
- outbuf := bufio.NewWriter(logout)
- errbuf := bufio.NewWriter(logerr)
-
+ output := bufio.NewReader(pipeOut)
return &Executor{
- make(chan []byte),
- make(chan bool),
- make(chan error),
cmd,
- stdin,
- pout,
- sout,
- serr,
- outbuf,
- errbuf,
- }
-}
-
-// collect log from a stream
-func _collect(ch chan string, reader *bufio.Reader) {
- for {
- buf, err := reader.ReadBytes('\n')
- if err != nil {
- break
- }
- ch <- string(buf)
- }
-}
-
-// loop over the command executing
-// returning when the command exits
-func (proc *Executor) run() {
- Debug("run: start")
- err := proc._cmd.Start()
- if err != nil {
- proc.exit <- err
- Debug("run: early exit")
- proc._cmd = nil // do not kill
- return
- }
- Debug("pid: %d", proc._cmd.Process.Pid)
- // wait for the exit
- proc.exit <- proc._cmd.Wait()
- proc._cmd = nil // do not kill
- Debug("run: end")
-}
-
-func drain(ch chan string, out *bufio.Writer) {
- for loop := true; loop; {
- runtime.Gosched()
- select {
- case buf := <-ch:
- fmt.Fprint(out, buf)
- out.Flush()
- case <-time.After(DefaultTimeoutDrain):
- loop = false
- }
+ input,
+ output,
+ make(chan bool),
}
- fmt.Fprintln(out, OutputGuard)
- out.Flush()
}
-// manage copying stdout and stder in output
-// with log guards
-func (proc *Executor) logger() {
- Debug("logger: start")
- // poll stdout and stderr
- chOut := make(chan string)
- go _collect(chOut, proc._logout)
- chErr := make(chan string)
- go _collect(chErr, proc._logerr)
-
- // loop draining the loop until asked to exit
- for <-proc.log {
- // drain stdout
- Debug("draining stdout")
- drain(chOut, proc._outbuf)
- // drain stderr
- Debug("draining stderr")
- drain(chErr, proc._errbuf)
- proc.log <- true
- }
- Debug("logger: end")
+// Interact interacts with the underlying process
+func (proc *Executor) Interact(in []byte) ([]byte, error) {
+ // input to the subprocess
+ proc.input.Write(in)
+ proc.input.Write([]byte("\n"))
+ out, err := proc.output.ReadBytes('\n')
+ proc.cmd.Stdout.Write([]byte(OutputGuard))
+ proc.cmd.Stderr.Write([]byte(OutputGuard))
+ return out, err
}
-// main service function
-// writing in input
-// and reading in output
-// using the provide channels
-func (proc *Executor) service() {
- Debug("service: start")
- for {
- in := <-proc.io
- if len(in) == 0 {
- Debug("terminated upon request")
- break
- }
- // input to the subprocess
- DebugLimit(">>>", in, 120)
- proc._input.Write(in)
- proc._input.Write([]byte("\n"))
- Debug("done")
-
- // ok now give a chance to run to goroutines
- runtime.Gosched()
-
- // input to the subprocess
- out, err := proc._output.ReadBytes('\n')
- if err != nil {
- break
- }
- DebugLimit("<<<", out, 120)
- proc.io <- out
- if len(out) == 0 {
- Debug("empty input - exiting")
- break
- }
+// Exited checks if the underlying command exited
+func (proc *Executor) Exited() bool {
+ select {
+ case <-proc.exited:
+ return true
+ default:
+ return false
}
- Debug("service: end")
}
// Start execution of the command
+// wait a bit to check if the command exited
// returns an error if the program fails
func (proc *Executor) Start() error {
// start the underlying executable
- // check if died
- go proc.run()
+ Debug("Start:")
+ err := proc.cmd.Start()
+ if err != nil {
+ Debug("run: early exit")
+ proc.cmd = nil // no need to kill
+ return fmt.Errorf("command exited")
+ }
+ Debug("pid: %d", proc.cmd.Process.Pid)
+ go func() {
+ proc.cmd.Wait()
+ proc.exited <- true
+ }()
select {
- case <-proc.exit:
- // oops, it died
+ case <-proc.exited:
return fmt.Errorf("command exited")
- case <-time.After(DefaultTimeoutInit):
- // ok let's process it
- go proc.service()
- go proc.logger()
+ case <-time.After(DefaultTimeoutStart):
+ return nil
}
- return nil
}
// Stop will kill the process
// and close the channels
func (proc *Executor) Stop() {
Debug("stopping")
- if proc._cmd != nil {
- proc.log <- false
- proc.io <- []byte("")
- proc._cmd.Process.Kill()
- <-proc.exit
- proc._cmd = nil
+ if proc.cmd != nil {
+ proc.cmd.Process.Kill()
+ proc.cmd = nil
}
- close(proc.io)
- close(proc.exit)
- close(proc.log)
}
diff --git a/openwhisk/executor_test.go b/openwhisk/executor_test.go
index 8504fd9..6ab5bd7 100644
--- a/openwhisk/executor_test.go
+++ b/openwhisk/executor_test.go
@@ -51,24 +51,13 @@ func ExampleNewExecutor_bc() {
proc := NewExecutor(log, log, "_test/bc.sh")
err := proc.Start()
fmt.Println(err)
- proc.io <- []byte("2+2")
- fmt.Printf("%s", <-proc.io)
- proc.log <- true
- <-proc.log
- // and now, exit detection
- proc.io <- []byte("quit")
- select {
- case in := <-proc.io:
- fmt.Printf("%s", in)
- case <-proc.exit:
- fmt.Println("exit")
- }
+ res, _ := proc.Interact([]byte("2+2"))
+ fmt.Printf("%s", res)
proc.Stop()
dump(log)
// Output:
// <nil>
// 4
- // exit
// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
}
@@ -78,48 +67,14 @@ func ExampleNewExecutor_hello() {
proc := NewExecutor(log, log, "_test/hello.sh")
err := proc.Start()
fmt.Println(err)
- proc.io <- []byte(`{"value":{"name":"Mike"}}`)
- fmt.Printf("%s", <-proc.io)
- proc.log <- true
- <-proc.log
+ res, _ := proc.Interact([]byte(`{"value":{"name":"Mike"}}`))
+ fmt.Printf("%s", res)
proc.Stop()
- _, ok := <-proc.io
- fmt.Printf("io %v\n", ok)
dump(log)
- // Unordered output:
+ // Output:
// <nil>
// {"hello": "Mike"}
// msg=hello Mike
// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
- // io false
-}
-
-func ExampleNewExecutor_term() {
- log, _ := ioutil.TempFile("", "log")
- proc := NewExecutor(log, log, "_test/hello.sh")
- err := proc.Start()
- fmt.Println(err)
- proc.io <- []byte(`{"value":{"name":"*"}}`)
- var exited bool
- select {
- case <-proc.io:
- exited = false
- case <-proc.exit:
- exited = true
- }
- proc.log <- true
- <-proc.log
- fmt.Printf("exit %v\n", exited)
- proc.Stop()
- _, ok := <-proc.io
- fmt.Printf("io %v\n", ok)
- dump(log)
- // Unordered output:
- // <nil>
- // exit true
- // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
- // Goodbye!
- // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
- // io false
}
diff --git a/openwhisk/initHandler_test.go b/openwhisk/initHandler_test.go
index 81f00dc..de71f6d 100644
--- a/openwhisk/initHandler_test.go
+++ b/openwhisk/initHandler_test.go
@@ -94,20 +94,18 @@ func Example_shell_nocompiler() {
doRun(ts, "")
doInit(ts, initBinary("_test/hello.sh", ""))
doRun(ts, "")
- doRun(ts, `{"name":"*"}`)
- doRun(ts, "")
+ doRun(ts, `{"name":"world"}`)
stopTestServer(ts, cur, log)
// Output:
// 500 {"error":"no action defined yet"}
// 200 {"ok":true}
// 200 {"hello": "Mike"}
- // 400 {"error":"command exited"}
- // 500 {"error":"no action defined yet"}
+ // 200 {"hello": "world"}
// msg=hello Mike
// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+ // msg=hello world
// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
- // Goodbye!
// XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
}
@@ -226,4 +224,5 @@ func Example_badinit_nocompiler() {
// 400 {"error":"cannot start action: command exited"}
// 400 {"error":"cannot start action: command exited"}
// 500 {"error":"no action defined yet"}
+ // hi
}
diff --git a/openwhisk/runHandler.go b/openwhisk/runHandler.go
index 2463255..3345cb9 100644
--- a/openwhisk/runHandler.go
+++ b/openwhisk/runHandler.go
@@ -59,29 +59,20 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r
*http.Request) {
sendError(w, http.StatusInternalServerError, fmt.Sprintf("no
action defined yet"))
return
}
+ // check if the process exited
+ if ap.theExecutor.Exited() {
+ sendError(w, http.StatusInternalServerError,
fmt.Sprintf("command exited"))
+ return
+ }
// remove newlines
body = bytes.Replace(body, []byte("\n"), []byte(""), -1)
// execute the action
- ap.theExecutor.io <- body
+ response, err := ap.theExecutor.Interact(body)
// check for early termination
- var response []byte
- var exited bool
- select {
- case response = <-ap.theExecutor.io:
- exited = false
- case err = <-ap.theExecutor.exit:
- exited = true
- }
-
- // flush the logs sending the activation message at the end
- ap.theExecutor.log <- true
- <-ap.theExecutor.log
-
- // check for early termination
- if exited {
+ if err != nil {
Debug("WARNING! Command exited")
ap.theExecutor = nil
sendError(w, http.StatusBadRequest, fmt.Sprintf("command
exited"))
@@ -106,7 +97,7 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r
*http.Request) {
f.Flush()
}
- // diagnostic when writing problems
+ // diagnostic when you have writing problems
if err != nil {
sendError(w, http.StatusInternalServerError, fmt.Sprintf("Error
writing response: %v", err))
return
diff --git a/openwhisk/util_test.go b/openwhisk/util_test.go
index b0d8084..1593760 100644
--- a/openwhisk/util_test.go
+++ b/openwhisk/util_test.go
@@ -185,9 +185,7 @@ func TestMain(m *testing.M) {
}
// increase timeouts for init
- DefaultTimeoutInit = 1000 * time.Millisecond
- // timeout for drain - shoud less (or you can get stuck on stdout
without getting the stderr)
- DefaultTimeoutDrain = 100 * time.Millisecond
+ DefaultTimeoutStart = 1000 * time.Millisecond
// build some test stuff
sys("_test/build.sh")
sys("_test/zips.sh")