shunping commented on code in PR #36126:
URL: https://github.com/apache/beam/pull/36126#discussion_r2342796607
##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -573,4 +574,105 @@ func (t *TriggerDefault) String() string {
return "Default"
}
-// TODO https://github.com/apache/beam/issues/31438 Handle
TriggerAfterProcessingTime
+// TimestampTransform is the engine's representation of a processing time
transform.
+type TimestampTransform struct {
+ Delay time.Duration
+ AlignToPeriod time.Duration
+ AlignToOffset time.Duration
+}
+
+// TriggerAfterProcessingTime fires once after a specified amount of
processing time
+// has passed since an element was first seen.
+// Uses the extra state field to track the processing time of the first
element.
+type TriggerAfterProcessingTime struct {
+ Transforms []TimestampTransform
+}
+
+type afterProcessingTimeState struct {
+ emNow mtime.Time
+ firingTime mtime.Time
+ endOfWindowReached bool
Review Comment:
Alternatively, we can pass `triggerInput` as an argument in `shouldFire()`
and `reset()`. Then we don't need to put `emNow` and `endOfWindowReached`
inside `ts.extra`.
I am fine with both approaches, but would also want to see if you have any
opinion on that. @lostluck
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]