This is an automated email from the ASF dual-hosted git repository. lostluck pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 92087f2daa7 [prism] Catch panics in primary execution goroutines. (#32210) 92087f2daa7 is described below commit 92087f2daa7d1fdfa041c99b9047cdd195a46924 Author: Robert Burke <lostl...@users.noreply.github.com> AuthorDate: Thu Aug 15 15:13:56 2024 -0700 [prism] Catch panics in primary execution goroutines. (#32210) --- sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go | 6 ++++++ sdks/go/pkg/beam/runners/prism/internal/stage.go | 8 +++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 0781efd5ff0..ed9c0ddc0f8 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -349,6 +349,12 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context. }() // Watermark evaluation goroutine. go func() { + defer func() { + // In case of panics in bundle generation, fail and cancel the job. + if e := recover(); e != nil { + upstreamCancelFn(fmt.Errorf("panic in ElementManager.Bundles watermark evaluation goroutine: %v", e)) + } + }() defer close(runStageCh) // If we have a test stream, clear out existing refreshes, so the test stream can diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go b/sdks/go/pkg/beam/runners/prism/internal/stage.go index 1a62f2f6f42..3d1e506f5e3 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/stage.go +++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go @@ -76,7 +76,13 @@ type stage struct { OutputsToCoders map[string]engine.PColInfo } -func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) error { +func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) (err error) { + defer func() { + // Convert execution panics to errors to fail the bundle. + if e := recover(); e != nil { + err = fmt.Errorf("panic in stage.Execute bundle processing goroutine: %v, stage: %+v", e, s) + } + }() slog.Debug("Execute: starting bundle", "bundle", rb) var b *worker.B