shunping commented on code in PR #34828:
URL: https://github.com/apache/beam/pull/34828#discussion_r2072317179
##########
sdks/go/pkg/beam/runners/prism/internal/stage.go:
##########
@@ -218,58 +223,75 @@ progress:
}
slog.Debug("progress report", "bundle", rb, "index",
index, "prevIndex", previousIndex)
+ var fraction float64
+
// Check if there has been any measurable progress by
the input, or all output pcollections since last report.
slow := previousIndex == index["index"] &&
previousTotalCount == index["totalCount"]
+ checkpointReady := checkpointTickCount >=
checkpointTickCutoff
if slow && unsplit {
- slog.Debug("splitting report", "bundle", rb,
"index", index)
- sr, err := b.Split(ctx, wk, 0.5 /* fraction of
remainder */, nil /* allowed splits */)
- if err != nil {
- slog.Warn("SDK Error from split,
aborting splits", "bundle", rb, "error", err.Error())
- break progress
- }
- if sr.GetChannelSplits() == nil {
- slog.Debug("SDK returned no splits",
"bundle", rb)
- unsplit = false
- continue progress
- }
+ fraction = 0.5
+ } else if checkpointReady && unsplit {
+ // splitting on 0.0 fraction to make a
checkpoint
+ fraction = 0.0
+ // reset tickCount after scheduling a checkpoint
+ checkpointTickCount = 0
+ } else {
+ previousIndex = index["index"]
+ previousTotalCount = index["totalCount"]
+ continue progress
+ }
- // TODO sort out rescheduling primary Roots on
bundle failure.
- var residuals []engine.Residual
- for _, rr := range sr.GetResidualRoots() {
- ba := rr.GetApplication()
- residuals = append(residuals,
engine.Residual{Element: ba.GetElement()})
- if len(ba.GetElement()) == 0 {
- slog.LogAttrs(context.TODO(),
slog.LevelError, "returned empty residual application", slog.Any("bundle", rb))
- panic("sdk returned empty
residual application")
- }
- // TODO what happens to output
watermarks on splits?
- }
- if len(sr.GetChannelSplits()) != 1 {
- slog.Warn("received non-single channel
split", "bundle", rb)
- }
- cs := sr.GetChannelSplits()[0]
- fr := cs.GetFirstResidualElement()
- // The first residual can be after the end of
data, so filter out those cases.
- if b.EstimatedInputElements >= int(fr) {
- b.EstimatedInputElements = int(fr) //
Update the estimate for the next split.
- // Split Residuals are returned right
away for rescheduling.
- em.ReturnResiduals(rb, int(fr),
s.inputInfo, engine.Residuals{
- Data: residuals,
- })
+ // Do the split (fraction > 0) or checkpoint (fraction
== 0)
+ slog.Debug("splitting report", "bundle", rb, "index",
index)
+ sr, err := b.Split(ctx, wk, fraction /* fraction of
remainder */, nil /* allowed splits */)
+ if err != nil {
+ slog.Warn("SDK Error from split, aborting
splits", "bundle", rb, "error", err.Error())
+ break progress
+ }
+ if sr.GetChannelSplits() == nil {
+ slog.Debug("SDK returned no splits", "bundle",
rb)
+ unsplit = false
+ continue progress
+ }
+ // Save residual roots for checkpoint. After
checkpointing is successful,
+ // the bundle will be marked as finished and no
residual roots will be
Review Comment:
This is kind of a surprise. When a bundle finishes due to splitting with 0.0
fraction, no residual roots in the response. Is this by design?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]