damccorm commented on code in PR #35412:
URL: https://github.com/apache/beam/pull/35412#discussion_r2167020441


##########
sdks/python/apache_beam/transforms/periodicsequence.py:
##########
@@ -91,7 +94,39 @@ class ImpulseSeqGenDoFn(beam.DoFn):
   ImpulseSeqGenDoFn can't guarantee that each element is output at exact time.
   ImpulseSeqGenDoFn guarantees that elements would not be output prior to
   given runtime timestamp.
+
+  The output mode of the DoFn is based on the input `data`:
+
+    - **None**: If `data` is None (by default), the output element will be the

Review Comment:
   This is a good point. You could get around this by moving most of this logic 
into the `map` transform instead, but I don't think this is necessary if you 
don't think it is cleaner (I won't block on this).



##########
sdks/python/apache_beam/transforms/periodicsequence.py:
##########
@@ -114,24 +149,31 @@ def process(
     assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
 
     current_output_index = restriction_tracker.current_restriction().start
-    current_output_timestamp = start + interval * current_output_index
-    current_time = time.time()
-    watermark_estimator.set_watermark(
-        timestamp.Timestamp(current_output_timestamp))
-
-    while current_output_timestamp <= current_time:
-      if restriction_tracker.try_claim(current_output_index):
-        yield current_output_timestamp
-        current_output_index += 1
-        current_output_timestamp = start + interval * current_output_index
-        current_time = time.time()
-        watermark_estimator.set_watermark(
+
+    while True:
+      current_output_timestamp = start + interval * current_output_index
+
+      if current_output_timestamp > time.time():
+        # we are too ahead of time, let's wait.
+        restriction_tracker.defer_remainder(
             timestamp.Timestamp(current_output_timestamp))
-      else:
         return
 
-    restriction_tracker.defer_remainder(
-        timestamp.Timestamp(current_output_timestamp))
+      if not restriction_tracker.try_claim(current_output_index):
+        # nothing to claim, just stop
+        return
+
+      output, output_ts = self._get_output(current_output_index,
+                                           current_output_timestamp)
+
+      current_watermark = watermark_estimator.current_watermark()
+      if current_watermark is None or output_ts > current_watermark:
+        # ensure watermark is monotonic
+        watermark_estimator.set_watermark(output_ts)

Review Comment:
   I think this watermark estimation won't work if the data has provided event 
times.
   
   For example, imagine we have the following data:
   
   ```
   (data=foo, event_time=10)
   (data=foo, event_time=15)
   (data=foo, event_time=1)
   ```
   
   when we see the first element, we'd increment the watermark to 10. But this 
would immediately mean that the 3rd element is late.
   
   The problem is even more severe when you consider repeating data. If you 
have repeating data, the only valid watermark you can advance to is 
`min(all_event_times)`.
   
   I'd propose the following changes:
   
   1) If you repeat data, that data cannot have event times associated with it 
(otherwise watermark estimation is basically impossible). This would be a 
validation step at transform construction.
   2) If you don't repeat data, but there are associated watermarks, when 
emitting `data[i]`, set the watermark to `min(data[i+1:].map(lambda d: d. 
event_time))`. That will ensure a valid watermark.
   
   Note that this would allow you to move the logic into the `map` transform if 
you want to, resolving the problems described in 
https://github.com/apache/beam/pull/35412/files#r2164781118



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to