Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 1305c108a -> b8e3a7b52


Make step encodings consistently use WindowedValueCoders

This change is needed for size-estimation aggregators.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d8434ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d8434ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d8434ec

Branch: refs/heads/python-sdk
Commit: 3d8434ec95327d4bb374d38196e883a49f2a76d4
Parents: 1305c10
Author: Charles Chen <c...@google.com>
Authored: Fri Jul 15 14:37:43 2016 -0700
Committer: Charles Chen <c...@google.com>
Committed: Fri Jul 15 14:37:43 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/dataflow_runner.py | 12 +++++++++++-
 sdks/python/apache_beam/utils/dependency.py        |  6 +++++-
 2 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d8434ec/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow_runner.py
index 45bfb6e..4fafc9f 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -530,6 +530,12 @@ class DataflowPipelineRunner(PipelineRunner):
     else:
       coder = transform.source.coder
 
+    # Wrap coder in WindowedValueCoder: this is necessary as the encoding of a
+    # step should be the type of value outputted by each step.  Read steps
+    # automatically wrap output values in a WindowedValue wrapper, if 
necessary.
+    # This is also necessary for proper encoding for size estimation.
+    coder = coders.WindowedValueCoder(coder)
+
     step.encoding = self._get_cloud_encoding(coder)
     step.add_property(
         PropertyNames.OUTPUT_INFO,
@@ -595,7 +601,11 @@ class DataflowPipelineRunner(PipelineRunner):
           'Sink %r has unexpected format %s.' % (
               transform.sink, transform.sink.format))
     step.add_property(PropertyNames.FORMAT, transform.sink.format)
-    step.encoding = self._get_cloud_encoding(transform.sink.coder)
+
+    # Wrap coder in WindowedValueCoder: this is necessary for proper encoding
+    # for size estimation.
+    coder = coders.WindowedValueCoder(transform.sink.coder)
+    step.encoding = self._get_cloud_encoding(coder)
     step.add_property(PropertyNames.ENCODING, step.encoding)
     step.add_property(
         PropertyNames.PARALLEL_INPUT,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d8434ec/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py 
b/sdks/python/apache_beam/utils/dependency.py
index b809cf2..bd0983c 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -434,7 +434,11 @@ def get_required_container_version():
   except pkg.DistributionNotFound:
     # This case covers Apache Beam end-to-end testing scenarios. All these 
tests
     # will run with a special container version.
-    return 'beamhead'
+    #
+    # TODO(ccy): change this back to 'beamhead' when worker support for the
+    # recent change to use WindowedValueCoders in Sources and Sinks is rolled
+    # out.
+    return 'beamhead-02'
 
 
 def _download_pypi_sdk_package(temp_dir):

Reply via email to