This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2203c8d4469 [Prism] Fix DEADLINE_EXCEEDED errors caused by worker
failures (#38523)
2203c8d4469 is described below
commit 2203c8d4469ed216275d09858c461f8b9b4daf95
Author: Shunping Huang <[email protected]>
AuthorDate: Thu May 21 17:24:28 2026 -0400
[Prism] Fix DEADLINE_EXCEEDED errors caused by worker failures (#38523)
* Set b.BundleErr correctly when dynamic split happens.
* Fix deadlock on worker failures by introducing a bundle-level Done signal
The cancellation signal `b.Done` is to abort bundle data streaming
immediately
if the worker fails or completes the bundle early.
Without this signal, a deadlock scenario can occur.
- Runner sends elements through data channel
- SDK worker reads and processes the first element, raises an exception
and sends InstructionResponse through control channel. However,
at the same time, Runner keeps sending data and is blocked until
SDK worker reads more data.
- The existed two signals are not sufficient to do an immediate break
- ctx.Done() is closed when the pipeline is done or timeout. Given
the bundle failure, timeout is the only option but it is too late.
- wk.StoppedChan() is closed when the worker is stopped by the worker
pool, which is also not happening when the runner is waiting to
send data.
* Fix a problem of prism log not relaying correctly.
* Trigger more python tests.
* Add a mutex and safe getter/setter methods for bundleErr
* Rename b.Done to b.DataAbort.
* Set bundleErr before closing DataAbort to ensure error will be propagated
correctly.
* Refactor Respond to prevent race condition on BundleErr setting
* Remove debug msg
* Formatting.
* Address reviewer comments.
---
.../beam_PostCommit_Python_Versions.json | 2 +-
.../beam/runners/prism/internal/execute_test.go | 21 +++++++
sdks/go/pkg/beam/runners/prism/internal/stage.go | 17 ++++--
.../beam/runners/prism/internal/testdofns_test.go | 34 +++++++++++
.../beam/runners/prism/internal/worker/bundle.go | 58 ++++++++++++++++++-
.../runners/prism/internal/worker/worker_test.go | 67 ++++++++++++++++++++++
.../apache_beam/runners/portability/job_server.py | 5 ++
.../runners/portability/prism_runner.py | 10 +++-
.../runners/portability/prism_runner_test.py | 11 ++++
9 files changed, 213 insertions(+), 12 deletions(-)
diff --git a/.github/trigger_files/beam_PostCommit_Python_Versions.json
b/.github/trigger_files/beam_PostCommit_Python_Versions.json
index a975cd1cd10..541dc4ea8e8 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Versions.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Versions.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run",
- "revision": 1
+ "revision": 2
}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
index 2bb73f20e20..96c90678be3 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
@@ -541,6 +541,27 @@ func TestFailureHang(t *testing.T) {
}
}
+func TestFailure_SplitError(t *testing.T) {
+ initRunner(t)
+
+ p, s := beam.NewPipelineWithRoot()
+ configs := beam.Create(s, SourceConfig{NumElements: 100, InitialSplits:
1})
+ col := beam.ParDo(s, &slowFailSDF{}, configs)
+ beam.ParDo(s, &int64Check{Name: "sdf_fail", Want: []int{}}, col)
+
+ // Set a short timeout so the test fails quickly if the pipeline hangs
due to split error handling bug
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ _, err := executeWithT(ctx, t, p)
+ if err == nil {
+ t.Fatalf("expected pipeline failure, but got a success")
+ }
+ if want := "intentional split error from tracker";
!strings.Contains(err.Error(), want) {
+ t.Fatalf("expected pipeline failure with %q, but was %v", want,
err)
+ }
+}
+
func TestRunner_Passert(t *testing.T) {
initRunner(t)
tests := []struct {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index c4758984af8..b46c9c2fd5b 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -207,8 +207,9 @@ progress:
return context.Cause(ctx)
case resp = <-b.Resp:
bundleFinished = true
- if b.BundleErr != nil {
- return b.BundleErr
+ if err := b.GetErr(); err != nil {
+ slog.Error("stage.Execute aborting due to
bundle error", "stage", s.ID, "bundle", rb.BundleID)
+ return err
}
if dataFinished && bundleFinished {
break progress // exit progress loop on close.
@@ -240,10 +241,16 @@ progress:
sr, err := b.Split(ctx, wk, 0.5 /* fraction of
remainder */, nil /* allowed splits */)
if err != nil {
slog.Warn("SDK Error from split,
aborting splits and failing bundle", "bundle", rb, "error", err.Error())
- if b.BundleErr != nil {
- b.BundleErr = err
+ // Safely set the split error if no
primary bundle error has been set yet.
+ // Both SetErr and GetErr are
synchronized under the same mutex, guaranteeing
+ // no memory races occur. The
write-read inconsistency is explicitly guarded
+ // by the nil/null check inside
SetErr(): if a concurrent primary error (e.g.,
+ // worker crash) was set first, it will
not be overwritten and b.GetErr() will
+ // correctly preserve and return it
instead of the secondary split error.
+ if !b.SetErr(err) {
+ slog.Debug("Error for bundle
already set, logging dropped split error", "bundle", rb, "error", err)
}
- return b.BundleErr
+ return b.GetErr()
}
if sr.GetChannelSplits() == nil {
slog.Debug("SDK returned no splits",
"bundle", rb)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
index d21ccd53afd..903805cf659 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
@@ -64,6 +64,7 @@ func init() {
register.Function2x1(combineIntSum)
register.DoFn3x1[*sdf.LockRTracker, SourceConfig, func(int64),
error]((*intRangeFn)(nil))
+ register.DoFn4x1[context.Context, *sdf.LockRTracker, SourceConfig,
func(int64), error]((*slowFailSDF)(nil))
register.Emitter1[int64]()
register.Emitter2[int64, int64]()
}
@@ -404,3 +405,36 @@ func (fn *selfCheckpointingDoFn) ProcessElement(rt
*sdf.LockRTracker, _ []byte,
}
}
}
+
+type errorSplitTracker struct {
+ *offsetrange.Tracker
+}
+
+func (t *errorSplitTracker) TrySplit(fraction float64) (any, any, error) {
+ return nil, nil, fmt.Errorf("intentional split error from tracker")
+}
+
+type slowFailSDF struct{}
+
+func (fn *slowFailSDF) CreateInitialRestriction(config SourceConfig)
offsetrange.Restriction {
+ return offsetrange.Restriction{Start: 0, End: config.NumElements}
+}
+
+func (fn *slowFailSDF) SplitRestriction(config SourceConfig, rest
offsetrange.Restriction) []offsetrange.Restriction {
+ return rest.EvenSplits(config.InitialSplits)
+}
+
+func (fn *slowFailSDF) RestrictionSize(_ SourceConfig, rest
offsetrange.Restriction) float64 {
+ return rest.Size()
+}
+
+func (fn *slowFailSDF) CreateTracker(rest offsetrange.Restriction)
*sdf.LockRTracker {
+ return sdf.NewLockRTracker(&errorSplitTracker{Tracker:
offsetrange.NewTracker(rest)})
+}
+
+func (fn *slowFailSDF) ProcessElement(ctx context.Context, rt
*sdf.LockRTracker, config SourceConfig, emit func(int64)) error {
+ for i := rt.GetRestriction().(offsetrange.Restriction).Start;
rt.TryClaim(i); i++ {
+ <-ctx.Done()
+ }
+ return nil
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
index 15023a1b0bd..11d59d3c8d0 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
@@ -20,6 +20,7 @@ import (
"context"
"fmt"
"log/slog"
+ "sync"
"sync/atomic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
@@ -61,8 +62,19 @@ type B struct {
dataSema atomic.Int32
OutputData engine.TentativeData
- Resp chan *fnpb.ProcessBundleResponse
- BundleErr error
+ Resp chan *fnpb.ProcessBundleResponse
+ // DataAbort is closed when the worker responds to the bundle
instruction
+ // (with success or failure), signaling ProcessOn to stop streaming
data.
+ //
+ // This prevents a deadlock where a worker fails mid-bundle and stops
reading
+ // from the data channel while the runner blocks indefinitely
attempting to
+ // write remaining elements. Other signals are insufficient to abort
immediately:
+ // - ctx.Done() only triggers on global timeouts/cancellations, which
is too late.
+ // - wk.StoppedChan is only closed when tearing down the worker pool,
which does
+ // not happen while the runner is waiting on the current bundle to
finish.
+ DataAbort chan struct{}
+ mu sync.Mutex
+ bundleErr error
responded bool
SinkToPCollection map[string]string
@@ -80,6 +92,7 @@ func (b *B) Init() {
close(b.DataWait) // Can happen if there are no outputs for the
bundle.
}
b.Resp = make(chan *fnpb.ProcessBundleResponse, 1)
+ b.DataAbort = make(chan struct{})
}
// DataOrTimerDone indicates a final element has been received from a Data or
Timer output.
@@ -96,14 +109,40 @@ func (b *B) LogValue() slog.Value {
slog.String("stage", b.PBDID))
}
+// SetErr sets the bundle error if it is not already set, returning true if it
was set, and false otherwise.
+func (b *B) SetErr(err error) bool {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ if b.bundleErr == nil {
+ b.bundleErr = err
+ return true
+ }
+ return false
+}
+
+// GetErr gets the current bundle error.
+func (b *B) GetErr() error {
+ b.mu.Lock()
+ defer b.mu.Unlock()
+ return b.bundleErr
+}
+
func (b *B) Respond(resp *fnpb.InstructionResponse) {
if b.responded {
slog.Warn("additional bundle response", "bundle", b, "resp",
resp)
return
}
b.responded = true
+ if b.DataAbort != nil {
+ // Defer closing DataAbort to guarantee that the abort signal
is sent
+ // when this function returns. This ensures it is always
executed after
+ // any error has been safely written and synchronized via
b.SetErr() or,
+ // in the happy path, after the successful response is sent to
b.Resp.
+ defer close(b.DataAbort)
+ }
if resp.GetError() != "" {
- b.BundleErr = fmt.Errorf("bundle %v %v failed:%v",
resp.GetInstructionId(), b.PBDID, resp.GetError())
+ slog.Error("Prism received bundle error from worker response",
"bundle", resp.GetInstructionId())
+ b.SetErr(fmt.Errorf("bundle %v %v failed:%v",
resp.GetInstructionId(), b.PBDID, resp.GetError()))
close(b.Resp)
return
}
@@ -143,6 +182,13 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan
struct{} {
b.DataOrTimerDone()
}
return b.DataWait
+ case <-b.DataAbort:
+ // The bundle completed/failed before req was sent.
+ outCap := b.OutputCount + len(b.HasTimers)
+ for i := 0; i < outCap; i++ {
+ b.DataOrTimerDone()
+ }
+ return b.DataWait
case wk.InstReqs <- req:
// desired outcome
}
@@ -181,6 +227,9 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan
struct{} {
case <-ctx.Done():
b.DataOrTimerDone()
return b.DataWait
+ case <-b.DataAbort:
+ b.DataOrTimerDone()
+ return b.DataWait
case wk.DataReqs <- elms:
}
}
@@ -202,6 +251,9 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan
struct{} {
case <-ctx.Done():
b.DataOrTimerDone()
return b.DataWait
+ case <-b.DataAbort:
+ b.DataOrTimerDone()
+ return b.DataWait
case wk.DataReqs <- &fnpb.Elements{
Timers: timers,
Data: []*fnpb.Elements_Data{
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
index 76a05563ec3..055a003e276 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
@@ -522,6 +522,73 @@ func TestWorker_State_MultimapSideInput(t *testing.T) {
}
}
+// TestBundle_ProcessOn_WorkerFailure verifies that the runner does not
deadlock when
+// a worker fails mid-bundle and stops reading elements from the Data plane
stream.
+func TestBundle_ProcessOn_WorkerFailure(t *testing.T) {
+ ctx, wk, clientConn := serveTestWorker(t)
+
+ dataCli := fnpb.NewBeamFnDataClient(clientConn)
+ dataStream, err := dataCli.Data(ctx)
+ if err != nil {
+ t.Fatal("couldn't create data client:", err)
+ }
+
+ instID := wk.NextInst()
+
+ // Create 15 large input blocks (512 KB each) to saturate the 10-slot
channel buffer
+ // and the gRPC flow control window, forcing the Data sender inside
worker.go to block.
+ largeBytes := make([]byte, 512*1024)
+ var inputBlocks []*engine.Block
+ for i := 0; i < 15; i++ {
+ inputBlocks = append(inputBlocks, &engine.Block{
+ Kind: engine.BlockData,
+ Bytes: [][]byte{largeBytes},
+ })
+ }
+
+ b := &B{
+ InstID: instID,
+ PBDID: "teststageID",
+ Input: inputBlocks,
+ OutputCount: 1,
+ }
+ b.Init()
+ wk.activeInstructions[instID] = b
+
+ processOnDone := make(chan struct{})
+ go func() {
+ b.ProcessOn(ctx, wk)
+ close(processOnDone)
+ }()
+
+ // Send the initial process bundle request trigger.
+ wk.InstReqs <- &fnpb.InstructionRequest{
+ InstructionId: instID,
+ }
+
+ // Read only the first block to simulate worker processing start.
+ _, err = dataStream.Recv()
+ if err != nil {
+ t.Fatal("couldn't receive first data element:", err)
+ }
+
+ // Simulate worker failure by responding with an error on the Control
channel.
+ // Without the fix, ProcessOn's background goroutine deadlocks at
`wk.DataReqs <- elms`
+ // because the client stopped reading and the buffer/flow-control is
saturated.
+ wk.activeInstructions[instID].Respond(&fnpb.InstructionResponse{
+ InstructionId: instID,
+ Error: "Intentional worker failure",
+ })
+
+ // Verify that ProcessOn exits cleanly and does not deadlock/hang.
+ select {
+ case <-processOnDone:
+ // Test passed: ProcessOn exited successfully!
+ case <-time.After(10 * time.Second):
+ t.Fatal("ProcessOn deadlocked / hung after worker failure!")
+ }
+}
+
func newWorker() *W {
mw := &MultiplexW{
pool: map[string]*W{},
diff --git a/sdks/python/apache_beam/runners/portability/job_server.py
b/sdks/python/apache_beam/runners/portability/job_server.py
index 53688e0be95..8bdad6b70fe 100644
--- a/sdks/python/apache_beam/runners/portability/job_server.py
+++ b/sdks/python/apache_beam/runners/portability/job_server.py
@@ -121,6 +121,11 @@ class SubprocessJobServer(JobServer):
logger = logging.getLogger(f"{self.__class__.__name__}")
if self._log_filter is not None:
logger.addFilter(self._log_filter)
+ # Explicitly set logger level to INFO so logger.info(...) calls in
+ # SubprocessServer._really_start_process's log_stdout pass the initial
+ # isEnabledFor check, allowing the filter to dynamically elevate log
+ # levels to WARNING or ERROR.
+ logger.setLevel(logging.INFO)
self._server = subprocess_server.SubprocessServer(
beam_job_api_pb2_grpc.JobServiceStub, cmd, port=port, logger=logger)
return self._server.start()
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py
b/sdks/python/apache_beam/runners/portability/prism_runner.py
index 0458898b5e9..105c27fcb8c 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner.py
@@ -154,9 +154,13 @@ class PrismRunnerLogFilter(logging.Filter):
record.msg = json_record["sdk"]["msg"]
else:
record.name = "PrismRunner"
- record.msg = (
- f"{json_record['msg']} "
- f"({', '.join(f'{k}={v!r}' for k, v in extras.items())})")
+ formatted_extras = []
+ for k, v in extras.items():
+ if isinstance(v, str) and '\n' in v:
+ formatted_extras.append(f"\n{k}:\n{v}")
+ else:
+ formatted_extras.append(f"{k}={v!r}")
+ record.msg = f"{json_record['msg']} ({', '.join(formatted_extras)})"
except (json.JSONDecodeError,
KeyError,
ValueError,
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py
b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
index 9c1620603fd..bb6b7bb04da 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
@@ -300,6 +300,17 @@ class
PrismRunnerTest(portable_runner_test.PortableRunnerTest):
('B-3', {10, 15, 16}),
])))
+ def test_failing_dofn(self):
+ # Prism interprets all bundle failures as RuntimeError
+ with self.assertRaisesRegex(
+ RuntimeError, "Intentional DoFn failure for prism logging test"):
+ with self.create_pipeline() as p:
+
+ def failing_fn(x):
+ raise ValueError("Intentional DoFn failure for prism logging test")
+
+ _ = p | beam.Create([1, 2, 3]) | beam.Map(failing_fn)
+
class PrismJobServerTest(unittest.TestCase):
def setUp(self) -> None: