This is an automated email from the ASF dual-hosted git repository.

dubeejw pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-openwhisk-runtime-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c302496  Performance boost (#61)
c302496 is described below

commit c302496f61ddb0e849f9ec821bd03a69e32276b6
Author: Sciabarra.com ltd <[email protected]>
AuthorDate: Thu Nov 29 21:58:22 2018 +0100

    Performance boost (#61)
    
    * performance boost for golang
    
    * ops
    
    * fixed makefile for benchmark
    
    * removed invoke.py adding an init.sh
    
    * Added license header
---
 examples/benchmark/Makefile                        |  23 +++
 .../_test/hello.sh => examples/benchmark/init.sh   |  13 +-
 examples/benchmark/main.go                         |  31 +++
 .../_test/hello.sh => examples/benchmark/main.sh   |   4 -
 openwhisk/_test/hello.sh                           |   4 -
 openwhisk/actionProxy.go                           |   3 -
 openwhisk/actionProxy_test.go                      |   7 +-
 openwhisk/executor.go                              | 212 +++++----------------
 openwhisk/executor_test.go                         |  55 +-----
 openwhisk/initHandler_test.go                      |   9 +-
 openwhisk/runHandler.go                            |  25 +--
 openwhisk/util_test.go                             |   4 +-
 12 files changed, 129 insertions(+), 261 deletions(-)

diff --git a/examples/benchmark/Makefile b/examples/benchmark/Makefile
new file mode 100644
index 0000000..22a03f4
--- /dev/null
+++ b/examples/benchmark/Makefile
@@ -0,0 +1,23 @@
+IMG?=whisk/actionloop-golang-v1.11
+IMG2?=whisk/actionloop
+
+all: golang bash
+
+test.lua:
+       echo 'wrk.method = "POST"'>test.lua
+       echo "wrk.body = '{\"value\":{\"name\":\"Mike\"}}'">>test.lua
+       echo 'wrk.headers["Content-Type"] = "application/json"'>>test.lua
+
+golang: test.lua
+       docker run -d --name under-test --rm -p 8080:8080 $(IMG)
+       bash init.sh main.go
+       wrk -t1 -c1 -stest.lua http://localhost:8080/run
+       docker kill under-test
+
+bash: test.lua
+       docker run -d --name under-test --rm -p 8080:8080 $(IMG2)
+       bash init.sh main.sh
+       wrk -t1 -c1 -stest.lua http://localhost:8080/run
+       docker kill under-test
+
+.PHONY: all golang bash
diff --git a/openwhisk/_test/hello.sh b/examples/benchmark/init.sh
old mode 100755
new mode 100644
similarity index 79%
copy from openwhisk/_test/hello.sh
copy to examples/benchmark/init.sh
index ff21212..d34951f
--- a/openwhisk/_test/hello.sh
+++ b/examples/benchmark/init.sh
@@ -15,13 +15,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-while read line
-do
-   name="$(echo $line | jq -r .value.name)"
-   if [ "$name" == "*" ]
-   then echo "Goodbye!" >&2
-        exit 0
-   fi
-   echo msg="hello $name"
-   echo '{"hello": "'$name'"}' >&3
-done
+INIT=${1:?action}
+jq -n --rawfile file $INIT '{ "value": {"main":"main", "code":$file}}' 
>$INIT.json
+curl -XPOST -H "Content-Type: application/json" http://localhost:8080/init -d 
@$INIT.json
diff --git a/examples/benchmark/main.go b/examples/benchmark/main.go
new file mode 100644
index 0000000..5fdaf5d
--- /dev/null
+++ b/examples/benchmark/main.go
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package main
+
+import "fmt"
+
+// Main function for the action
+func Main(obj map[string]interface{}) map[string]interface{} {
+       name, ok := obj["name"].(string)
+       if !ok {
+               name = "world"
+       }
+       fmt.Printf("name=%s\n", name)
+       msg := make(map[string]interface{})
+       msg["golang-main-single"] = "Hello, " + name + "!"
+       return msg
+}
diff --git a/openwhisk/_test/hello.sh b/examples/benchmark/main.sh
similarity index 92%
copy from openwhisk/_test/hello.sh
copy to examples/benchmark/main.sh
index ff21212..d8ddb7f 100755
--- a/openwhisk/_test/hello.sh
+++ b/examples/benchmark/main.sh
@@ -18,10 +18,6 @@
 while read line
 do
    name="$(echo $line | jq -r .value.name)"
-   if [ "$name" == "*" ]
-   then echo "Goodbye!" >&2
-        exit 0
-   fi
    echo msg="hello $name"
    echo '{"hello": "'$name'"}' >&3
 done
diff --git a/openwhisk/_test/hello.sh b/openwhisk/_test/hello.sh
index ff21212..d8ddb7f 100755
--- a/openwhisk/_test/hello.sh
+++ b/openwhisk/_test/hello.sh
@@ -18,10 +18,6 @@
 while read line
 do
    name="$(echo $line | jq -r .value.name)"
-   if [ "$name" == "*" ]
-   then echo "Goodbye!" >&2
-        exit 0
-   fi
    echo msg="hello $name"
    echo '{"hello": "'$name'"}' >&3
 done
diff --git a/openwhisk/actionProxy.go b/openwhisk/actionProxy.go
index c61c1a8..cb6e441 100644
--- a/openwhisk/actionProxy.go
+++ b/openwhisk/actionProxy.go
@@ -24,9 +24,6 @@ import (
        "os"
 )
 
-// OutputGuard constant string
-const OutputGuard = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX"
-
 // ActionProxy is the container of the data specific to a server
 type ActionProxy struct {
 
diff --git a/openwhisk/actionProxy_test.go b/openwhisk/actionProxy_test.go
index 395e811..2d767af 100644
--- a/openwhisk/actionProxy_test.go
+++ b/openwhisk/actionProxy_test.go
@@ -52,8 +52,7 @@ func TestStartLatestAction_emit1(t *testing.T) {
        buf := []byte("#!/bin/sh\nwhile read a; do echo 1 >&3 ; done\n")
        ap.ExtractAction(&buf, "bin")
        ap.StartLatestAction()
-       ap.theExecutor.io <- []byte("x")
-       res := <-ap.theExecutor.io
+       res, _ := ap.theExecutor.Interact([]byte("x"))
        assert.Equal(t, res, []byte("1\n"))
        ap.theExecutor.Stop()
 }
@@ -77,8 +76,8 @@ func TestStartLatestAction_emit2(t *testing.T) {
        buf := []byte("#!/bin/sh\nwhile read a; do echo 2 >&3 ; done\n")
        ap.ExtractAction(&buf, "bin")
        ap.StartLatestAction()
-       ap.theExecutor.io <- []byte("z")
-       assert.Equal(t, <-ap.theExecutor.io, []byte("2\n"))
+       res, _ := ap.theExecutor.Interact([]byte("z"))
+       assert.Equal(t, res, []byte("2\n"))
        /**/
        ap.theExecutor.Stop()
 }
diff --git a/openwhisk/executor.go b/openwhisk/executor.go
index 1658fe8..06d875c 100644
--- a/openwhisk/executor.go
+++ b/openwhisk/executor.go
@@ -23,29 +23,22 @@ import (
        "io"
        "os"
        "os/exec"
-       "runtime"
        "time"
 )
 
-// DefaultTimeoutInit to wait for a process to start
-var DefaultTimeoutInit = 5 * time.Millisecond
+// OutputGuard constant string
+const OutputGuard = "XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX\n"
 
-// DefaultTimeoutDrain to wait for draining logs
-var DefaultTimeoutDrain = 5 * time.Millisecond
+// DefaultTimeoutStart to wait for a process to start
+var DefaultTimeoutStart = 5 * time.Millisecond
 
 // Executor is the container and the guardian  of a child process
 // It starts a command, feeds input and output, read logs and control its 
termination
 type Executor struct {
-       io      chan []byte
-       log     chan bool
-       exit    chan error
-       _cmd    *exec.Cmd
-       _input  io.WriteCloser
-       _output *bufio.Reader
-       _logout *bufio.Reader
-       _logerr *bufio.Reader
-       _outbuf *bufio.Writer
-       _errbuf *bufio.Writer
+       cmd    *exec.Cmd
+       input  io.WriteCloser
+       output *bufio.Reader
+       exited chan bool
 }
 
 // NewExecutor creates a child subprocess using the provided command line,
@@ -53,188 +46,85 @@ type Executor struct {
 // You can then start it getting a communication channel
 func NewExecutor(logout *os.File, logerr *os.File, command string, args 
...string) (proc *Executor) {
        cmd := exec.Command(command, args...)
+       cmd.Stdout = logout
+       cmd.Stderr = logerr
        cmd.Env = []string{
                "__OW_API_HOST=" + os.Getenv("__OW_API_HOST"),
        }
+       Debug("env: %v", cmd.Env)
        if Debugging {
                cmd.Env = append(cmd.Env, "OW_DEBUG=/tmp/action.log")
        }
-       Debug("env: %v", cmd.Env)
-
-       stdin, err := cmd.StdinPipe()
-       if err != nil {
-               return nil
-       }
-
-       stdout, err := cmd.StdoutPipe()
+       input, err := cmd.StdinPipe()
        if err != nil {
                return nil
        }
-
-       stderr, err := cmd.StderrPipe()
-       if err != nil {
-               return nil
-       }
-
        pipeOut, pipeIn, err := os.Pipe()
        if err != nil {
                return nil
        }
        cmd.ExtraFiles = []*os.File{pipeIn}
-
-       pout := bufio.NewReader(pipeOut)
-       sout := bufio.NewReader(stdout)
-       serr := bufio.NewReader(stderr)
-       outbuf := bufio.NewWriter(logout)
-       errbuf := bufio.NewWriter(logerr)
-
+       output := bufio.NewReader(pipeOut)
        return &Executor{
-               make(chan []byte),
-               make(chan bool),
-               make(chan error),
                cmd,
-               stdin,
-               pout,
-               sout,
-               serr,
-               outbuf,
-               errbuf,
-       }
-}
-
-// collect log from a stream
-func _collect(ch chan string, reader *bufio.Reader) {
-       for {
-               buf, err := reader.ReadBytes('\n')
-               if err != nil {
-                       break
-               }
-               ch <- string(buf)
-       }
-}
-
-// loop over the command executing
-// returning when the command exits
-func (proc *Executor) run() {
-       Debug("run: start")
-       err := proc._cmd.Start()
-       if err != nil {
-               proc.exit <- err
-               Debug("run: early exit")
-               proc._cmd = nil // do not kill
-               return
-       }
-       Debug("pid: %d", proc._cmd.Process.Pid)
-       // wait for the exit
-       proc.exit <- proc._cmd.Wait()
-       proc._cmd = nil // do not kill
-       Debug("run: end")
-}
-
-func drain(ch chan string, out *bufio.Writer) {
-       for loop := true; loop; {
-               runtime.Gosched()
-               select {
-               case buf := <-ch:
-                       fmt.Fprint(out, buf)
-                       out.Flush()
-               case <-time.After(DefaultTimeoutDrain):
-                       loop = false
-               }
+               input,
+               output,
+               make(chan bool),
        }
-       fmt.Fprintln(out, OutputGuard)
-       out.Flush()
 }
 
-// manage copying stdout and stder in output
-// with log guards
-func (proc *Executor) logger() {
-       Debug("logger: start")
-       // poll stdout and stderr
-       chOut := make(chan string)
-       go _collect(chOut, proc._logout)
-       chErr := make(chan string)
-       go _collect(chErr, proc._logerr)
-
-       // loop draining the loop until asked to exit
-       for <-proc.log {
-               // drain stdout
-               Debug("draining stdout")
-               drain(chOut, proc._outbuf)
-               // drain stderr
-               Debug("draining stderr")
-               drain(chErr, proc._errbuf)
-               proc.log <- true
-       }
-       Debug("logger: end")
+// Interact interacts with the underlying process
+func (proc *Executor) Interact(in []byte) ([]byte, error) {
+       // input to the subprocess
+       proc.input.Write(in)
+       proc.input.Write([]byte("\n"))
+       out, err := proc.output.ReadBytes('\n')
+       proc.cmd.Stdout.Write([]byte(OutputGuard))
+       proc.cmd.Stderr.Write([]byte(OutputGuard))
+       return out, err
 }
 
-// main service function
-// writing in input
-// and reading in output
-// using the provide channels
-func (proc *Executor) service() {
-       Debug("service: start")
-       for {
-               in := <-proc.io
-               if len(in) == 0 {
-                       Debug("terminated upon request")
-                       break
-               }
-               // input to the subprocess
-               DebugLimit(">>>", in, 120)
-               proc._input.Write(in)
-               proc._input.Write([]byte("\n"))
-               Debug("done")
-
-               // ok now give a chance to run to goroutines
-               runtime.Gosched()
-
-               // input to the subprocess
-               out, err := proc._output.ReadBytes('\n')
-               if err != nil {
-                       break
-               }
-               DebugLimit("<<<", out, 120)
-               proc.io <- out
-               if len(out) == 0 {
-                       Debug("empty input - exiting")
-                       break
-               }
+// Exited checks if the underlying command exited
+func (proc *Executor) Exited() bool {
+       select {
+       case <-proc.exited:
+               return true
+       default:
+               return false
        }
-       Debug("service: end")
 }
 
 // Start execution of the command
+// wait a bit to check if the command exited
 // returns an error if the program fails
 func (proc *Executor) Start() error {
        // start the underlying executable
-       // check if died
-       go proc.run()
+       Debug("Start:")
+       err := proc.cmd.Start()
+       if err != nil {
+               Debug("run: early exit")
+               proc.cmd = nil // no need to kill
+               return fmt.Errorf("command exited")
+       }
+       Debug("pid: %d", proc.cmd.Process.Pid)
+       go func() {
+               proc.cmd.Wait()
+               proc.exited <- true
+       }()
        select {
-       case <-proc.exit:
-               // oops, it died
+       case <-proc.exited:
                return fmt.Errorf("command exited")
-       case <-time.After(DefaultTimeoutInit):
-               // ok let's process it
-               go proc.service()
-               go proc.logger()
+       case <-time.After(DefaultTimeoutStart):
+               return nil
        }
-       return nil
 }
 
 // Stop will kill the process
 // and close the channels
 func (proc *Executor) Stop() {
        Debug("stopping")
-       if proc._cmd != nil {
-               proc.log <- false
-               proc.io <- []byte("")
-               proc._cmd.Process.Kill()
-               <-proc.exit
-               proc._cmd = nil
+       if proc.cmd != nil {
+               proc.cmd.Process.Kill()
+               proc.cmd = nil
        }
-       close(proc.io)
-       close(proc.exit)
-       close(proc.log)
 }
diff --git a/openwhisk/executor_test.go b/openwhisk/executor_test.go
index 8504fd9..6ab5bd7 100644
--- a/openwhisk/executor_test.go
+++ b/openwhisk/executor_test.go
@@ -51,24 +51,13 @@ func ExampleNewExecutor_bc() {
        proc := NewExecutor(log, log, "_test/bc.sh")
        err := proc.Start()
        fmt.Println(err)
-       proc.io <- []byte("2+2")
-       fmt.Printf("%s", <-proc.io)
-       proc.log <- true
-       <-proc.log
-       // and now, exit detection
-       proc.io <- []byte("quit")
-       select {
-       case in := <-proc.io:
-               fmt.Printf("%s", in)
-       case <-proc.exit:
-               fmt.Println("exit")
-       }
+       res, _ := proc.Interact([]byte("2+2"))
+       fmt.Printf("%s", res)
        proc.Stop()
        dump(log)
        // Output:
        // <nil>
        // 4
-       // exit
        // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
        // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
 }
@@ -78,48 +67,14 @@ func ExampleNewExecutor_hello() {
        proc := NewExecutor(log, log, "_test/hello.sh")
        err := proc.Start()
        fmt.Println(err)
-       proc.io <- []byte(`{"value":{"name":"Mike"}}`)
-       fmt.Printf("%s", <-proc.io)
-       proc.log <- true
-       <-proc.log
+       res, _ := proc.Interact([]byte(`{"value":{"name":"Mike"}}`))
+       fmt.Printf("%s", res)
        proc.Stop()
-       _, ok := <-proc.io
-       fmt.Printf("io %v\n", ok)
        dump(log)
-       // Unordered output:
+       // Output:
        // <nil>
        // {"hello": "Mike"}
        // msg=hello Mike
        // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
        // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-       // io false
-}
-
-func ExampleNewExecutor_term() {
-       log, _ := ioutil.TempFile("", "log")
-       proc := NewExecutor(log, log, "_test/hello.sh")
-       err := proc.Start()
-       fmt.Println(err)
-       proc.io <- []byte(`{"value":{"name":"*"}}`)
-       var exited bool
-       select {
-       case <-proc.io:
-               exited = false
-       case <-proc.exit:
-               exited = true
-       }
-       proc.log <- true
-       <-proc.log
-       fmt.Printf("exit %v\n", exited)
-       proc.Stop()
-       _, ok := <-proc.io
-       fmt.Printf("io %v\n", ok)
-       dump(log)
-       // Unordered output:
-       // <nil>
-       // exit true
-       // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-       // Goodbye!
-       // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-       // io false
 }
diff --git a/openwhisk/initHandler_test.go b/openwhisk/initHandler_test.go
index 81f00dc..de71f6d 100644
--- a/openwhisk/initHandler_test.go
+++ b/openwhisk/initHandler_test.go
@@ -94,20 +94,18 @@ func Example_shell_nocompiler() {
        doRun(ts, "")
        doInit(ts, initBinary("_test/hello.sh", ""))
        doRun(ts, "")
-       doRun(ts, `{"name":"*"}`)
-       doRun(ts, "")
+       doRun(ts, `{"name":"world"}`)
        stopTestServer(ts, cur, log)
        // Output:
        // 500 {"error":"no action defined yet"}
        // 200 {"ok":true}
        // 200 {"hello": "Mike"}
-       // 400 {"error":"command exited"}
-       // 500 {"error":"no action defined yet"}
+       // 200 {"hello": "world"}
        // msg=hello Mike
        // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
        // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+       // msg=hello world
        // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
-       // Goodbye!
        // XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
 }
 
@@ -226,4 +224,5 @@ func Example_badinit_nocompiler() {
        // 400 {"error":"cannot start action: command exited"}
        // 400 {"error":"cannot start action: command exited"}
        // 500 {"error":"no action defined yet"}
+       // hi
 }
diff --git a/openwhisk/runHandler.go b/openwhisk/runHandler.go
index 2463255..3345cb9 100644
--- a/openwhisk/runHandler.go
+++ b/openwhisk/runHandler.go
@@ -59,29 +59,20 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r 
*http.Request) {
                sendError(w, http.StatusInternalServerError, fmt.Sprintf("no 
action defined yet"))
                return
        }
+       // check if the process exited
+       if ap.theExecutor.Exited() {
+               sendError(w, http.StatusInternalServerError, 
fmt.Sprintf("command exited"))
+               return
+       }
 
        // remove newlines
        body = bytes.Replace(body, []byte("\n"), []byte(""), -1)
 
        // execute the action
-       ap.theExecutor.io <- body
+       response, err := ap.theExecutor.Interact(body)
 
        // check for early termination
-       var response []byte
-       var exited bool
-       select {
-       case response = <-ap.theExecutor.io:
-               exited = false
-       case err = <-ap.theExecutor.exit:
-               exited = true
-       }
-
-       // flush the logs sending the activation message at the end
-       ap.theExecutor.log <- true
-       <-ap.theExecutor.log
-
-       // check for early termination
-       if exited {
+       if err != nil {
                Debug("WARNING! Command exited")
                ap.theExecutor = nil
                sendError(w, http.StatusBadRequest, fmt.Sprintf("command 
exited"))
@@ -106,7 +97,7 @@ func (ap *ActionProxy) runHandler(w http.ResponseWriter, r 
*http.Request) {
                f.Flush()
        }
 
-       // diagnostic when writing problems
+       // diagnostic when you have writing problems
        if err != nil {
                sendError(w, http.StatusInternalServerError, fmt.Sprintf("Error 
writing response: %v", err))
                return
diff --git a/openwhisk/util_test.go b/openwhisk/util_test.go
index b0d8084..1593760 100644
--- a/openwhisk/util_test.go
+++ b/openwhisk/util_test.go
@@ -185,9 +185,7 @@ func TestMain(m *testing.M) {
        }
 
        // increase timeouts for init
-       DefaultTimeoutInit = 1000 * time.Millisecond
-       // timeout for drain - shoud less (or you can get stuck on stdout 
without getting the stderr)
-       DefaultTimeoutDrain = 100 * time.Millisecond
+       DefaultTimeoutStart = 1000 * time.Millisecond
        // build some test stuff
        sys("_test/build.sh")
        sys("_test/zips.sh")

Reply via email to