This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2203c8d4469 [Prism] Fix DEADLINE_EXCEEDED errors caused by worker 
failures (#38523)
2203c8d4469 is described below

commit 2203c8d4469ed216275d09858c461f8b9b4daf95
Author: Shunping Huang <[email protected]>
AuthorDate: Thu May 21 17:24:28 2026 -0400

    [Prism] Fix DEADLINE_EXCEEDED errors caused by worker failures (#38523)
    
    * Set b.BundleErr correctly when dynamic split happens.
    
    * Fix deadlock on worker failures by introducing a bundle-level Done signal
    
    The cancellation signal `b.Done` is to abort bundle data streaming 
immediately
    if the worker fails or completes the bundle early.
    
    Without this signal, a deadlock scenario can occur.
    - Runner sends elements through data channel
    - SDK worker reads and processes the first element, raises an exception
      and sends InstructionResponse through control channel. However,
      at the same time, Runner keeps sending data and is blocked until
      SDK worker reads more data.
    - The existed two signals are not sufficient to do an immediate break
      - ctx.Done() is closed when the pipeline is done or timeout. Given
        the bundle failure, timeout is the only option but it is too late.
      - wk.StoppedChan() is closed when the worker is stopped by the worker
        pool, which is also not happening when the runner is waiting to
        send data.
    
    * Fix a problem of prism log not relaying correctly.
    
    * Trigger more python tests.
    
    * Add a mutex and safe getter/setter methods for bundleErr
    
    * Rename b.Done to b.DataAbort.
    
    * Set bundleErr before closing DataAbort to ensure error will be propagated 
correctly.
    
    * Refactor Respond to prevent race condition on BundleErr setting
    
    * Remove debug msg
    
    * Formatting.
    
    * Address reviewer comments.
---
 .../beam_PostCommit_Python_Versions.json           |  2 +-
 .../beam/runners/prism/internal/execute_test.go    | 21 +++++++
 sdks/go/pkg/beam/runners/prism/internal/stage.go   | 17 ++++--
 .../beam/runners/prism/internal/testdofns_test.go  | 34 +++++++++++
 .../beam/runners/prism/internal/worker/bundle.go   | 58 ++++++++++++++++++-
 .../runners/prism/internal/worker/worker_test.go   | 67 ++++++++++++++++++++++
 .../apache_beam/runners/portability/job_server.py  |  5 ++
 .../runners/portability/prism_runner.py            | 10 +++-
 .../runners/portability/prism_runner_test.py       | 11 ++++
 9 files changed, 213 insertions(+), 12 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python_Versions.json 
b/.github/trigger_files/beam_PostCommit_Python_Versions.json
index a975cd1cd10..541dc4ea8e8 100644
--- a/.github/trigger_files/beam_PostCommit_Python_Versions.json
+++ b/.github/trigger_files/beam_PostCommit_Python_Versions.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-  "revision": 1
+  "revision": 2
 }
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
index 2bb73f20e20..96c90678be3 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute_test.go
@@ -541,6 +541,27 @@ func TestFailureHang(t *testing.T) {
        }
 }
 
+func TestFailure_SplitError(t *testing.T) {
+       initRunner(t)
+
+       p, s := beam.NewPipelineWithRoot()
+       configs := beam.Create(s, SourceConfig{NumElements: 100, InitialSplits: 
1})
+       col := beam.ParDo(s, &slowFailSDF{}, configs)
+       beam.ParDo(s, &int64Check{Name: "sdf_fail", Want: []int{}}, col)
+
+       // Set a short timeout so the test fails quickly if the pipeline hangs 
due to split error handling bug
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+
+       _, err := executeWithT(ctx, t, p)
+       if err == nil {
+               t.Fatalf("expected pipeline failure, but got a success")
+       }
+       if want := "intentional split error from tracker"; 
!strings.Contains(err.Error(), want) {
+               t.Fatalf("expected pipeline failure with %q, but was %v", want, 
err)
+       }
+}
+
 func TestRunner_Passert(t *testing.T) {
        initRunner(t)
        tests := []struct {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index c4758984af8..b46c9c2fd5b 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -207,8 +207,9 @@ progress:
                        return context.Cause(ctx)
                case resp = <-b.Resp:
                        bundleFinished = true
-                       if b.BundleErr != nil {
-                               return b.BundleErr
+                       if err := b.GetErr(); err != nil {
+                               slog.Error("stage.Execute aborting due to 
bundle error", "stage", s.ID, "bundle", rb.BundleID)
+                               return err
                        }
                        if dataFinished && bundleFinished {
                                break progress // exit progress loop on close.
@@ -240,10 +241,16 @@ progress:
                                sr, err := b.Split(ctx, wk, 0.5 /* fraction of 
remainder */, nil /* allowed splits */)
                                if err != nil {
                                        slog.Warn("SDK Error from split, 
aborting splits and failing bundle", "bundle", rb, "error", err.Error())
-                                       if b.BundleErr != nil {
-                                               b.BundleErr = err
+                                       // Safely set the split error if no 
primary bundle error has been set yet.
+                                       // Both SetErr and GetErr are 
synchronized under the same mutex, guaranteeing
+                                       // no memory races occur. The 
write-read inconsistency is explicitly guarded
+                                       // by the nil/null check inside 
SetErr(): if a concurrent primary error (e.g.,
+                                       // worker crash) was set first, it will 
not be overwritten and b.GetErr() will
+                                       // correctly preserve and return it 
instead of the secondary split error.
+                                       if !b.SetErr(err) {
+                                               slog.Debug("Error for bundle 
already set, logging dropped split error", "bundle", rb, "error", err)
                                        }
-                                       return b.BundleErr
+                                       return b.GetErr()
                                }
                                if sr.GetChannelSplits() == nil {
                                        slog.Debug("SDK returned no splits", 
"bundle", rb)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
index d21ccd53afd..903805cf659 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go
@@ -64,6 +64,7 @@ func init() {
        register.Function2x1(combineIntSum)
 
        register.DoFn3x1[*sdf.LockRTracker, SourceConfig, func(int64), 
error]((*intRangeFn)(nil))
+       register.DoFn4x1[context.Context, *sdf.LockRTracker, SourceConfig, 
func(int64), error]((*slowFailSDF)(nil))
        register.Emitter1[int64]()
        register.Emitter2[int64, int64]()
 }
@@ -404,3 +405,36 @@ func (fn *selfCheckpointingDoFn) ProcessElement(rt 
*sdf.LockRTracker, _ []byte,
                }
        }
 }
+
+type errorSplitTracker struct {
+       *offsetrange.Tracker
+}
+
+func (t *errorSplitTracker) TrySplit(fraction float64) (any, any, error) {
+       return nil, nil, fmt.Errorf("intentional split error from tracker")
+}
+
+type slowFailSDF struct{}
+
+func (fn *slowFailSDF) CreateInitialRestriction(config SourceConfig) 
offsetrange.Restriction {
+       return offsetrange.Restriction{Start: 0, End: config.NumElements}
+}
+
+func (fn *slowFailSDF) SplitRestriction(config SourceConfig, rest 
offsetrange.Restriction) []offsetrange.Restriction {
+       return rest.EvenSplits(config.InitialSplits)
+}
+
+func (fn *slowFailSDF) RestrictionSize(_ SourceConfig, rest 
offsetrange.Restriction) float64 {
+       return rest.Size()
+}
+
+func (fn *slowFailSDF) CreateTracker(rest offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(&errorSplitTracker{Tracker: 
offsetrange.NewTracker(rest)})
+}
+
+func (fn *slowFailSDF) ProcessElement(ctx context.Context, rt 
*sdf.LockRTracker, config SourceConfig, emit func(int64)) error {
+       for i := rt.GetRestriction().(offsetrange.Restriction).Start; 
rt.TryClaim(i); i++ {
+               <-ctx.Done()
+       }
+       return nil
+}
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
index 15023a1b0bd..11d59d3c8d0 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
@@ -20,6 +20,7 @@ import (
        "context"
        "fmt"
        "log/slog"
+       "sync"
        "sync/atomic"
 
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
@@ -61,8 +62,19 @@ type B struct {
        dataSema   atomic.Int32
        OutputData engine.TentativeData
 
-       Resp      chan *fnpb.ProcessBundleResponse
-       BundleErr error
+       Resp chan *fnpb.ProcessBundleResponse
+       // DataAbort is closed when the worker responds to the bundle 
instruction
+       // (with success or failure), signaling ProcessOn to stop streaming 
data.
+       //
+       // This prevents a deadlock where a worker fails mid-bundle and stops 
reading
+       // from the data channel while the runner blocks indefinitely 
attempting to
+       // write remaining elements. Other signals are insufficient to abort 
immediately:
+       // - ctx.Done() only triggers on global timeouts/cancellations, which 
is too late.
+       // - wk.StoppedChan is only closed when tearing down the worker pool, 
which does
+       //   not happen while the runner is waiting on the current bundle to 
finish.
+       DataAbort chan struct{}
+       mu        sync.Mutex
+       bundleErr error
        responded bool
 
        SinkToPCollection map[string]string
@@ -80,6 +92,7 @@ func (b *B) Init() {
                close(b.DataWait) // Can happen if there are no outputs for the 
bundle.
        }
        b.Resp = make(chan *fnpb.ProcessBundleResponse, 1)
+       b.DataAbort = make(chan struct{})
 }
 
 // DataOrTimerDone indicates a final element has been received from a Data or 
Timer output.
@@ -96,14 +109,40 @@ func (b *B) LogValue() slog.Value {
                slog.String("stage", b.PBDID))
 }
 
+// SetErr sets the bundle error if it is not already set, returning true if it 
was set, and false otherwise.
+func (b *B) SetErr(err error) bool {
+       b.mu.Lock()
+       defer b.mu.Unlock()
+       if b.bundleErr == nil {
+               b.bundleErr = err
+               return true
+       }
+       return false
+}
+
+// GetErr gets the current bundle error.
+func (b *B) GetErr() error {
+       b.mu.Lock()
+       defer b.mu.Unlock()
+       return b.bundleErr
+}
+
 func (b *B) Respond(resp *fnpb.InstructionResponse) {
        if b.responded {
                slog.Warn("additional bundle response", "bundle", b, "resp", 
resp)
                return
        }
        b.responded = true
+       if b.DataAbort != nil {
+               // Defer closing DataAbort to guarantee that the abort signal 
is sent
+               // when this function returns. This ensures it is always 
executed after
+               // any error has been safely written and synchronized via 
b.SetErr() or,
+               // in the happy path, after the successful response is sent to 
b.Resp.
+               defer close(b.DataAbort)
+       }
        if resp.GetError() != "" {
-               b.BundleErr = fmt.Errorf("bundle %v %v failed:%v", 
resp.GetInstructionId(), b.PBDID, resp.GetError())
+               slog.Error("Prism received bundle error from worker response", 
"bundle", resp.GetInstructionId())
+               b.SetErr(fmt.Errorf("bundle %v %v failed:%v", 
resp.GetInstructionId(), b.PBDID, resp.GetError()))
                close(b.Resp)
                return
        }
@@ -143,6 +182,13 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan 
struct{} {
                        b.DataOrTimerDone()
                }
                return b.DataWait
+       case <-b.DataAbort:
+               // The bundle completed/failed before req was sent.
+               outCap := b.OutputCount + len(b.HasTimers)
+               for i := 0; i < outCap; i++ {
+                       b.DataOrTimerDone()
+               }
+               return b.DataWait
        case wk.InstReqs <- req:
                // desired outcome
        }
@@ -181,6 +227,9 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan 
struct{} {
                case <-ctx.Done():
                        b.DataOrTimerDone()
                        return b.DataWait
+               case <-b.DataAbort:
+                       b.DataOrTimerDone()
+                       return b.DataWait
                case wk.DataReqs <- elms:
                }
        }
@@ -202,6 +251,9 @@ func (b *B) ProcessOn(ctx context.Context, wk *W) <-chan 
struct{} {
        case <-ctx.Done():
                b.DataOrTimerDone()
                return b.DataWait
+       case <-b.DataAbort:
+               b.DataOrTimerDone()
+               return b.DataWait
        case wk.DataReqs <- &fnpb.Elements{
                Timers: timers,
                Data: []*fnpb.Elements_Data{
diff --git a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go 
b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
index 76a05563ec3..055a003e276 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/worker/worker_test.go
@@ -522,6 +522,73 @@ func TestWorker_State_MultimapSideInput(t *testing.T) {
        }
 }
 
+// TestBundle_ProcessOn_WorkerFailure verifies that the runner does not 
deadlock when
+// a worker fails mid-bundle and stops reading elements from the Data plane 
stream.
+func TestBundle_ProcessOn_WorkerFailure(t *testing.T) {
+       ctx, wk, clientConn := serveTestWorker(t)
+
+       dataCli := fnpb.NewBeamFnDataClient(clientConn)
+       dataStream, err := dataCli.Data(ctx)
+       if err != nil {
+               t.Fatal("couldn't create data client:", err)
+       }
+
+       instID := wk.NextInst()
+
+       // Create 15 large input blocks (512 KB each) to saturate the 10-slot 
channel buffer
+       // and the gRPC flow control window, forcing the Data sender inside 
worker.go to block.
+       largeBytes := make([]byte, 512*1024)
+       var inputBlocks []*engine.Block
+       for i := 0; i < 15; i++ {
+               inputBlocks = append(inputBlocks, &engine.Block{
+                       Kind:  engine.BlockData,
+                       Bytes: [][]byte{largeBytes},
+               })
+       }
+
+       b := &B{
+               InstID:      instID,
+               PBDID:       "teststageID",
+               Input:       inputBlocks,
+               OutputCount: 1,
+       }
+       b.Init()
+       wk.activeInstructions[instID] = b
+
+       processOnDone := make(chan struct{})
+       go func() {
+               b.ProcessOn(ctx, wk)
+               close(processOnDone)
+       }()
+
+       // Send the initial process bundle request trigger.
+       wk.InstReqs <- &fnpb.InstructionRequest{
+               InstructionId: instID,
+       }
+
+       // Read only the first block to simulate worker processing start.
+       _, err = dataStream.Recv()
+       if err != nil {
+               t.Fatal("couldn't receive first data element:", err)
+       }
+
+       // Simulate worker failure by responding with an error on the Control 
channel.
+       // Without the fix, ProcessOn's background goroutine deadlocks at 
`wk.DataReqs <- elms`
+       // because the client stopped reading and the buffer/flow-control is 
saturated.
+       wk.activeInstructions[instID].Respond(&fnpb.InstructionResponse{
+               InstructionId: instID,
+               Error:         "Intentional worker failure",
+       })
+
+       // Verify that ProcessOn exits cleanly and does not deadlock/hang.
+       select {
+       case <-processOnDone:
+               // Test passed: ProcessOn exited successfully!
+       case <-time.After(10 * time.Second):
+               t.Fatal("ProcessOn deadlocked / hung after worker failure!")
+       }
+}
+
 func newWorker() *W {
        mw := &MultiplexW{
                pool: map[string]*W{},
diff --git a/sdks/python/apache_beam/runners/portability/job_server.py 
b/sdks/python/apache_beam/runners/portability/job_server.py
index 53688e0be95..8bdad6b70fe 100644
--- a/sdks/python/apache_beam/runners/portability/job_server.py
+++ b/sdks/python/apache_beam/runners/portability/job_server.py
@@ -121,6 +121,11 @@ class SubprocessJobServer(JobServer):
       logger = logging.getLogger(f"{self.__class__.__name__}")
       if self._log_filter is not None:
         logger.addFilter(self._log_filter)
+        # Explicitly set logger level to INFO so logger.info(...) calls in
+        # SubprocessServer._really_start_process's log_stdout pass the initial
+        # isEnabledFor check, allowing the filter to dynamically elevate log
+        # levels to WARNING or ERROR.
+        logger.setLevel(logging.INFO)
       self._server = subprocess_server.SubprocessServer(
           beam_job_api_pb2_grpc.JobServiceStub, cmd, port=port, logger=logger)
     return self._server.start()
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py 
b/sdks/python/apache_beam/runners/portability/prism_runner.py
index 0458898b5e9..105c27fcb8c 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner.py
@@ -154,9 +154,13 @@ class PrismRunnerLogFilter(logging.Filter):
           record.msg = json_record["sdk"]["msg"]
         else:
           record.name = "PrismRunner"
-          record.msg = (
-              f"{json_record['msg']} "
-              f"({', '.join(f'{k}={v!r}' for k, v in extras.items())})")
+          formatted_extras = []
+          for k, v in extras.items():
+            if isinstance(v, str) and '\n' in v:
+              formatted_extras.append(f"\n{k}:\n{v}")
+            else:
+              formatted_extras.append(f"{k}={v!r}")
+          record.msg = f"{json_record['msg']} ({', '.join(formatted_extras)})"
       except (json.JSONDecodeError,
               KeyError,
               ValueError,
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner_test.py 
b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
index 9c1620603fd..bb6b7bb04da 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner_test.py
@@ -300,6 +300,17 @@ class 
PrismRunnerTest(portable_runner_test.PortableRunnerTest):
                   ('B-3', {10, 15, 16}),
               ])))
 
+  def test_failing_dofn(self):
+    # Prism interprets all bundle failures as RuntimeError
+    with self.assertRaisesRegex(
+        RuntimeError, "Intentional DoFn failure for prism logging test"):
+      with self.create_pipeline() as p:
+
+        def failing_fn(x):
+          raise ValueError("Intentional DoFn failure for prism logging test")
+
+        _ = p | beam.Create([1, 2, 3]) | beam.Map(failing_fn)
+
 
 class PrismJobServerTest(unittest.TestCase):
   def setUp(self) -> None:

Reply via email to