This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8e9f980 Temporary workaround for [BEAM-7473] (#9023)
8e9f980 is described below
commit 8e9f980ea6ef256349131702ec9bbd015213b113
Author: Boyuan Zhang <[email protected]>
AuthorDate: Tue Jul 23 03:05:20 2019 -0700
Temporary workaround for [BEAM-7473] (#9023)
---
model/fn-execution/src/main/proto/beam_fn_api.proto | 3 +++
sdks/python/apache_beam/io/iobase.py | 7 +++++++
sdks/python/apache_beam/runners/common.py | 6 +++++-
sdks/python/apache_beam/runners/worker/bundle_processor.py | 4 ++--
4 files changed, 17 insertions(+), 3 deletions(-)
diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto
b/model/fn-execution/src/main/proto/beam_fn_api.proto
index b8c91fd..df2bf86 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -193,6 +193,9 @@ message BundleApplication {
// value represents a lower bound on the timestamps of elements that
// are produced by this PTransform into each of its output PCollections
// when invoked with this application.
+ //
+ // If there is no watermark reported from RestrictionTracker, the runner will
+ // use MIN_TIMESTAMP by default.
map<string, google.protobuf.Timestamp> output_watermarks = 4;
// (Required) Whether this application potentially produces an unbounded
diff --git a/sdks/python/apache_beam/io/iobase.py
b/sdks/python/apache_beam/io/iobase.py
index 79c3706..bb7c03c 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -1137,6 +1137,13 @@ class RestrictionTracker(object):
"""
raise NotImplementedError
+ def current_watermark(self):
+ """Returns current watermark. By default, not report watermark.
+
+ TODO(BEAM-7473): Provide synchronization guarantee by using a wrapper.
+ """
+ return None
+
def checkpoint(self):
"""Performs a checkpoint of the current restriction.
diff --git a/sdks/python/apache_beam/runners/common.py
b/sdks/python/apache_beam/runners/common.py
index 89c2cb7..ef2ec65 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -697,6 +697,10 @@ class PerWindowInvoker(DoFnInvoker):
restriction_tracker = self.restriction_tracker
current_windowed_value = self.current_windowed_value
if restriction_tracker and current_windowed_value:
+ # Temporary workaround for [BEAM-7473]: get current_watermark before
+ # split, in case watermark gets advanced before getting split results.
+ # In worst case, current_watermark is always stale, which is ok.
+ current_watermark = restriction_tracker.current_watermark()
split = restriction_tracker.try_split(fraction)
if split:
primary, residual = split
@@ -710,7 +714,7 @@ class PerWindowInvoker(DoFnInvoker):
None),
(self.current_windowed_value.with_value(
((element, residual), residual_size)),
- restriction_tracker.current_watermark()))
+ current_watermark))
def current_element_progress(self):
restriction_tracker = self.restriction_tracker
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 8d788fc..a85e4da 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -32,7 +32,7 @@ from builtins import next
from builtins import object
from future.utils import itervalues
-from google import protobuf
+from google.protobuf import timestamp_pb2
import apache_beam as beam
from apache_beam import coders
@@ -648,7 +648,7 @@ class BundleProcessor(object):
# TODO(SDF): For non-root nodes, need main_input_coder + residual_coder.
element_and_restriction, watermark = deferred_remainder
if watermark:
- proto_watermark = protobuf.Timestamp()
+ proto_watermark = timestamp_pb2.Timestamp()
proto_watermark.FromMicroseconds(watermark.micros)
output_watermarks = {output: proto_watermark for output in outputs}
else: