This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 92087f2daa7 [prism] Catch panics in primary execution goroutines. 
(#32210)
92087f2daa7 is described below

commit 92087f2daa7d1fdfa041c99b9047cdd195a46924
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Thu Aug 15 15:13:56 2024 -0700

    [prism] Catch panics in primary execution goroutines. (#32210)
---
 sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go | 6 ++++++
 sdks/go/pkg/beam/runners/prism/internal/stage.go                 | 8 +++++++-
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 0781efd5ff0..ed9c0ddc0f8 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -349,6 +349,12 @@ func (em *ElementManager) Bundles(ctx context.Context, 
upstreamCancelFn context.
        }()
        // Watermark evaluation goroutine.
        go func() {
+               defer func() {
+                       // In case of panics in bundle generation, fail and 
cancel the job.
+                       if e := recover(); e != nil {
+                               upstreamCancelFn(fmt.Errorf("panic in 
ElementManager.Bundles watermark evaluation goroutine: %v", e))
+                       }
+               }()
                defer close(runStageCh)
 
                // If we have a test stream, clear out existing refreshes, so 
the test stream can
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 1a62f2f6f42..3d1e506f5e3 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -76,7 +76,13 @@ type stage struct {
        OutputsToCoders   map[string]engine.PColInfo
 }
 
-func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, 
comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) error 
{
+func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, 
comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) (err 
error) {
+       defer func() {
+               // Convert execution panics to errors to fail the bundle.
+               if e := recover(); e != nil {
+                       err = fmt.Errorf("panic in stage.Execute bundle 
processing goroutine: %v, stage: %+v", e, s)
+               }
+       }()
        slog.Debug("Execute: starting bundle", "bundle", rb)
 
        var b *worker.B

Reply via email to