This is an automated email from the ASF dual-hosted git repository. robertwb pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 72714c2 [BEAM-2939] Add split_and_size method for SDFs. new 0869790 Merge pull request #8235 [BEAM-2939] Add split_and_size method for SDFs. 72714c2 is described below commit 72714c2a4bbb399eb03bf038bc628df0ad3b4a02 Author: Robert Bradshaw <rober...@google.com> AuthorDate: Fri Apr 5 13:25:52 2019 +0200 [BEAM-2939] Add split_and_size method for SDFs. --- sdks/python/apache_beam/runners/worker/bundle_processor.py | 6 +++--- sdks/python/apache_beam/transforms/core.py | 14 +++++++++----- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 1a3667d..583f1bb 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -960,9 +960,9 @@ def create(*args): def process(self, element_restriction, *args, **kwargs): element, restriction = element_restriction - for part in self.restriction_provider.split(element, restriction): - yield ((element, part), - self.restriction_provider.restriction_size(element, part)) + for part, size in self.restriction_provider.split_and_size( + element, restriction): + yield ((element, part), size) return _create_sdf_operation(SplitAndSizeRestrictions, *args) diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 4639f7c..0ee672f 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -196,15 +196,16 @@ class RestrictionProvider(object): an instance of ``RestrictionProvider``. The provided ``RestrictionProvider`` instance must provide suitable overrides - for the following methods. + for the following methods: * create_tracker() * initial_restriction() Optionally, ``RestrictionProvider`` may override default implementations of - following methods. + following methods: * restriction_coder() * restriction_size() * split() + * split_and_size() ** Pausing and resuming processing of an element ** @@ -254,9 +255,6 @@ class RestrictionProvider(object): reading input element for each of the returned restrictions should be the same as the total set of elements produced by reading the input element for the input restriction. - - TODO(chamikara): give suitable hints for performing splitting, for example - number of parts or size in bytes. """ yield restriction @@ -279,6 +277,12 @@ class RestrictionProvider(object): """ return self.create_tracker(restriction).default_size() + def split_and_size(self, element, restriction): + """Like split, but also does sizing, returning (restriction, size) pairs. + """ + for part in self.split(element, restriction): + yield part, self.restriction_size(element, part) + def get_function_arguments(obj, func): """Return the function arguments based on the name provided. If they have