Refine Python DirectRunner watermark advancement behavior This change helps prepare for streaming pipeline execution.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3e049020 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3e049020 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3e049020 Branch: refs/heads/gearpump-runner Commit: 3e04902008b410269b23179dc2146623ff1fbd0a Parents: d81ed21 Author: Charles Chen <c...@google.com> Authored: Wed Jun 7 17:46:36 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Thu Jun 8 10:55:44 2017 -0700 ---------------------------------------------------------------------- .../runners/direct/watermark_manager.py | 20 +++++++++++++++++--- sdks/python/apache_beam/utils/timestamp.py | 5 +++++ 2 files changed, 22 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3e049020/sdks/python/apache_beam/runners/direct/watermark_manager.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py b/sdks/python/apache_beam/runners/direct/watermark_manager.py index 3a13539..0d7cd4f 100644 --- a/sdks/python/apache_beam/runners/direct/watermark_manager.py +++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py @@ -25,6 +25,7 @@ from apache_beam import pipeline from apache_beam import pvalue from apache_beam.utils.timestamp import MAX_TIMESTAMP from apache_beam.utils.timestamp import MIN_TIMESTAMP +from apache_beam.utils.timestamp import TIME_GRANULARITY class WatermarkManager(object): @@ -193,9 +194,22 @@ class _TransformWatermarks(object): def refresh(self): with self._lock: - pending_holder = (WatermarkManager.WATERMARK_NEG_INF - if self._pending else - WatermarkManager.WATERMARK_POS_INF) + min_pending_timestamp = WatermarkManager.WATERMARK_POS_INF + has_pending_elements = False + for input_bundle in self._pending: + # TODO(ccy): we can have the Bundle class keep track of the minimum + # timestamp so we don't have to do an iteration here. + for wv in input_bundle.get_elements_iterable(): + has_pending_elements = True + if wv.timestamp < min_pending_timestamp: + min_pending_timestamp = wv.timestamp + + # If there is a pending element with a certain timestamp, we can at most + # advance our watermark to the maximum timestamp less than that + # timestamp. + pending_holder = WatermarkManager.WATERMARK_POS_INF + if has_pending_elements: + pending_holder = min_pending_timestamp - TIME_GRANULARITY input_watermarks = [ tw.output_watermark for tw in self._input_transform_watermarks] http://git-wip-us.apache.org/repos/asf/beam/blob/3e049020/sdks/python/apache_beam/utils/timestamp.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/timestamp.py b/sdks/python/apache_beam/utils/timestamp.py index 5d1b48c..b3e840e 100644 --- a/sdks/python/apache_beam/utils/timestamp.py +++ b/sdks/python/apache_beam/utils/timestamp.py @@ -208,3 +208,8 @@ class Duration(object): def __mod__(self, other): other = Duration.of(other) return Duration(micros=self.micros % other.micros) + + +# The minimum granularity / interval expressible in a Timestamp / Duration +# object. +TIME_GRANULARITY = Duration(micros=1)