shunping commented on code in PR #35106:
URL: https://github.com/apache/beam/pull/35106#discussion_r2116887371


##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1345,18 +1340,23 @@ func (*statefulStageKind) addPending(ss *stageState, em 
*ElementManager, newPend
                heap.Push(&dnt.elements, e)
 
                if e.IsTimer() {
-                       if lastSet, ok := dnt.timers[timerKey{family: e.family, 
tag: e.tag, window: e.window}]; ok {
-                               // existing timer!
-                               // don't increase the count this time, as 
"this" timer is already pending.
-                               count--
-                               // clear out the existing hold for accounting 
purposes.
-                               ss.watermarkHolds.Drop(lastSet.hold, 1)
-                       }
-                       // Update the last set time on the timer.
-                       dnt.timers[timerKey{family: e.family, tag: e.tag, 
window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp}
+                       if e.sequence > 0 {
+                               if lastSet, ok := dnt.timers[timerKey{family: 
e.family, tag: e.tag, window: e.window}]; ok {
+                                       // existing timer!
+                                       // don't increase the count this time, 
as "this" timer is already pending.
+                                       count--
+                                       // clear out the existing hold for 
accounting purposes.
+                                       ss.watermarkHolds.Drop(lastSet.hold, 1)
+                               }
+                               // Update the last set time on the timer.
+                               dnt.timers[timerKey{family: e.family, tag: 
e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: 
e.holdTimestamp}
 
-                       // Mark the hold in the heap.
-                       ss.watermarkHolds.Add(e.holdTimestamp, 1)
+                               // Mark the hold in the heap.
+                               ss.watermarkHolds.Add(e.holdTimestamp, 1)
+                       } else {
+                               // timer is to be cleared
+                               delete(dnt.timers, timerKey{family: e.family, 
tag: e.tag, window: e.window})
+                       }

Review Comment:
   Yes. I noticed that I got an error about there were pending elements at the 
end of a pipeline. It is fixed now with the latest commit.



##########
sdks/go/pkg/beam/runners/prism/internal/engine/timers.go:
##########
@@ -86,7 +86,18 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder 
WinCoderType, raw [
                        clear := d.Bool()
                        hold := mtime.MaxTimestamp
                        if clear {
-                               if !yield(timerRet{keyBytes, tag, nil, ws}) {
+                               var elms []element
+                               for _, w := range ws {
+                                       elms = append(elms, element{
+                                               tag:      tag,
+                                               elmBytes: nil, // indicates 
this is a timer.
+                                               keyBytes: keyBytes,
+                                               window:   w,
+                                               sequence: -1,

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to