This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 72714c2  [BEAM-2939] Add split_and_size method for SDFs.
     new 0869790  Merge pull request #8235 [BEAM-2939] Add split_and_size 
method for SDFs.
72714c2 is described below

commit 72714c2a4bbb399eb03bf038bc628df0ad3b4a02
Author: Robert Bradshaw <rober...@google.com>
AuthorDate: Fri Apr 5 13:25:52 2019 +0200

    [BEAM-2939] Add split_and_size method for SDFs.
---
 sdks/python/apache_beam/runners/worker/bundle_processor.py |  6 +++---
 sdks/python/apache_beam/transforms/core.py                 | 14 +++++++++-----
 2 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py 
b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 1a3667d..583f1bb 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -960,9 +960,9 @@ def create(*args):
 
     def process(self, element_restriction, *args, **kwargs):
       element, restriction = element_restriction
-      for part in self.restriction_provider.split(element, restriction):
-        yield ((element, part),
-               self.restriction_provider.restriction_size(element, part))
+      for part, size in self.restriction_provider.split_and_size(
+          element, restriction):
+        yield ((element, part), size)
 
   return _create_sdf_operation(SplitAndSizeRestrictions, *args)
 
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 4639f7c..0ee672f 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -196,15 +196,16 @@ class RestrictionProvider(object):
   an instance of ``RestrictionProvider``.
 
   The provided ``RestrictionProvider`` instance must provide suitable overrides
-  for the following methods.
+  for the following methods:
   * create_tracker()
   * initial_restriction()
 
   Optionally, ``RestrictionProvider`` may override default implementations of
-  following methods.
+  following methods:
   * restriction_coder()
   * restriction_size()
   * split()
+  * split_and_size()
 
   ** Pausing and resuming processing of an element **
 
@@ -254,9 +255,6 @@ class RestrictionProvider(object):
     reading input element for each of the returned restrictions should be the
     same as the total set of elements produced by reading the input element for
     the input restriction.
-
-    TODO(chamikara): give suitable hints for performing splitting, for example
-    number of parts or size in bytes.
     """
     yield restriction
 
@@ -279,6 +277,12 @@ class RestrictionProvider(object):
     """
     return self.create_tracker(restriction).default_size()
 
+  def split_and_size(self, element, restriction):
+    """Like split, but also does sizing, returning (restriction, size) pairs.
+    """
+    for part in self.split(element, restriction):
+      yield part, self.restriction_size(element, part)
+
 
 def get_function_arguments(obj, func):
   """Return the function arguments based on the name provided. If they have

Reply via email to