This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new facdc4e0e73 Add PeriodicStream in the new time series folder. (#35300)
facdc4e0e73 is described below
commit facdc4e0e73fd938d5ca5c6ebbbe46b534e8f05a
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Jun 16 13:08:06 2025 -0400
Add PeriodicStream in the new time series folder. (#35300)
* Add PeriodicStream in the new time series folder.
* Add some more docstrings and minor fix on test name.
* Fix lints and docs.
---
sdks/python/apache_beam/ml/ts/__init__.py | 16 +++
sdks/python/apache_beam/ml/ts/util.py | 181 +++++++++++++++++++++++++++++
sdks/python/apache_beam/ml/ts/util_test.py | 82 +++++++++++++
3 files changed, 279 insertions(+)
diff --git a/sdks/python/apache_beam/ml/ts/__init__.py
b/sdks/python/apache_beam/ml/ts/__init__.py
new file mode 100644
index 00000000000..cce3acad34a
--- /dev/null
+++ b/sdks/python/apache_beam/ml/ts/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/sdks/python/apache_beam/ml/ts/util.py
b/sdks/python/apache_beam/ml/ts/util.py
new file mode 100644
index 00000000000..4005f57e004
--- /dev/null
+++ b/sdks/python/apache_beam/ml/ts/util.py
@@ -0,0 +1,181 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+import time
+from typing import Any
+from typing import Optional
+from typing import Sequence
+
+import apache_beam as beam
+from apache_beam.io.watermark_estimators import ManualWatermarkEstimator
+from apache_beam.runners import sdf_utils
+from apache_beam.transforms.periodicsequence import
ImpulseSeqGenRestrictionProvider # pylint:disable=line-too-long
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+
+
+class ImpulseStreamGenDoFn(beam.DoFn):
+ """
+ Generates a periodic, unbounded stream of elements from a provided sequence.
+
+ (Similar to ImpulseSeqGenDoFn in apache_beam.transforms.periodicsequence)
+
+ This Splittable DoFn (SDF) is designed to simulate a continuous stream of
+ data for testing or demonstration purposes. It takes a Python sequence (e.g.,
+ a list) and emits its elements one by one in a loop, assigning a timestamp
+ to each.
+
+ The DoFn operates in two modes based on the structure of the input `data`:
+
+ - **Non-Timestamped Data**: If `data` is a sequence of arbitrary values
+ (e.g., `[v1, v2, ...]`), the DoFn will assign a new timestamp to each
+ emitted element. The timestamps are calculated by starting at a given
+ `start_time` and incrementing by a fixed `interval`.
+ - **Pre-Timestamped Data**: If `data` is a sequence of tuples, where each
+ tuple is `(apache_beam.utils.timestamp.Timestamp, value)`, the DoFn
+ will use the provided timestamp for the emitted element.
+
+ The rate of emission is controlled by wall-clock time. The DoFn will only
+ emit elements whose timestamp (either calculated or provided) is in the past
+ compared to the current system time. When it "catches up" to the present,
+ it will pause and defer the remainder of the work.
+
+ Args:
+ data: The sequence of elements to emit into the PCollection. The elements
+ can be raw values or pre-timestamped tuples in the format
+ `(apache_beam.utils.timestamp.Timestamp, value)`.
+ """
+ def __init__(self, data: Sequence[Any]):
+ self._data = data
+ self._len = len(data)
+ self._is_timestamped_value = len(data) > 0 and isinstance(
+ data[0], tuple) and isinstance(data[0][0], timestamp.Timestamp)
+
+ def _get_timestamped_value(self, index, current_output_timestamp):
+ if self._is_timestamped_value:
+ event_time, value = self._data[index % self._len]
+ return TimestampedValue(value, event_time)
+ else:
+ value = self._data[index % self._len]
+ return TimestampedValue(value, current_output_timestamp)
+
+ @beam.DoFn.unbounded_per_element()
+ def process(
+ self,
+ element,
+ restriction_tracker=beam.DoFn.RestrictionParam(
+ ImpulseSeqGenRestrictionProvider()),
+ watermark_estimator=beam.DoFn.WatermarkEstimatorParam(
+ ManualWatermarkEstimator.default_provider())):
+ start, _, interval = element
+
+ if isinstance(start, timestamp.Timestamp):
+ start = start.micros / 1000000
+
+ assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
+
+ current_output_index = restriction_tracker.current_restriction().start
+
+ while True:
+ current_output_timestamp = start + interval * current_output_index
+
+ if current_output_timestamp > time.time():
+ # we are too ahead of time, let's wait.
+ restriction_tracker.defer_remainder(
+ timestamp.Timestamp(current_output_timestamp))
+ return
+
+ if not restriction_tracker.try_claim(current_output_index):
+ # nothing to claim, just stop
+ return
+
+ output = self._get_timestamped_value(
+ current_output_index, current_output_timestamp)
+
+ current_watermark = watermark_estimator.current_watermark()
+ if current_watermark is None or output.timestamp > current_watermark:
+ # ensure watermark is monotonic
+ watermark_estimator.set_watermark(output.timestamp)
+
+ yield output
+
+ current_output_index += 1
+
+
+class PeriodicStream(beam.PTransform):
+ """A PTransform that generates a periodic stream of elements from a sequence.
+
+ This transform creates a `PCollection` by emitting elements from a provided
+ Python sequence at a specified time interval. It is designed for use in
+ streaming pipelines to simulate a live, continuous source of data.
+
+ The transform can be configured to:
+ - Emit the sequence only once.
+ - Repeat the sequence indefinitely or for a maximum duration.
+ - Control the time interval between elements.
+
+ To ensure that the stream does not emit a burst of elements immediately at
+ pipeline startup, a fixed warmup period is added before the first element
+ is generated.
+
+ Args:
+ data: The sequence of elements to emit into the PCollection. The elements
+ can be raw values or pre-timestamped tuples in the format
+ `(apache_beam.utils.timestamp.Timestamp, value)`.
+ max_duration: The maximum total duration in seconds for the stream
+ generation. If `None` (the default) and `repeat` is `True`, the
+ stream is effectively infinite. If `repeat` is `False`, the stream's
+ duration is the shorter of this value and the time required to emit
+ the sequence once.
+ interval: The delay in seconds between consecutive elements.
+ Defaults to 0.1.
+ repeat: If `True`, the input `data` sequence is emitted repeatedly.
+ If `False` (the default), the sequence is emitted only once.
+ """
+
+ WARMUP_TIME = 2
+
+ def __init__(
+ self,
+ data: Sequence[Any],
+ max_duration: Optional[float] = None,
+ interval: float = 0.1,
+ repeat: bool = False):
+ self._data = data
+ self._interval = interval
+ self._repeat = repeat
+ self._duration = len(self._data) * interval
+ self._max_duration = max_duration if max_duration is not None else float(
+ "inf")
+
+ def expand(self, pbegin):
+ # Give the runner some time to start up so the events will not cluster
+ # at the beginning.
+ start = timestamp.Timestamp.now() + PeriodicStream.WARMUP_TIME
+
+ if not self._repeat:
+ stop = start + min(self._duration, self._max_duration)
+ else:
+ stop = timestamp.MAX_TIMESTAMP if math.isinf(
+ self._max_duration) else start + self._max_duration
+
+ result = (
+ pbegin
+ | 'ImpulseElement' >> beam.Create([(start, stop, self._interval)])
+ | 'GenStream' >> beam.ParDo(ImpulseStreamGenDoFn(self._data)))
+ return result
diff --git a/sdks/python/apache_beam/ml/ts/util_test.py
b/sdks/python/apache_beam/ml/ts/util_test.py
new file mode 100644
index 00000000000..ac2bc6ea701
--- /dev/null
+++ b/sdks/python/apache_beam/ml/ts/util_test.py
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import unittest
+
+import apache_beam as beam
+from apache_beam.ml.ts.util import PeriodicStream
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms.window import FixedWindows
+from apache_beam.utils.timestamp import Timestamp
+
+
+class PeriodicStreamTest(unittest.TestCase):
+ def test_interval(self):
+ options = PipelineOptions()
+ start = Timestamp.now()
+ with beam.Pipeline(options=options) as p:
+ ret = (
+ p | PeriodicStream([1, 2, 3, 4], interval=0.5)
+ | beam.WindowInto(FixedWindows(0.5))
+ | beam.WithKeys(0)
+ | beam.GroupByKey())
+ expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4])]
+ assert_that(ret, equal_to(expected))
+ end = Timestamp.now()
+ self.assertGreaterEqual(end - start, 3)
+ self.assertLessEqual(end - start, 7)
+
+ def test_repeat(self):
+ options = PipelineOptions()
+ start = Timestamp.now()
+ with beam.Pipeline(options=options) as p:
+ ret = (
+ p | PeriodicStream(
+ [1, 2, 3, 4], interval=0.5, max_duration=3, repeat=True)
+ | beam.WindowInto(FixedWindows(0.5))
+ | beam.WithKeys(0)
+ | beam.GroupByKey())
+ expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4]), (0, [1]), (0, [2])]
+ assert_that(ret, equal_to(expected))
+ end = Timestamp.now()
+ self.assertGreaterEqual(end - start, 3)
+ self.assertLessEqual(end - start, 7)
+
+ def test_timestamped_value(self):
+ options = PipelineOptions()
+ start = Timestamp.now()
+ with beam.Pipeline(options=options) as p:
+ ret = (
+ p | PeriodicStream([(Timestamp(1), 1), (Timestamp(3), 2),
+ (Timestamp(2), 3), (Timestamp(1), 4)],
+ interval=0.5)
+ | beam.WindowInto(FixedWindows(0.5))
+ | beam.WithKeys(0)
+ | beam.GroupByKey())
+ expected = [(0, [1, 4]), (0, [2]), (0, [3])]
+ assert_that(ret, equal_to(expected))
+ end = Timestamp.now()
+ self.assertGreaterEqual(end - start, 3)
+ self.assertLessEqual(end - start, 7)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.WARNING)
+ unittest.main()