Repository: beam Updated Branches: refs/heads/master 24fb4cda6 -> 4084f71a1
A couple of worker fixes. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e9cb40dc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e9cb40dc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e9cb40dc Branch: refs/heads/master Commit: e9cb40dc37683effb2ded1bfaab6dfc04ade698b Parents: 24fb4cd Author: Robert Bradshaw <rober...@google.com> Authored: Wed May 17 14:25:08 2017 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Wed May 24 15:09:12 2017 -0700 ---------------------------------------------------------------------- .../python/apache_beam/runners/worker/operation_specs.py | 11 ++++++----- sdks/python/apache_beam/runners/worker/operations.py | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e9cb40dc/sdks/python/apache_beam/runners/worker/operation_specs.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py b/sdks/python/apache_beam/runners/worker/operation_specs.py index c03d9a2..db5eb76 100644 --- a/sdks/python/apache_beam/runners/worker/operation_specs.py +++ b/sdks/python/apache_beam/runners/worker/operation_specs.py @@ -305,7 +305,8 @@ def get_coder_from_spec(coder_spec): assert coder_spec is not None # Ignore the wrappers in these encodings. - # TODO(silviuc): Make sure with all the renamings that names below are ok. + ignored_wrappers = ( + 'com.google.cloud.dataflow.sdk.util.TimerOrElement$TimerOrElementCoder') if coder_spec['@type'] in ignored_wrappers: assert len(coder_spec['component_encodings']) == 1 coder_spec = coder_spec['component_encodings'][0] @@ -328,20 +329,20 @@ def get_coder_from_spec(coder_spec): return coders.WindowedValueCoder(value_coder, window_coder=window_coder) elif coder_spec['@type'] == 'kind:interval_window': assert ('component_encodings' not in coder_spec - or len(coder_spec['component_encodings'] == 0)) + or not coder_spec['component_encodings']) return coders.IntervalWindowCoder() elif coder_spec['@type'] == 'kind:global_window': assert ('component_encodings' not in coder_spec or not coder_spec['component_encodings']) - return coders.GlobalWindowCoder() + return coders.coders.GlobalWindowCoder() elif coder_spec['@type'] == 'kind:length_prefix': assert len(coder_spec['component_encodings']) == 1 - return coders.LengthPrefixCoder( + return coders.coders.LengthPrefixCoder( get_coder_from_spec(coder_spec['component_encodings'][0])) # We pass coders in the form "<coder_name>$<pickled_data>" to make the job # description JSON more readable. - return coders.deserialize_coder(coder_spec['@type']) + return coders.coders.deserialize_coder(coder_spec['@type']) class MapTask(object): http://git-wip-us.apache.org/repos/asf/beam/blob/e9cb40dc/sdks/python/apache_beam/runners/worker/operations.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py index 5dbe57e..a44561d 100644 --- a/sdks/python/apache_beam/runners/worker/operations.py +++ b/sdks/python/apache_beam/runners/worker/operations.py @@ -281,7 +281,7 @@ class DoOperation(Operation): # Backwards compatibility for pre BEAM-733 SDKs. if isinstance(view_options, tuple): - if view_class == pvalue.SingletonPCollectionView: + if view_class == pvalue.AsSingleton: has_default, default = view_options view_options = {'default': default} if has_default else {} else: