tarun-google commented on code in PR #36285:
URL: https://github.com/apache/beam/pull/36285#discussion_r2399525867
##########
sdks/python/apache_beam/transforms/trigger.py:
##########
@@ -933,6 +980,13 @@ def to_runner_api(self, context):
def has_ontime_pane(self):
return any(t.has_ontime_pane() for t in self.triggers)
+ def get_continuation_trigger(self):
+ return Repeatedly(
Review Comment:
@shunping With continuation trigger concept, we are injecting a new trigger
after the GroupBy window. Which gets evaluated every time there is a new pane
of data released by first trigger. For example if the initial trigger is
AfterProcessingTime(5), which trigger only once after 5 sec. we are adding a
new trigger after GroupBy when this trigger happens, which is pass by layer. A
lot of our triggers are one time.
But the point with AfterEach(condition1, condition2,..) is it is not a one
time trigger. it triggers every time there is a condition met. So, if we just
write the continuation trigger AfterAny() then it triggers only once. we want
continuation trigger for AfterEach to trigger every time the condition is met,
not once.
Reference:
1. [Java impl for AfterEach continuation trigger and its
definition](https://github.com/tarun-google/beam/blob/14a601921aa1f5068d890e8e7bf945af1eb41ad0/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java#L74)
2. [Definition of
AfterAny](https://github.com/tarun-google/beam/blob/14a601921aa1f5068d890e8e7bf945af1eb41ad0/sdks/python/apache_beam/transforms/trigger.py#L882)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]