Fix side inputs on dataflow runner.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/07daf3a5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/07daf3a5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/07daf3a5

Branch: refs/heads/master
Commit: 07daf3a54544ce842165ffe15264e43ebced28ba
Parents: 60901f8
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Fri Mar 31 14:57:44 2017 -0700
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Fri Mar 31 21:45:16 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/runners/dataflow/dataflow_runner.py    | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/07daf3a5/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index db433df..fe9f8c0 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -287,7 +287,7 @@ class DataflowRunner(PipelineRunner):
   def _add_singleton_step(self, label, full_label, tag, input_step):
     """Creates a CollectionToSingleton step used to handle ParDo side 
inputs."""
     # Import here to avoid adding the dependency for local running scenarios.
-    from google.cloud.dataflow.internal import apiclient
+    from apache_beam.runners.dataflow.internal import apiclient
     step = apiclient.Step(TransformNames.COLLECTION_TO_SINGLETON, label)
     self.job.proto.steps.append(step.proto)
     step.add_property(PropertyNames.USER_NAME, full_label)
@@ -302,7 +302,7 @@ class DataflowRunner(PipelineRunner):
         [{PropertyNames.USER_NAME: (
             '%s.%s' % (full_label, PropertyNames.OUTPUT)),
           PropertyNames.ENCODING: step.encoding,
-          PropertyNames.OUTPUT_NAME: PropertyNames.OUTPUT}])
+          PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
     return step
 
   def run_Flatten(self, transform_node):
@@ -374,12 +374,10 @@ class DataflowRunner(PipelineRunner):
     si_dict = {}
     # We must call self._cache.get_pvalue exactly once due to refcounting.
     si_labels = {}
-    for side_pval in transform_node.side_inputs:
-      si_labels[side_pval] = self._cache.get_pvalue(side_pval).step_name
     lookup_label = lambda side_pval: si_labels[side_pval]
     for side_pval in transform_node.side_inputs:
       assert isinstance(side_pval, AsSideInput)
-      si_label = self._get_unique_step_name()
+      si_label = 'SideInput-' + self._get_unique_step_name()
       si_full_label = '%s/%s' % (transform_node.full_label, si_label)
       self._add_singleton_step(
           si_label, si_full_label, side_pval.pvalue.tag,
@@ -388,10 +386,13 @@ class DataflowRunner(PipelineRunner):
           '@type': 'OutputReference',
           PropertyNames.STEP_NAME: si_label,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}
+      si_labels[side_pval] = si_label
 
     # Now create the step for the ParDo transform being handled.
     step = self._add_step(
-        TransformNames.DO, transform_node.full_label, transform_node,
+        TransformNames.DO,
+        transform_node.full_label + '/Do' if transform_node.side_inputs else 
'',
+        transform_node,
         transform_node.transform.side_output_tags)
     fn_data = self._pardo_fn_data(transform_node, lookup_label)
     step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(fn_data))

Reply via email to