This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
commit 9fc641ac8f7c1385c82e71e35d55c1d4d77a1147 Author: Luke Cwik <lc...@google.com> AuthorDate: Wed Nov 27 12:56:59 2019 -0800 [BEAM-2929] Ensure that the Beam Python SDK sends the property "use_indexed_format" to Dataflow for side inputs which use a multimap materialization. --- .../runners/dataflow/dataflow_runner.py | 22 ++++++++++++++-------- .../apache_beam/runners/dataflow/internal/names.py | 1 + 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index d07abc4..718ab61 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -607,7 +607,8 @@ class DataflowRunner(PipelineRunner): return step def _add_singleton_step( - self, label, full_label, tag, input_step, windowing_strategy): + self, label, full_label, tag, input_step, windowing_strategy, + access_pattern): """Creates a CollectionToSingleton step used to handle ParDo side inputs.""" # Import here to avoid adding the dependency for local running scenarios. from apache_beam.runners.dataflow.internal import apiclient @@ -620,12 +621,16 @@ class DataflowRunner(PipelineRunner): PropertyNames.STEP_NAME: input_step.proto.name, PropertyNames.OUTPUT_NAME: input_step.get_output(tag)}) step.encoding = self._get_side_input_encoding(input_step.encoding) - step.add_property( - PropertyNames.OUTPUT_INFO, - [{PropertyNames.USER_NAME: ( - '%s.%s' % (full_label, PropertyNames.OUTPUT)), - PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) + + output_info = { + PropertyNames.USER_NAME: '%s.%s' % (full_label, PropertyNames.OUTPUT), + PropertyNames.ENCODING: step.encoding, + PropertyNames.OUTPUT_NAME: PropertyNames.OUT + } + if common_urns.side_inputs.MULTIMAP.urn == access_pattern: + output_info[PropertyNames.USE_INDEXED_FORMAT] = True + step.add_property(PropertyNames.OUTPUT_INFO, [output_info]) + step.add_property( PropertyNames.WINDOWING_STRATEGY, self.serialize_windowing_strategy(windowing_strategy)) @@ -820,7 +825,8 @@ class DataflowRunner(PipelineRunner): self._add_singleton_step( step_name, si_full_label, side_pval.pvalue.tag, self._cache.get_pvalue(side_pval.pvalue), - side_pval.pvalue.windowing) + side_pval.pvalue.windowing, + side_pval._side_input_data().access_pattern) si_dict[si_label] = { '@type': 'OutputReference', PropertyNames.STEP_NAME: step_name, diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index 5b2dd89..fdce49b 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -117,6 +117,7 @@ class PropertyNames(object): SOURCE_STEP_INPUT = 'custom_source_step_input' SERIALIZED_TEST_STREAM = 'serialized_test_stream' STEP_NAME = 'step_name' + USE_INDEXED_FORMAT = 'use_indexed_format' USER_FN = 'user_fn' USER_NAME = 'user_name' VALIDATE_SINK = 'validate_sink'