This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 687ed79f1f7 [#28187][prism] worker shutdown, cleanup, log fail, port 
spec, grpc recv size (#28184)
687ed79f1f7 is described below

commit 687ed79f1f781e42730560e8544a48de5b096a11
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Mon Aug 28 13:18:16 2023 -0700

    [#28187][prism] worker shutdown, cleanup, log fail, port spec, grpc recv 
size (#28184)
    
    * [prism] worker shutdown, cleanup, log fail
    
    * Increase prism server receive size to max.
---
 sdks/go/pkg/beam/runners/prism/internal/execute.go     | 10 ++++++++++
 .../pkg/beam/runners/prism/internal/jobservices/job.go |  1 +
 .../beam/runners/prism/internal/jobservices/server.go  |  3 ++-
 sdks/go/pkg/beam/runners/prism/internal/preprocess.go  |  2 +-
 .../pkg/beam/runners/prism/internal/worker/worker.go   | 18 +++++++++++++++---
 5 files changed, 29 insertions(+), 5 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 49676710343..42327a0209d 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -20,6 +20,7 @@ import (
        "fmt"
        "io"
        "sort"
+       "time"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
@@ -55,6 +56,15 @@ func RunPipeline(j *jobservices.Job) {
        env, _ := getOnlyPair(envs)
        wk := worker.New(env) // Cheating by having the worker id match the 
environment id.
        go wk.Serve()
+       timeout := time.Minute
+       time.AfterFunc(timeout, func() {
+               if wk.Connected() {
+                       return
+               }
+               err := fmt.Errorf("prism %v didn't get control connection after 
%v", wk, timeout)
+               j.Failed(err)
+               j.CancelFn(err)
+       })
 
        // When this function exits, we cancel the context to clear
        // any related job resources.
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
index ed0984323e2..fe4f18bd38e 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
@@ -160,6 +160,7 @@ func (j *Job) Done() {
 
 // Failed indicates that the job completed unsuccessfully.
 func (j *Job) Failed(err error) {
+       slog.Error("job failed", slog.Any("job", j), slog.Any("error", err))
        j.failureErr = err
        j.sendState(jobpb.JobState_FAILED)
        j.CancelFn(err)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
index aaff03047f6..e3fb7766b51 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
@@ -70,7 +70,8 @@ func (s *Server) getJob(id string) *Job {
 }
 
 func (s *Server) Endpoint() string {
-       return s.lis.Addr().String()
+       _, port, _ := net.SplitHostPort(s.lis.Addr().String())
+       return fmt.Sprintf("localhost:%v", port)
 }
 
 // Serve serves on the started listener. Blocks.
diff --git a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go 
b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
index 96c5f5549b0..bca40709626 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/preprocess.go
@@ -139,7 +139,7 @@ func (p *preprocessor) preProcessGraph(comps 
*pipepb.Components) []*stage {
        keptLeaves := maps.Keys(leaves)
        sort.Strings(keptLeaves)
        topological := pipelinex.TopologicalSort(ts, keptLeaves)
-       slog.Debug("topological transform ordering", topological)
+       slog.Debug("topological transform ordering", slog.Any("topological", 
topological))
 
        // Basic Fusion Behavior
        //
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
index dab831c20af..405c1e812a4 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker.go
@@ -22,6 +22,7 @@ import (
        "context"
        "fmt"
        "io"
+       "math"
        "net"
        "strconv"
        "strings"
@@ -62,6 +63,7 @@ type W struct {
 
        // These are the ID sources
        inst, bund uint64
+       connected  atomic.Bool
 
        InstReqs chan *fnpb.InstructionRequest
        DataReqs chan *fnpb.Elements
@@ -83,7 +85,9 @@ func New(id string) *W {
        if err != nil {
                panic(fmt.Sprintf("failed to listen: %v", err))
        }
-       var opts []grpc.ServerOption
+       opts := []grpc.ServerOption{
+               grpc.MaxRecvMsgSize(math.MaxInt32),
+       }
        wk := &W{
                ID:     id,
                lis:    lis,
@@ -106,7 +110,8 @@ func New(id string) *W {
 }
 
 func (wk *W) Endpoint() string {
-       return wk.lis.Addr().String()
+       _, port, _ := net.SplitHostPort(wk.lis.Addr().String())
+       return fmt.Sprintf("localhost:%v", port)
 }
 
 // Serve serves on the started listener. Blocks.
@@ -200,7 +205,7 @@ func (wk *W) Logging(stream 
fnpb.BeamFnLogging_LoggingServer) error {
                                        file = file[:i]
                                }
 
-                               slog.LogAttrs(context.TODO(), 
toSlogSev(l.GetSeverity()), l.GetMessage(),
+                               slog.LogAttrs(stream.Context(), 
toSlogSev(l.GetSeverity()), l.GetMessage(),
                                        slog.Any(slog.SourceKey, &slog.Source{
                                                File: file,
                                                Line: line,
@@ -241,10 +246,15 @@ func (wk *W) GetProcessBundleDescriptor(ctx 
context.Context, req *fnpb.GetProces
        return desc, nil
 }
 
+func (wk *W) Connected() bool {
+       return wk.connected.Load()
+}
+
 // Control relays instructions to SDKs and back again, coordinated via unique 
instructionIDs.
 //
 // Requests come from the runner, and are sent to the client in the SDK.
 func (wk *W) Control(ctrl fnpb.BeamFnControl_ControlServer) error {
+       wk.connected.Store(true)
        done := make(chan struct{})
        go func() {
                for {
@@ -281,10 +291,12 @@ func (wk *W) Control(ctrl 
fnpb.BeamFnControl_ControlServer) error {
                case req := <-wk.InstReqs:
                        err := ctrl.Send(req)
                        if err != nil {
+                               go func() { <-done }()
                                return err
                        }
                case <-ctrl.Context().Done():
                        slog.Debug("Control context canceled")
+                       go func() { <-done }()
                        return ctrl.Context().Err()
                case <-done:
                        slog.Debug("Control done")

Reply via email to