lostluck commented on code in PR #38523:
URL: https://github.com/apache/beam/pull/38523#discussion_r3282519889
##########
sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go:
##########
@@ -61,8 +62,18 @@ type B struct {
dataSema atomic.Int32
OutputData engine.TentativeData
- Resp chan *fnpb.ProcessBundleResponse
- Done chan struct{}
+ Resp chan *fnpb.ProcessBundleResponse
+ // DataAbort is closed when the worker responds to the bundle
instruction
+ // (with success or failure), signaling ProcessOn to stop streaming
data.
+ //
+ // This prevents a deadlock where a worker fails mid-bundle and stops
reading
+ // from the data channel while the runner blocks indefinitely
attempting to
+ // write remaining elements. Other signals are insufficient to abort
immediately:
+ // - ctx.Done() only triggers on global timeouts/cancellations, which
is too late.
+ // - wk.StoppedChan is only closed when tearing down the worker pool,
which does
+ // not happen while the runner is waiting on the current bundle to
finish.
+ DataAbort chan struct{}
+ mu sync.Mutex
BundleErr error
Review Comment:
We should make this bundleErr to soft-indicate that we should be using
SetErr and GetErr. The rename force catches any missing locations.
##########
sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go:
##########
@@ -98,18 +109,38 @@ func (b *B) LogValue() slog.Value {
slog.String("stage", b.PBDID))
}
+// SetErr sets the bundle error if it is not already set.
+func (b *B) SetErr(err error) {
Review Comment:
Consider having this return a boolean to indicate whether the value was set
or not. That way the caller can optionally log the error. (eg. "Error for
bundle %v already set, logging dropped followup error: %v").
I can't imagine we would need to log all the locations where this is
happening.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]