yifanmai commented on a change in pull request #12185:
URL: https://github.com/apache/beam/pull/12185#discussion_r465914422



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -685,6 +687,264 @@ def fix_side_input_pcoll_coders(stages, pipeline_context):
   return stages
 
 
+def eliminate_common_siblings(stages, context):
+  # type: (Iterable[Stage], TransformContext) -> Iterable[Stage]
+  """Runs common subexpression elimination for common siblings.
+
+  If stages have common input, an identical transform, and one output each,
+  then all but one stages will be eliminated, and the output of the remaining
+  will be connected to the original output PCollections of the eliminated
+  stages. This elimination runs only once, not recursively, and will only
+  eliminate the first stage after a common input, rather than a chain of
+  stages.
+  """
+
+  SiblingKey = collections.namedtuple(
+      'SiblingKey', ['spec_urn', 'spec_payload', 'inputs', 'environment_id'])
+
+  def get_sibling_key(transform):
+    """Returns a key that will be identical for common siblings."""
+    transform_output_keys = list(transform.outputs.keys())
+    # Return None as the sibling key for ineligible transforms.
+    if len(transform_output_keys
+          ) != 1 or transform.spec.urn != common_urns.primitives.PAR_DO.urn:
+      return None
+    return SiblingKey(
+        spec_urn=transform.spec.urn,
+        spec_payload=transform.spec.payload,
+        inputs=tuple(transform.inputs.items()),
+        environment_id=transform.environment_id)
+
+  # Group stages by keys.
+  stages_by_sibling_key = collections.defaultdict(list)
+  for stage in stages:
+    transform = only_transform(stage.transforms)
+    stages_by_sibling_key[get_sibling_key(transform)].append(stage)
+
+  # Eliminate stages and build the output PCollection remapping dictionary.
+  pcoll_id_remap = {}

Review comment:
       Noted. I'm not currently aware of any requirement to keeping this around 
beyond inside this optimizer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to