shunping commented on code in PR #35412: URL: https://github.com/apache/beam/pull/35412#discussion_r2164787795
########## sdks/python/apache_beam/transforms/periodicsequence.py: ########## @@ -202,3 +244,79 @@ def expand(self, pbegin): result = result | 'ApplyWindowing' >> beam.WindowInto( window.FixedWindows(self.interval)) return result + + +class PeriodicStream(beam.PTransform): + """A PTransform that generates a periodic stream of elements from a sequence. + + This transform creates a `PCollection` by emitting elements from a provided + Python sequence at a specified time interval. It is designed for use in + streaming pipelines to simulate a live, continuous source of data. + + The transform can be configured to: + - Emit the sequence only once. + - Repeat the sequence indefinitely or for a maximum duration. + - Control the time interval between elements. + + To ensure that the stream does not emit a burst of elements immediately at + pipeline startup, a fixed warmup period is added before the first element + is generated. + + Args: + data: The sequence of elements to emit into the PCollection. The elements + can be raw values or pre-timestamped tuples in the format + `(apache_beam.utils.timestamp.Timestamp, value)`. + max_duration: The maximum total duration in seconds for the stream + generation. If `None` (the default) and `repeat` is `True`, the + stream is effectively infinite. If `repeat` is `False`, the stream's + duration is the shorter of this value and the time required to emit + the sequence once. + interval: The delay in seconds between consecutive elements. + Defaults to 0.1. + repeat: If `True`, the input `data` sequence is emitted repeatedly. + If `False` (the default), the sequence is emitted only once. + warmup_time: The extra wait time for the impulse element + (start, end, interval) to reach `ImpulseSeqGenDoFn`. It is used to + avoid the events clustering at the beginning. + """ + def __init__( + self, + data: Sequence[Any], + max_duration: Optional[float] = None, + interval: float = 0.1, Review Comment: Sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org