This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e9f980  Temporary workaround for [BEAM-7473] (#9023)
8e9f980 is described below

commit 8e9f980ea6ef256349131702ec9bbd015213b113
Author: Boyuan Zhang <[email protected]>
AuthorDate: Tue Jul 23 03:05:20 2019 -0700

    Temporary workaround for [BEAM-7473] (#9023)
---
 model/fn-execution/src/main/proto/beam_fn_api.proto        | 3 +++
 sdks/python/apache_beam/io/iobase.py                       | 7 +++++++
 sdks/python/apache_beam/runners/common.py                  | 6 +++++-
 sdks/python/apache_beam/runners/worker/bundle_processor.py | 4 ++--
 4 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto 
b/model/fn-execution/src/main/proto/beam_fn_api.proto
index b8c91fd..df2bf86 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -193,6 +193,9 @@ message BundleApplication {
   // value represents a lower bound on the timestamps of elements that
   // are produced by this PTransform into each of its output PCollections
   // when invoked with this application.
+  //
+  // If there is no watermark reported from RestrictionTracker, the runner will
+  // use MIN_TIMESTAMP by default.
   map<string, google.protobuf.Timestamp> output_watermarks = 4;
 
   // (Required) Whether this application potentially produces an unbounded
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index 79c3706..bb7c03c 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1137,6 +1137,13 @@ class RestrictionTracker(object):
     """
     raise NotImplementedError
 
+  def current_watermark(self):
+    """Returns current watermark. By default, not report watermark.
+
+    TODO(BEAM-7473): Provide synchronization guarantee by using a wrapper.
+    """
+    return None
+
   def checkpoint(self):
     """Performs a checkpoint of the current restriction.
 
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 89c2cb7..ef2ec65 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -697,6 +697,10 @@ class PerWindowInvoker(DoFnInvoker):
     restriction_tracker = self.restriction_tracker
     current_windowed_value = self.current_windowed_value
     if restriction_tracker and current_windowed_value:
+      # Temporary workaround for [BEAM-7473]: get current_watermark before
+      # split, in case watermark gets advanced before getting split results.
+      # In worst case, current_watermark is always stale, which is ok.
+      current_watermark = restriction_tracker.current_watermark()
       split = restriction_tracker.try_split(fraction)
       if split:
         primary, residual = split
@@ -710,7 +714,7 @@ class PerWindowInvoker(DoFnInvoker):
              None),
             (self.current_windowed_value.with_value(
                 ((element, residual), residual_size)),
-             restriction_tracker.current_watermark()))
+             current_watermark))
 
   def current_element_progress(self):
     restriction_tracker = self.restriction_tracker
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 8d788fc..a85e4da 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -32,7 +32,7 @@ from builtins import next
 from builtins import object
 
 from future.utils import itervalues
-from google import protobuf
+from google.protobuf import timestamp_pb2
 
 import apache_beam as beam
 from apache_beam import coders
@@ -648,7 +648,7 @@ class BundleProcessor(object):
     # TODO(SDF): For non-root nodes, need main_input_coder + residual_coder.
     element_and_restriction, watermark = deferred_remainder
     if watermark:
-      proto_watermark = protobuf.Timestamp()
+      proto_watermark = timestamp_pb2.Timestamp()
       proto_watermark.FromMicroseconds(watermark.micros)
       output_watermarks = {output: proto_watermark for output in outputs}
     else:

Reply via email to