lostluck commented on code in PR #35106:
URL: https://github.com/apache/beam/pull/35106#discussion_r2116597717
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1345,18 +1340,23 @@ func (*statefulStageKind) addPending(ss *stageState, em
*ElementManager, newPend
heap.Push(&dnt.elements, e)
if e.IsTimer() {
- if lastSet, ok := dnt.timers[timerKey{family: e.family,
tag: e.tag, window: e.window}]; ok {
- // existing timer!
- // don't increase the count this time, as
"this" timer is already pending.
- count--
- // clear out the existing hold for accounting
purposes.
- ss.watermarkHolds.Drop(lastSet.hold, 1)
- }
- // Update the last set time on the timer.
- dnt.timers[timerKey{family: e.family, tag: e.tag,
window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp}
+ if e.sequence > 0 {
+ if lastSet, ok := dnt.timers[timerKey{family:
e.family, tag: e.tag, window: e.window}]; ok {
+ // existing timer!
+ // don't increase the count this time,
as "this" timer is already pending.
+ count--
+ // clear out the existing hold for
accounting purposes.
+ ss.watermarkHolds.Drop(lastSet.hold, 1)
+ }
+ // Update the last set time on the timer.
+ dnt.timers[timerKey{family: e.family, tag:
e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold:
e.holdTimestamp}
- // Mark the hold in the heap.
- ss.watermarkHolds.Add(e.holdTimestamp, 1)
+ // Mark the hold in the heap.
+ ss.watermarkHolds.Add(e.holdTimestamp, 1)
+ } else {
+ // timer is to be cleared
+ delete(dnt.timers, timerKey{family: e.family,
tag: e.tag, window: e.window})
+ }
Review Comment:
We do need to ensure the count is adjusted properly here, as it's required
so the ElementManager has a global view of the amount of pending work.
As it stands:
We increment "count" for the clearing element. (line 1321) So we need to
decrement it here, since it's a very fake element.
Further, if there's an existing timer, we also need to decrement the count
*again* and clear the existing hold. (eg. the last set code.)
So I'd move the `if e.sequence > 0 {` part to after the lastSet check, and
only wrap the "update and mark the hold" part in that.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/timers.go:
##########
@@ -86,7 +86,18 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder
WinCoderType, raw [
clear := d.Bool()
hold := mtime.MaxTimestamp
if clear {
- if !yield(timerRet{keyBytes, tag, nil, ws}) {
+ var elms []element
+ for _, w := range ws {
+ elms = append(elms, element{
+ tag: tag,
+ elmBytes: nil, // indicates
this is a timer.
+ keyBytes: keyBytes,
+ window: w,
+ sequence: -1,
Review Comment:
Add a comment here saying "indicates this timer is being cleared". Also go
to definition for the `element` struct definition, so we can update the
documentation for the `sequence` about this usage of it: a value of -1 means
this is a timer that should be cleared... etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]