Refine Python DirectRunner watermark advancement behavior

This change helps prepare for streaming pipeline execution.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3e049020
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3e049020
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3e049020

Branch: refs/heads/gearpump-runner
Commit: 3e04902008b410269b23179dc2146623ff1fbd0a
Parents: d81ed21
Author: Charles Chen <c...@google.com>
Authored: Wed Jun 7 17:46:36 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Jun 8 10:55:44 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/watermark_manager.py         | 20 +++++++++++++++++---
 sdks/python/apache_beam/utils/timestamp.py      |  5 +++++
 2 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3e049020/sdks/python/apache_beam/runners/direct/watermark_manager.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/watermark_manager.py 
b/sdks/python/apache_beam/runners/direct/watermark_manager.py
index 3a13539..0d7cd4f 100644
--- a/sdks/python/apache_beam/runners/direct/watermark_manager.py
+++ b/sdks/python/apache_beam/runners/direct/watermark_manager.py
@@ -25,6 +25,7 @@ from apache_beam import pipeline
 from apache_beam import pvalue
 from apache_beam.utils.timestamp import MAX_TIMESTAMP
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
+from apache_beam.utils.timestamp import TIME_GRANULARITY
 
 
 class WatermarkManager(object):
@@ -193,9 +194,22 @@ class _TransformWatermarks(object):
 
   def refresh(self):
     with self._lock:
-      pending_holder = (WatermarkManager.WATERMARK_NEG_INF
-                        if self._pending else
-                        WatermarkManager.WATERMARK_POS_INF)
+      min_pending_timestamp = WatermarkManager.WATERMARK_POS_INF
+      has_pending_elements = False
+      for input_bundle in self._pending:
+        # TODO(ccy): we can have the Bundle class keep track of the minimum
+        # timestamp so we don't have to do an iteration here.
+        for wv in input_bundle.get_elements_iterable():
+          has_pending_elements = True
+          if wv.timestamp < min_pending_timestamp:
+            min_pending_timestamp = wv.timestamp
+
+      # If there is a pending element with a certain timestamp, we can at most
+      # advance our watermark to the maximum timestamp less than that
+      # timestamp.
+      pending_holder = WatermarkManager.WATERMARK_POS_INF
+      if has_pending_elements:
+        pending_holder = min_pending_timestamp - TIME_GRANULARITY
 
       input_watermarks = [
           tw.output_watermark for tw in self._input_transform_watermarks]

http://git-wip-us.apache.org/repos/asf/beam/blob/3e049020/sdks/python/apache_beam/utils/timestamp.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/timestamp.py 
b/sdks/python/apache_beam/utils/timestamp.py
index 5d1b48c..b3e840e 100644
--- a/sdks/python/apache_beam/utils/timestamp.py
+++ b/sdks/python/apache_beam/utils/timestamp.py
@@ -208,3 +208,8 @@ class Duration(object):
   def __mod__(self, other):
     other = Duration.of(other)
     return Duration(micros=self.micros % other.micros)
+
+
+# The minimum granularity / interval expressible in a Timestamp / Duration
+# object.
+TIME_GRANULARITY = Duration(micros=1)

Reply via email to