shunping commented on code in PR #35106:
URL: https://github.com/apache/beam/pull/35106#discussion_r2116887371
##########
sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go:
##########
@@ -1345,18 +1340,23 @@ func (*statefulStageKind) addPending(ss *stageState, em
*ElementManager, newPend
heap.Push(&dnt.elements, e)
if e.IsTimer() {
- if lastSet, ok := dnt.timers[timerKey{family: e.family,
tag: e.tag, window: e.window}]; ok {
- // existing timer!
- // don't increase the count this time, as
"this" timer is already pending.
- count--
- // clear out the existing hold for accounting
purposes.
- ss.watermarkHolds.Drop(lastSet.hold, 1)
- }
- // Update the last set time on the timer.
- dnt.timers[timerKey{family: e.family, tag: e.tag,
window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp}
+ if e.sequence > 0 {
+ if lastSet, ok := dnt.timers[timerKey{family:
e.family, tag: e.tag, window: e.window}]; ok {
+ // existing timer!
+ // don't increase the count this time,
as "this" timer is already pending.
+ count--
+ // clear out the existing hold for
accounting purposes.
+ ss.watermarkHolds.Drop(lastSet.hold, 1)
+ }
+ // Update the last set time on the timer.
+ dnt.timers[timerKey{family: e.family, tag:
e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold:
e.holdTimestamp}
- // Mark the hold in the heap.
- ss.watermarkHolds.Add(e.holdTimestamp, 1)
+ // Mark the hold in the heap.
+ ss.watermarkHolds.Add(e.holdTimestamp, 1)
+ } else {
+ // timer is to be cleared
+ delete(dnt.timers, timerKey{family: e.family,
tag: e.tag, window: e.window})
+ }
Review Comment:
Yes. I noticed that I got an error about there were pending elements at the
end of a pipeline. It is fixed now with the latest commit.
##########
sdks/go/pkg/beam/runners/prism/internal/engine/timers.go:
##########
@@ -86,7 +86,18 @@ func decodeTimerIter(keyDec func(io.Reader) []byte, winCoder
WinCoderType, raw [
clear := d.Bool()
hold := mtime.MaxTimestamp
if clear {
- if !yield(timerRet{keyBytes, tag, nil, ws}) {
+ var elms []element
+ for _, w := range ws {
+ elms = append(elms, element{
+ tag: tag,
+ elmBytes: nil, // indicates
this is a timer.
+ keyBytes: keyBytes,
+ window: w,
+ sequence: -1,
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]