This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e734fe1c38 [Go SDK] Make it clearer that timers and data don't 
interact negatively in element batches. (#31319)
0e734fe1c38 is described below

commit 0e734fe1c383b09096988b80551843374776c18f
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Tue May 21 07:11:07 2024 -0700

    [Go SDK] Make it clearer that timers and data don't interact negatively in 
element batches. (#31319)
    
    * Don't swallow data errors on timer errors.
    
    * reset error in case of timer handling
    
    * Don't drop timers on splits.
    
    ---------
    
    Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com>
---
 sdks/go/pkg/beam/core/runtime/exec/datasource.go | 33 +++++++++++-------------
 1 file changed, 15 insertions(+), 18 deletions(-)

diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index 674de44cf35..156d9565379 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -107,15 +107,10 @@ func (n *DataSource) StartBundle(ctx context.Context, id 
string, data DataContex
        return n.Out.StartBundle(ctx, id, data)
 }
 
-// errSplitSuccess is a marker error to indicate we've reached the split index.
-// Akin to io.EOF.
-var errSplitSuccess = errors.New("split index reached")
-
 // process handles converting elements from the data source to timers.
 //
 // The data and timer callback functions must return an io.EOF if the reader 
terminates to signal that an additional
-// buffer is desired. On successful splits, [splitSuccess] must be returned to 
indicate that the
-// PTransform is done processing data for this instruction.
+// buffer is desired.
 func (n *DataSource) process(ctx context.Context, data func(bcr 
*byteCountReader, ptransformID string) error, timer func(bcr *byteCountReader, 
ptransformID, timerFamilyID string) error) error {
        // The SID contains this instruction's expected data processing 
transform (this one).
        elms, err := n.source.OpenElementChan(ctx, n.SID, 
maps.Keys(n.OnTimerTransforms))
@@ -129,7 +124,6 @@ func (n *DataSource) process(ctx context.Context, data 
func(bcr *byteCountReader
        var byteCount int
        bcr := byteCountReader{reader: &r, count: &byteCount}
 
-       splitPrimaryComplete := map[string]bool{}
        for {
                var err error
                select {
@@ -138,24 +132,21 @@ func (n *DataSource) process(ctx context.Context, data 
func(bcr *byteCountReader
                        if !ok {
                                return nil
                        }
-                       if splitPrimaryComplete[e.PtransformID] {
-                               continue
-                       }
                        if len(e.Data) > 0 {
                                r.Reset(e.Data)
                                err = data(&bcr, e.PtransformID)
                        }
+                       if err != nil && err != io.EOF {
+                               return errors.Wrapf(err, "source failed 
processing data")
+                       }
+                       // Process any simultaneously sent timers.
+                       // If the data channel has split though
                        if len(e.Timers) > 0 {
                                r.Reset(e.Timers)
                                err = timer(&bcr, e.PtransformID, 
e.TimerFamilyID)
                        }
-
-                       if err == errSplitSuccess {
-                               // Returning splitSuccess means we've split, 
and aren't consuming the remaining buffer.
-                               // We mark the PTransform done to ignore 
further data.
-                               splitPrimaryComplete[e.PtransformID] = true
-                       } else if err != nil && err != io.EOF {
-                               return errors.Wrap(err, "source failed")
+                       if err != nil && err != io.EOF {
+                               return errors.Wrap(err, "source failed 
processing timers")
                        }
                        // io.EOF means the reader successfully drained.
                        // We're ready for a new buffer.
@@ -210,8 +201,13 @@ func (n *DataSource) Process(ctx context.Context) 
([]*Checkpoint, error) {
                cp = MakeElementDecoder(c)
        }
 
+       hasSplit := map[string]bool{}
        var checkpoints []*Checkpoint
        err := n.process(ctx, func(bcr *byteCountReader, ptransformID string) 
error {
+               // Check if this transform has already successfully, and if so, 
skip reading and decoding of the elements in the buffer.
+               if hasSplit[ptransformID] {
+                       return nil
+               }
                for {
                        // TODO(lostluck) 2020/02/22: Should we include window 
headers or just count the element sizes?
                        ws, t, pn, err := DecodeWindowedValueHeader(wc, 
bcr.reader)
@@ -258,7 +254,8 @@ func (n *DataSource) Process(ctx context.Context) 
([]*Checkpoint, error) {
                        }
                        //      We've finished processing an element, check if 
we have finished a split.
                        if n.incrementIndexAndCheckSplit() {
-                               return errSplitSuccess
+                               hasSplit[ptransformID] = true
+                               return nil
                        }
                }
        },

Reply via email to