lostluck commented on code in PR #35106:
URL: https://github.com/apache/beam/pull/35106#discussion_r2116936343
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1345,18 +1341,28 @@ func (*statefulStageKind) addPending(ss *stageState, em
*ElementManager, newPend
heap.Push(&dnt.elements, e)
if e.IsTimer() {
- if lastSet, ok := dnt.timers[timerKey{family: e.family,
tag: e.tag, window: e.window}]; ok {
+ lastSet, ok := dnt.timers[timerKey{family: e.family,
tag: e.tag, window: e.window}]
+ if ok {
// existing timer!
// don't increase the count this time, as
"this" timer is already pending.
count--
// clear out the existing hold for accounting
purposes.
ss.watermarkHolds.Drop(lastSet.hold, 1)
}
- // Update the last set time on the timer.
- dnt.timers[timerKey{family: e.family, tag: e.tag,
window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp}
+ if e.sequence >= 0 {
+ // Update the last set time on the timer.
+ dnt.timers[timerKey{family: e.family, tag:
e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold:
e.holdTimestamp}
- // Mark the hold in the heap.
- ss.watermarkHolds.Add(e.holdTimestamp, 1)
+ // Mark the hold in the heap.
+ ss.watermarkHolds.Add(e.holdTimestamp, 1)
+ } else {
+ // we need to decrement the pending count only
if the timer to be cleared is in the pending list
+ if ok {
+ count--
+ }
Review Comment:
I don't think this needs to be conditional? But I may be wrong that it
needed to be decremented twice...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]