Fix side inputs on dataflow runner.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/07daf3a5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/07daf3a5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/07daf3a5 Branch: refs/heads/master Commit: 07daf3a54544ce842165ffe15264e43ebced28ba Parents: 60901f8 Author: Robert Bradshaw <rober...@gmail.com> Authored: Fri Mar 31 14:57:44 2017 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Fri Mar 31 21:45:16 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/runners/dataflow/dataflow_runner.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/07daf3a5/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index db433df..fe9f8c0 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -287,7 +287,7 @@ class DataflowRunner(PipelineRunner): def _add_singleton_step(self, label, full_label, tag, input_step): """Creates a CollectionToSingleton step used to handle ParDo side inputs.""" # Import here to avoid adding the dependency for local running scenarios. - from google.cloud.dataflow.internal import apiclient + from apache_beam.runners.dataflow.internal import apiclient step = apiclient.Step(TransformNames.COLLECTION_TO_SINGLETON, label) self.job.proto.steps.append(step.proto) step.add_property(PropertyNames.USER_NAME, full_label) @@ -302,7 +302,7 @@ class DataflowRunner(PipelineRunner): [{PropertyNames.USER_NAME: ( '%s.%s' % (full_label, PropertyNames.OUTPUT)), PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUTPUT}]) + PropertyNames.OUTPUT_NAME: PropertyNames.OUT}]) return step def run_Flatten(self, transform_node): @@ -374,12 +374,10 @@ class DataflowRunner(PipelineRunner): si_dict = {} # We must call self._cache.get_pvalue exactly once due to refcounting. si_labels = {} - for side_pval in transform_node.side_inputs: - si_labels[side_pval] = self._cache.get_pvalue(side_pval).step_name lookup_label = lambda side_pval: si_labels[side_pval] for side_pval in transform_node.side_inputs: assert isinstance(side_pval, AsSideInput) - si_label = self._get_unique_step_name() + si_label = 'SideInput-' + self._get_unique_step_name() si_full_label = '%s/%s' % (transform_node.full_label, si_label) self._add_singleton_step( si_label, si_full_label, side_pval.pvalue.tag, @@ -388,10 +386,13 @@ class DataflowRunner(PipelineRunner): '@type': 'OutputReference', PropertyNames.STEP_NAME: si_label, PropertyNames.OUTPUT_NAME: PropertyNames.OUT} + si_labels[side_pval] = si_label # Now create the step for the ParDo transform being handled. step = self._add_step( - TransformNames.DO, transform_node.full_label, transform_node, + TransformNames.DO, + transform_node.full_label + '/Do' if transform_node.side_inputs else '', + transform_node, transform_node.transform.side_output_tags) fn_data = self._pardo_fn_data(transform_node, lookup_label) step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(fn_data))