[ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88560&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88560
 ]

ASF GitHub Bot logged work on BEAM-3355:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Apr/18 18:38
            Start Date: 06/Apr/18 18:38
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #4311: [BEAM-3355] 
Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 5e545c550f9..f8efffece71 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -17,16 +17,14 @@
 package harness
 
 import (
-       "bytes"
        "context"
        "fmt"
        "io"
-       "io/ioutil"
-       "runtime/pprof"
        "sync"
        "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
        "github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
@@ -37,12 +35,13 @@ import (
 // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a 
plugin).
 
 // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
-// "pipeline-construction time" -- on each worker. It is a Fn API client and
+// "pipeline-construction time" -- on each worker. It is a FnAPI client and
 // ultimately responsible for correctly executing user code.
 func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
-       setupRemoteLogging(ctx, loggingEndpoint)
-       setupDiagnosticRecording()
+       hooks.DeserializeHooksFromOptions()
 
+       hooks.RunInitHooks(ctx)
+       setupRemoteLogging(ctx, loggingEndpoint)
        recordHeader()
 
        // Connect to FnAPI control server. Receive and execute work.
@@ -87,8 +86,6 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
                data:   &DataManager{},
        }
 
-       var cpuProfBuf bytes.Buffer
-
        // gRPC requires all readers of a stream be the same goroutine, so this 
goroutine
        // is responsible for managing the network data. All it does is pull 
data from
        // the stream, and hand off the message to a goroutine to actually be 
handled,
@@ -112,18 +109,10 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
                        log.Debugf(ctx, "RECV: %v", 
proto.MarshalTextString(req))
                        recordInstructionRequest(req)
 
-                       if isEnabled("cpu_profiling") {
-                               cpuProfBuf.Reset()
-                               pprof.StartCPUProfile(&cpuProfBuf)
-                       }
+                       hooks.RunRequestHooks(ctx, req)
                        resp := ctrl.handleInstruction(ctx, req)
 
-                       if isEnabled("cpu_profiling") {
-                               pprof.StopCPUProfile()
-                               if err := 
ioutil.WriteFile(fmt.Sprintf("%s/cpu_prof%s", storagePath, req.InstructionId), 
cpuProfBuf.Bytes(), 0644); err != nil {
-                                       log.Warnf(ctx, "Failed to write CPU 
profile for instruction %s: %v", req.InstructionId, err)
-                               }
-                       }
+                       hooks.RunResponseHooks(ctx, req, resp)
 
                        recordInstructionResponse(resp)
                        if resp != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/harness/session.go 
b/sdks/go/pkg/beam/core/runtime/harness/session.go
index 67f577f5035..69a1d992786 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/session.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/session.go
@@ -16,13 +16,14 @@
 package harness
 
 import (
+       "context"
        "fmt"
-       "os"
+       "io"
        "sync"
-       "time"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/session"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
        "github.com/golang/protobuf/proto"
 )
@@ -35,12 +36,13 @@ const (
        dataSend
 )
 
+// capture is set by the capture hook below.
+var capture io.WriteCloser
+
 var (
        selectedOptions = make(map[string]bool)
-       // TODO(wcn): add a buffered writer around capture and use it.
-       capture     *os.File
-       sessionLock sync.Mutex
-       bufPool     = sync.Pool{
+       sessionLock     sync.Mutex
+       bufPool         = sync.Pool{
                New: func() interface{} {
                        return proto.NewBuffer(nil)
                },
@@ -49,35 +51,6 @@ var (
        storagePath string
 )
 
-// TODO(wcn): the plan is to make these hooks available in the harness in a 
fashion
-// similar to net/http/httptrace. They are simple function calls now to get 
this
-// code underway.
-func setupDiagnosticRecording() error {
-       // No recording options specified? We're done.
-       if runtime.GlobalOptions.Get("cpu_profiling") == "" && 
runtime.GlobalOptions.Get("session_recording") == "" {
-               return nil
-       }
-
-       var err error
-
-       storagePath = runtime.GlobalOptions.Get("storage_path")
-       // Any form of recording requires the destination directory to exist.
-       if err = os.MkdirAll(storagePath, 0755); err != nil {
-               return fmt.Errorf("Unable to create session directory: %v", err)
-       }
-
-       if !isEnabled("session_recording") {
-               return nil
-       }
-
-       // Set up the session recorder.
-       if capture, err = os.Create(fmt.Sprintf("%s/session-%v", storagePath, 
time.Now().Unix())); err != nil {
-               return fmt.Errorf("Unable to create session file: %v", err)
-       }
-
-       return nil
-}
-
 func isEnabled(option string) bool {
        return runtime.GlobalOptions.Get(option) == "true"
 }
@@ -213,3 +186,57 @@ func recordFooter() error {
                },
        })
 }
+
+// CaptureHook writes the messaging content consumed and
+// produced by the worker, allowing the data to be used as
+// an input for the session runner. Since workers can exist
+// in a variety of environments, this allows the runner
+// to tailor the behavior best for its particular needs.
+type CaptureHook io.WriteCloser
+
+// CaptureHookFactory produces a CaptureHook from the supplied
+// options.
+type CaptureHookFactory func([]string) CaptureHook
+
+var captureHookRegistry = make(map[string]CaptureHookFactory)
+
+func init() {
+       hf := func(opts []string) hooks.Hook {
+               return hooks.Hook{
+                       Init: func(_ context.Context) error {
+                               if len(opts) > 0 {
+                                       name, opts := hooks.Decode(opts[0])
+                                       capture = 
captureHookRegistry[name](opts)
+                               }
+                               return nil
+                       },
+               }
+       }
+
+       hooks.RegisterHook("session", hf)
+}
+
+// RegisterCaptureHook registers a CaptureHookFactory for the
+// supplied identifier.
+func RegisterCaptureHook(name string, c CaptureHookFactory) {
+       if _, exists := captureHookRegistry[name]; exists {
+               panic(fmt.Sprintf("RegisterSessionCaptureHook: %s registered 
twice", name))
+       }
+       captureHookRegistry[name] = c
+}
+
+// EnableCaptureHook is called to request the use of a hook in a pipeline.
+// It updates the supplied pipelines to capture this request.
+func EnableCaptureHook(name string, opts []string) {
+       if _, exists := captureHookRegistry[name]; !exists {
+               panic(fmt.Sprintf("EnableHook: %s not registered", name))
+       }
+       if exists, opts := hooks.IsEnabled("session"); exists {
+               n, _ := hooks.Decode(opts[0])
+               if n != name {
+                       panic(fmt.Sprintf("EnableHook: can't enable hook %s, 
hook %s already enabled", name, n))
+               }
+       }
+
+       hooks.EnableHook("session", hooks.Encode(name, opts))
+}
diff --git a/sdks/go/pkg/beam/core/util/hooks/hooks.go 
b/sdks/go/pkg/beam/core/util/hooks/hooks.go
new file mode 100644
index 00000000000..ea30cc6e50f
--- /dev/null
+++ b/sdks/go/pkg/beam/core/util/hooks/hooks.go
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package hooks allows runners to tailor execution of the worker harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+package hooks
+
+import (
+       "bytes"
+       "context"
+       "encoding/csv"
+       "encoding/json"
+       "fmt"
+       "strings"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+       "github.com/apache/beam/sdks/go/pkg/beam/log"
+       fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+       hookRegistry = make(map[string]HookFactory)
+       enabledHooks = make(map[string][]string)
+       activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipeline.
+type Hook struct {
+       // Init is called once at the startup of the worker prior to
+       // connecting to the FnAPI services.
+       Init InitHook
+       // Req is called each time the worker handles a FnAPI instruction 
request.
+       Req RequestHook
+       // Resp is called each time the worker generates a FnAPI instruction 
response.
+       Resp ResponseHook
+}
+
+// InitHook is a hook that is called when the harness
+// initializes.
+type InitHook func(context.Context) error
+
+// HookFactory is a function that produces a Hook from the supplied arguments.
+type HookFactory func([]string) Hook
+
+// RegisterHook registers a Hook for the
+// supplied identifier.
+func RegisterHook(name string, h HookFactory) {
+       hookRegistry[name] = h
+}
+
+// RunInitHooks runs the init hooks.
+func RunInitHooks(ctx context.Context) error {
+       // If an init hook fails to complete, the invariants of the
+       // system are compromised and we can't run a workflow.
+       // The hooks can run in any order. They should not be
+       // interdependent or interfere with each other.
+       for _, h := range activeHooks {
+               if h.Init != nil {
+                       if err := h.Init(ctx); err != nil {
+                               return err
+                       }
+               }
+       }
+       return nil
+}
+
+// RequestHook is called when handling a FnAPI instruction.
+type RequestHook func(context.Context, *fnpb.InstructionRequest) error
+
+// RunRequestHooks runs the hooks that handle a FnAPI request.
+func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) {
+       // The request hooks should not modify the request.
+       for n, h := range activeHooks {
+               if h.Req != nil {
+                       if err := h.Req(ctx, req); err != nil {
+                               log.Infof(ctx, "request hook %s failed: %v", n, 
err)
+                       }
+               }
+       }
+}
+
+// ResponseHook is called when sending a FnAPI instruction response.
+type ResponseHook func(context.Context, *fnpb.InstructionRequest, 
*fnpb.InstructionResponse) error
+
+// RunResponseHooks runs the hooks that handle a FnAPI response.
+func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp 
*fnpb.InstructionResponse) {
+       for n, h := range activeHooks {
+               if h.Resp != nil {
+                       if err := h.Resp(ctx, req, resp); err != nil {
+                               log.Infof(ctx, "response hook %s failed: %v", 
n, err)
+                       }
+               }
+       }
+}
+
+// SerializeHooksToOptions serializes the activated hooks and their 
configuration into a JSON string
+// that can be deserialized later by the runner.
+func SerializeHooksToOptions() {
+       data, err := json.Marshal(enabledHooks)
+       if err != nil {
+               // Shouldn't happen, since all the data is strings.
+               panic(fmt.Sprintf("Couldn't serialize hooks: %v", err))
+       }
+       runtime.GlobalOptions.Set("hooks", string(data))
+}
+
+// DeserializeHooksFromOptions extracts the hook configuration information 
from the options and configures
+// the hooks with the supplied options.
+func DeserializeHooksFromOptions() {
+       cfg := runtime.GlobalOptions.Get("hooks")
+       if err := json.Unmarshal([]byte(cfg), &enabledHooks); err != nil {
+               // Shouldn't happen, since all the data is strings.
+               panic(fmt.Sprintf("DeserializeHooks failed on input %q: %v", 
cfg, err))
+       }
+
+       for h, opts := range enabledHooks {
+               activeHooks[h] = hookRegistry[h](opts)
+       }
+}
+
+// EnableHook enables the hook to be run for the pipline. It will be
+// receive the supplied args when the pipeline executes. It is safe
+// to enable the same hook with different options, as this is necessary
+// if a hook wants to compose behavior.
+func EnableHook(name string, args ...string) error {
+       if _, ok := hookRegistry[name]; !ok {
+               return fmt.Errorf("EnableHook: hook %s not found", name)
+       }
+       enabledHooks[name] = args
+       return nil
+}
+
+// IsEnabled returns true and the registered options if the hook is
+// already enabled.
+func IsEnabled(name string) (bool, []string) {
+       opts, ok := enabledHooks[name]
+       return ok, opts
+}
+
+// Encode encodes a hook name and its arguments into a single string.
+// This is a convenience function for users of this package that are composing
+// hooks.
+func Encode(name string, opts []string) string {
+       var cfg bytes.Buffer
+       w := csv.NewWriter(&cfg)
+       // This should never happen since a bytes.Buffer doesn't fail to write.
+       if err := w.Write(append([]string{name}, opts...)); err != nil {
+               panic(fmt.Sprintf("error encoding arguments: %v", err))
+       }
+       w.Flush()
+       return cfg.String()
+}
+
+// Decode decodes a hook name and its arguments from a single string.
+// This is a convenience function for users of this package that are composing
+// hooks.
+func Decode(in string) (string, []string) {
+       r := csv.NewReader(strings.NewReader(in))
+       s, err := r.Read()
+       if err != nil {
+               panic(fmt.Sprintf("malformed input for decoding: %s %v", in, 
err))
+       }
+       return s[0], s[1:]
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index 247a2d1487f..4a9f4335e34 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "flag"
        "fmt"
+       "io"
        "os"
        "os/user"
        "path"
@@ -31,12 +32,14 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        // Importing to get the side effect of the remote execution hook. See 
init().
        _ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        "github.com/apache/beam/sdks/go/pkg/beam/options/gcpopts"
        "github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
        "github.com/apache/beam/sdks/go/pkg/beam/runners/universal/runnerlib"
        "github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
+       "github.com/apache/beam/sdks/go/pkg/beam/x/hooks/perf"
        "github.com/golang/protobuf/proto"
        "golang.org/x/oauth2/google"
        df "google.golang.org/api/dataflow/v1b3"
@@ -55,18 +58,19 @@ var (
        teardownPolicy = flag.String("teardown_policy", "", "Job teardown 
policy (internal only).")
 
        // SDK options
-       cpuProfiling     = flag.String("cpu_profiling", "", "Job records CPU 
profiles")
+       cpuProfiling     = flag.String("cpu_profiling", "", "Job records CPU 
profiles to this GCS location (optional)")
        sessionRecording = flag.String("session_recording", "", "Job records 
session transcripts")
 )
 
 func init() {
        // Note that we also _ import harness/init to setup the remote 
execution hook.
        beam.RegisterRunner("dataflow", Execute)
+
+       perf.RegisterProfCaptureHook("gcs_profile_writer", gcsRecorderHook)
 }
 
 type dataflowOptions struct {
-       Options     map[string]string `json:"options"`
-       PipelineURL string            `json:"pipelineUrl"`
+       PipelineURL string `json:"pipelineUrl"`
 }
 
 // Execute runs the given pipeline on Google Cloud Dataflow. It uses the
@@ -90,15 +94,20 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        }
 
        if *cpuProfiling != "" {
-               beam.PipelineOptions.Set("cpu_profiling", "true")
-               beam.PipelineOptions.Set("storage_path", 
"/var/opt/google/traces")
+               perf.EnableProfCaptureHook("gcs_profile_writer", *cpuProfiling)
        }
 
        if *sessionRecording != "" {
-               beam.PipelineOptions.Set("session_recording", "true")
-               beam.PipelineOptions.Set("storage_path", 
"/var/opt/google/traces")
+               // TODO(wcn): BEAM-4017
+               // It's a bit inconvenient for GCS because the whole object is 
written in
+               // one pass, whereas the session logs are constantly appended. 
We wouldn't
+               // want to hold all the logs in memory to flush at the end of 
the pipeline
+               // as we'd blow out memory on the worker. The implementation of 
the
+               // CaptureHook should create an internal buffer and write 
chunks out to GCS
+               // once they get to an appropriate size (50M or so?)
        }
 
+       hooks.SerializeHooksToOptions()
        options := beam.PipelineOptions.Export()
 
        // (1) Upload Go binary and model to GCS.
@@ -147,9 +156,9 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                        SdkPipelineOptions: newMsg(pipelineOptions{
                                DisplayData: findPipelineFlags(),
                                Options: dataflowOptions{
-                                       Options:     options.Options,
                                        PipelineURL: modelURL,
                                },
+                               GoOptions: options,
                        }),
                        WorkerPools: []*df.WorkerPool{{
                                Kind: "harness",
@@ -317,3 +326,18 @@ func printJob(ctx context.Context, job *df.Job) {
        }
        log.Info(ctx, string(str))
 }
+
+func gcsRecorderHook(opts []string) perf.CaptureHook {
+       bucket, prefix, err := gcsx.ParseObject(opts[0])
+       if err != nil {
+               panic(fmt.Sprintf("Invalid hook configuration for 
gcsRecorderHook: %s", opts))
+       }
+
+       return func(ctx context.Context, spec string, r io.Reader) error {
+               client, err := gcsx.NewClient(ctx, 
storage.DevstorageReadWriteScope)
+               if err != nil {
+                       return fmt.Errorf("couldn't establish GCS client: %v", 
err)
+               }
+               return gcsx.WriteObject(client, bucket, path.Join(prefix, 
spec), r)
+       }
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/messages.go 
b/sdks/go/pkg/beam/runners/dataflow/messages.go
index e4abea67ac7..80fcebdadf0 100644
--- a/sdks/go/pkg/beam/runners/dataflow/messages.go
+++ b/sdks/go/pkg/beam/runners/dataflow/messages.go
@@ -19,6 +19,7 @@ import (
        "encoding/json"
        "fmt"
 
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        "google.golang.org/api/googleapi"
 )
@@ -34,8 +35,9 @@ func newMsg(msg interface{}) googleapi.RawMessage {
 
 // pipelineOptions models Job/Environment/SdkPipelineOptions
 type pipelineOptions struct {
-       DisplayData []*displayData `json:"display_data,omitempty"`
-       Options     interface{}    `json:"options,omitempty"`
+       DisplayData []*displayData     `json:"display_data,omitempty"`
+       Options     interface{}        `json:"options,omitempty"`
+       GoOptions   runtime.RawOptions 
`json:"beam:option:go_options:v1,omitempty"`
 }
 
 // NOTE(herohde) 2/9/2017: most of the v1b3 messages are weakly-typed json
diff --git a/sdks/go/pkg/beam/runners/session/session.go 
b/sdks/go/pkg/beam/runners/session/session.go
index b123466dcfc..aa8ae1ec899 100644
--- a/sdks/go/pkg/beam/runners/session/session.go
+++ b/sdks/go/pkg/beam/runners/session/session.go
@@ -51,7 +51,7 @@ func init() {
 
 var sessionFile = flag.String("session_file", "", "Session file for the 
runner")
 
-// controlServer manages the Fn API control channel.
+// controlServer manages the FnAPI control channel.
 type controlServer struct {
        filename   string
        wg         *sync.WaitGroup // used to signal when the session is 
completed
@@ -207,7 +207,7 @@ func extractPortSpec(spec *rapi_pb.FunctionSpec) string {
        panic("unable to extract port")
 }
 
-// dataServer manages the Fn API data channel.
+// dataServer manages the FnAPI data channel.
 type dataServer struct {
        ctrl *controlServer
 }
@@ -235,7 +235,7 @@ func (d *dataServer) Data(stream 
fnapi_pb.BeamFnData_DataServer) error {
        }
 }
 
-// loggingServer manages the Fn API logging channel.
+// loggingServer manages the FnAPI logging channel.
 type loggingServer struct{} // no data content
 
 func (l *loggingServer) Logging(stream fnapi_pb.BeamFnLogging_LoggingServer) 
error {
diff --git a/sdks/go/pkg/beam/util/grpcx/hook.go 
b/sdks/go/pkg/beam/util/grpcx/hook.go
new file mode 100644
index 00000000000..6978915aaa6
--- /dev/null
+++ b/sdks/go/pkg/beam/util/grpcx/hook.go
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcx
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
+       "google.golang.org/grpc"
+)
+
+// Hook allow a runner to customize various aspects of gRPC
+// communication with the FnAPI harness. Each member of the struct
+// is optional; the default behavior will be used if a value is not
+// supplied.
+type Hook struct {
+       // Dialer allows the runner to customize the gRPC dialing behavior.
+       Dialer func(context.Context, string, time.Duration) (*grpc.ClientConn, 
error)
+       // TODO(wcn): expose other hooks here.
+}
+
+type HookFactory func([]string) Hook
+
+var hookRegistry = make(map[string]HookFactory)
+
+// RegisterHook registers a HookFactory for the
+// supplied identifier. It panics if the same identifier is
+// registered twice.
+func RegisterHook(name string, c HookFactory) {
+       if _, exists := hookRegistry[name]; exists {
+               panic(fmt.Sprintf("grpc.Hook: %s registered twice", name))
+       }
+       hookRegistry[name] = c
+
+       hf := func(opts []string) hooks.Hook {
+               return hooks.Hook{
+                       Init: func(_ context.Context) error {
+                               if len(opts) == 0 {
+                                       return nil
+                               }
+
+                               name, opts := hooks.Decode(opts[0])
+                               grpcHook := hookRegistry[name](opts)
+                               if grpcHook.Dialer != nil {
+                                       Dial = grpcHook.Dialer
+                               }
+                               return nil
+                       },
+               }
+       }
+       hooks.RegisterHook("grpc", hf)
+}
+
+// EnableHook is called to request the use of the gRPC
+// hook in a pipeline.
+func EnableHook(name string, opts ...string) {
+       _, exists := hookRegistry[name]
+       if !exists {
+               panic(fmt.Sprintf("EnableHook: %s not registered", name))
+       }
+       // Only one hook can be enabled. If the pipeline has two conflicting 
views about how to use gRPC
+       // that won't end well.
+       if exists, opts := hooks.IsEnabled("grpc"); exists {
+               n, _ := hooks.Decode(opts[0])
+               if n != name {
+                       panic(fmt.Sprintf("EnableHook: can't enable hook %s, 
hook %s already enabled", name, n))
+               }
+       }
+
+       hooks.EnableHook("grpc", hooks.Encode(name, opts))
+}
diff --git a/sdks/go/pkg/beam/x/hooks/perf/perf.go 
b/sdks/go/pkg/beam/x/hooks/perf/perf.go
new file mode 100644
index 00000000000..93fd6118459
--- /dev/null
+++ b/sdks/go/pkg/beam/x/hooks/perf/perf.go
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package perf is to add performance measuring hooks to a runner, such as 
cpu, or trace profiles.
+package perf
+
+import (
+       "bytes"
+       "context"
+       "fmt"
+       "io"
+       "runtime/pprof"
+       "runtime/trace"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
+       fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+// CaptureHook is used by the harness to have the runner
+// persist a trace record with the supplied name and comment.
+// The type of trace can be determined by the prefix of the string.
+//
+// * prof: A profile compatible with traces produced by runtime/pprof
+// * trace: A trace compatible with traces produced by runtime/trace
+type CaptureHook func(context.Context, string, io.Reader) error
+
+// CaptureHookFactory creates a CaptureHook from the supplied options.
+type CaptureHookFactory func([]string) CaptureHook
+
+var profCaptureHookRegistry = make(map[string]CaptureHookFactory)
+
+var enabledProfCaptureHooks []string
+
+func init() {
+       hf := func(opts []string) hooks.Hook {
+               enabledProfCaptureHooks = opts
+               enabled := len(enabledProfCaptureHooks) > 0
+               var cpuProfBuf bytes.Buffer
+               return hooks.Hook{
+                       Req: func(_ context.Context, _ 
*fnpb.InstructionRequest) error {
+                               if !enabled {
+                                       return nil
+                               }
+                               cpuProfBuf.Reset()
+                               return pprof.StartCPUProfile(&cpuProfBuf)
+                       },
+                       Resp: func(ctx context.Context, req 
*fnpb.InstructionRequest, _ *fnpb.InstructionResponse) error {
+                               if !enabled {
+                                       return nil
+                               }
+                               pprof.StopCPUProfile()
+                               for _, h := range enabledProfCaptureHooks {
+                                       name, opts := hooks.Decode(h)
+                                       if err := 
profCaptureHookRegistry[name](opts)(ctx, fmt.Sprintf("prof%s", 
req.InstructionId), &cpuProfBuf); err != nil {
+                                               return err
+                                       }
+                               }
+                               return nil
+                       },
+               }
+       }
+       hooks.RegisterHook("prof", hf)
+
+       hf = func(opts []string) hooks.Hook {
+               var traceProfBuf bytes.Buffer
+               enabledTraceCaptureHooks = opts
+               enabled := len(enabledTraceCaptureHooks) > 0
+               return hooks.Hook{
+                       Req: func(_ context.Context, _ 
*fnpb.InstructionRequest) error {
+                               if !enabled {
+                                       return nil
+                               }
+                               traceProfBuf.Reset()
+                               return trace.Start(&traceProfBuf)
+                       },
+                       Resp: func(ctx context.Context, req 
*fnpb.InstructionRequest, _ *fnpb.InstructionResponse) error {
+                               if !enabled {
+                                       return nil
+                               }
+                               trace.Stop()
+                               for _, h := range enabledTraceCaptureHooks {
+                                       name, opts := hooks.Decode(h)
+                                       if err := 
traceCaptureHookRegistry[name](opts)(ctx, fmt.Sprintf("trace_prof%s", 
req.InstructionId), &traceProfBuf); err != nil {
+                                               return err
+                                       }
+                               }
+                               return nil
+                       },
+               }
+       }
+       hooks.RegisterHook("trace", hf)
+}
+
+// RegisterProfCaptureHook registers a CaptureHookFactory for the
+// supplied identifier. It panics if the same identifier is
+// registered twice.
+func RegisterProfCaptureHook(name string, c CaptureHookFactory) {
+       if _, exists := profCaptureHookRegistry[name]; exists {
+               panic(fmt.Sprintf("RegisterProfCaptureHook: %s registered 
twice", name))
+       }
+       profCaptureHookRegistry[name] = c
+}
+
+// EnableProfCaptureHook actives a registered profile capture hook for a given 
pipeline.
+func EnableProfCaptureHook(name string, opts ...string) {
+       _, exists := profCaptureHookRegistry[name]
+       if !exists {
+               panic(fmt.Sprintf("EnableProfCaptureHook: %s not registered", 
name))
+       }
+
+       enc := hooks.Encode(name, opts)
+
+       for i, h := range enabledProfCaptureHooks {
+               n, _ := hooks.Decode(h)
+               if h == n {
+                       // Rewrite the registration with the current arguments
+                       enabledProfCaptureHooks[i] = enc
+                       hooks.EnableHook("prof", enabledProfCaptureHooks...)
+                       return
+               }
+       }
+
+       enabledProfCaptureHooks = append(enabledProfCaptureHooks, enc)
+       hooks.EnableHook("prof", enabledProfCaptureHooks...)
+}
+
+var traceCaptureHookRegistry = make(map[string]CaptureHookFactory)
+var enabledTraceCaptureHooks []string
+
+// RegisterTraceCaptureHook registers a CaptureHookFactory for the
+// supplied identifier. It panics if the same identifier is
+// registered twice.
+func RegisterTraceCaptureHook(name string, c CaptureHookFactory) {
+       if _, exists := traceCaptureHookRegistry[name]; exists {
+               panic(fmt.Sprintf("RegisterTraceCaptureHook: %s registered 
twice", name))
+       }
+       traceCaptureHookRegistry[name] = c
+}
+
+// EnableTraceCaptureHook actives a registered profile capture hook for a 
given pipeline.
+func EnableTraceCaptureHook(name string, opts ...string) {
+       if _, exists := traceCaptureHookRegistry[name]; !exists {
+               panic(fmt.Sprintf("EnableTraceCaptureHook: %s not registered", 
name))
+       }
+
+       enc := hooks.Encode(name, opts)
+       for i, h := range enabledTraceCaptureHooks {
+               n, _ := hooks.Decode(h)
+               if h == n {
+                       // Rewrite the registration with the current arguments
+                       enabledTraceCaptureHooks[i] = enc
+                       hooks.EnableHook("trace", enabledTraceCaptureHooks...)
+                       return
+               }
+       }
+       enabledTraceCaptureHooks = append(enabledTraceCaptureHooks, enc)
+       hooks.EnableHook("trace", enabledTraceCaptureHooks...)
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 88560)
    Time Spent: 9h 50m  (was: 9h 40m)

> Make Go SDK runtime harness hooks pluggable
> -------------------------------------------
>
>                 Key: BEAM-3355
>                 URL: https://issues.apache.org/jira/browse/BEAM-3355
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-go
>            Reporter: Henning Rohde
>            Assignee: Bill Neubauer
>            Priority: Minor
>          Time Spent: 9h 50m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to