[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88560=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88560
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 18:38
Start Date: 06/Apr/18 18:38
Worklog Time Spent: 10m 
  Work Description: lukecwik closed pull request #4311: [BEAM-3355] 
Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index 5e545c550f9..f8efffece71 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -17,16 +17,14 @@
 package harness
 
 import (
-   "bytes"
"context"
"fmt"
"io"
-   "io/ioutil"
-   "runtime/pprof"
"sync"
"time"
 
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
"github.com/apache/beam/sdks/go/pkg/beam/log"
fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
"github.com/apache/beam/sdks/go/pkg/beam/util/grpcx"
@@ -37,12 +35,13 @@ import (
 // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a 
plugin).
 
 // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
-// "pipeline-construction time" -- on each worker. It is a Fn API client and
+// "pipeline-construction time" -- on each worker. It is a FnAPI client and
 // ultimately responsible for correctly executing user code.
 func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
-   setupRemoteLogging(ctx, loggingEndpoint)
-   setupDiagnosticRecording()
+   hooks.DeserializeHooksFromOptions()
 
+   hooks.RunInitHooks(ctx)
+   setupRemoteLogging(ctx, loggingEndpoint)
recordHeader()
 
// Connect to FnAPI control server. Receive and execute work.
@@ -87,8 +86,6 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
data:   {},
}
 
-   var cpuProfBuf bytes.Buffer
-
// gRPC requires all readers of a stream be the same goroutine, so this 
goroutine
// is responsible for managing the network data. All it does is pull 
data from
// the stream, and hand off the message to a goroutine to actually be 
handled,
@@ -112,18 +109,10 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
log.Debugf(ctx, "RECV: %v", 
proto.MarshalTextString(req))
recordInstructionRequest(req)
 
-   if isEnabled("cpu_profiling") {
-   cpuProfBuf.Reset()
-   pprof.StartCPUProfile()
-   }
+   hooks.RunRequestHooks(ctx, req)
resp := ctrl.handleInstruction(ctx, req)
 
-   if isEnabled("cpu_profiling") {
-   pprof.StopCPUProfile()
-   if err := 
ioutil.WriteFile(fmt.Sprintf("%s/cpu_prof%s", storagePath, req.InstructionId), 
cpuProfBuf.Bytes(), 0644); err != nil {
-   log.Warnf(ctx, "Failed to write CPU 
profile for instruction %s: %v", req.InstructionId, err)
-   }
-   }
+   hooks.RunResponseHooks(ctx, req, resp)
 
recordInstructionResponse(resp)
if resp != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/harness/session.go 
b/sdks/go/pkg/beam/core/runtime/harness/session.go
index 67f577f5035..69a1d992786 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/session.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/session.go
@@ -16,13 +16,14 @@
 package harness
 
 import (
+   "context"
"fmt"
-   "os"
+   "io"
"sync"
-   "time"
 
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/session"
+   "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
"github.com/golang/protobuf/proto"
 )
@@ -35,12 +36,13 @@ const (
dataSend
 )
 
+// capture is set by the capture hook below.
+var capture io.WriteCloser
+
 var (
selectedOptions = make(map[string]bool)
-   // TODO(wcn): add a buffered writer around capture and use it.
- 

[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88547=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88547
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 18:25
Start Date: 06/Apr/18 18:25
Worklog Time Spent: 10m 
  Work Description: herohde commented on issue #4311: [BEAM-3355] 
Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#issuecomment-379337077
 
 
   Thanks.
   
   I meant a simple example pipeline I can run on both Dataflow and Flink and 
that illustrates how the hook mechanisms should be used. I'm fine if that being 
added separately and not have it block this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88547)
Time Spent: 9h 40m  (was: 9.5h)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88545=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88545
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 18:16
Start Date: 06/Apr/18 18:16
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179836738
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/harness/session.go
 ##
 @@ -213,3 +186,59 @@ func recordFooter() error {
},
})
 }
+
+// CaptureHook writes the messaging content consumed and
+// produced by the worker, allowing the data to be used as
+// an input for the session runner. Since workers can exist
+// in a variety of environments, this allows the runner
+// to tailor the behavior best for its particular needs.
+type CaptureHook io.WriteCloser
 
 Review comment:
   I'm just saying that having 2 close layers both use the name Hook is 
somewhat confusing terminology. But I'm fine leaving that for now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88545)
Time Spent: 9.5h  (was: 9h 20m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88538=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88538
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 18:03
Start Date: 06/Apr/18 18:03
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179833473
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/harness/session.go
 ##
 @@ -213,3 +186,59 @@ func recordFooter() error {
},
})
 }
+
+// CaptureHook writes the messaging content consumed and
+// produced by the worker, allowing the data to be used as
+// an input for the session runner. Since workers can exist
+// in a variety of environments, this allows the runner
+// to tailor the behavior best for its particular needs.
+type CaptureHook io.WriteCloser
+
+// CaptureHookFactory produces a CaptureHook from the supplied
+// options.
+type CaptureHookFactory func([]string) CaptureHook
+
+var captureHookRegistry = make(map[string]CaptureHookFactory)
+var enabledCaptureHook string
+
+func init() {
+   hf := func(opts []string) hooks.Hook {
+   return hooks.Hook{
+   Init: func(_ context.Context) error {
+   if len(opts) > 0 {
+   name, opts := hooks.Decode(opts[0])
+   capture = 
captureHookRegistry[name](opts)
+   }
+   return nil
+   },
+   }
+   }
+
+   hooks.RegisterHook("session", hf)
+}
+
+// RegisterCaptureHook registers a CaptureHookFactory for the
+// supplied identifier.
+func RegisterCaptureHook(name string, c CaptureHookFactory) {
+   if _, exists := captureHookRegistry[name]; exists {
+   panic(fmt.Sprintf("RegisterSessionCaptureHook: %s registered 
twice", name))
+   }
+   captureHookRegistry[name] = c
+}
+
+// EnableCaptureHook is called to request the use of a hook in a pipeline.
+// It updates the supplied pipelines to capture this request.
+func EnableCaptureHook(name string, opts []string) {
+   if _, exists := captureHookRegistry[name]; !exists {
+   panic(fmt.Sprintf("EnableHook: %s not registered", name))
+   }
+   if enabledCaptureHook != "" {
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88538)
Time Spent: 9h 10m  (was: 9h)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 9h 10m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88539=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88539
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 18:03
Start Date: 06/Apr/18 18:03
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on issue #4311: [BEAM-3355] Diagnostic 
interfaces
URL: https://github.com/apache/beam/pull/4311#issuecomment-379331297
 
 
   PTAL all comments addressed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88539)
Time Spent: 9h 20m  (was: 9h 10m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 9h 20m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88537=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88537
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 18:01
Start Date: 06/Apr/18 18:01
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179833157
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/harness/session.go
 ##
 @@ -213,3 +186,59 @@ func recordFooter() error {
},
})
 }
+
+// CaptureHook writes the messaging content consumed and
+// produced by the worker, allowing the data to be used as
+// an input for the session runner. Since workers can exist
+// in a variety of environments, this allows the runner
+// to tailor the behavior best for its particular needs.
+type CaptureHook io.WriteCloser
+
+// CaptureHookFactory produces a CaptureHook from the supplied
+// options.
+type CaptureHookFactory func([]string) CaptureHook
+
+var captureHookRegistry = make(map[string]CaptureHookFactory)
+var enabledCaptureHook string
+
+func init() {
+   hf := func(opts []string) hooks.Hook {
+   return hooks.Hook{
+   Init: func(_ context.Context) error {
+   if len(opts) > 0 {
+   name, opts := hooks.Decode(opts[0])
+   capture = 
captureHookRegistry[name](opts)
+   }
+   return nil
+   },
+   }
+   }
+
+   hooks.RegisterHook("session", hf)
+}
+
+// RegisterCaptureHook registers a CaptureHookFactory for the
+// supplied identifier.
+func RegisterCaptureHook(name string, c CaptureHookFactory) {
+   if _, exists := captureHookRegistry[name]; exists {
+   panic(fmt.Sprintf("RegisterSessionCaptureHook: %s registered 
twice", name))
+   }
+   captureHookRegistry[name] = c
+}
+
+// EnableCaptureHook is called to request the use of a hook in a pipeline.
+// It updates the supplied pipelines to capture this request.
+func EnableCaptureHook(name string, opts []string) {
+   if _, exists := captureHookRegistry[name]; !exists {
+   panic(fmt.Sprintf("EnableHook: %s not registered", name))
+   }
+   if enabledCaptureHook != "" {
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88537)
Time Spent: 9h  (was: 8h 50m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 9h
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88533=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88533
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:44
Start Date: 06/Apr/18 17:44
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179828479
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package hooks allows runners to tailor execution of the worker harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
+   Init InitHook
+   // Req is called each time the worker handles a FnAPI instruction 
request.
+   Req RequestHook
+   // Resp is called each time the worker generates a FnAPI instruction 
response.
+   Resp ResponseHook
+}
+
+// InitHook is a hook that is called when the harness
+// initializes.
+type InitHook func(context.Context) error
+
+// HookFactory is a function that produces a Hook from the supplied arguments.
+type HookFactory func([]string) Hook
+
+// RegisterHook registers a Hook for the
+// supplied identifier.
+func RegisterHook(name string, h HookFactory) {
+   hookRegistry[name] = h
+}
+
+// RunInitHooks runs the init hooks.
+func RunInitHooks(ctx context.Context) error {
+   // If an init hook fails to complete, the invariants of the
+   // system are compromised and we can't run a workflow.
+   // The hooks can run in any order. They should not be
+   // interdependent or interfere with each other.
+   for _, h := range activeHooks {
+   if h.Init != nil {
+   if err := h.Init(ctx); err != nil {
+   return err
+   }
+   }
+   }
+   return nil
+}
+
+// RequestHook is called when handling a FnAPI instruction.
+type RequestHook func(context.Context, *fnpb.InstructionRequest) error
+
+// RunRequestHooks runs the hooks that handle a FnAPI request.
+func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) {
+   // The request hooks should not modify the request.
+   // TODO(wcn): pass the request by value to enforce? That's a perf hit.
+   // I'd rather trust users to do the right thing.
+   for n, h := range activeHooks {
+   if h.Req != nil {
+   if err := h.Req(ctx, req); err != nil {
+   log.Infof(ctx, "request hook %s failed: %v", n, 
err)
+   }
+   }
+   }
+}
+
+// ResponseHook is called when sending a FnAPI instruction response.
+type ResponseHook func(context.Context, *fnpb.InstructionRequest, 
*fnpb.InstructionResponse) error
+
+// RunResponseHooks runs the hooks that handle a FnAPI response.
+func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp 
*fnpb.InstructionResponse) {
+   for n, h := range activeHooks {
+   if h.Resp != nil {
+   if err := h.Resp(ctx, req, 

[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88530=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88530
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:40
Start Date: 06/Apr/18 17:40
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on issue #4311: [BEAM-3355] Diagnostic 
interfaces
URL: https://github.com/apache/beam/pull/4311#issuecomment-379325307
 
 
   There's an example of hooking profiling in the Dataflow runner.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88530)
Time Spent: 8h 40m  (was: 8.5h)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88529=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88529
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:39
Start Date: 06/Apr/18 17:39
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179827403
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/harness/session.go
 ##
 @@ -213,3 +186,59 @@ func recordFooter() error {
},
})
 }
+
+// CaptureHook writes the messaging content consumed and
+// produced by the worker, allowing the data to be used as
+// an input for the session runner. Since workers can exist
+// in a variety of environments, this allows the runner
+// to tailor the behavior best for its particular needs.
+type CaptureHook io.WriteCloser
 
 Review comment:
   It's a hook that happens to be a writer. It's the hook into the capture 
facility, and all the APIs are talking about 'hooks' not 'writers' so this 
suggestion feels incorrectly motivated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88529)
Time Spent: 8.5h  (was: 8h 20m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 8.5h
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88528=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88528
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:37
Start Date: 06/Apr/18 17:37
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179826856
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package hooks allows runners to tailor execution of the worker harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88528)
Time Spent: 8h 20m  (was: 8h 10m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88527=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88527
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:36
Start Date: 06/Apr/18 17:36
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179826558
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package hooks allows runners to tailor execution of the worker harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
+   Init InitHook
+   // Req is called each time the worker handles a FnAPI instruction 
request.
+   Req RequestHook
+   // Resp is called each time the worker generates a FnAPI instruction 
response.
+   Resp ResponseHook
+}
+
+// InitHook is a hook that is called when the harness
+// initializes.
+type InitHook func(context.Context) error
+
+// HookFactory is a function that produces a Hook from the supplied arguments.
+type HookFactory func([]string) Hook
+
+// RegisterHook registers a Hook for the
+// supplied identifier.
+func RegisterHook(name string, h HookFactory) {
+   hookRegistry[name] = h
+}
+
+// RunInitHooks runs the init hooks.
+func RunInitHooks(ctx context.Context) error {
+   // If an init hook fails to complete, the invariants of the
+   // system are compromised and we can't run a workflow.
+   // The hooks can run in any order. They should not be
+   // interdependent or interfere with each other.
+   for _, h := range activeHooks {
+   if h.Init != nil {
+   if err := h.Init(ctx); err != nil {
+   return err
+   }
+   }
+   }
+   return nil
+}
+
+// RequestHook is called when handling a FnAPI instruction.
+type RequestHook func(context.Context, *fnpb.InstructionRequest) error
+
+// RunRequestHooks runs the hooks that handle a FnAPI request.
+func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) {
+   // The request hooks should not modify the request.
+   // TODO(wcn): pass the request by value to enforce? That's a perf hit.
 
 Review comment:
   OK.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88527)
Time Spent: 8h 10m  (was: 8h)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill 

[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88526=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88526
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:36
Start Date: 06/Apr/18 17:36
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179826460
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package hooks allows runners to tailor execution of the worker harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
+   Init InitHook
+   // Req is called each time the worker handles a FnAPI instruction 
request.
+   Req RequestHook
+   // Resp is called each time the worker generates a FnAPI instruction 
response.
+   Resp ResponseHook
+}
+
+// InitHook is a hook that is called when the harness
+// initializes.
+type InitHook func(context.Context) error
+
+// HookFactory is a function that produces a Hook from the supplied arguments.
+type HookFactory func([]string) Hook
+
+// RegisterHook registers a Hook for the
+// supplied identifier.
+func RegisterHook(name string, h HookFactory) {
+   hookRegistry[name] = h
+}
+
+// RunInitHooks runs the init hooks.
+func RunInitHooks(ctx context.Context) error {
+   // If an init hook fails to complete, the invariants of the
+   // system are compromised and we can't run a workflow.
+   // The hooks can run in any order. They should not be
+   // interdependent or interfere with each other.
+   for _, h := range activeHooks {
+   if h.Init != nil {
+   if err := h.Init(ctx); err != nil {
+   return err
+   }
+   }
+   }
+   return nil
+}
+
+// RequestHook is called when handling a FnAPI instruction.
+type RequestHook func(context.Context, *fnpb.InstructionRequest) error
+
+// RunRequestHooks runs the hooks that handle a FnAPI request.
+func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) {
+   // The request hooks should not modify the request.
+   // TODO(wcn): pass the request by value to enforce? That's a perf hit.
+   // I'd rather trust users to do the right thing.
+   for n, h := range activeHooks {
+   if h.Req != nil {
+   if err := h.Req(ctx, req); err != nil {
+   log.Infof(ctx, "request hook %s failed: %v", n, 
err)
+   }
+   }
+   }
+}
+
+// ResponseHook is called when sending a FnAPI instruction response.
+type ResponseHook func(context.Context, *fnpb.InstructionRequest, 
*fnpb.InstructionResponse) error
+
+// RunResponseHooks runs the hooks that handle a FnAPI response.
+func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp 
*fnpb.InstructionResponse) {
+   for n, h := range activeHooks {
+   if h.Resp != nil {
+   if err := h.Resp(ctx, req, 

[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88525=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88525
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:35
Start Date: 06/Apr/18 17:35
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179826372
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package hooks allows runners to tailor execution of the worker harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
+   Init InitHook
+   // Req is called each time the worker handles a FnAPI instruction 
request.
+   Req RequestHook
+   // Resp is called each time the worker generates a FnAPI instruction 
response.
+   Resp ResponseHook
+}
+
+// InitHook is a hook that is called when the harness
+// initializes.
+type InitHook func(context.Context) error
+
+// HookFactory is a function that produces a Hook from the supplied arguments.
+type HookFactory func([]string) Hook
+
+// RegisterHook registers a Hook for the
+// supplied identifier.
+func RegisterHook(name string, h HookFactory) {
+   hookRegistry[name] = h
+}
+
+// RunInitHooks runs the init hooks.
+func RunInitHooks(ctx context.Context) error {
+   // If an init hook fails to complete, the invariants of the
+   // system are compromised and we can't run a workflow.
+   // The hooks can run in any order. They should not be
+   // interdependent or interfere with each other.
+   for _, h := range activeHooks {
+   if h.Init != nil {
+   if err := h.Init(ctx); err != nil {
+   return err
+   }
+   }
+   }
+   return nil
+}
+
+// RequestHook is called when handling a FnAPI instruction.
+type RequestHook func(context.Context, *fnpb.InstructionRequest) error
+
+// RunRequestHooks runs the hooks that handle a FnAPI request.
+func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) {
+   // The request hooks should not modify the request.
+   // TODO(wcn): pass the request by value to enforce? That's a perf hit.
+   // I'd rather trust users to do the right thing.
+   for n, h := range activeHooks {
+   if h.Req != nil {
+   if err := h.Req(ctx, req); err != nil {
+   log.Infof(ctx, "request hook %s failed: %v", n, 
err)
+   }
+   }
+   }
+}
+
+// ResponseHook is called when sending a FnAPI instruction response.
+type ResponseHook func(context.Context, *fnpb.InstructionRequest, 
*fnpb.InstructionResponse) error
+
+// RunResponseHooks runs the hooks that handle a FnAPI response.
+func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp 
*fnpb.InstructionResponse) {
+   for n, h := range activeHooks {
+   if h.Resp != nil {
+   if err := h.Resp(ctx, req, 

[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88524=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88524
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:34
Start Date: 06/Apr/18 17:34
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179826078
 
 

 ##
 File path: sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go
 ##
 @@ -924,7 +924,7 @@ type PTransform struct {
//
Outputs map[string]string `protobuf:"bytes,4,rep,name=outputs" 
json:"outputs,omitempty" protobuf_key:"bytes,1,opt,name=key" 
protobuf_val:"bytes,2,opt,name=value"`
// (Optional) Static display data for this PTransform application. If
-   // there is none, or it is not relevant (such as use by the Fn API)
 
 Review comment:
   Reverted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88524)
Time Spent: 7h 40m  (was: 7.5h)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 7h 40m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88520=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88520
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:28
Start Date: 06/Apr/18 17:28
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179824381
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
 ##
 @@ -37,12 +35,13 @@ import (
 // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a 
plugin).
 
 // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
-// "pipeline-construction time" -- on each worker. It is a Fn API client and
+// "pipeline-construction time" -- on each worker. It is a FnAPI client and
 // ultimately responsible for correctly executing user code.
 func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
-   setupRemoteLogging(ctx, loggingEndpoint)
-   setupDiagnosticRecording()
+   hooks.DeserializeHooks()
 
+   hooks.RunInitHooks(ctx)
+   setupRemoteLogging(ctx, loggingEndpoint)
recordHeader()
 
 Review comment:
   No, that needs more work. BEAM-4017


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88520)
Time Spent: 7.5h  (was: 7h 20m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 7.5h
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88518=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88518
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:27
Start Date: 06/Apr/18 17:27
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179824037
 
 

 ##
 File path: sdks/go/pkg/beam/runners/dataflow/dataflow.go
 ##
 @@ -90,15 +94,20 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
}
 
if *cpuProfiling != "" {
-   beam.PipelineOptions.Set("cpu_profiling", "true")
-   beam.PipelineOptions.Set("storage_path", 
"/var/opt/google/traces")
+   perf.EnableProfCaptureHook("gcs_profile_writer", 
[]string{*cpuProfiling})
}
 
if *sessionRecording != "" {
-   beam.PipelineOptions.Set("session_recording", "true")
-   beam.PipelineOptions.Set("storage_path", 
"/var/opt/google/traces")
+   // TODO(wcn): implement this.
 
 Review comment:
   I actually did, I didn't fix the comment. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88518)
Time Spent: 7h 20m  (was: 7h 10m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 7h 20m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88508=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88508
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:25
Start Date: 06/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179815311
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/harness/session.go
 ##
 @@ -213,3 +186,59 @@ func recordFooter() error {
},
})
 }
+
+// CaptureHook writes the messaging content consumed and
+// produced by the worker, allowing the data to be used as
+// an input for the session runner. Since workers can exist
+// in a variety of environments, this allows the runner
+// to tailor the behavior best for its particular needs.
+type CaptureHook io.WriteCloser
 
 Review comment:
   nit: rename it CaptureWriter? I was confused when reading the code by it 
also called a hook.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88508)
Time Spent: 6h 10m  (was: 6h)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 6h 10m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88507=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88507
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:25
Start Date: 06/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179815875
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/harness/session.go
 ##
 @@ -213,3 +186,59 @@ func recordFooter() error {
},
})
 }
+
+// CaptureHook writes the messaging content consumed and
+// produced by the worker, allowing the data to be used as
+// an input for the session runner. Since workers can exist
+// in a variety of environments, this allows the runner
+// to tailor the behavior best for its particular needs.
+type CaptureHook io.WriteCloser
+
+// CaptureHookFactory produces a CaptureHook from the supplied
+// options.
+type CaptureHookFactory func([]string) CaptureHook
+
+var captureHookRegistry = make(map[string]CaptureHookFactory)
+var enabledCaptureHook string
+
+func init() {
+   hf := func(opts []string) hooks.Hook {
+   return hooks.Hook{
+   Init: func(_ context.Context) error {
+   if len(opts) > 0 {
+   name, opts := hooks.Decode(opts[0])
+   capture = 
captureHookRegistry[name](opts)
+   }
+   return nil
+   },
+   }
+   }
+
+   hooks.RegisterHook("session", hf)
+}
+
+// RegisterCaptureHook registers a CaptureHookFactory for the
+// supplied identifier.
+func RegisterCaptureHook(name string, c CaptureHookFactory) {
+   if _, exists := captureHookRegistry[name]; exists {
+   panic(fmt.Sprintf("RegisterSessionCaptureHook: %s registered 
twice", name))
+   }
+   captureHookRegistry[name] = c
+}
+
+// EnableCaptureHook is called to request the use of a hook in a pipeline.
+// It updates the supplied pipelines to capture this request.
+func EnableCaptureHook(name string, opts []string) {
+   if _, exists := captureHookRegistry[name]; !exists {
+   panic(fmt.Sprintf("EnableHook: %s not registered", name))
+   }
+   if enabledCaptureHook != "" {
 
 Review comment:
   Instead of keeping track if it manually, perhaps add an IsEnabled function 
in hooks to allow this check? Other hooks will have the same requirement of 
only allowing one instance.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88507)
Time Spent: 6h  (was: 5h 50m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 6h
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88511=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88511
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:25
Start Date: 06/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179819467
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package hooks allows runners to tailor execution of the worker harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
+   Init InitHook
+   // Req is called each time the worker handles a FnAPI instruction 
request.
+   Req RequestHook
+   // Resp is called each time the worker generates a FnAPI instruction 
response.
+   Resp ResponseHook
+}
+
+// InitHook is a hook that is called when the harness
+// initializes.
+type InitHook func(context.Context) error
+
+// HookFactory is a function that produces a Hook from the supplied arguments.
+type HookFactory func([]string) Hook
+
+// RegisterHook registers a Hook for the
+// supplied identifier.
+func RegisterHook(name string, h HookFactory) {
+   hookRegistry[name] = h
+}
+
+// RunInitHooks runs the init hooks.
+func RunInitHooks(ctx context.Context) error {
+   // If an init hook fails to complete, the invariants of the
+   // system are compromised and we can't run a workflow.
+   // The hooks can run in any order. They should not be
+   // interdependent or interfere with each other.
+   for _, h := range activeHooks {
+   if h.Init != nil {
+   if err := h.Init(ctx); err != nil {
+   return err
+   }
+   }
+   }
+   return nil
+}
+
+// RequestHook is called when handling a FnAPI instruction.
+type RequestHook func(context.Context, *fnpb.InstructionRequest) error
+
+// RunRequestHooks runs the hooks that handle a FnAPI request.
+func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) {
+   // The request hooks should not modify the request.
+   // TODO(wcn): pass the request by value to enforce? That's a perf hit.
+   // I'd rather trust users to do the right thing.
+   for n, h := range activeHooks {
+   if h.Req != nil {
+   if err := h.Req(ctx, req); err != nil {
+   log.Infof(ctx, "request hook %s failed: %v", n, 
err)
+   }
+   }
+   }
+}
+
+// ResponseHook is called when sending a FnAPI instruction response.
+type ResponseHook func(context.Context, *fnpb.InstructionRequest, 
*fnpb.InstructionResponse) error
+
+// RunResponseHooks runs the hooks that handle a FnAPI response.
+func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp 
*fnpb.InstructionResponse) {
+   for n, h := range activeHooks {
+   if h.Resp != nil {
+   if err := h.Resp(ctx, 

[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88512=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88512
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:25
Start Date: 06/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179823479
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package hooks allows runners to tailor execution of the worker harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
+   Init InitHook
+   // Req is called each time the worker handles a FnAPI instruction 
request.
+   Req RequestHook
+   // Resp is called each time the worker generates a FnAPI instruction 
response.
+   Resp ResponseHook
+}
+
+// InitHook is a hook that is called when the harness
+// initializes.
+type InitHook func(context.Context) error
+
+// HookFactory is a function that produces a Hook from the supplied arguments.
+type HookFactory func([]string) Hook
+
+// RegisterHook registers a Hook for the
+// supplied identifier.
+func RegisterHook(name string, h HookFactory) {
+   hookRegistry[name] = h
+}
+
+// RunInitHooks runs the init hooks.
+func RunInitHooks(ctx context.Context) error {
+   // If an init hook fails to complete, the invariants of the
+   // system are compromised and we can't run a workflow.
+   // The hooks can run in any order. They should not be
+   // interdependent or interfere with each other.
+   for _, h := range activeHooks {
+   if h.Init != nil {
+   if err := h.Init(ctx); err != nil {
+   return err
+   }
+   }
+   }
+   return nil
+}
+
+// RequestHook is called when handling a FnAPI instruction.
+type RequestHook func(context.Context, *fnpb.InstructionRequest) error
+
+// RunRequestHooks runs the hooks that handle a FnAPI request.
+func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) {
+   // The request hooks should not modify the request.
+   // TODO(wcn): pass the request by value to enforce? That's a perf hit.
+   // I'd rather trust users to do the right thing.
+   for n, h := range activeHooks {
+   if h.Req != nil {
+   if err := h.Req(ctx, req); err != nil {
+   log.Infof(ctx, "request hook %s failed: %v", n, 
err)
+   }
+   }
+   }
+}
+
+// ResponseHook is called when sending a FnAPI instruction response.
+type ResponseHook func(context.Context, *fnpb.InstructionRequest, 
*fnpb.InstructionResponse) error
+
+// RunResponseHooks runs the hooks that handle a FnAPI response.
+func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp 
*fnpb.InstructionResponse) {
+   for n, h := range activeHooks {
+   if h.Resp != nil {
+   if err := h.Resp(ctx, 

[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88514=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88514
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:25
Start Date: 06/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179819164
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package hooks allows runners to tailor execution of the worker harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
+   Init InitHook
+   // Req is called each time the worker handles a FnAPI instruction 
request.
+   Req RequestHook
+   // Resp is called each time the worker generates a FnAPI instruction 
response.
+   Resp ResponseHook
+}
+
+// InitHook is a hook that is called when the harness
+// initializes.
+type InitHook func(context.Context) error
+
+// HookFactory is a function that produces a Hook from the supplied arguments.
+type HookFactory func([]string) Hook
+
+// RegisterHook registers a Hook for the
+// supplied identifier.
+func RegisterHook(name string, h HookFactory) {
+   hookRegistry[name] = h
+}
+
+// RunInitHooks runs the init hooks.
+func RunInitHooks(ctx context.Context) error {
+   // If an init hook fails to complete, the invariants of the
+   // system are compromised and we can't run a workflow.
+   // The hooks can run in any order. They should not be
+   // interdependent or interfere with each other.
+   for _, h := range activeHooks {
+   if h.Init != nil {
+   if err := h.Init(ctx); err != nil {
+   return err
+   }
+   }
+   }
+   return nil
+}
+
+// RequestHook is called when handling a FnAPI instruction.
+type RequestHook func(context.Context, *fnpb.InstructionRequest) error
+
+// RunRequestHooks runs the hooks that handle a FnAPI request.
+func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) {
+   // The request hooks should not modify the request.
+   // TODO(wcn): pass the request by value to enforce? That's a perf hit.
 
 Review comment:
   I don't think the TODO is needed. We should do what we have here. Hooks are 
already "unsafe" IMO and we may want to allow modifications/filtering at some 
later point.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88514)
Time Spent: 6h 40m  (was: 6.5h)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: 

[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88510=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88510
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:25
Start Date: 06/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179817281
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package hooks allows runners to tailor execution of the worker harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
 
 Review comment:
   Perhaps mention explicitly that this is called before we set up logging or 
connect to the server.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88510)
Time Spent: 6.5h  (was: 6h 20m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88509=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88509
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:25
Start Date: 06/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179816955
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/harness/harness.go
 ##
 @@ -37,12 +35,13 @@ import (
 // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a 
plugin).
 
 // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
-// "pipeline-construction time" -- on each worker. It is a Fn API client and
+// "pipeline-construction time" -- on each worker. It is a FnAPI client and
 // ultimately responsible for correctly executing user code.
 func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
-   setupRemoteLogging(ctx, loggingEndpoint)
-   setupDiagnosticRecording()
+   hooks.DeserializeHooks()
 
+   hooks.RunInitHooks(ctx)
+   setupRemoteLogging(ctx, loggingEndpoint)
recordHeader()
 
 Review comment:
   Question: is session not being converted to a hook at this time?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88509)
Time Spent: 6h 20m  (was: 6h 10m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 6h 20m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88517=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88517
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:25
Start Date: 06/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179819835
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package hooks allows runners to tailor execution of the worker harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
+   Init InitHook
+   // Req is called each time the worker handles a FnAPI instruction 
request.
+   Req RequestHook
+   // Resp is called each time the worker generates a FnAPI instruction 
response.
+   Resp ResponseHook
+}
+
+// InitHook is a hook that is called when the harness
+// initializes.
+type InitHook func(context.Context) error
+
+// HookFactory is a function that produces a Hook from the supplied arguments.
+type HookFactory func([]string) Hook
+
+// RegisterHook registers a Hook for the
+// supplied identifier.
+func RegisterHook(name string, h HookFactory) {
+   hookRegistry[name] = h
+}
+
+// RunInitHooks runs the init hooks.
+func RunInitHooks(ctx context.Context) error {
+   // If an init hook fails to complete, the invariants of the
+   // system are compromised and we can't run a workflow.
+   // The hooks can run in any order. They should not be
+   // interdependent or interfere with each other.
+   for _, h := range activeHooks {
+   if h.Init != nil {
+   if err := h.Init(ctx); err != nil {
+   return err
+   }
+   }
+   }
+   return nil
+}
+
+// RequestHook is called when handling a FnAPI instruction.
+type RequestHook func(context.Context, *fnpb.InstructionRequest) error
+
+// RunRequestHooks runs the hooks that handle a FnAPI request.
+func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) {
+   // The request hooks should not modify the request.
+   // TODO(wcn): pass the request by value to enforce? That's a perf hit.
+   // I'd rather trust users to do the right thing.
+   for n, h := range activeHooks {
+   if h.Req != nil {
+   if err := h.Req(ctx, req); err != nil {
+   log.Infof(ctx, "request hook %s failed: %v", n, 
err)
+   }
+   }
+   }
+}
+
+// ResponseHook is called when sending a FnAPI instruction response.
+type ResponseHook func(context.Context, *fnpb.InstructionRequest, 
*fnpb.InstructionResponse) error
+
+// RunResponseHooks runs the hooks that handle a FnAPI response.
+func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp 
*fnpb.InstructionResponse) {
+   for n, h := range activeHooks {
+   if h.Resp != nil {
+   if err := h.Resp(ctx, 

[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88513=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88513
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:25
Start Date: 06/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179820369
 
 

 ##
 File path: sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go
 ##
 @@ -924,7 +924,7 @@ type PTransform struct {
//
Outputs map[string]string `protobuf:"bytes,4,rep,name=outputs" 
json:"outputs,omitempty" protobuf_key:"bytes,1,opt,name=key" 
protobuf_val:"bytes,2,opt,name=value"`
// (Optional) Static display data for this PTransform application. If
-   // there is none, or it is not relevant (such as use by the Fn API)
 
 Review comment:
   Is this from a new go generate or a search'n'replace? If the latter, we 
should make the change to the proto file or revert.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88513)
Time Spent: 6h 40m  (was: 6.5h)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 6h 40m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88515=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88515
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:25
Start Date: 06/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179821103
 
 

 ##
 File path: sdks/go/pkg/beam/runners/dataflow/dataflow.go
 ##
 @@ -90,15 +94,20 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
}
 
if *cpuProfiling != "" {
-   beam.PipelineOptions.Set("cpu_profiling", "true")
-   beam.PipelineOptions.Set("storage_path", 
"/var/opt/google/traces")
+   perf.EnableProfCaptureHook("gcs_profile_writer", 
[]string{*cpuProfiling})
}
 
if *sessionRecording != "" {
-   beam.PipelineOptions.Set("session_recording", "true")
-   beam.PipelineOptions.Set("storage_path", 
"/var/opt/google/traces")
+   // TODO(wcn): implement this.
 
 Review comment:
   Please open a JIRA.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88515)
Time Spent: 6h 50m  (was: 6h 40m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 6h 50m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-06 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88516=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88516
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 06/Apr/18 17:25
Start Date: 06/Apr/18 17:25
Worklog Time Spent: 10m 
  Work Description: herohde commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179820829
 
 

 ##
 File path: sdks/go/pkg/beam/runners/dataflow/dataflow.go
 ##
 @@ -147,9 +156,9 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
SdkPipelineOptions: newMsg(pipelineOptions{
DisplayData: findPipelineFlags(),
Options: dataflowOptions{
-   Options: options.Options,
PipelineURL: modelURL,
},
+   GoOptions: options,
 
 Review comment:
   Clever! Thanks for figuring out how to make this work.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88516)
Time Spent: 7h  (was: 6h 50m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 7h
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88249=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88249
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 05/Apr/18 22:07
Start Date: 05/Apr/18 22:07
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179614299
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Hooks allow runners to tailor execution of the worker to allow for 
customization
+// of features used by the harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
+   Init InitHook
+   // Req is called each time the worker handles a FnAPI instruction 
request.
+   Req RequestHook
+   // Resp is called each time the worker generates a FnAPI instruction 
response.
+   Resp ResponseHook
+}
+
+// InitHook is a hook that is called when the harness
+// initializes.
+type InitHook func(context.Context) error
+
+// HookFactory is a function that produces a Hook from the supplied arguments.
+type HookFactory func([]string) Hook
+
+// RegisterHook registers a Hook for the
+// supplied identifier.
+func RegisterHook(name string, h HookFactory) {
+   hookRegistry[name] = h
+}
+
+// RunInitHooks runs the init hooks.
+func RunInitHooks(ctx context.Context) error {
+   // If an init hook fails to complete, the invariants of the
+   // system are compromised and we can't run a workflow.
+   // The hooks can run in any order. They should not be
+   // interdependent or interfere with each other.
+   for _, h := range activeHooks {
+   if h.Init != nil {
+   if err := h.Init(ctx); err != nil {
+   return err
+   }
+   }
+   }
+   return nil
+}
+
+// RequestHook is called when handling a Fn API instruction.
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88249)
Time Spent: 5.5h  (was: 5h 20m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88250=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88250
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 05/Apr/18 22:07
Start Date: 05/Apr/18 22:07
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179614424
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Hooks allow runners to tailor execution of the worker to allow for 
customization
 
 Review comment:
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88250)
Time Spent: 5h 40m  (was: 5.5h)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 5h 40m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88247=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88247
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 05/Apr/18 22:05
Start Date: 05/Apr/18 22:05
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179614042
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Hooks allow runners to tailor execution of the worker to allow for 
customization
+// of features used by the harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
+   Init InitHook
+   // Req is called each time the worker handles a FnAPI instruction 
request.
+   Req RequestHook
+   // Resp is called each time the worker generates a FnAPI instruction 
response.
+   Resp ResponseHook
+}
+
+// InitHook is a hook that is called when the harness
+// initializes.
+type InitHook func(context.Context) error
+
+// HookFactory is a function that produces a Hook from the supplied arguments.
+type HookFactory func([]string) Hook
+
+// RegisterHook registers a Hook for the
+// supplied identifier.
+func RegisterHook(name string, h HookFactory) {
+   hookRegistry[name] = h
+}
+
+// RunInitHooks runs the init hooks.
+func RunInitHooks(ctx context.Context) error {
+   // If an init hook fails to complete, the invariants of the
+   // system are compromised and we can't run a workflow.
+   // The hooks can run in any order. They should not be
+   // interdependent or interfere with each other.
+   for _, h := range activeHooks {
+   if h.Init != nil {
+   if err := h.Init(ctx); err != nil {
+   return err
+   }
+   }
+   }
+   return nil
+}
+
+// RequestHook is called when handling a Fn API instruction.
+type RequestHook func(context.Context, *fnpb.InstructionRequest) error
+
+// RunRequestHooks runs the hooks that handle a FnAPI request.
+func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) {
+   // The request hooks should not modify the request.
+   // TODO(wcn): pass the request by value to enforce? That's a perf hit.
+   // I'd rather trust users to do the right thing.
+   for n, h := range activeHooks {
+   if h.Req != nil {
+   if err := h.Req(ctx, req); err != nil {
+   log.Infof(ctx, "request hook %s failed: %v", n, 
err)
+   }
+   }
+   }
+}
+
+// ResponseHook is called when sending a Fn API instruction response.
+type ResponseHook func(context.Context, *fnpb.InstructionRequest, 
*fnpb.InstructionResponse) error
+
+// RunResponseHooks runs the hooks that handle a FnAPI response.
+func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp 
*fnpb.InstructionResponse) {
+   for n, h := range activeHooks {
+   if h.Resp != nil {

[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88194=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88194
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 05/Apr/18 18:46
Start Date: 05/Apr/18 18:46
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179564270
 
 

 ##
 File path: sdks/go/pkg/beam/core/runtime/harness/session.go
 ##
 @@ -213,3 +186,59 @@ func recordFooter() error {
},
})
 }
+
+// CaptureHook writes the messaging content consumed and
+// produced by the worker, allowing the data to be used as
+// an input for the session runner. Since workers can exist
+// in a variety of environments, this allows the runner
+// to tailor the behavior best for its particular needs.
+type CaptureHook io.WriteCloser
+
+// CaptureHookFactory produces a CaptureHook from the supplied
+// options.
+type CaptureHookFactory func([]string) CaptureHook
+
+var captureHookRegistry = make(map[string]CaptureHookFactory)
+var enabledCaptureHook string
+
+func init() {
+   hf := func(opts []string) hooks.Hook {
+   return hooks.Hook{
+   Init: func(_ context.Context) error {
 
 Review comment:
   
   As implemented, all capture hooks run at worker initialization time.
   
   Is there any further implementation needed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88194)
Time Spent: 5h  (was: 4h 50m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 5h
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88191=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88191
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 05/Apr/18 18:46
Start Date: 05/Apr/18 18:46
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179564264
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Hooks allow runners to tailor execution of the worker to allow for 
customization
+// of features used by the harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
+   Init InitHook
+   // Req is called each time the worker handles a FnAPI instruction 
request.
+   Req RequestHook
+   // Resp is called each time the worker generates a FnAPI instruction 
response.
+   Resp ResponseHook
+}
+
+// InitHook is a hook that is called when the harness
+// initializes.
+type InitHook func(context.Context) error
+
+// HookFactory is a function that produces a Hook from the supplied arguments.
+type HookFactory func([]string) Hook
+
+// RegisterHook registers a Hook for the
+// supplied identifier.
+func RegisterHook(name string, h HookFactory) {
+   hookRegistry[name] = h
+}
+
+// RunInitHooks runs the init hooks.
+func RunInitHooks(ctx context.Context) error {
+   // If an init hook fails to complete, the invariants of the
+   // system are compromised and we can't run a workflow.
+   // The hooks can run in any order. They should not be
+   // interdependent or interfere with each other.
+   for _, h := range activeHooks {
+   if h.Init != nil {
+   if err := h.Init(ctx); err != nil {
+   return err
+   }
+   }
+   }
+   return nil
+}
+
+// RequestHook is called when handling a Fn API instruction.
+type RequestHook func(context.Context, *fnpb.InstructionRequest) error
+
+// RunRequestHooks runs the hooks that handle a FnAPI request.
+func RunRequestHooks(ctx context.Context, req *fnpb.InstructionRequest) {
+   // The request hooks should not modify the request.
+   // TODO(wcn): pass the request by value to enforce? That's a perf hit.
+   // I'd rather trust users to do the right thing.
+   for n, h := range activeHooks {
+   if h.Req != nil {
+   if err := h.Req(ctx, req); err != nil {
+   log.Infof(ctx, "request hook %s failed: %v", n, 
err)
+   }
+   }
+   }
+}
+
+// ResponseHook is called when sending a Fn API instruction response.
+type ResponseHook func(context.Context, *fnpb.InstructionRequest, 
*fnpb.InstructionResponse) error
+
+// RunResponseHooks runs the hooks that handle a FnAPI response.
+func RunResponseHooks(ctx context.Context, req *fnpb.InstructionRequest, resp 
*fnpb.InstructionResponse) {
+   for n, h := range activeHooks {
+   if h.Resp != 

[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88192=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88192
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 05/Apr/18 18:46
Start Date: 05/Apr/18 18:46
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179564267
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Hooks allow runners to tailor execution of the worker to allow for 
customization
 
 Review comment:
   
   Consider removing the blank line between this comment block and the package 
declaration, and changing this first sentence to:
   
   Package hooks allows runners to tailor execution of the worker harness.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88192)
Time Spent: 4h 50m  (was: 4h 40m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88195=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88195
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 05/Apr/18 18:46
Start Date: 05/Apr/18 18:46
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179564273
 
 

 ##
 File path: sdks/go/pkg/beam/util/grpcx/hook.go
 ##
 @@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcx
+
+import (
+   "context"
+   "fmt"
+   "time"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/util/hooks"
+   "google.golang.org/grpc"
+)
+
+// Hook allow a runner to customize various aspects of gRPC
+// communication with the FnAPI harness. Each member of the struct
+// is optional; the default behavior will be used if a value is not
+// supplied.
+type Hook struct {
+   // Dialer allows the runner to customize the gRPC dialing behavior.
+   Dialer func(context.Context, string, time.Duration) (*grpc.ClientConn, 
error)
+   // TODO(wcn): expose other hooks here.
+}
+
+type HookFactory func([]string) Hook
 
 Review comment:
   
   Exported types should have a comment.
   
   // HookFactory configures a hook with the provided arguments.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88195)
Time Spent: 5h 10m  (was: 5h)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 5h 10m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88193=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88193
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 05/Apr/18 18:46
Start Date: 05/Apr/18 18:46
Worklog Time Spent: 10m 
  Work Description: lostluck commented on a change in pull request #4311: 
[BEAM-3355] Diagnostic interfaces
URL: https://github.com/apache/beam/pull/4311#discussion_r179564266
 
 

 ##
 File path: sdks/go/pkg/beam/core/util/hooks/hooks.go
 ##
 @@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Hooks allow runners to tailor execution of the worker to allow for 
customization
+// of features used by the harness.
+//
+// Examples of customization:
+//
+// gRPC integration
+// session recording
+// profile recording
+//
+// Registration methods for hooks must be called prior to calling beam.Init()
+// Request methods for hooks must be called as part of building the pipeline
+// request for the runner's Execute method.
+
+package hooks
+
+import (
+   "bytes"
+   "context"
+   "encoding/csv"
+   "encoding/json"
+   "fmt"
+   "os"
+   "strings"
+
+   "github.com/apache/beam/sdks/go/pkg/beam/core/runtime"
+   "github.com/apache/beam/sdks/go/pkg/beam/log"
+   fnpb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+var (
+   hookRegistry = make(map[string]HookFactory)
+   enabledHooks = make(map[string][]string)
+   activeHooks  = make(map[string]Hook)
+)
+
+// A Hook is a set of hooks to run at various stages of executing a
+// pipelne.
+type Hook struct {
+   // Init is called once at the startup of the worker.
+   Init InitHook
+   // Req is called each time the worker handles a FnAPI instruction 
request.
+   Req RequestHook
+   // Resp is called each time the worker generates a FnAPI instruction 
response.
+   Resp ResponseHook
+}
+
+// InitHook is a hook that is called when the harness
+// initializes.
+type InitHook func(context.Context) error
+
+// HookFactory is a function that produces a Hook from the supplied arguments.
+type HookFactory func([]string) Hook
+
+// RegisterHook registers a Hook for the
+// supplied identifier.
+func RegisterHook(name string, h HookFactory) {
+   hookRegistry[name] = h
+}
+
+// RunInitHooks runs the init hooks.
+func RunInitHooks(ctx context.Context) error {
+   // If an init hook fails to complete, the invariants of the
+   // system are compromised and we can't run a workflow.
+   // The hooks can run in any order. They should not be
+   // interdependent or interfere with each other.
+   for _, h := range activeHooks {
+   if h.Init != nil {
+   if err := h.Init(ctx); err != nil {
+   return err
+   }
+   }
+   }
+   return nil
+}
+
+// RequestHook is called when handling a Fn API instruction.
 
 Review comment:
   
   Fn API -> FnAPI and the same throughout.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88193)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 4h 50m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-04-05 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=88080=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-88080
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 05/Apr/18 15:17
Start Date: 05/Apr/18 15:17
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on issue #4311: [BEAM-3355] Diagnostic 
interfaces
URL: https://github.com/apache/beam/pull/4311#issuecomment-378972753
 
 
   PTAL, this is ready for review.
   
   This PR also fixes the Dataflow job submission process to work with the 
options changes made to support the Flink runner. I've verified CPU profiling 
works on Cloud Dataflow. The session runner interface needs to be reconsidered, 
as one monolithic output file is not a good match to most cloud storage 
systems. Changing the output to chunked files is the most likely improvement, 
but requires changes to the session runner. I've opened BEAM-4015 to track this 
development, so the session hook is TBD in Dataflow for now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 88080)
Time Spent: 4.5h  (was: 4h 20m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 4.5h
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Work logged] (BEAM-3355) Make Go SDK runtime harness hooks pluggable

2018-03-23 Thread ASF GitHub Bot (JIRA)

 [ 
https://issues.apache.org/jira/browse/BEAM-3355?focusedWorklogId=83855=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-83855
 ]

ASF GitHub Bot logged work on BEAM-3355:


Author: ASF GitHub Bot
Created on: 23/Mar/18 22:42
Start Date: 23/Mar/18 22:42
Worklog Time Spent: 10m 
  Work Description: wcn3 commented on issue #4311: [BEAM-3355] Diagnostic 
interfaces
URL: https://github.com/apache/beam/pull/4311#issuecomment-375817227
 
 
   PTAL with regards to the user API surface.
   
   I haven't actually tested these changes, there are still some internal 
inconsistencies and likely bugs. I just wanted to make sure we're on the same 
page with regards to the high-level API per our previous discussions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
---

Worklog Id: (was: 83855)
Time Spent: 4h 20m  (was: 4h 10m)

> Make Go SDK runtime harness hooks pluggable
> ---
>
> Key: BEAM-3355
> URL: https://issues.apache.org/jira/browse/BEAM-3355
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-go
>Reporter: Henning Rohde
>Assignee: Bill Neubauer
>Priority: Minor
>  Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We currently hardcode cpu profiling and session recording in the harness. We 
> should make it pluggable instead.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)