This is an automated email from the ASF dual-hosted git repository. boyuanz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 8b3b14c Fix PairWithRestrictionFn process no attribute signature error new 7973f41 Merge pull request #12640 from y1chi/bundle_runner_sdf 8b3b14c is described below commit 8b3b14cd8d28302be9c9bf9d0280ac4419ea4743 Author: Yichi Zhang <zyi...@google.com> AuthorDate: Wed Aug 19 16:07:59 2020 -0700 Fix PairWithRestrictionFn process no attribute signature error --- sdks/python/apache_beam/runners/direct/sdf_direct_runner.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py index ac27d57..7d605b2 100644 --- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner.py @@ -117,19 +117,18 @@ class ElementAndRestriction(object): class PairWithRestrictionFn(beam.DoFn): """A transform that pairs each element with a restriction.""" def __init__(self, do_fn): - self._do_fn = do_fn + self._signature = DoFnSignature(do_fn) def start_bundle(self): - signature = DoFnSignature(self._do_fn) self._invoker = DoFnInvoker.create_invoker( - signature, + self._signature, output_processor=_NoneShallPassOutputProcessor(), process_invocation=False) def process(self, element, window=beam.DoFn.WindowParam, *args, **kwargs): initial_restriction = self._invoker.invoke_initial_restriction(element) watermark_estimator_state = ( - self.signature.process_method.watermark_estimator_provider. + self._signature.process_method.watermark_estimator_provider. initial_estimator_state(element, initial_restriction)) yield ElementAndRestriction( element, initial_restriction, watermark_estimator_state)