[ https://issues.apache.org/jira/browse/BEAM-4549?focusedWorklogId=111631&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-111631 ]
ASF GitHub Bot logged work on BEAM-4549: ---------------------------------------- Author: ASF GitHub Bot Created on: 13/Jun/18 19:58 Start Date: 13/Jun/18 19:58 Worklog Time Spent: 10m Work Description: aaltay closed pull request #5623: [BEAM-4549] Use per-pipeline unique ids for side inputs in DataflowRunner URL: https://github.com/apache/beam/pull/5623 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index ec25ce01ad1..ca9e892fc2d 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -423,12 +423,13 @@ def _get_encoded_output_coder(self, transform_node, window_value=True): return self._get_typehint_based_encoding( element_type, window_coder=window_coder) - def _add_step(self, step_kind, step_label, transform_node, side_tags=()): + def _add_step(self, step_kind, step_label, transform_node, side_tags=(), + step_name=None): """Creates a Step object and adds it to the cache.""" # Import here to avoid adding the dependency for local running scenarios. # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.runners.dataflow.internal import apiclient - step = apiclient.Step(step_kind, self._get_unique_step_name()) + step = apiclient.Step(step_kind, step_name or self._get_unique_step_name()) self.job.proto.steps.append(step.proto) step.add_property(PropertyNames.USER_NAME, step_label) # Cache the node/step association for the main output of the transform node. @@ -581,6 +582,8 @@ def run_ParDo(self, transform_node): input_tag = transform_node.inputs[0].tag input_step = self._cache.get_pvalue(transform_node.inputs[0]) + pardo_step_name = self._get_unique_step_name() + # Attach side inputs. si_dict = {} # We must call self._cache.get_pvalue exactly once due to refcounting. @@ -590,7 +593,7 @@ def run_ParDo(self, transform_node): for ix, side_pval in enumerate(transform_node.side_inputs): assert isinstance(side_pval, AsSideInput) step_name = 'SideInput-' + self._get_unique_step_name() - si_label = 'side%d' % ix + si_label = 'side-%s-%d' % (pardo_step_name, ix) pcollection_label = '%s.%s' % ( side_pval.pvalue.producer.full_label.split('/')[-1], side_pval.pvalue.tag if side_pval.pvalue.tag else 'out') @@ -621,7 +624,8 @@ def run_ParDo(self, transform_node): '/{}'.format(transform_name) if transform_node.side_inputs else ''), transform_node, - transform_node.transform.output_tags) + transform_node.transform.output_tags, + step_name=pardo_step_name) # Import here to avoid adding the dependency for local running scenarios. # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.runners.dataflow.internal import apiclient ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 111631) Time Spent: 1h 20m (was: 1h 10m) > Streaming pipelines with multiple side inputs fail on DataflowRunner > -------------------------------------------------------------------- > > Key: BEAM-4549 > URL: https://issues.apache.org/jira/browse/BEAM-4549 > Project: Beam > Issue Type: Bug > Components: sdk-py-core > Reporter: Charles Chen > Assignee: Charles Chen > Priority: Major > Time Spent: 1h 20m > Remaining Estimate: 0h > > Streaming pipelines with multiple side inputs currently fail on > DataflowRunner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)