This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 79b6e5b [BEAM-8824] Add support to allow specify window allowed_lateness in python sdk new fb0353e Merge pull request #10216 from y1chi/allowed_lateness 79b6e5b is described below commit 79b6e5bb863c8cf96b690d1a96c8988d8bec72be Author: Yichi Zhang <zyi...@google.com> AuthorDate: Tue Nov 19 10:16:38 2019 -0800 [BEAM-8824] Add support to allow specify window allowed_lateness in python sdk --- .../apache_beam/examples/snippets/snippets_test.py | 2 ++ .../testing/data/trigger_transcripts.yaml | 22 ++++++++++++ .../python/apache_beam/testing/test_stream_test.py | 2 +- sdks/python/apache_beam/transforms/core.py | 39 +++++++++++++++++----- sdks/python/apache_beam/transforms/trigger.py | 5 ++- sdks/python/apache_beam/transforms/trigger_test.py | 29 ++++++++++------ 6 files changed, 78 insertions(+), 21 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index f0f53e2..38bcb88 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -958,6 +958,7 @@ class SnippetsTest(unittest.TestCase): | 'pair_with_one' >> beam.Map(lambda x: (x, 1)) | WindowInto(FixedWindows(15), trigger=trigger, + allowed_lateness=20, accumulation_mode=AccumulationMode.DISCARDING) | 'group' >> beam.GroupByKey() | 'count' >> beam.Map( @@ -1014,6 +1015,7 @@ class SnippetsTest(unittest.TestCase): FixedWindows(1 * 60), trigger=AfterWatermark( late=AfterProcessingTime(10 * 60)), + allowed_lateness=10, accumulation_mode=AccumulationMode.DISCARDING) # [END model_composite_triggers] | 'group' >> beam.GroupByKey() diff --git a/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml b/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml index cac0c74..b2d4e9a 100644 --- a/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml +++ b/sdks/python/apache_beam/testing/data/trigger_transcripts.yaml @@ -30,6 +30,7 @@ name: fixed_default_late_data window_fn: FixedWindows(10) trigger_fn: Default timestamp_combiner: OUTPUT_AT_EOW +allowed_lateness: 100 transcript: - input: [1, 2, 3, 10, 11, 25] - watermark: 100 @@ -42,6 +43,26 @@ transcript: - {window: [0, 9], values: [1, 2, 3, 7], timestamp: 9, late: true} --- +name: fixed_drop_late_data_after_allowed_lateness +window_fn: FixedWindows(10) +trigger_fn: AfterWatermark(early=AfterCount(3), late=AfterCount(1)) +timestamp_combiner: OUTPUT_AT_EOW +allowed_lateness: 20 +accumulation_mode: accumulating +transcript: + - input: [1, 2, 10, 11, 80, 81] + - watermark: 100 + - expect: + - {window: [0, 9], values: [1, 2], timestamp: 9, final: false} + - {window: [10, 19], values: [10, 11], timestamp: 19} + - {window: [80, 89], values: [80, 81], timestamp: 89, late: false} + - input: [7, 8] # no output + - input: [17, 18] # no output + - input: [82] + - expect: + - {window: [80, 89], values: [80, 81, 82], timestamp: 89, late: true} + +--- name: timestamp_combiner_earliest window_fn: FixedWindows(10) trigger_fn: Default @@ -118,6 +139,7 @@ broken_on: - SwitchingDirectRunner window_fn: Sessions(10) trigger_fn: AfterWatermark(early=AfterCount(2), late=AfterCount(3)) +allowed_lateness: 100 timestamp_combiner: OUTPUT_AT_EOW transcript: - input: [1, 2, 3] diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py index 26b54bd..bfadb5e 100644 --- a/sdks/python/apache_beam/testing/test_stream_test.py +++ b/sdks/python/apache_beam/testing/test_stream_test.py @@ -275,7 +275,7 @@ class TestStreamTest(unittest.TestCase): p = TestPipeline(options=options) records = (p | test_stream - | beam.WindowInto(FixedWindows(15)) + | beam.WindowInto(FixedWindows(15), allowed_lateness=300) | beam.Map(lambda x: ('k', x)) | beam.GroupByKey()) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 3cd5472..25cc91f 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -64,6 +64,7 @@ from apache_beam.typehints.trivial_inference import element_type from apache_beam.typehints.typehints import is_consistent_with from apache_beam.utils import timestamp from apache_beam.utils import urns +from apache_beam.utils.timestamp import Duration if typing.TYPE_CHECKING: from google.protobuf import message # pylint: disable=ungrouped-imports @@ -2269,7 +2270,21 @@ class Windowing(object): triggerfn=None, # type: typing.Optional[TriggerFn] accumulation_mode=None, # typing.Optional[beam_runner_api_pb2.AccumulationMode] timestamp_combiner=None, # typing.Optional[beam_runner_api_pb2.OutputTime] - ): + allowed_lateness=0, # type: typing.Union[int, float] + ): + """Class representing the window strategy. + + Args: + windowfn: Window assign function. + triggerfn: Trigger function. + accumulation_mode: a AccumulationMode, controls what to do with data + when a trigger fires multiple times. + timestamp_combiner: a TimestampCombiner, determines how output + timestamps of grouping operations are assigned. + allowed_lateness: Maximum delay in seconds after end of window + allowed for any late data to be processed without being discarded + directly. + """ global AccumulationMode, DefaultTrigger # pylint: disable=global-variable-not-assigned # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms.trigger import AccumulationMode, DefaultTrigger @@ -2289,13 +2304,15 @@ class Windowing(object): self.windowfn = windowfn self.triggerfn = triggerfn self.accumulation_mode = accumulation_mode + self.allowed_lateness = Duration.of(allowed_lateness) self.timestamp_combiner = ( timestamp_combiner or TimestampCombiner.OUTPUT_AT_EOW) self._is_default = ( self.windowfn == GlobalWindows() and self.triggerfn == DefaultTrigger() and self.accumulation_mode == AccumulationMode.DISCARDING and - self.timestamp_combiner == TimestampCombiner.OUTPUT_AT_EOW) + self.timestamp_combiner == TimestampCombiner.OUTPUT_AT_EOW and + self.allowed_lateness == 0) def __repr__(self): return "Windowing(%s, %s, %s, %s)" % (self.windowfn, self.triggerfn, @@ -2310,7 +2327,8 @@ class Windowing(object): self.windowfn == other.windowfn and self.triggerfn == other.triggerfn and self.accumulation_mode == other.accumulation_mode - and self.timestamp_combiner == other.timestamp_combiner) + and self.timestamp_combiner == other.timestamp_combiner + and self.allowed_lateness == other.allowed_lateness) return False def __ne__(self, other): @@ -2318,7 +2336,8 @@ class Windowing(object): return not self == other def __hash__(self): - return hash((self.windowfn, self.accumulation_mode, + return hash((self.windowfn, self.triggerfn, self.accumulation_mode, + self.allowed_lateness, self.timestamp_combiner)) def is_default(self): @@ -2340,7 +2359,7 @@ class Windowing(object): # TODO(robertwb): Support EMIT_IF_NONEMPTY closing_behavior=beam_runner_api_pb2.ClosingBehavior.EMIT_ALWAYS, OnTimeBehavior=beam_runner_api_pb2.OnTimeBehavior.FIRE_ALWAYS, - allowed_lateness=0, + allowed_lateness=self.allowed_lateness.micros // 1000, environment_id=context.default_environment_id()) @staticmethod @@ -2351,7 +2370,8 @@ class Windowing(object): windowfn=WindowFn.from_runner_api(proto.window_fn, context), triggerfn=TriggerFn.from_runner_api(proto.trigger, context), accumulation_mode=proto.accumulation_mode, - timestamp_combiner=proto.output_time) + timestamp_combiner=proto.output_time, + allowed_lateness=Duration(micros=proto.allowed_lateness * 1000)) @typehints.with_input_types(T) @@ -2383,8 +2403,8 @@ class WindowInto(ParDo): windowfn, # type: typing.Union[Windowing, WindowFn] trigger=None, # type: typing.Optional[TriggerFn] accumulation_mode=None, - timestamp_combiner=None - ): + timestamp_combiner=None, + allowed_lateness=0): """Initializes a WindowInto transform. Args: @@ -2406,7 +2426,8 @@ class WindowInto(ParDo): timestamp_combiner = timestamp_combiner or windowing.timestamp_combiner self.windowing = Windowing( - windowfn, trigger, accumulation_mode, timestamp_combiner) + windowfn, trigger, accumulation_mode, timestamp_combiner, + allowed_lateness) super(WindowInto, self).__init__(self.WindowIntoFn(self.windowing)) def get_windowing(self, unused_inputs): diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py index 6106c26..65bd4c7 100644 --- a/sdks/python/apache_beam/transforms/trigger.py +++ b/sdks/python/apache_beam/transforms/trigger.py @@ -1128,6 +1128,7 @@ class GeneralTriggerDriver(TriggerDriver): def __init__(self, windowing, clock): self.clock = clock + self.allowed_lateness = windowing.allowed_lateness self.window_fn = windowing.windowfn self.timestamp_combiner_impl = TimestampCombiner.get_impl( windowing.timestamp_combiner, self.window_fn) @@ -1147,6 +1148,9 @@ class GeneralTriggerDriver(TriggerDriver): windows_to_elements = collections.defaultdict(list) for wv in windowed_values: for window in wv.windows: + # ignore expired windows + if input_watermark > window.end + self.allowed_lateness: + continue windows_to_elements[window].append((wv.value, wv.timestamp)) # First handle merging. @@ -1241,7 +1245,6 @@ class GeneralTriggerDriver(TriggerDriver): nonspeculative_index = state.get_state( window, self.NONSPECULATIVE_INDEX) state.add_state(window, self.NONSPECULATIVE_INDEX, 1) - windowed_value.PaneInfoTiming.LATE _LOGGER.warning('Watermark moved backwards in time ' 'or late data moved window end forward.') else: diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index d67611c..bdc8e37 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -36,6 +36,7 @@ import apache_beam as beam from apache_beam import coders from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.portability import common_urns from apache_beam.runners import pipeline_context from apache_beam.runners.direct.clock import TestClock from apache_beam.testing.test_pipeline import TestPipeline @@ -66,6 +67,7 @@ from apache_beam.transforms.window import WindowedValue from apache_beam.transforms.window import WindowFn from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP +from apache_beam.utils.timestamp import Duration from apache_beam.utils.windowed_value import PaneInfoTiming @@ -118,8 +120,11 @@ class TriggerTest(unittest.TestCase): bundles, late_bundles, expected_panes): actual_panes = collections.defaultdict(list) + allowed_lateness = Duration(micros=int( + common_urns.constants.MAX_TIMESTAMP_MILLIS.constant)*1000) driver = GeneralTriggerDriver( - Windowing(window_fn, trigger_fn, accumulation_mode), TestClock()) + Windowing(window_fn, trigger_fn, accumulation_mode, + allowed_lateness=allowed_lateness), TestClock()) state = InMemoryUnmergedState() for bundle in bundles: @@ -590,6 +595,7 @@ class TranscriptTest(unittest.TestCase): timestamp_combiner = getattr( TimestampCombiner, spec.get('timestamp_combiner', 'OUTPUT_AT_EOW').upper()) + allowed_lateness = spec.get('allowed_lateness', 0.000) def only_element(xs): x, = list(xs) @@ -599,7 +605,7 @@ class TranscriptTest(unittest.TestCase): self._execute( window_fn, trigger_fn, accumulation_mode, timestamp_combiner, - transcript, spec) + allowed_lateness, transcript, spec) def _windowed_value_info(windowed_value): @@ -676,11 +682,11 @@ class TriggerDriverTranscriptTest(TranscriptTest): def _execute( self, window_fn, trigger_fn, accumulation_mode, timestamp_combiner, - transcript, unused_spec): + allowed_lateness, transcript, unused_spec): driver = GeneralTriggerDriver( - Windowing(window_fn, trigger_fn, accumulation_mode, timestamp_combiner), - TestClock()) + Windowing(window_fn, trigger_fn, accumulation_mode, + timestamp_combiner, allowed_lateness), TestClock()) state = InMemoryUnmergedState() output = [] watermark = MIN_TIMESTAMP @@ -708,7 +714,8 @@ class TriggerDriverTranscriptTest(TranscriptTest): for t in params] output = [ _windowed_value_info(wv) - for wv in driver.process_elements(state, bundle, watermark)] + for wv in driver.process_elements(state, bundle, watermark, + watermark)] fire_timers() elif action == 'watermark': @@ -742,7 +749,7 @@ class BaseTestStreamTranscriptTest(TranscriptTest): def _execute( self, window_fn, trigger_fn, accumulation_mode, timestamp_combiner, - transcript, spec): + allowed_lateness, transcript, spec): runner_name = TestPipeline().runner.__class__.__name__ if runner_name in spec.get('broken_on', ()): @@ -881,7 +888,8 @@ class BaseTestStreamTranscriptTest(TranscriptTest): window_fn, trigger=trigger_fn, accumulation_mode=accumulation_mode, - timestamp_combiner=timestamp_combiner) + timestamp_combiner=timestamp_combiner, + allowed_lateness=allowed_lateness) | aggregation | beam.MapTuple(_windowed_value_info_map_fn) # Place outputs back into the global window to allow flattening @@ -921,7 +929,7 @@ class BatchTranscriptTest(TranscriptTest): def _execute( self, window_fn, trigger_fn, accumulation_mode, timestamp_combiner, - transcript, spec): + allowed_lateness, transcript, spec): if timestamp_combiner == TimestampCombiner.OUTPUT_AT_EARLIEST_TRANSFORMED: self.skipTest( 'Non-fnapi timestamp combiner: %s' % spec.get('timestamp_combiner')) @@ -971,7 +979,8 @@ class BatchTranscriptTest(TranscriptTest): window_fn, trigger=trigger_fn, accumulation_mode=accumulation_mode, - timestamp_combiner=timestamp_combiner)) + timestamp_combiner=timestamp_combiner, + allowed_lateness=allowed_lateness)) grouped = input_pc | 'Grouped' >> ( beam.GroupByKey()