This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 79b6e5b  [BEAM-8824] Add support to allow specify window 
allowed_lateness in python sdk
     new fb0353e  Merge pull request #10216 from y1chi/allowed_lateness
79b6e5b is described below

commit 79b6e5bb863c8cf96b690d1a96c8988d8bec72be
Author: Yichi Zhang <zyi...@google.com>
AuthorDate: Tue Nov 19 10:16:38 2019 -0800

    [BEAM-8824] Add support to allow specify window allowed_lateness in python 
sdk
---
 .../apache_beam/examples/snippets/snippets_test.py |  2 ++
 .../testing/data/trigger_transcripts.yaml          | 22 ++++++++++++
 .../python/apache_beam/testing/test_stream_test.py |  2 +-
 sdks/python/apache_beam/transforms/core.py         | 39 +++++++++++++++++-----
 sdks/python/apache_beam/transforms/trigger.py      |  5 ++-
 sdks/python/apache_beam/transforms/trigger_test.py | 29 ++++++++++------
 6 files changed, 78 insertions(+), 21 deletions(-)

diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index f0f53e2..38bcb88 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -958,6 +958,7 @@ class SnippetsTest(unittest.TestCase):
                 | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
                 | WindowInto(FixedWindows(15),
                              trigger=trigger,
+                             allowed_lateness=20,
                              accumulation_mode=AccumulationMode.DISCARDING)
                 | 'group' >> beam.GroupByKey()
                 | 'count' >> beam.Map(
@@ -1014,6 +1015,7 @@ class SnippetsTest(unittest.TestCase):
               FixedWindows(1 * 60),
               trigger=AfterWatermark(
                   late=AfterProcessingTime(10 * 60)),
+              allowed_lateness=10,
               accumulation_mode=AccumulationMode.DISCARDING)
           # [END model_composite_triggers]
           | 'group' >> beam.GroupByKey()
diff --git a/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml 
b/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml
index cac0c74..b2d4e9a 100644
--- a/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml
+++ b/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml
@@ -30,6 +30,7 @@ name: fixed_default_late_data
 window_fn: FixedWindows(10)
 trigger_fn: Default
 timestamp_combiner: OUTPUT_AT_EOW
+allowed_lateness: 100
 transcript:
   - input: [1, 2, 3, 10, 11, 25]
   - watermark: 100
@@ -42,6 +43,26 @@ transcript:
       - {window: [0, 9], values: [1, 2, 3, 7], timestamp: 9, late: true}
 
 ---
+name: fixed_drop_late_data_after_allowed_lateness
+window_fn: FixedWindows(10)
+trigger_fn: AfterWatermark(early=AfterCount(3), late=AfterCount(1))
+timestamp_combiner: OUTPUT_AT_EOW
+allowed_lateness: 20
+accumulation_mode: accumulating
+transcript:
+  - input: [1, 2, 10, 11, 80, 81]
+  - watermark: 100
+  - expect:
+      - {window: [0, 9], values: [1, 2], timestamp: 9, final: false}
+      - {window: [10, 19], values: [10, 11], timestamp: 19}
+      - {window: [80, 89], values: [80, 81], timestamp: 89, late: false}
+  - input: [7, 8] # no output
+  - input: [17, 18] # no output
+  - input: [82]
+  - expect:
+      - {window: [80, 89], values: [80, 81, 82], timestamp: 89, late: true}
+
+---
 name: timestamp_combiner_earliest
 window_fn: FixedWindows(10)
 trigger_fn: Default
@@ -118,6 +139,7 @@ broken_on:
   - SwitchingDirectRunner
 window_fn: Sessions(10)
 trigger_fn: AfterWatermark(early=AfterCount(2), late=AfterCount(3))
+allowed_lateness: 100
 timestamp_combiner: OUTPUT_AT_EOW
 transcript:
     - input: [1, 2, 3]
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py 
b/sdks/python/apache_beam/testing/test_stream_test.py
index 26b54bd..bfadb5e 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -275,7 +275,7 @@ class TestStreamTest(unittest.TestCase):
     p = TestPipeline(options=options)
     records = (p
                | test_stream
-               | beam.WindowInto(FixedWindows(15))
+               | beam.WindowInto(FixedWindows(15), allowed_lateness=300)
                | beam.Map(lambda x: ('k', x))
                | beam.GroupByKey())
 
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 3cd5472..25cc91f 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -64,6 +64,7 @@ from apache_beam.typehints.trivial_inference import 
element_type
 from apache_beam.typehints.typehints import is_consistent_with
 from apache_beam.utils import timestamp
 from apache_beam.utils import urns
+from apache_beam.utils.timestamp import Duration
 
 if typing.TYPE_CHECKING:
   from google.protobuf import message  # pylint: disable=ungrouped-imports
@@ -2269,7 +2270,21 @@ class Windowing(object):
                triggerfn=None,  # type: typing.Optional[TriggerFn]
                accumulation_mode=None,  # 
typing.Optional[beam_runner_api_pb2.AccumulationMode]
                timestamp_combiner=None,  # 
typing.Optional[beam_runner_api_pb2.OutputTime]
-              ):
+               allowed_lateness=0, # type: typing.Union[int, float]
+               ):
+    """Class representing the window strategy.
+
+    Args:
+      windowfn: Window assign function.
+      triggerfn: Trigger function.
+      accumulation_mode: a AccumulationMode, controls what to do with data
+        when a trigger fires multiple times.
+      timestamp_combiner: a TimestampCombiner, determines how output
+        timestamps of grouping operations are assigned.
+      allowed_lateness: Maximum delay in seconds after end of window
+        allowed for any late data to be processed without being discarded
+        directly.
+    """
     global AccumulationMode, DefaultTrigger  # pylint: 
disable=global-variable-not-assigned
     # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.transforms.trigger import AccumulationMode, DefaultTrigger
@@ -2289,13 +2304,15 @@ class Windowing(object):
     self.windowfn = windowfn
     self.triggerfn = triggerfn
     self.accumulation_mode = accumulation_mode
+    self.allowed_lateness = Duration.of(allowed_lateness)
     self.timestamp_combiner = (
         timestamp_combiner or TimestampCombiner.OUTPUT_AT_EOW)
     self._is_default = (
         self.windowfn == GlobalWindows() and
         self.triggerfn == DefaultTrigger() and
         self.accumulation_mode == AccumulationMode.DISCARDING and
-        self.timestamp_combiner == TimestampCombiner.OUTPUT_AT_EOW)
+        self.timestamp_combiner == TimestampCombiner.OUTPUT_AT_EOW and
+        self.allowed_lateness == 0)
 
   def __repr__(self):
     return "Windowing(%s, %s, %s, %s)" % (self.windowfn, self.triggerfn,
@@ -2310,7 +2327,8 @@ class Windowing(object):
           self.windowfn == other.windowfn
           and self.triggerfn == other.triggerfn
           and self.accumulation_mode == other.accumulation_mode
-          and self.timestamp_combiner == other.timestamp_combiner)
+          and self.timestamp_combiner == other.timestamp_combiner
+          and self.allowed_lateness == other.allowed_lateness)
     return False
 
   def __ne__(self, other):
@@ -2318,7 +2336,8 @@ class Windowing(object):
     return not self == other
 
   def __hash__(self):
-    return hash((self.windowfn, self.accumulation_mode,
+    return hash((self.windowfn, self.triggerfn, self.accumulation_mode,
+                 self.allowed_lateness,
                  self.timestamp_combiner))
 
   def is_default(self):
@@ -2340,7 +2359,7 @@ class Windowing(object):
         # TODO(robertwb): Support EMIT_IF_NONEMPTY
         closing_behavior=beam_runner_api_pb2.ClosingBehavior.EMIT_ALWAYS,
         OnTimeBehavior=beam_runner_api_pb2.OnTimeBehavior.FIRE_ALWAYS,
-        allowed_lateness=0,
+        allowed_lateness=self.allowed_lateness.micros // 1000,
         environment_id=context.default_environment_id())
 
   @staticmethod
@@ -2351,7 +2370,8 @@ class Windowing(object):
         windowfn=WindowFn.from_runner_api(proto.window_fn, context),
         triggerfn=TriggerFn.from_runner_api(proto.trigger, context),
         accumulation_mode=proto.accumulation_mode,
-        timestamp_combiner=proto.output_time)
+        timestamp_combiner=proto.output_time,
+        allowed_lateness=Duration(micros=proto.allowed_lateness * 1000))
 
 
 @typehints.with_input_types(T)
@@ -2383,8 +2403,8 @@ class WindowInto(ParDo):
                windowfn,  # type: typing.Union[Windowing, WindowFn]
                trigger=None,  # type: typing.Optional[TriggerFn]
                accumulation_mode=None,
-               timestamp_combiner=None
-              ):
+               timestamp_combiner=None,
+               allowed_lateness=0):
     """Initializes a WindowInto transform.
 
     Args:
@@ -2406,7 +2426,8 @@ class WindowInto(ParDo):
       timestamp_combiner = timestamp_combiner or windowing.timestamp_combiner
 
     self.windowing = Windowing(
-        windowfn, trigger, accumulation_mode, timestamp_combiner)
+        windowfn, trigger, accumulation_mode, timestamp_combiner,
+        allowed_lateness)
     super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing))
 
   def get_windowing(self, unused_inputs):
diff --git a/sdks/python/apache_beam/transforms/trigger.py 
b/sdks/python/apache_beam/transforms/trigger.py
index 6106c26..65bd4c7 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -1128,6 +1128,7 @@ class GeneralTriggerDriver(TriggerDriver):
 
   def __init__(self, windowing, clock):
     self.clock = clock
+    self.allowed_lateness = windowing.allowed_lateness
     self.window_fn = windowing.windowfn
     self.timestamp_combiner_impl = TimestampCombiner.get_impl(
         windowing.timestamp_combiner, self.window_fn)
@@ -1147,6 +1148,9 @@ class GeneralTriggerDriver(TriggerDriver):
     windows_to_elements = collections.defaultdict(list)
     for wv in windowed_values:
       for window in wv.windows:
+        # ignore expired windows
+        if input_watermark > window.end + self.allowed_lateness:
+          continue
         windows_to_elements[window].append((wv.value, wv.timestamp))
 
     # First handle merging.
@@ -1241,7 +1245,6 @@ class GeneralTriggerDriver(TriggerDriver):
         nonspeculative_index = state.get_state(
             window, self.NONSPECULATIVE_INDEX)
         state.add_state(window, self.NONSPECULATIVE_INDEX, 1)
-        windowed_value.PaneInfoTiming.LATE
         _LOGGER.warning('Watermark moved backwards in time '
                         'or late data moved window end forward.')
     else:
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py 
b/sdks/python/apache_beam/transforms/trigger_test.py
index d67611c..bdc8e37 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -36,6 +36,7 @@ import apache_beam as beam
 from apache_beam import coders
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.portability import common_urns
 from apache_beam.runners import pipeline_context
 from apache_beam.runners.direct.clock import TestClock
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -66,6 +67,7 @@ from apache_beam.transforms.window import WindowedValue
 from apache_beam.transforms.window import WindowFn
 from apache_beam.utils.timestamp import MAX_TIMESTAMP
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from apache_beam.utils.timestamp import Duration
 from apache_beam.utils.windowed_value import PaneInfoTiming
 
 
@@ -118,8 +120,11 @@ class TriggerTest(unittest.TestCase):
                   bundles, late_bundles,
                   expected_panes):
     actual_panes = collections.defaultdict(list)
+    allowed_lateness = Duration(micros=int(
+        common_urns.constants.MAX_TIMESTAMP_MILLIS.constant)*1000)
     driver = GeneralTriggerDriver(
-        Windowing(window_fn, trigger_fn, accumulation_mode), TestClock())
+        Windowing(window_fn, trigger_fn, accumulation_mode,
+                  allowed_lateness=allowed_lateness), TestClock())
     state = InMemoryUnmergedState()
 
     for bundle in bundles:
@@ -590,6 +595,7 @@ class TranscriptTest(unittest.TestCase):
     timestamp_combiner = getattr(
         TimestampCombiner,
         spec.get('timestamp_combiner', 'OUTPUT_AT_EOW').upper())
+    allowed_lateness = spec.get('allowed_lateness', 0.000)
 
     def only_element(xs):
       x, = list(xs)
@@ -599,7 +605,7 @@ class TranscriptTest(unittest.TestCase):
 
     self._execute(
         window_fn, trigger_fn, accumulation_mode, timestamp_combiner,
-        transcript, spec)
+        allowed_lateness, transcript, spec)
 
 
 def _windowed_value_info(windowed_value):
@@ -676,11 +682,11 @@ class TriggerDriverTranscriptTest(TranscriptTest):
 
   def _execute(
       self, window_fn, trigger_fn, accumulation_mode, timestamp_combiner,
-      transcript, unused_spec):
+      allowed_lateness, transcript, unused_spec):
 
     driver = GeneralTriggerDriver(
-        Windowing(window_fn, trigger_fn, accumulation_mode, 
timestamp_combiner),
-        TestClock())
+        Windowing(window_fn, trigger_fn, accumulation_mode,
+                  timestamp_combiner, allowed_lateness), TestClock())
     state = InMemoryUnmergedState()
     output = []
     watermark = MIN_TIMESTAMP
@@ -708,7 +714,8 @@ class TriggerDriverTranscriptTest(TranscriptTest):
             for t in params]
         output = [
             _windowed_value_info(wv)
-            for wv in driver.process_elements(state, bundle, watermark)]
+            for wv in driver.process_elements(state, bundle, watermark,
+                                              watermark)]
         fire_timers()
 
       elif action == 'watermark':
@@ -742,7 +749,7 @@ class BaseTestStreamTranscriptTest(TranscriptTest):
 
   def _execute(
       self, window_fn, trigger_fn, accumulation_mode, timestamp_combiner,
-      transcript, spec):
+      allowed_lateness, transcript, spec):
 
     runner_name = TestPipeline().runner.__class__.__name__
     if runner_name in spec.get('broken_on', ()):
@@ -881,7 +888,8 @@ class BaseTestStreamTranscriptTest(TranscriptTest):
               window_fn,
               trigger=trigger_fn,
               accumulation_mode=accumulation_mode,
-              timestamp_combiner=timestamp_combiner)
+              timestamp_combiner=timestamp_combiner,
+              allowed_lateness=allowed_lateness)
           | aggregation
           | beam.MapTuple(_windowed_value_info_map_fn)
           # Place outputs back into the global window to allow flattening
@@ -921,7 +929,7 @@ class BatchTranscriptTest(TranscriptTest):
 
   def _execute(
       self, window_fn, trigger_fn, accumulation_mode, timestamp_combiner,
-      transcript, spec):
+      allowed_lateness, transcript, spec):
     if timestamp_combiner == TimestampCombiner.OUTPUT_AT_EARLIEST_TRANSFORMED:
       self.skipTest(
           'Non-fnapi timestamp combiner: %s' % spec.get('timestamp_combiner'))
@@ -971,7 +979,8 @@ class BatchTranscriptTest(TranscriptTest):
               window_fn,
               trigger=trigger_fn,
               accumulation_mode=accumulation_mode,
-              timestamp_combiner=timestamp_combiner))
+              timestamp_combiner=timestamp_combiner,
+              allowed_lateness=allowed_lateness))
 
       grouped = input_pc | 'Grouped' >> (
           beam.GroupByKey()

Reply via email to