lostluck commented on code in PR #36126:
URL: https://github.com/apache/beam/pull/36126#discussion_r2360630907
##########
sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go:
##########
@@ -573,4 +574,105 @@ func (t *TriggerDefault) String() string {
return "Default"
}
-// TODO https://github.com/apache/beam/issues/31438 Handle
TriggerAfterProcessingTime
+// TimestampTransform is the engine's representation of a processing time
transform.
+type TimestampTransform struct {
+ Delay time.Duration
+ AlignToPeriod time.Duration
+ AlignToOffset time.Duration
+}
+
+// TriggerAfterProcessingTime fires once after a specified amount of
processing time
+// has passed since an element was first seen.
+// Uses the extra state field to track the processing time of the first
element.
+type TriggerAfterProcessingTime struct {
+ Transforms []TimestampTransform
+}
+
+type afterProcessingTimeState struct {
+ emNow mtime.Time
+ firingTime mtime.Time
+ endOfWindowReached bool
Review Comment:
It's handy to have emNow in state, since it helps track the change
happening. Feels consistent. That's partly why there isn't the triggerInput on
those methods presently: it's harder to reason about the state machine, if the
state isn't in the machine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]