This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 687ed79f1f7 [#28187][prism] worker shutdown, cleanup, log fail, port spec, grpc recv size (#28184) 687ed79f1f7 is described below commit 687ed79f1f781e42730560e8544a48de5b096a11 Author: Robert Burke <lostl...@users.noreply.github.com> AuthorDate: Mon Aug 28 13:18:16 2023 -0700 [#28187][prism] worker shutdown, cleanup, log fail, port spec, grpc recv size (#28184) * [prism] worker shutdown, cleanup, log fail * Increase prism server receive size to max. --- sdks/go/pkg/beam/runners/prism/internal/execute.go | 10 ++++++++++ .../pkg/beam/runners/prism/internal/jobservices/job.go | 1 + .../beam/runners/prism/internal/jobservices/server.go | 3 ++- sdks/go/pkg/beam/runners/prism/internal/preprocess.go | 2 +- .../pkg/beam/runners/prism/internal/worker/worker.go | 18 +++++++++++++++--- 5 files changed, 29 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go b/sdks/go/pkg/beam/runners/prism/internal/execute.go index 49676710343..42327a0209d 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/execute.go +++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "sort" + "time" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec" @@ -55,6 +56,15 @@ func RunPipeline(j *jobservices.Job) { env, _ := getOnlyPair(envs) wk := worker.New(env) // Cheating by having the worker id match the environment id. go wk.Serve() + timeout := time.Minute + time.AfterFunc(timeout, func() { + if wk.Connected() { + return + } + err := fmt.Errorf("prism %v didn't get control connection after %v", wk, timeout) + j.Failed(err) + j.CancelFn(err) + }) // When this function exits, we cancel the context to clear // any related job resources. diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go index ed0984323e2..fe4f18bd38e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go @@ -160,6 +160,7 @@ func (j *Job) Done() { // Failed indicates that the job completed unsuccessfully. func (j *Job) Failed(err error) { + slog.Error("job failed", slog.Any("job", j), slog.Any("error", err)) j.failureErr = err j.sendState(jobpb.JobState_FAILED) j.CancelFn(err) diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go index aaff03047f6..e3fb7766b51 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go @@ -70,7 +70,8 @@ func (s *Server) getJob(id string) *Job { } func (s *Server) Endpoint() string { - return s.lis.Addr().String() + _, port, _ := net.SplitHostPort(s.lis.Addr().String()) + return fmt.Sprintf("localhost:%v", port) } // Serve serves on the started listener. Blocks. diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go index 96c5f5549b0..bca40709626 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go +++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go @@ -139,7 +139,7 @@ func (p *preprocessor) preProcessGraph(comps *pipepb.Components) []*stage { keptLeaves := maps.Keys(leaves) sort.Strings(keptLeaves) topological := pipelinex.TopologicalSort(ts, keptLeaves) - slog.Debug("topological transform ordering", topological) + slog.Debug("topological transform ordering", slog.Any("topological", topological)) // Basic Fusion Behavior // diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go index dab831c20af..405c1e812a4 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go +++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go @@ -22,6 +22,7 @@ import ( "context" "fmt" "io" + "math" "net" "strconv" "strings" @@ -62,6 +63,7 @@ type W struct { // These are the ID sources inst, bund uint64 + connected atomic.Bool InstReqs chan *fnpb.InstructionRequest DataReqs chan *fnpb.Elements @@ -83,7 +85,9 @@ func New(id string) *W { if err != nil { panic(fmt.Sprintf("failed to listen: %v", err)) } - var opts []grpc.ServerOption + opts := []grpc.ServerOption{ + grpc.MaxRecvMsgSize(math.MaxInt32), + } wk := &W{ ID: id, lis: lis, @@ -106,7 +110,8 @@ func New(id string) *W { } func (wk *W) Endpoint() string { - return wk.lis.Addr().String() + _, port, _ := net.SplitHostPort(wk.lis.Addr().String()) + return fmt.Sprintf("localhost:%v", port) } // Serve serves on the started listener. Blocks. @@ -200,7 +205,7 @@ func (wk *W) Logging(stream fnpb.BeamFnLogging_LoggingServer) error { file = file[:i] } - slog.LogAttrs(context.TODO(), toSlogSev(l.GetSeverity()), l.GetMessage(), + slog.LogAttrs(stream.Context(), toSlogSev(l.GetSeverity()), l.GetMessage(), slog.Any(slog.SourceKey, &slog.Source{ File: file, Line: line, @@ -241,10 +246,15 @@ func (wk *W) GetProcessBundleDescriptor(ctx context.Context, req *fnpb.GetProces return desc, nil } +func (wk *W) Connected() bool { + return wk.connected.Load() +} + // Control relays instructions to SDKs and back again, coordinated via unique instructionIDs. // // Requests come from the runner, and are sent to the client in the SDK. func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { + wk.connected.Store(true) done := make(chan struct{}) go func() { for { @@ -281,10 +291,12 @@ func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error { case req := <-wk.InstReqs: err := ctrl.Send(req) if err != nil { + go func() { <-done }() return err } case <-ctrl.Context().Done(): slog.Debug("Control context canceled") + go func() { <-done }() return ctrl.Context().Err() case <-done: slog.Debug("Control done")