lostluck commented on code in PR #26782:
URL: https://github.com/apache/beam/pull/26782#discussion_r1213620890


##########
sdks/go/pkg/beam/core/runtime/exec/pardo.go:
##########
@@ -362,47 +370,128 @@ func (n *ParDo) invokeDataFn(ctx context.Context, pn 
typex.PaneInfo, ws []typex.
        return val, nil
 }
 
-func (n *ParDo) InvokeTimerFn(ctx context.Context, fn *funcx.Fn, timerFamilyID 
string, bcr *byteCountReader) (*FullValue, error) {
-       timerAdapter, ok := n.Timer.(*userTimerAdapter)
-       if !ok {
-               return nil, fmt.Errorf("userTimerAdapter empty for ParDo: %v", 
n.GetPID())
+// decodeBundleTimers is a helper to decode a batch of timers for a bundle, 
handling the io.EOF from the reader.
+func decodeBundleTimers(spec timerFamilySpec, r io.Reader) ([]TimerRecv, 
error) {
+       var bundleTimers []TimerRecv
+       for {
+               tmap, err := decodeTimer(spec.KeyDecoder, spec.WinDecoder, r)
+               if err != nil {
+                       if goerrors.Is(err, io.EOF) {
+                               break
+                       }
+                       return nil, errors.WithContext(err, "error decoding 
received timer callback")
+               }
+               bundleTimers = append(bundleTimers, tmap)
        }
-       tmap, err := decodeTimer(timerAdapter.dc, timerAdapter.wc, bcr)
+       return bundleTimers, nil
+}
+
+// ProcessTimers processes all timers in firing order from the runner for a 
timer family ID.
+//
+// A timer refers to a specific combination of Key+Window + Family + Tag. They 
also
+// have a fireing time, and a data watermark hold time. The SDK doesn't 
determine
+// if a timer is ready to fire or not, that's up to the runner.
+//
+// This method fires timers in the order from the runner. During this process, 
the user
+// code may set additional firings for one or more timers, which may overwrite 
orderings
+// from the runner.
+//
+// In particular, if runner sent timer produces a new firing that is earlier 
than a 2nd runner sent timer,
+// then it is processed before that 2nd timer. This will override any 
subsequent firing of the same timer,
+// and as a result, must add a clear to the set of timer modifications.
+func (n *ParDo) ProcessTimers(timerFamilyID string, r io.Reader) (err error) {
+       // Lookup actual domain for family here.
+       spec := n.TimerTracker.familyToSpec[timerFamilyID]
+
+       bundleTimers, err := decodeBundleTimers(spec, r)
        if err != nil {
-               return nil, errors.WithContext(err, "error decoding received 
timer callback")
+               return err
        }
+       for _, tmap := range bundleTimers {
+               n.TimerTracker.SetCurrentKeyString(tmap.KeyString)
+               for i, w := range tmap.Windows {
+                       ws := tmap.Windows[i : i+1]

Review Comment:
   Alas, it is not. w is an individual `typex.Window`, but we need a 
`[]typex.Window` instead for other reasons. So we re-use the backing array 
behind the slice decoded from the timer, and avoid an allocation.
   
   Since the windows are not writeable (they're copied by value anyways), this 
is safe for the downstream calls.
   
   There's probably some further cleanup we could do for the various window 
passing we do in this file (among other cleanups for the file), but this does 
the correct thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to