lostluck commented on code in PR #35106:
URL: https://github.com/apache/beam/pull/35106#discussion_r2116597717


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1345,18 +1340,23 @@ func (*statefulStageKind) addPending(ss *stageState, em 
*ElementManager, newPend
                heap.Push(&dnt.elements, e)
 
                if e.IsTimer() {
-                       if lastSet, ok := dnt.timers[timerKey{family: e.family, 
tag: e.tag, window: e.window}]; ok {
-                               // existing timer!
-                               // don't increase the count this time, as 
"this" timer is already pending.
-                               count--
-                               // clear out the existing hold for accounting 
purposes.
-                               ss.watermarkHolds.Drop(lastSet.hold, 1)
-                       }
-                       // Update the last set time on the timer.
-                       dnt.timers[timerKey{family: e.family, tag: e.tag, 
window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp}
+                       if e.sequence > 0 {
+                               if lastSet, ok := dnt.timers[timerKey{family: 
e.family, tag: e.tag, window: e.window}]; ok {
+                                       // existing timer!
+                                       // don't increase the count this time, 
as "this" timer is already pending.
+                                       count--
+                                       // clear out the existing hold for 
accounting purposes.
+                                       ss.watermarkHolds.Drop(lastSet.hold, 1)
+                               }
+                               // Update the last set time on the timer.
+                               dnt.timers[timerKey{family: e.family, tag: 
e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: 
e.holdTimestamp}
 
-                       // Mark the hold in the heap.
-                       ss.watermarkHolds.Add(e.holdTimestamp, 1)
+                               // Mark the hold in the heap.
+                               ss.watermarkHolds.Add(e.holdTimestamp, 1)
+                       } else {
+                               // timer is to be cleared
+                               delete(dnt.timers, timerKey{family: e.family, 
tag: e.tag, window: e.window})
+                       }

Review Comment:
   We do need to ensure the count is adjusted properly here, as it's required 
so the ElementManager has a global view of the amount of pending work.
   
   As it stands:
   
   We increment "count" for the clearing element. (line 1321) So we need to 
decrement it here, since it's a very fake element. 
   Further, if there's an existing timer, we also need to decrement the count 
*again* and clear the existing hold. (eg. the last set code.)
   
   So I'd move the `if e.sequence > 0 {` part to after the lastSet check, and 
only wrap the  "update and mark the hold" part in that.
   
   
   



##########
sdks/go/pkg/beam/runners/prism/internal/engine/timers.go:
##########
@@ -86,7 +86,18 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder 
WinCoderType, raw [
                        clear := d.Bool()
                        hold := mtime.MaxTimestamp
                        if clear {
-                               if !yield(timerRet{keyBytes, tag, nil, ws}) {
+                               var elms []element
+                               for _, w := range ws {
+                                       elms = append(elms, element{
+                                               tag:      tag,
+                                               elmBytes: nil, // indicates 
this is a timer.
+                                               keyBytes: keyBytes,
+                                               window:   w,
+                                               sequence: -1,

Review Comment:
   Add a comment here saying "indicates this timer is being cleared".  Also go 
to definition for the `element` struct definition, so we can update the 
documentation for the `sequence` about this usage of it: a value of -1 means 
this is a timer that should be cleared... etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to