This is an automated email from the ASF dual-hosted git repository. jrmccluskey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 0e734fe1c38 [Go SDK] Make it clearer that timers and data don't interact negatively in element batches. (#31319) 0e734fe1c38 is described below commit 0e734fe1c383b09096988b80551843374776c18f Author: Robert Burke <lostl...@users.noreply.github.com> AuthorDate: Tue May 21 07:11:07 2024 -0700 [Go SDK] Make it clearer that timers and data don't interact negatively in element batches. (#31319) * Don't swallow data errors on timer errors. * reset error in case of timer handling * Don't drop timers on splits. --------- Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com> --- sdks/go/pkg/beam/core/runtime/exec/datasource.go | 33 +++++++++++------------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go b/sdks/go/pkg/beam/core/runtime/exec/datasource.go index 674de44cf35..156d9565379 100644 --- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go +++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go @@ -107,15 +107,10 @@ func (n *DataSource) StartBundle(ctx context.Context, id string, data DataContex return n.Out.StartBundle(ctx, id, data) } -// errSplitSuccess is a marker error to indicate we've reached the split index. -// Akin to io.EOF. -var errSplitSuccess = errors.New("split index reached") - // process handles converting elements from the data source to timers. // // The data and timer callback functions must return an io.EOF if the reader terminates to signal that an additional -// buffer is desired. On successful splits, [splitSuccess] must be returned to indicate that the -// PTransform is done processing data for this instruction. +// buffer is desired. func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader, ptransformID string) error, timer func(bcr *byteCountReader, ptransformID, timerFamilyID string) error) error { // The SID contains this instruction's expected data processing transform (this one). elms, err := n.source.OpenElementChan(ctx, n.SID, maps.Keys(n.OnTimerTransforms)) @@ -129,7 +124,6 @@ func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader var byteCount int bcr := byteCountReader{reader: &r, count: &byteCount} - splitPrimaryComplete := map[string]bool{} for { var err error select { @@ -138,24 +132,21 @@ func (n *DataSource) process(ctx context.Context, data func(bcr *byteCountReader if !ok { return nil } - if splitPrimaryComplete[e.PtransformID] { - continue - } if len(e.Data) > 0 { r.Reset(e.Data) err = data(&bcr, e.PtransformID) } + if err != nil && err != io.EOF { + return errors.Wrapf(err, "source failed processing data") + } + // Process any simultaneously sent timers. + // If the data channel has split though if len(e.Timers) > 0 { r.Reset(e.Timers) err = timer(&bcr, e.PtransformID, e.TimerFamilyID) } - - if err == errSplitSuccess { - // Returning splitSuccess means we've split, and aren't consuming the remaining buffer. - // We mark the PTransform done to ignore further data. - splitPrimaryComplete[e.PtransformID] = true - } else if err != nil && err != io.EOF { - return errors.Wrap(err, "source failed") + if err != nil && err != io.EOF { + return errors.Wrap(err, "source failed processing timers") } // io.EOF means the reader successfully drained. // We're ready for a new buffer. @@ -210,8 +201,13 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) { cp = MakeElementDecoder(c) } + hasSplit := map[string]bool{} var checkpoints []*Checkpoint err := n.process(ctx, func(bcr *byteCountReader, ptransformID string) error { + // Check if this transform has already successfully, and if so, skip reading and decoding of the elements in the buffer. + if hasSplit[ptransformID] { + return nil + } for { // TODO(lostluck) 2020/02/22: Should we include window headers or just count the element sizes? ws, t, pn, err := DecodeWindowedValueHeader(wc, bcr.reader) @@ -258,7 +254,8 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) { } // We've finished processing an element, check if we have finished a split. if n.incrementIndexAndCheckSplit() { - return errSplitSuccess + hasSplit[ptransformID] = true + return nil } } },