This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new facdc4e0e73 Add PeriodicStream in the new time series folder. (#35300)
facdc4e0e73 is described below

commit facdc4e0e73fd938d5ca5c6ebbbe46b534e8f05a
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Jun 16 13:08:06 2025 -0400

    Add PeriodicStream in the new time series folder. (#35300)
    
    * Add PeriodicStream in the new time series folder.
    
    * Add some more docstrings and minor fix on test name.
    
    * Fix lints and docs.
---
 sdks/python/apache_beam/ml/ts/__init__.py  |  16 +++
 sdks/python/apache_beam/ml/ts/util.py      | 181 +++++++++++++++++++++++++++++
 sdks/python/apache_beam/ml/ts/util_test.py |  82 +++++++++++++
 3 files changed, 279 insertions(+)

diff --git a/sdks/python/apache_beam/ml/ts/__init__.py 
b/sdks/python/apache_beam/ml/ts/__init__.py
new file mode 100644
index 00000000000..cce3acad34a
--- /dev/null
+++ b/sdks/python/apache_beam/ml/ts/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/sdks/python/apache_beam/ml/ts/util.py 
b/sdks/python/apache_beam/ml/ts/util.py
new file mode 100644
index 00000000000..4005f57e004
--- /dev/null
+++ b/sdks/python/apache_beam/ml/ts/util.py
@@ -0,0 +1,181 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import math
+import time
+from typing import Any
+from typing import Optional
+from typing import Sequence
+
+import apache_beam as beam
+from apache_beam.io.watermark_estimators import ManualWatermarkEstimator
+from apache_beam.runners import sdf_utils
+from apache_beam.transforms.periodicsequence import 
ImpulseSeqGenRestrictionProvider  # pylint:disable=line-too-long
+from apache_beam.transforms.window import TimestampedValue
+from apache_beam.utils import timestamp
+
+
+class ImpulseStreamGenDoFn(beam.DoFn):
+  """
+  Generates a periodic, unbounded stream of elements from a provided sequence.
+
+  (Similar to ImpulseSeqGenDoFn in apache_beam.transforms.periodicsequence)
+
+  This Splittable DoFn (SDF) is designed to simulate a continuous stream of
+  data for testing or demonstration purposes. It takes a Python sequence (e.g.,
+  a list) and emits its elements one by one in a loop, assigning a timestamp
+  to each.
+
+  The DoFn operates in two modes based on the structure of the input `data`:
+
+    - **Non-Timestamped Data**: If `data` is a sequence of arbitrary values
+      (e.g., `[v1, v2, ...]`), the DoFn will assign a new timestamp to each
+      emitted element. The timestamps are calculated by starting at a given
+      `start_time` and incrementing by a fixed `interval`.
+    - **Pre-Timestamped Data**: If `data` is a sequence of tuples, where each
+      tuple is `(apache_beam.utils.timestamp.Timestamp, value)`, the DoFn
+      will use the provided timestamp for the emitted element.
+
+  The rate of emission is controlled by wall-clock time. The DoFn will only
+  emit elements whose timestamp (either calculated or provided) is in the past
+  compared to the current system time. When it "catches up" to the present,
+  it will pause and defer the remainder of the work.
+
+  Args:
+    data: The sequence of elements to emit into the PCollection. The elements
+      can be raw values or pre-timestamped tuples in the format
+      `(apache_beam.utils.timestamp.Timestamp, value)`.
+  """
+  def __init__(self, data: Sequence[Any]):
+    self._data = data
+    self._len = len(data)
+    self._is_timestamped_value = len(data) > 0 and isinstance(
+        data[0], tuple) and isinstance(data[0][0], timestamp.Timestamp)
+
+  def _get_timestamped_value(self, index, current_output_timestamp):
+    if self._is_timestamped_value:
+      event_time, value = self._data[index % self._len]
+      return TimestampedValue(value, event_time)
+    else:
+      value = self._data[index % self._len]
+      return TimestampedValue(value, current_output_timestamp)
+
+  @beam.DoFn.unbounded_per_element()
+  def process(
+      self,
+      element,
+      restriction_tracker=beam.DoFn.RestrictionParam(
+          ImpulseSeqGenRestrictionProvider()),
+      watermark_estimator=beam.DoFn.WatermarkEstimatorParam(
+          ManualWatermarkEstimator.default_provider())):
+    start, _, interval = element
+
+    if isinstance(start, timestamp.Timestamp):
+      start = start.micros / 1000000
+
+    assert isinstance(restriction_tracker, sdf_utils.RestrictionTrackerView)
+
+    current_output_index = restriction_tracker.current_restriction().start
+
+    while True:
+      current_output_timestamp = start + interval * current_output_index
+
+      if current_output_timestamp > time.time():
+        # we are too ahead of time, let's wait.
+        restriction_tracker.defer_remainder(
+            timestamp.Timestamp(current_output_timestamp))
+        return
+
+      if not restriction_tracker.try_claim(current_output_index):
+        # nothing to claim, just stop
+        return
+
+      output = self._get_timestamped_value(
+          current_output_index, current_output_timestamp)
+
+      current_watermark = watermark_estimator.current_watermark()
+      if current_watermark is None or output.timestamp > current_watermark:
+        # ensure watermark is monotonic
+        watermark_estimator.set_watermark(output.timestamp)
+
+      yield output
+
+      current_output_index += 1
+
+
+class PeriodicStream(beam.PTransform):
+  """A PTransform that generates a periodic stream of elements from a sequence.
+
+  This transform creates a `PCollection` by emitting elements from a provided
+  Python sequence at a specified time interval. It is designed for use in
+  streaming pipelines to simulate a live, continuous source of data.
+
+  The transform can be configured to:
+  - Emit the sequence only once.
+  - Repeat the sequence indefinitely or for a maximum duration.
+  - Control the time interval between elements.
+
+  To ensure that the stream does not emit a burst of elements immediately at
+  pipeline startup, a fixed warmup period is added before the first element
+  is generated.
+
+  Args:
+      data: The sequence of elements to emit into the PCollection. The elements
+        can be raw values or pre-timestamped tuples in the format
+        `(apache_beam.utils.timestamp.Timestamp, value)`.
+      max_duration: The maximum total duration in seconds for the stream
+        generation. If `None` (the default) and `repeat` is `True`, the
+        stream is effectively infinite. If `repeat` is `False`, the stream's
+        duration is the shorter of this value and the time required to emit
+        the sequence once.
+      interval: The delay in seconds between consecutive elements.
+        Defaults to 0.1.
+      repeat: If `True`, the input `data` sequence is emitted repeatedly.
+        If `False` (the default), the sequence is emitted only once.
+  """
+
+  WARMUP_TIME = 2
+
+  def __init__(
+      self,
+      data: Sequence[Any],
+      max_duration: Optional[float] = None,
+      interval: float = 0.1,
+      repeat: bool = False):
+    self._data = data
+    self._interval = interval
+    self._repeat = repeat
+    self._duration = len(self._data) * interval
+    self._max_duration = max_duration if max_duration is not None else float(
+        "inf")
+
+  def expand(self, pbegin):
+    # Give the runner some time to start up so the events will not cluster
+    # at the beginning.
+    start = timestamp.Timestamp.now() + PeriodicStream.WARMUP_TIME
+
+    if not self._repeat:
+      stop = start + min(self._duration, self._max_duration)
+    else:
+      stop = timestamp.MAX_TIMESTAMP if math.isinf(
+          self._max_duration) else start + self._max_duration
+
+    result = (
+        pbegin
+        | 'ImpulseElement' >> beam.Create([(start, stop, self._interval)])
+        | 'GenStream' >> beam.ParDo(ImpulseStreamGenDoFn(self._data)))
+    return result
diff --git a/sdks/python/apache_beam/ml/ts/util_test.py 
b/sdks/python/apache_beam/ml/ts/util_test.py
new file mode 100644
index 00000000000..ac2bc6ea701
--- /dev/null
+++ b/sdks/python/apache_beam/ml/ts/util_test.py
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import logging
+import unittest
+
+import apache_beam as beam
+from apache_beam.ml.ts.util import PeriodicStream
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms.window import FixedWindows
+from apache_beam.utils.timestamp import Timestamp
+
+
+class PeriodicStreamTest(unittest.TestCase):
+  def test_interval(self):
+    options = PipelineOptions()
+    start = Timestamp.now()
+    with beam.Pipeline(options=options) as p:
+      ret = (
+          p | PeriodicStream([1, 2, 3, 4], interval=0.5)
+          | beam.WindowInto(FixedWindows(0.5))
+          | beam.WithKeys(0)
+          | beam.GroupByKey())
+      expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4])]
+      assert_that(ret, equal_to(expected))
+    end = Timestamp.now()
+    self.assertGreaterEqual(end - start, 3)
+    self.assertLessEqual(end - start, 7)
+
+  def test_repeat(self):
+    options = PipelineOptions()
+    start = Timestamp.now()
+    with beam.Pipeline(options=options) as p:
+      ret = (
+          p | PeriodicStream(
+              [1, 2, 3, 4], interval=0.5, max_duration=3, repeat=True)
+          | beam.WindowInto(FixedWindows(0.5))
+          | beam.WithKeys(0)
+          | beam.GroupByKey())
+      expected = [(0, [1]), (0, [2]), (0, [3]), (0, [4]), (0, [1]), (0, [2])]
+      assert_that(ret, equal_to(expected))
+    end = Timestamp.now()
+    self.assertGreaterEqual(end - start, 3)
+    self.assertLessEqual(end - start, 7)
+
+  def test_timestamped_value(self):
+    options = PipelineOptions()
+    start = Timestamp.now()
+    with beam.Pipeline(options=options) as p:
+      ret = (
+          p | PeriodicStream([(Timestamp(1), 1), (Timestamp(3), 2),
+                              (Timestamp(2), 3), (Timestamp(1), 4)],
+                             interval=0.5)
+          | beam.WindowInto(FixedWindows(0.5))
+          | beam.WithKeys(0)
+          | beam.GroupByKey())
+      expected = [(0, [1, 4]), (0, [2]), (0, [3])]
+      assert_that(ret, equal_to(expected))
+    end = Timestamp.now()
+    self.assertGreaterEqual(end - start, 3)
+    self.assertLessEqual(end - start, 7)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.WARNING)
+  unittest.main()

Reply via email to