[ 
https://issues.apache.org/jira/browse/BEAM-4678?focusedWorklogId=174108&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174108
 ]

ASF GitHub Bot logged work on BEAM-4678:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Dec/18 14:28
            Start Date: 11/Dec/18 14:28
    Worklog Time Spent: 10m 
      Work Description: robertwb closed pull request #7228: [BEAM-4678] Support 
combiner lifting in portable Flink runner.
URL: https://github.com/apache/beam/pull/7228
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto 
b/model/pipeline/src/main/proto/beam_runner_api.proto
index bbe2cfa91305..b691a636313b 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -250,6 +250,7 @@ message StandardPTransforms {
   }
   // Payload for all of these: CombinePayload
   enum CombineComponents {
+    // TODO(BEAM-6199): Remove these old URNs.
     COMBINE_PGBKCV = 0 [(beam_urn) = "beam:transform:combine_pgbkcv:v1"];
     COMBINE_MERGE_ACCUMULATORS = 1 [(beam_urn) = 
"beam:transform:combine_merge_accumulators:v1"];
     COMBINE_EXTRACT_OUTPUTS = 2 [(beam_urn) = 
"beam:transform:combine_extract_outputs:v1"];
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
index cebf5e34512e..bcca38476da7 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java
@@ -44,6 +44,15 @@
   private static final ImmutableMap<String, EnvironmentIdExtractor> 
KNOWN_URN_SPEC_EXTRACTORS =
       ImmutableMap.<String, EnvironmentIdExtractor>builder()
           .put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, 
Environments::combineExtractor)
+          .put(
+              PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
+              Environments::combineExtractor)
+          .put(
+              
PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN,
+              Environments::combineExtractor)
+          .put(
+              
PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN,
+              Environments::combineExtractor)
           .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, 
Environments::parDoExtractor)
           .put(PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN, 
Environments::parDoExtractor)
           .put(PTransformTranslation.READ_TRANSFORM_URN, 
Environments::readExtractor)
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 86a414c4c508..ffe5f271aab0 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -91,6 +91,12 @@
       getUrn(StandardPTransforms.Composites.COMBINE_GLOBALLY);
   public static final String COMBINE_GROUPED_VALUES_TRANSFORM_URN =
       getUrn(CombineComponents.COMBINE_GROUPED_VALUES);
+  public static final String COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN =
+      getUrn(CombineComponents.COMBINE_PER_KEY_PRECOMBINE);
+  public static final String COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN =
+      getUrn(CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS);
+  public static final String COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN =
+      getUrn(CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS);
   public static final String RESHUFFLE_URN = 
getUrn(StandardPTransforms.Composites.RESHUFFLE);
   public static final String WRITE_FILES_TRANSFORM_URN =
       getUrn(StandardPTransforms.Composites.WRITE_FILES);
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
index c0c61eefd8eb..d6b5eca30504 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java
@@ -46,9 +46,18 @@
   private static final Map<String, FusibilityChecker> URN_FUSIBILITY_CHECKERS =
       ImmutableMap.<String, FusibilityChecker>builder()
           .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, 
GreedyPCollectionFusers::canFuseParDo)
+          .put(
+              PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
+              GreedyPCollectionFusers::canFuseCompatibleEnvironment)
+          .put(
+              
PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN,
+              GreedyPCollectionFusers::canFuseCompatibleEnvironment)
+          .put(
+              
PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN,
+              GreedyPCollectionFusers::canFuseCompatibleEnvironment)
           .put(
               PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN,
-              GreedyPCollectionFusers::canFuseAssignWindows)
+              GreedyPCollectionFusers::canFuseCompatibleEnvironment)
           .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, 
GreedyPCollectionFusers::canAlwaysFuse)
           .put(
               // GroupByKeys are runner-implemented only. PCollections 
consumed by a GroupByKey must
@@ -65,6 +74,15 @@
           .put(
               PTransformTranslation.PAR_DO_TRANSFORM_URN,
               GreedyPCollectionFusers::parDoCompatibility)
+          .put(
+              PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
+              GreedyPCollectionFusers::compatibleEnvironments)
+          .put(
+              
PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN,
+              GreedyPCollectionFusers::compatibleEnvironments)
+          .put(
+              
PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN,
+              GreedyPCollectionFusers::compatibleEnvironments)
           .put(
               PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN,
               GreedyPCollectionFusers::compatibleEnvironments)
@@ -213,15 +231,15 @@ private static boolean parDoCompatibility(
   /**
    * A WindowInto can be fused into a stage if it executes in the same 
Environment as that stage.
    */
-  private static boolean canFuseAssignWindows(
-      PTransformNode window,
+  private static boolean canFuseCompatibleEnvironment(
+      PTransformNode operation,
       Environment environmemnt,
       @SuppressWarnings("unused") PCollectionNode candidate,
       @SuppressWarnings("unused") Collection<PCollectionNode> 
stagePCollections,
       QueryablePipeline pipeline) {
     // WindowInto transforms may not have an environment
-    Optional<Environment> windowEnvironment = pipeline.getEnvironment(window);
-    return environmemnt.equals(windowEnvironment.orElse(null));
+    Optional<Environment> operationEnvironment = 
pipeline.getEnvironment(operation);
+    return environmemnt.equals(operationEnvironment.orElse(null));
   }
 
   private static boolean compatibleEnvironments(
diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
index 5c3e447d959c..c46a57f01e55 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java
@@ -19,6 +19,9 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN;
+import static 
org.apache.beam.runners.core.construction.PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN;
+import static 
org.apache.beam.runners.core.construction.PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN;
+import static 
org.apache.beam.runners.core.construction.PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN;
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.CREATE_VIEW_TRANSFORM_URN;
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.FLATTEN_TRANSFORM_URN;
 import static 
org.apache.beam.runners.core.construction.PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
@@ -162,6 +165,9 @@ private QueryablePipeline(Collection<String> transformIds, 
Components components
           MAP_WINDOWS_TRANSFORM_URN,
           READ_TRANSFORM_URN,
           CREATE_VIEW_TRANSFORM_URN,
+          COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN,
+          COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN,
+          COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN,
           SPLITTABLE_PROCESS_KEYED_URN,
           SPLITTABLE_PROCESS_ELEMENTS_URN);
 
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 62a2d299983d..e217fd050cc5 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -435,131 +435,9 @@ def impulse_to_input(stages):
         yield stage
 
     def lift_combiners(stages):
-      """Expands CombinePerKey into pre- and post-grouping stages.
-
-      ... -> CombinePerKey -> ...
-
-      becomes
-
-      ... -> PreCombine -> GBK -> MergeAccumulators -> ExtractOutput -> ...
-      """
-      for stage in stages:
-        assert len(stage.transforms) == 1
-        transform = stage.transforms[0]
-        if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn:
-          combine_payload = proto_utils.parse_Bytes(
-              transform.spec.payload, beam_runner_api_pb2.CombinePayload)
-
-          input_pcoll = pipeline_components.pcollections[only_element(
-              list(transform.inputs.values()))]
-          output_pcoll = pipeline_components.pcollections[only_element(
-              list(transform.outputs.values()))]
-
-          element_coder_id = input_pcoll.coder_id
-          element_coder = pipeline_components.coders[element_coder_id]
-          key_coder_id, _ = element_coder.component_coder_ids
-          accumulator_coder_id = combine_payload.accumulator_coder_id
-
-          key_accumulator_coder = beam_runner_api_pb2.Coder(
-              spec=beam_runner_api_pb2.SdkFunctionSpec(
-                  spec=beam_runner_api_pb2.FunctionSpec(
-                      urn=common_urns.coders.KV.urn)),
-              component_coder_ids=[key_coder_id, accumulator_coder_id])
-          key_accumulator_coder_id = add_or_get_coder_id(key_accumulator_coder)
-
-          accumulator_iter_coder = beam_runner_api_pb2.Coder(
-              spec=beam_runner_api_pb2.SdkFunctionSpec(
-                  spec=beam_runner_api_pb2.FunctionSpec(
-                      urn=common_urns.coders.ITERABLE.urn)),
-              component_coder_ids=[accumulator_coder_id])
-          accumulator_iter_coder_id = add_or_get_coder_id(
-              accumulator_iter_coder)
-
-          key_accumulator_iter_coder = beam_runner_api_pb2.Coder(
-              spec=beam_runner_api_pb2.SdkFunctionSpec(
-                  spec=beam_runner_api_pb2.FunctionSpec(
-                      urn=common_urns.coders.KV.urn)),
-              component_coder_ids=[key_coder_id, accumulator_iter_coder_id])
-          key_accumulator_iter_coder_id = add_or_get_coder_id(
-              key_accumulator_iter_coder)
-
-          precombined_pcoll_id = unique_name(
-              pipeline_components.pcollections, 'pcollection')
-          pipeline_components.pcollections[precombined_pcoll_id].CopyFrom(
-              beam_runner_api_pb2.PCollection(
-                  unique_name=transform.unique_name + '/Precombine.out',
-                  coder_id=key_accumulator_coder_id,
-                  windowing_strategy_id=input_pcoll.windowing_strategy_id,
-                  is_bounded=input_pcoll.is_bounded))
-
-          grouped_pcoll_id = unique_name(
-              pipeline_components.pcollections, 'pcollection')
-          pipeline_components.pcollections[grouped_pcoll_id].CopyFrom(
-              beam_runner_api_pb2.PCollection(
-                  unique_name=transform.unique_name + '/Group.out',
-                  coder_id=key_accumulator_iter_coder_id,
-                  windowing_strategy_id=output_pcoll.windowing_strategy_id,
-                  is_bounded=output_pcoll.is_bounded))
-
-          merged_pcoll_id = unique_name(
-              pipeline_components.pcollections, 'pcollection')
-          pipeline_components.pcollections[merged_pcoll_id].CopyFrom(
-              beam_runner_api_pb2.PCollection(
-                  unique_name=transform.unique_name + '/Merge.out',
-                  coder_id=key_accumulator_coder_id,
-                  windowing_strategy_id=output_pcoll.windowing_strategy_id,
-                  is_bounded=output_pcoll.is_bounded))
-
-          def make_stage(base_stage, transform):
-            return Stage(
-                transform.unique_name,
-                [transform],
-                downstream_side_inputs=base_stage.downstream_side_inputs,
-                must_follow=base_stage.must_follow)
-
-          yield make_stage(
-              stage,
-              beam_runner_api_pb2.PTransform(
-                  unique_name=transform.unique_name + '/Precombine',
-                  spec=beam_runner_api_pb2.FunctionSpec(
-                      urn=common_urns.combine_components.COMBINE_PGBKCV.urn,
-                      payload=transform.spec.payload),
-                  inputs=transform.inputs,
-                  outputs={'out': precombined_pcoll_id}))
-
-          yield make_stage(
-              stage,
-              beam_runner_api_pb2.PTransform(
-                  unique_name=transform.unique_name + '/Group',
-                  spec=beam_runner_api_pb2.FunctionSpec(
-                      urn=common_urns.primitives.GROUP_BY_KEY.urn),
-                  inputs={'in': precombined_pcoll_id},
-                  outputs={'out': grouped_pcoll_id}))
-
-          yield make_stage(
-              stage,
-              beam_runner_api_pb2.PTransform(
-                  unique_name=transform.unique_name + '/Merge',
-                  spec=beam_runner_api_pb2.FunctionSpec(
-                      urn=common_urns.combine_components
-                      .COMBINE_MERGE_ACCUMULATORS.urn,
-                      payload=transform.spec.payload),
-                  inputs={'in': grouped_pcoll_id},
-                  outputs={'out': merged_pcoll_id}))
-
-          yield make_stage(
-              stage,
-              beam_runner_api_pb2.PTransform(
-                  unique_name=transform.unique_name + '/ExtractOutputs',
-                  spec=beam_runner_api_pb2.FunctionSpec(
-                      urn=common_urns.combine_components
-                      .COMBINE_EXTRACT_OUTPUTS.urn,
-                      payload=transform.spec.payload),
-                  inputs={'in': merged_pcoll_id},
-                  outputs=transform.outputs))
-
-        else:
-          yield stage
+      return fn_api_runner_transforms.lift_combiners(
+          stages,
+          fn_api_runner_transforms.TransformContext(pipeline_components))
 
     def expand_gbk(stages):
       """Transforms each GBK into a write followed by a read.
@@ -576,9 +454,12 @@ def expand_gbk(stages):
                 pipeline_components.pcollections[pcoll_id], 
pipeline_components)
 
           # This is used later to correlate the read and write.
-          grouping_buffer = create_buffer_id(stage.name, kind='group')
-          if stage.name not in pipeline_components.transforms:
-            pipeline_components.transforms[stage.name].CopyFrom(transform)
+          transform_id = stage.name
+          if transform != pipeline_components.transforms.get(transform_id):
+            transform_id = unique_name(
+                pipeline_components.transforms, stage.name)
+            pipeline_components.transforms[transform_id].CopyFrom(transform)
+          grouping_buffer = create_buffer_id(transform_id, kind='group')
           gbk_write = Stage(
               transform.unique_name + '/Write',
               [beam_runner_api_pb2.PTransform(
diff --git 
a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py 
b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
index 79418d99862c..335dd912b5e8 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py
@@ -142,6 +142,168 @@ def leaf_transform_stages(
         yield stage
 
 
+def with_stages(pipeline_proto, stages):
+  new_proto = beam_runner_api_pb2.Pipeline()
+  new_proto.CopyFrom(pipeline_proto)
+  components = new_proto.components
+  components.transforms.clear()
+
+  parents = {
+      child: parent
+      for parent, proto in pipeline_proto.components.transforms.items()
+      for child in proto.subtransforms
+  }
+
+  def add_parent(child, parent):
+    if parent not in components.transforms:
+      components.transforms[parent].CopyFrom(
+          pipeline_proto.components.transforms[parent])
+      del components.transforms[parent].subtransforms[:]
+      if parent in parents:
+        add_parent(parent, parents[parent])
+    components.transforms[parent].subtransforms.append(child)
+
+  for stage in stages:
+    for transform in stage.transforms:
+      id = unique_name(components.transforms, stage.name)
+      components.transforms[id].CopyFrom(transform)
+      if stage.parent:
+        add_parent(id, stage.parent)
+
+  return new_proto
+
+
+def lift_combiners(stages, context):
+  """Expands CombinePerKey into pre- and post-grouping stages.
+
+  ... -> CombinePerKey -> ...
+
+  becomes
+
+  ... -> PreCombine -> GBK -> MergeAccumulators -> ExtractOutput -> ...
+  """
+  for stage in stages:
+    assert len(stage.transforms) == 1
+    transform = stage.transforms[0]
+    if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn:
+      combine_payload = proto_utils.parse_Bytes(
+          transform.spec.payload, beam_runner_api_pb2.CombinePayload)
+
+      input_pcoll = context.components.pcollections[only_element(
+          list(transform.inputs.values()))]
+      output_pcoll = context.components.pcollections[only_element(
+          list(transform.outputs.values()))]
+
+      element_coder_id = input_pcoll.coder_id
+      element_coder = context.components.coders[element_coder_id]
+      key_coder_id, _ = element_coder.component_coder_ids
+      accumulator_coder_id = combine_payload.accumulator_coder_id
+
+      key_accumulator_coder = beam_runner_api_pb2.Coder(
+          spec=beam_runner_api_pb2.SdkFunctionSpec(
+              spec=beam_runner_api_pb2.FunctionSpec(
+                  urn=common_urns.coders.KV.urn)),
+          component_coder_ids=[key_coder_id, accumulator_coder_id])
+      key_accumulator_coder_id = context.add_or_get_coder_id(
+          key_accumulator_coder)
+
+      accumulator_iter_coder = beam_runner_api_pb2.Coder(
+          spec=beam_runner_api_pb2.SdkFunctionSpec(
+              spec=beam_runner_api_pb2.FunctionSpec(
+                  urn=common_urns.coders.ITERABLE.urn)),
+          component_coder_ids=[accumulator_coder_id])
+      accumulator_iter_coder_id = context.add_or_get_coder_id(
+          accumulator_iter_coder)
+
+      key_accumulator_iter_coder = beam_runner_api_pb2.Coder(
+          spec=beam_runner_api_pb2.SdkFunctionSpec(
+              spec=beam_runner_api_pb2.FunctionSpec(
+                  urn=common_urns.coders.KV.urn)),
+          component_coder_ids=[key_coder_id, accumulator_iter_coder_id])
+      key_accumulator_iter_coder_id = context.add_or_get_coder_id(
+          key_accumulator_iter_coder)
+
+      precombined_pcoll_id = unique_name(
+          context.components.pcollections, 'pcollection')
+      context.components.pcollections[precombined_pcoll_id].CopyFrom(
+          beam_runner_api_pb2.PCollection(
+              unique_name=transform.unique_name + '/Precombine.out',
+              coder_id=key_accumulator_coder_id,
+              windowing_strategy_id=input_pcoll.windowing_strategy_id,
+              is_bounded=input_pcoll.is_bounded))
+
+      grouped_pcoll_id = unique_name(
+          context.components.pcollections, 'pcollection')
+      context.components.pcollections[grouped_pcoll_id].CopyFrom(
+          beam_runner_api_pb2.PCollection(
+              unique_name=transform.unique_name + '/Group.out',
+              coder_id=key_accumulator_iter_coder_id,
+              windowing_strategy_id=output_pcoll.windowing_strategy_id,
+              is_bounded=output_pcoll.is_bounded))
+
+      merged_pcoll_id = unique_name(
+          context.components.pcollections, 'pcollection')
+      context.components.pcollections[merged_pcoll_id].CopyFrom(
+          beam_runner_api_pb2.PCollection(
+              unique_name=transform.unique_name + '/Merge.out',
+              coder_id=key_accumulator_coder_id,
+              windowing_strategy_id=output_pcoll.windowing_strategy_id,
+              is_bounded=output_pcoll.is_bounded))
+
+      def make_stage(base_stage, transform):
+        return Stage(
+            transform.unique_name,
+            [transform],
+            downstream_side_inputs=base_stage.downstream_side_inputs,
+            must_follow=base_stage.must_follow,
+            parent=base_stage.name)
+
+      yield make_stage(
+          stage,
+          beam_runner_api_pb2.PTransform(
+              unique_name=transform.unique_name + '/Precombine',
+              spec=beam_runner_api_pb2.FunctionSpec(
+                  urn=common_urns.combine_components
+                  .COMBINE_PER_KEY_PRECOMBINE.urn,
+                  payload=transform.spec.payload),
+              inputs=transform.inputs,
+              outputs={'out': precombined_pcoll_id}))
+
+      yield make_stage(
+          stage,
+          beam_runner_api_pb2.PTransform(
+              unique_name=transform.unique_name + '/Group',
+              spec=beam_runner_api_pb2.FunctionSpec(
+                  urn=common_urns.primitives.GROUP_BY_KEY.urn),
+              inputs={'in': precombined_pcoll_id},
+              outputs={'out': grouped_pcoll_id}))
+
+      yield make_stage(
+          stage,
+          beam_runner_api_pb2.PTransform(
+              unique_name=transform.unique_name + '/Merge',
+              spec=beam_runner_api_pb2.FunctionSpec(
+                  urn=common_urns.combine_components
+                  .COMBINE_PER_KEY_MERGE_ACCUMULATORS.urn,
+                  payload=transform.spec.payload),
+              inputs={'in': grouped_pcoll_id},
+              outputs={'out': merged_pcoll_id}))
+
+      yield make_stage(
+          stage,
+          beam_runner_api_pb2.PTransform(
+              unique_name=transform.unique_name + '/ExtractOutputs',
+              spec=beam_runner_api_pb2.FunctionSpec(
+                  urn=common_urns.combine_components
+                  .COMBINE_PER_KEY_EXTRACT_OUTPUTS.urn,
+                  payload=transform.spec.payload),
+              inputs={'in': merged_pcoll_id},
+              outputs=transform.outputs))
+
+    else:
+      yield stage
+
+
 def union(a, b):
   # Minimize the number of distinct sets.
   if not a or a == b:
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py 
b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 025cb1066780..98bc6609615e 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -27,6 +27,7 @@
 from apache_beam import metrics
 from apache_beam.options.pipeline_options import PortableOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_job_api_pb2
 from apache_beam.portability.api import beam_job_api_pb2_grpc
@@ -34,6 +35,7 @@
 from apache_beam.runners import pipeline_context
 from apache_beam.runners import runner
 from apache_beam.runners.job import utils as job_utils
+from apache_beam.runners.portability import fn_api_runner_transforms
 from apache_beam.runners.portability import portable_stager
 from apache_beam.runners.portability.job_server import DockerizedJobServer
 
@@ -132,6 +134,17 @@ def run_pipeline(self, pipeline, options):
           del proto_pipeline.components.transforms[sub_transform]
         del transform_proto.subtransforms[:]
 
+    # Preemptively apply combiner lifting, until all runners support it.
+    # This optimization is idempotent.
+    if not options.view_as(StandardOptions).streaming:
+      stages = list(fn_api_runner_transforms.leaf_transform_stages(
+          proto_pipeline.root_transform_ids, proto_pipeline.components))
+      stages = fn_api_runner_transforms.lift_combiners(
+          stages,
+          fn_api_runner_transforms.TransformContext(proto_pipeline.components))
+      proto_pipeline = fn_api_runner_transforms.with_stages(
+          proto_pipeline, stages)
+
     # TODO: Define URNs for options.
     # convert int values: https://issues.apache.org/jira/browse/BEAM-5509
     p_options = {'beam:option:' + k + ':v1': (str(v) if type(v) == int else v)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 174108)
    Time Spent: 1.5h  (was: 1h 20m)

> Support portable combiner lifting in Java Flink Runner
> ------------------------------------------------------
>
>                 Key: BEAM-4678
>                 URL: https://issues.apache.org/jira/browse/BEAM-4678
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-flink
>            Reporter: Daniel Oliveira
>            Assignee: Daniel Oliveira
>            Priority: Major
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Adjust Flink Runner to support portable combiner lifting as described in the 
> following doc:
> https://s.apache.org/beam-runner-api-combine-model



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to