lostluck commented on code in PR #34828:
URL: https://github.com/apache/beam/pull/34828#discussion_r2072731387
##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -218,58 +223,75 @@ progress:
}
slog.Debug("progress report", "bundle", rb, "index",
index, "prevIndex", previousIndex)
+ var fraction float64
+
// Check if there has been any measurable progress by
the input, or all output pcollections since last report.
slow := previousIndex == index["index"] &&
previousTotalCount == index["totalCount"]
+ checkpointReady := checkpointTickCount >=
checkpointTickCutoff
if slow && unsplit {
- slog.Debug("splitting report", "bundle", rb,
"index", index)
- sr, err := b.Split(ctx, wk, 0.5 /* fraction of
remainder */, nil /* allowed splits */)
- if err != nil {
- slog.Warn("SDK Error from split,
aborting splits", "bundle", rb, "error", err.Error())
- break progress
- }
- if sr.GetChannelSplits() == nil {
- slog.Debug("SDK returned no splits",
"bundle", rb)
- unsplit = false
- continue progress
- }
+ fraction = 0.5
+ } else if checkpointReady && unsplit {
Review Comment:
My intuition is telling me that we should not to add it for the bounded
case. But the essence of Beam is to unify batch and streaming so it's probably
fine.
##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -218,58 +223,75 @@ progress:
}
slog.Debug("progress report", "bundle", rb, "index",
index, "prevIndex", previousIndex)
+ var fraction float64
+
// Check if there has been any measurable progress by
the input, or all output pcollections since last report.
slow := previousIndex == index["index"] &&
previousTotalCount == index["totalCount"]
+ checkpointReady := checkpointTickCount >=
checkpointTickCutoff
if slow && unsplit {
- slog.Debug("splitting report", "bundle", rb,
"index", index)
- sr, err := b.Split(ctx, wk, 0.5 /* fraction of
remainder */, nil /* allowed splits */)
- if err != nil {
- slog.Warn("SDK Error from split,
aborting splits", "bundle", rb, "error", err.Error())
- break progress
- }
- if sr.GetChannelSplits() == nil {
- slog.Debug("SDK returned no splits",
"bundle", rb)
- unsplit = false
- continue progress
- }
+ fraction = 0.5
+ } else if checkpointReady && unsplit {
+ // splitting on 0.0 fraction to make a
checkpoint
+ fraction = 0.0
+ // reset tickCount after scheduling a checkpoint
+ checkpointTickCount = 0
+ } else {
+ previousIndex = index["index"]
+ previousTotalCount = index["totalCount"]
+ continue progress
+ }
- // TODO sort out rescheduling primary Roots on
bundle failure.
- var residuals []engine.Residual
- for _, rr := range sr.GetResidualRoots() {
- ba := rr.GetApplication()
- residuals = append(residuals,
engine.Residual{Element: ba.GetElement()})
- if len(ba.GetElement()) == 0 {
- slog.LogAttrs(context.TODO(),
slog.LevelError, "returned empty residual application", slog.Any("bundle", rb))
- panic("sdk returned empty
residual application")
- }
- // TODO what happens to output
watermarks on splits?
- }
- if len(sr.GetChannelSplits()) != 1 {
- slog.Warn("received non-single channel
split", "bundle", rb)
- }
- cs := sr.GetChannelSplits()[0]
- fr := cs.GetFirstResidualElement()
- // The first residual can be after the end of
data, so filter out those cases.
- if b.EstimatedInputElements >= int(fr) {
- b.EstimatedInputElements = int(fr) //
Update the estimate for the next split.
- // Split Residuals are returned right
away for rescheduling.
- em.ReturnResiduals(rb, int(fr),
s.inputInfo, engine.Residuals{
- Data: residuals,
- })
+ // Do the split (fraction > 0) or checkpoint (fraction
== 0)
+ slog.Debug("splitting report", "bundle", rb, "index",
index)
+ sr, err := b.Split(ctx, wk, fraction /* fraction of
remainder */, nil /* allowed splits */)
+ if err != nil {
+ slog.Warn("SDK Error from split, aborting
splits", "bundle", rb, "error", err.Error())
+ break progress
+ }
+ if sr.GetChannelSplits() == nil {
+ slog.Debug("SDK returned no splits", "bundle",
rb)
+ unsplit = false
+ continue progress
+ }
+ // Save residual roots for checkpoint. After
checkpointing is successful,
+ // the bundle will be marked as finished and no
residual roots will be
Review Comment:
It is expected and by design.
Were you seeing errors due to returning the residuals early in the split
response for checkpointing case?
There are a few different cases to think about, but they're aligned with the
two FnAPI calls in question.
1. Normal ProcessBundleResponse: Returns when the primary is completed,
there are no residuals to worry about.
2. Split Response + ProcessBundleResponse:
The Split Response contains the confirmation of the primary (what the
bundle will finish processing), and the residual that needs to be processed
later. ProcessBundleResponse will not contain any residuals at this time, since
they were already persisted by the split response (per the above).
3. Self Checkpointed ProcessBundleResponse: This is when the DoFn itself
returns a process continuation for a specific element (eg. Resume in 10s or
similar). The Primary is by definition completed, but there may be residuals to
process later. That's what's returned and scheduled.
You're seeing 2 in this case. We shouldn't need to do any additional
residual handling and processing after the bundle is finished here. I'd be a
bit concerned that there is a data duplication risk when doing it this way (the
same residuals getting "returned" twice.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]