Repository: incubator-beam Updated Branches: refs/heads/python-sdk 1305c108a -> b8e3a7b52
Make step encodings consistently use WindowedValueCoders This change is needed for size-estimation aggregators. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d8434ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d8434ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d8434ec Branch: refs/heads/python-sdk Commit: 3d8434ec95327d4bb374d38196e883a49f2a76d4 Parents: 1305c10 Author: Charles Chen <c...@google.com> Authored: Fri Jul 15 14:37:43 2016 -0700 Committer: Charles Chen <c...@google.com> Committed: Fri Jul 15 14:37:43 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/dataflow_runner.py | 12 +++++++++++- sdks/python/apache_beam/utils/dependency.py | 6 +++++- 2 files changed, 16 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d8434ec/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 45bfb6e..4fafc9f 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -530,6 +530,12 @@ class DataflowPipelineRunner(PipelineRunner): else: coder = transform.source.coder + # Wrap coder in WindowedValueCoder: this is necessary as the encoding of a + # step should be the type of value outputted by each step. Read steps + # automatically wrap output values in a WindowedValue wrapper, if necessary. + # This is also necessary for proper encoding for size estimation. + coder = coders.WindowedValueCoder(coder) + step.encoding = self._get_cloud_encoding(coder) step.add_property( PropertyNames.OUTPUT_INFO, @@ -595,7 +601,11 @@ class DataflowPipelineRunner(PipelineRunner): 'Sink %r has unexpected format %s.' % ( transform.sink, transform.sink.format)) step.add_property(PropertyNames.FORMAT, transform.sink.format) - step.encoding = self._get_cloud_encoding(transform.sink.coder) + + # Wrap coder in WindowedValueCoder: this is necessary for proper encoding + # for size estimation. + coder = coders.WindowedValueCoder(transform.sink.coder) + step.encoding = self._get_cloud_encoding(coder) step.add_property(PropertyNames.ENCODING, step.encoding) step.add_property( PropertyNames.PARALLEL_INPUT, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d8434ec/sdks/python/apache_beam/utils/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py index b809cf2..bd0983c 100644 --- a/sdks/python/apache_beam/utils/dependency.py +++ b/sdks/python/apache_beam/utils/dependency.py @@ -434,7 +434,11 @@ def get_required_container_version(): except pkg.DistributionNotFound: # This case covers Apache Beam end-to-end testing scenarios. All these tests # will run with a special container version. - return 'beamhead' + # + # TODO(ccy): change this back to 'beamhead' when worker support for the + # recent change to use WindowedValueCoders in Sources and Sinks is rolled + # out. + return 'beamhead-02' def _download_pypi_sdk_package(temp_dir):