Repository: beam
Updated Branches:
  refs/heads/master 24fb4cda6 -> 4084f71a1


A couple of worker fixes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e9cb40dc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e9cb40dc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e9cb40dc

Branch: refs/heads/master
Commit: e9cb40dc37683effb2ded1bfaab6dfc04ade698b
Parents: 24fb4cd
Author: Robert Bradshaw <rober...@google.com>
Authored: Wed May 17 14:25:08 2017 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Wed May 24 15:09:12 2017 -0700

----------------------------------------------------------------------
 .../python/apache_beam/runners/worker/operation_specs.py | 11 ++++++-----
 sdks/python/apache_beam/runners/worker/operations.py     |  2 +-
 2 files changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e9cb40dc/sdks/python/apache_beam/runners/worker/operation_specs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py 
b/sdks/python/apache_beam/runners/worker/operation_specs.py
index c03d9a2..db5eb76 100644
--- a/sdks/python/apache_beam/runners/worker/operation_specs.py
+++ b/sdks/python/apache_beam/runners/worker/operation_specs.py
@@ -305,7 +305,8 @@ def get_coder_from_spec(coder_spec):
   assert coder_spec is not None
 
   # Ignore the wrappers in these encodings.
-  # TODO(silviuc): Make sure with all the renamings that names below are ok.
+  ignored_wrappers = (
+      'com.google.cloud.dataflow.sdk.util.TimerOrElement$TimerOrElementCoder')
   if coder_spec['@type'] in ignored_wrappers:
     assert len(coder_spec['component_encodings']) == 1
     coder_spec = coder_spec['component_encodings'][0]
@@ -328,20 +329,20 @@ def get_coder_from_spec(coder_spec):
     return coders.WindowedValueCoder(value_coder, window_coder=window_coder)
   elif coder_spec['@type'] == 'kind:interval_window':
     assert ('component_encodings' not in coder_spec
-            or len(coder_spec['component_encodings'] == 0))
+            or not coder_spec['component_encodings'])
     return coders.IntervalWindowCoder()
   elif coder_spec['@type'] == 'kind:global_window':
     assert ('component_encodings' not in coder_spec
             or not coder_spec['component_encodings'])
-    return coders.GlobalWindowCoder()
+    return coders.coders.GlobalWindowCoder()
   elif coder_spec['@type'] == 'kind:length_prefix':
     assert len(coder_spec['component_encodings']) == 1
-    return coders.LengthPrefixCoder(
+    return coders.coders.LengthPrefixCoder(
         get_coder_from_spec(coder_spec['component_encodings'][0]))
 
   # We pass coders in the form "<coder_name>$<pickled_data>" to make the job
   # description JSON more readable.
-  return coders.deserialize_coder(coder_spec['@type'])
+  return coders.coders.deserialize_coder(coder_spec['@type'])
 
 
 class MapTask(object):

http://git-wip-us.apache.org/repos/asf/beam/blob/e9cb40dc/sdks/python/apache_beam/runners/worker/operations.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/operations.py 
b/sdks/python/apache_beam/runners/worker/operations.py
index 5dbe57e..a44561d 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -281,7 +281,7 @@ class DoOperation(Operation):
 
       # Backwards compatibility for pre BEAM-733 SDKs.
       if isinstance(view_options, tuple):
-        if view_class == pvalue.SingletonPCollectionView:
+        if view_class == pvalue.AsSingleton:
           has_default, default = view_options
           view_options = {'default': default} if has_default else {}
         else:

Reply via email to