shunping commented on code in PR #35412: URL: https://github.com/apache/beam/pull/35412#discussion_r2164835161
########## sdks/python/apache_beam/transforms/periodicsequence.py: ########## @@ -202,3 +244,79 @@ def expand(self, pbegin): result = result | 'ApplyWindowing' >> beam.WindowInto( window.FixedWindows(self.interval)) return result + + +class PeriodicStream(beam.PTransform): + """A PTransform that generates a periodic stream of elements from a sequence. + + This transform creates a `PCollection` by emitting elements from a provided + Python sequence at a specified time interval. It is designed for use in + streaming pipelines to simulate a live, continuous source of data. + + The transform can be configured to: + - Emit the sequence only once. + - Repeat the sequence indefinitely or for a maximum duration. + - Control the time interval between elements. + + To ensure that the stream does not emit a burst of elements immediately at + pipeline startup, a fixed warmup period is added before the first element + is generated. + + Args: + data: The sequence of elements to emit into the PCollection. The elements + can be raw values or pre-timestamped tuples in the format + `(apache_beam.utils.timestamp.Timestamp, value)`. + max_duration: The maximum total duration in seconds for the stream + generation. If `None` (the default) and `repeat` is `True`, the + stream is effectively infinite. If `repeat` is `False`, the stream's + duration is the shorter of this value and the time required to emit + the sequence once. + interval: The delay in seconds between consecutive elements. + Defaults to 0.1. + repeat: If `True`, the input `data` sequence is emitted repeatedly. + If `False` (the default), the sequence is emitted only once. + warmup_time: The extra wait time for the impulse element + (start, end, interval) to reach `ImpulseSeqGenDoFn`. It is used to + avoid the events clustering at the beginning. + """ + def __init__( + self, + data: Sequence[Any], + max_duration: Optional[float] = None, + interval: float = 0.1, + repeat: bool = False, + warmup_time: float = 2.0): Review Comment: I find that the time domain of `start_timestamp`, `stop_timestamp` and `fire_interval` is a bit confusing in `PeriodicImpulse`. It seems that the `start_timestamp` and `stop_timestamp` are about event time. The current behavior when `start_timestamp < now` is to catch up event time to current time, which is very unintuitive. Assuming someone specifies start_timestamp to 0, then most cpu time will be spent on emitting billions of events. After the event time counter catches up to `now`, then `fire_interval` takes into affect but now this interval sounds like an interval of processing time. Given such inconsistency, my thought is to hide `start_timestamp`/`stop_timstamp` from the useage. By setting `start_timestamp = Timestamp.now()` (and plus some warm up time), we can let it behave as close to processing time as possible. If users want to inject events with old timestamps, they can specify the event-time timestamps in the `data` array. For example, they can put data = [(Timestamp(1), 1), (Timestamp(2), 2)], which then simulates two events coming from 1970s and it takes 55 years for them to arrive at and process at the current time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org