shunping commented on code in PR #35412:
URL: https://github.com/apache/beam/pull/35412#discussion_r2164835161


##########
sdks/python/apache_beam/transforms/periodicsequence.py:
##########
@@ -202,3 +244,79 @@ def expand(self, pbegin):
       result = result | 'ApplyWindowing' >> beam.WindowInto(
           window.FixedWindows(self.interval))
     return result
+
+
+class PeriodicStream(beam.PTransform):
+  """A PTransform that generates a periodic stream of elements from a sequence.
+
+  This transform creates a `PCollection` by emitting elements from a provided
+  Python sequence at a specified time interval. It is designed for use in
+  streaming pipelines to simulate a live, continuous source of data.
+
+  The transform can be configured to:
+  - Emit the sequence only once.
+  - Repeat the sequence indefinitely or for a maximum duration.
+  - Control the time interval between elements.
+
+  To ensure that the stream does not emit a burst of elements immediately at
+  pipeline startup, a fixed warmup period is added before the first element
+  is generated.
+
+  Args:
+      data: The sequence of elements to emit into the PCollection. The elements
+        can be raw values or pre-timestamped tuples in the format
+        `(apache_beam.utils.timestamp.Timestamp, value)`.
+      max_duration: The maximum total duration in seconds for the stream
+        generation. If `None` (the default) and `repeat` is `True`, the
+        stream is effectively infinite. If `repeat` is `False`, the stream's
+        duration is the shorter of this value and the time required to emit
+        the sequence once.
+      interval: The delay in seconds between consecutive elements.
+        Defaults to 0.1.
+      repeat: If `True`, the input `data` sequence is emitted repeatedly.
+        If `False` (the default), the sequence is emitted only once.
+      warmup_time: The extra wait time for the impulse element
+        (start, end, interval) to reach `ImpulseSeqGenDoFn`. It is used to
+        avoid the events clustering at the beginning.
+  """
+  def __init__(
+      self,
+      data: Sequence[Any],
+      max_duration: Optional[float] = None,
+      interval: float = 0.1,
+      repeat: bool = False,
+      warmup_time: float = 2.0):

Review Comment:
   I find that the time domain of `start_timestamp`, `stop_timestamp` and 
`fire_interval` is a bit confusing in `PeriodicImpulse`. 
   
   It seems that the `start_timestamp` and `stop_timestamp` are about event 
time. The current behavior when `start_timestamp < now` is to catch up event 
time to current time, which is very unintuitive. Assuming someone specifies 
start_timestamp to 0, then most cpu time will be spent on emitting billions of 
events. 
   
   After the event time counter catches up to `now`, then `fire_interval` takes 
into affect but now this interval sounds like an interval of processing time.
   
   Given such inconsistency, my thought is to hide 
`start_timestamp`/`stop_timstamp` from the useage. By setting `start_timestamp 
= Timestamp.now()` (and plus some warm up time), we can let it behave as close 
to processing time as possible. If users want to inject events with old 
timestamps, they can specify the event-time timestamps in the `data` array. For 
example, they can put data = [(Timestamp(1), 1), (Timestamp(2), 2)], which then 
simulates two events coming from 1970s and it takes 55 years for them to arrive 
at and process at the current time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to