[ https://issues.apache.org/jira/browse/BEAM-4678?focusedWorklogId=174108&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-174108 ]
ASF GitHub Bot logged work on BEAM-4678: ---------------------------------------- Author: ASF GitHub Bot Created on: 11/Dec/18 14:28 Start Date: 11/Dec/18 14:28 Worklog Time Spent: 10m Work Description: robertwb closed pull request #7228: [BEAM-4678] Support combiner lifting in portable Flink runner. URL: https://github.com/apache/beam/pull/7228 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto index bbe2cfa91305..b691a636313b 100644 --- a/model/pipeline/src/main/proto/beam_runner_api.proto +++ b/model/pipeline/src/main/proto/beam_runner_api.proto @@ -250,6 +250,7 @@ message StandardPTransforms { } // Payload for all of these: CombinePayload enum CombineComponents { + // TODO(BEAM-6199): Remove these old URNs. COMBINE_PGBKCV = 0 [(beam_urn) = "beam:transform:combine_pgbkcv:v1"]; COMBINE_MERGE_ACCUMULATORS = 1 [(beam_urn) = "beam:transform:combine_merge_accumulators:v1"]; COMBINE_EXTRACT_OUTPUTS = 2 [(beam_urn) = "beam:transform:combine_extract_outputs:v1"]; diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java index cebf5e34512e..bcca38476da7 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Environments.java @@ -44,6 +44,15 @@ private static final ImmutableMap<String, EnvironmentIdExtractor> KNOWN_URN_SPEC_EXTRACTORS = ImmutableMap.<String, EnvironmentIdExtractor>builder() .put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, Environments::combineExtractor) + .put( + PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN, + Environments::combineExtractor) + .put( + PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN, + Environments::combineExtractor) + .put( + PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN, + Environments::combineExtractor) .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, Environments::parDoExtractor) .put(PTransformTranslation.SPLITTABLE_PROCESS_ELEMENTS_URN, Environments::parDoExtractor) .put(PTransformTranslation.READ_TRANSFORM_URN, Environments::readExtractor) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java index 86a414c4c508..ffe5f271aab0 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java @@ -91,6 +91,12 @@ getUrn(StandardPTransforms.Composites.COMBINE_GLOBALLY); public static final String COMBINE_GROUPED_VALUES_TRANSFORM_URN = getUrn(CombineComponents.COMBINE_GROUPED_VALUES); + public static final String COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN = + getUrn(CombineComponents.COMBINE_PER_KEY_PRECOMBINE); + public static final String COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN = + getUrn(CombineComponents.COMBINE_PER_KEY_MERGE_ACCUMULATORS); + public static final String COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN = + getUrn(CombineComponents.COMBINE_PER_KEY_EXTRACT_OUTPUTS); public static final String RESHUFFLE_URN = getUrn(StandardPTransforms.Composites.RESHUFFLE); public static final String WRITE_FILES_TRANSFORM_URN = getUrn(StandardPTransforms.Composites.WRITE_FILES); diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java index c0c61eefd8eb..d6b5eca30504 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/GreedyPCollectionFusers.java @@ -46,9 +46,18 @@ private static final Map<String, FusibilityChecker> URN_FUSIBILITY_CHECKERS = ImmutableMap.<String, FusibilityChecker>builder() .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, GreedyPCollectionFusers::canFuseParDo) + .put( + PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN, + GreedyPCollectionFusers::canFuseCompatibleEnvironment) + .put( + PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN, + GreedyPCollectionFusers::canFuseCompatibleEnvironment) + .put( + PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN, + GreedyPCollectionFusers::canFuseCompatibleEnvironment) .put( PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, - GreedyPCollectionFusers::canFuseAssignWindows) + GreedyPCollectionFusers::canFuseCompatibleEnvironment) .put(PTransformTranslation.FLATTEN_TRANSFORM_URN, GreedyPCollectionFusers::canAlwaysFuse) .put( // GroupByKeys are runner-implemented only. PCollections consumed by a GroupByKey must @@ -65,6 +74,15 @@ .put( PTransformTranslation.PAR_DO_TRANSFORM_URN, GreedyPCollectionFusers::parDoCompatibility) + .put( + PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN, + GreedyPCollectionFusers::compatibleEnvironments) + .put( + PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN, + GreedyPCollectionFusers::compatibleEnvironments) + .put( + PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN, + GreedyPCollectionFusers::compatibleEnvironments) .put( PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, GreedyPCollectionFusers::compatibleEnvironments) @@ -213,15 +231,15 @@ private static boolean parDoCompatibility( /** * A WindowInto can be fused into a stage if it executes in the same Environment as that stage. */ - private static boolean canFuseAssignWindows( - PTransformNode window, + private static boolean canFuseCompatibleEnvironment( + PTransformNode operation, Environment environmemnt, @SuppressWarnings("unused") PCollectionNode candidate, @SuppressWarnings("unused") Collection<PCollectionNode> stagePCollections, QueryablePipeline pipeline) { // WindowInto transforms may not have an environment - Optional<Environment> windowEnvironment = pipeline.getEnvironment(window); - return environmemnt.equals(windowEnvironment.orElse(null)); + Optional<Environment> operationEnvironment = pipeline.getEnvironment(operation); + return environmemnt.equals(operationEnvironment.orElse(null)); } private static boolean compatibleEnvironments( diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java index 5c3e447d959c..c46a57f01e55 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/QueryablePipeline.java @@ -19,6 +19,9 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.runners.core.construction.PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN; +import static org.apache.beam.runners.core.construction.PTransformTranslation.COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN; +import static org.apache.beam.runners.core.construction.PTransformTranslation.COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN; +import static org.apache.beam.runners.core.construction.PTransformTranslation.COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN; import static org.apache.beam.runners.core.construction.PTransformTranslation.CREATE_VIEW_TRANSFORM_URN; import static org.apache.beam.runners.core.construction.PTransformTranslation.FLATTEN_TRANSFORM_URN; import static org.apache.beam.runners.core.construction.PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN; @@ -162,6 +165,9 @@ private QueryablePipeline(Collection<String> transformIds, Components components MAP_WINDOWS_TRANSFORM_URN, READ_TRANSFORM_URN, CREATE_VIEW_TRANSFORM_URN, + COMBINE_PER_KEY_PRECOMBINE_TRANSFORM_URN, + COMBINE_PER_KEY_MERGE_ACCUMULATORS_TRANSFORM_URN, + COMBINE_PER_KEY_EXTRACT_OUTPUTS_TRANSFORM_URN, SPLITTABLE_PROCESS_KEYED_URN, SPLITTABLE_PROCESS_ELEMENTS_URN); diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py index 62a2d299983d..e217fd050cc5 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py @@ -435,131 +435,9 @@ def impulse_to_input(stages): yield stage def lift_combiners(stages): - """Expands CombinePerKey into pre- and post-grouping stages. - - ... -> CombinePerKey -> ... - - becomes - - ... -> PreCombine -> GBK -> MergeAccumulators -> ExtractOutput -> ... - """ - for stage in stages: - assert len(stage.transforms) == 1 - transform = stage.transforms[0] - if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn: - combine_payload = proto_utils.parse_Bytes( - transform.spec.payload, beam_runner_api_pb2.CombinePayload) - - input_pcoll = pipeline_components.pcollections[only_element( - list(transform.inputs.values()))] - output_pcoll = pipeline_components.pcollections[only_element( - list(transform.outputs.values()))] - - element_coder_id = input_pcoll.coder_id - element_coder = pipeline_components.coders[element_coder_id] - key_coder_id, _ = element_coder.component_coder_ids - accumulator_coder_id = combine_payload.accumulator_coder_id - - key_accumulator_coder = beam_runner_api_pb2.Coder( - spec=beam_runner_api_pb2.SdkFunctionSpec( - spec=beam_runner_api_pb2.FunctionSpec( - urn=common_urns.coders.KV.urn)), - component_coder_ids=[key_coder_id, accumulator_coder_id]) - key_accumulator_coder_id = add_or_get_coder_id(key_accumulator_coder) - - accumulator_iter_coder = beam_runner_api_pb2.Coder( - spec=beam_runner_api_pb2.SdkFunctionSpec( - spec=beam_runner_api_pb2.FunctionSpec( - urn=common_urns.coders.ITERABLE.urn)), - component_coder_ids=[accumulator_coder_id]) - accumulator_iter_coder_id = add_or_get_coder_id( - accumulator_iter_coder) - - key_accumulator_iter_coder = beam_runner_api_pb2.Coder( - spec=beam_runner_api_pb2.SdkFunctionSpec( - spec=beam_runner_api_pb2.FunctionSpec( - urn=common_urns.coders.KV.urn)), - component_coder_ids=[key_coder_id, accumulator_iter_coder_id]) - key_accumulator_iter_coder_id = add_or_get_coder_id( - key_accumulator_iter_coder) - - precombined_pcoll_id = unique_name( - pipeline_components.pcollections, 'pcollection') - pipeline_components.pcollections[precombined_pcoll_id].CopyFrom( - beam_runner_api_pb2.PCollection( - unique_name=transform.unique_name + '/Precombine.out', - coder_id=key_accumulator_coder_id, - windowing_strategy_id=input_pcoll.windowing_strategy_id, - is_bounded=input_pcoll.is_bounded)) - - grouped_pcoll_id = unique_name( - pipeline_components.pcollections, 'pcollection') - pipeline_components.pcollections[grouped_pcoll_id].CopyFrom( - beam_runner_api_pb2.PCollection( - unique_name=transform.unique_name + '/Group.out', - coder_id=key_accumulator_iter_coder_id, - windowing_strategy_id=output_pcoll.windowing_strategy_id, - is_bounded=output_pcoll.is_bounded)) - - merged_pcoll_id = unique_name( - pipeline_components.pcollections, 'pcollection') - pipeline_components.pcollections[merged_pcoll_id].CopyFrom( - beam_runner_api_pb2.PCollection( - unique_name=transform.unique_name + '/Merge.out', - coder_id=key_accumulator_coder_id, - windowing_strategy_id=output_pcoll.windowing_strategy_id, - is_bounded=output_pcoll.is_bounded)) - - def make_stage(base_stage, transform): - return Stage( - transform.unique_name, - [transform], - downstream_side_inputs=base_stage.downstream_side_inputs, - must_follow=base_stage.must_follow) - - yield make_stage( - stage, - beam_runner_api_pb2.PTransform( - unique_name=transform.unique_name + '/Precombine', - spec=beam_runner_api_pb2.FunctionSpec( - urn=common_urns.combine_components.COMBINE_PGBKCV.urn, - payload=transform.spec.payload), - inputs=transform.inputs, - outputs={'out': precombined_pcoll_id})) - - yield make_stage( - stage, - beam_runner_api_pb2.PTransform( - unique_name=transform.unique_name + '/Group', - spec=beam_runner_api_pb2.FunctionSpec( - urn=common_urns.primitives.GROUP_BY_KEY.urn), - inputs={'in': precombined_pcoll_id}, - outputs={'out': grouped_pcoll_id})) - - yield make_stage( - stage, - beam_runner_api_pb2.PTransform( - unique_name=transform.unique_name + '/Merge', - spec=beam_runner_api_pb2.FunctionSpec( - urn=common_urns.combine_components - .COMBINE_MERGE_ACCUMULATORS.urn, - payload=transform.spec.payload), - inputs={'in': grouped_pcoll_id}, - outputs={'out': merged_pcoll_id})) - - yield make_stage( - stage, - beam_runner_api_pb2.PTransform( - unique_name=transform.unique_name + '/ExtractOutputs', - spec=beam_runner_api_pb2.FunctionSpec( - urn=common_urns.combine_components - .COMBINE_EXTRACT_OUTPUTS.urn, - payload=transform.spec.payload), - inputs={'in': merged_pcoll_id}, - outputs=transform.outputs)) - - else: - yield stage + return fn_api_runner_transforms.lift_combiners( + stages, + fn_api_runner_transforms.TransformContext(pipeline_components)) def expand_gbk(stages): """Transforms each GBK into a write followed by a read. @@ -576,9 +454,12 @@ def expand_gbk(stages): pipeline_components.pcollections[pcoll_id], pipeline_components) # This is used later to correlate the read and write. - grouping_buffer = create_buffer_id(stage.name, kind='group') - if stage.name not in pipeline_components.transforms: - pipeline_components.transforms[stage.name].CopyFrom(transform) + transform_id = stage.name + if transform != pipeline_components.transforms.get(transform_id): + transform_id = unique_name( + pipeline_components.transforms, stage.name) + pipeline_components.transforms[transform_id].CopyFrom(transform) + grouping_buffer = create_buffer_id(transform_id, kind='group') gbk_write = Stage( transform.unique_name + '/Write', [beam_runner_api_pb2.PTransform( diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py index 79418d99862c..335dd912b5e8 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_transforms.py @@ -142,6 +142,168 @@ def leaf_transform_stages( yield stage +def with_stages(pipeline_proto, stages): + new_proto = beam_runner_api_pb2.Pipeline() + new_proto.CopyFrom(pipeline_proto) + components = new_proto.components + components.transforms.clear() + + parents = { + child: parent + for parent, proto in pipeline_proto.components.transforms.items() + for child in proto.subtransforms + } + + def add_parent(child, parent): + if parent not in components.transforms: + components.transforms[parent].CopyFrom( + pipeline_proto.components.transforms[parent]) + del components.transforms[parent].subtransforms[:] + if parent in parents: + add_parent(parent, parents[parent]) + components.transforms[parent].subtransforms.append(child) + + for stage in stages: + for transform in stage.transforms: + id = unique_name(components.transforms, stage.name) + components.transforms[id].CopyFrom(transform) + if stage.parent: + add_parent(id, stage.parent) + + return new_proto + + +def lift_combiners(stages, context): + """Expands CombinePerKey into pre- and post-grouping stages. + + ... -> CombinePerKey -> ... + + becomes + + ... -> PreCombine -> GBK -> MergeAccumulators -> ExtractOutput -> ... + """ + for stage in stages: + assert len(stage.transforms) == 1 + transform = stage.transforms[0] + if transform.spec.urn == common_urns.composites.COMBINE_PER_KEY.urn: + combine_payload = proto_utils.parse_Bytes( + transform.spec.payload, beam_runner_api_pb2.CombinePayload) + + input_pcoll = context.components.pcollections[only_element( + list(transform.inputs.values()))] + output_pcoll = context.components.pcollections[only_element( + list(transform.outputs.values()))] + + element_coder_id = input_pcoll.coder_id + element_coder = context.components.coders[element_coder_id] + key_coder_id, _ = element_coder.component_coder_ids + accumulator_coder_id = combine_payload.accumulator_coder_id + + key_accumulator_coder = beam_runner_api_pb2.Coder( + spec=beam_runner_api_pb2.SdkFunctionSpec( + spec=beam_runner_api_pb2.FunctionSpec( + urn=common_urns.coders.KV.urn)), + component_coder_ids=[key_coder_id, accumulator_coder_id]) + key_accumulator_coder_id = context.add_or_get_coder_id( + key_accumulator_coder) + + accumulator_iter_coder = beam_runner_api_pb2.Coder( + spec=beam_runner_api_pb2.SdkFunctionSpec( + spec=beam_runner_api_pb2.FunctionSpec( + urn=common_urns.coders.ITERABLE.urn)), + component_coder_ids=[accumulator_coder_id]) + accumulator_iter_coder_id = context.add_or_get_coder_id( + accumulator_iter_coder) + + key_accumulator_iter_coder = beam_runner_api_pb2.Coder( + spec=beam_runner_api_pb2.SdkFunctionSpec( + spec=beam_runner_api_pb2.FunctionSpec( + urn=common_urns.coders.KV.urn)), + component_coder_ids=[key_coder_id, accumulator_iter_coder_id]) + key_accumulator_iter_coder_id = context.add_or_get_coder_id( + key_accumulator_iter_coder) + + precombined_pcoll_id = unique_name( + context.components.pcollections, 'pcollection') + context.components.pcollections[precombined_pcoll_id].CopyFrom( + beam_runner_api_pb2.PCollection( + unique_name=transform.unique_name + '/Precombine.out', + coder_id=key_accumulator_coder_id, + windowing_strategy_id=input_pcoll.windowing_strategy_id, + is_bounded=input_pcoll.is_bounded)) + + grouped_pcoll_id = unique_name( + context.components.pcollections, 'pcollection') + context.components.pcollections[grouped_pcoll_id].CopyFrom( + beam_runner_api_pb2.PCollection( + unique_name=transform.unique_name + '/Group.out', + coder_id=key_accumulator_iter_coder_id, + windowing_strategy_id=output_pcoll.windowing_strategy_id, + is_bounded=output_pcoll.is_bounded)) + + merged_pcoll_id = unique_name( + context.components.pcollections, 'pcollection') + context.components.pcollections[merged_pcoll_id].CopyFrom( + beam_runner_api_pb2.PCollection( + unique_name=transform.unique_name + '/Merge.out', + coder_id=key_accumulator_coder_id, + windowing_strategy_id=output_pcoll.windowing_strategy_id, + is_bounded=output_pcoll.is_bounded)) + + def make_stage(base_stage, transform): + return Stage( + transform.unique_name, + [transform], + downstream_side_inputs=base_stage.downstream_side_inputs, + must_follow=base_stage.must_follow, + parent=base_stage.name) + + yield make_stage( + stage, + beam_runner_api_pb2.PTransform( + unique_name=transform.unique_name + '/Precombine', + spec=beam_runner_api_pb2.FunctionSpec( + urn=common_urns.combine_components + .COMBINE_PER_KEY_PRECOMBINE.urn, + payload=transform.spec.payload), + inputs=transform.inputs, + outputs={'out': precombined_pcoll_id})) + + yield make_stage( + stage, + beam_runner_api_pb2.PTransform( + unique_name=transform.unique_name + '/Group', + spec=beam_runner_api_pb2.FunctionSpec( + urn=common_urns.primitives.GROUP_BY_KEY.urn), + inputs={'in': precombined_pcoll_id}, + outputs={'out': grouped_pcoll_id})) + + yield make_stage( + stage, + beam_runner_api_pb2.PTransform( + unique_name=transform.unique_name + '/Merge', + spec=beam_runner_api_pb2.FunctionSpec( + urn=common_urns.combine_components + .COMBINE_PER_KEY_MERGE_ACCUMULATORS.urn, + payload=transform.spec.payload), + inputs={'in': grouped_pcoll_id}, + outputs={'out': merged_pcoll_id})) + + yield make_stage( + stage, + beam_runner_api_pb2.PTransform( + unique_name=transform.unique_name + '/ExtractOutputs', + spec=beam_runner_api_pb2.FunctionSpec( + urn=common_urns.combine_components + .COMBINE_PER_KEY_EXTRACT_OUTPUTS.urn, + payload=transform.spec.payload), + inputs={'in': merged_pcoll_id}, + outputs=transform.outputs)) + + else: + yield stage + + def union(a, b): # Minimize the number of distinct sets. if not a or a == b: diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index 025cb1066780..98bc6609615e 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -27,6 +27,7 @@ from apache_beam import metrics from apache_beam.options.pipeline_options import PortableOptions from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import StandardOptions from apache_beam.portability import common_urns from apache_beam.portability.api import beam_job_api_pb2 from apache_beam.portability.api import beam_job_api_pb2_grpc @@ -34,6 +35,7 @@ from apache_beam.runners import pipeline_context from apache_beam.runners import runner from apache_beam.runners.job import utils as job_utils +from apache_beam.runners.portability import fn_api_runner_transforms from apache_beam.runners.portability import portable_stager from apache_beam.runners.portability.job_server import DockerizedJobServer @@ -132,6 +134,17 @@ def run_pipeline(self, pipeline, options): del proto_pipeline.components.transforms[sub_transform] del transform_proto.subtransforms[:] + # Preemptively apply combiner lifting, until all runners support it. + # This optimization is idempotent. + if not options.view_as(StandardOptions).streaming: + stages = list(fn_api_runner_transforms.leaf_transform_stages( + proto_pipeline.root_transform_ids, proto_pipeline.components)) + stages = fn_api_runner_transforms.lift_combiners( + stages, + fn_api_runner_transforms.TransformContext(proto_pipeline.components)) + proto_pipeline = fn_api_runner_transforms.with_stages( + proto_pipeline, stages) + # TODO: Define URNs for options. # convert int values: https://issues.apache.org/jira/browse/BEAM-5509 p_options = {'beam:option:' + k + ':v1': (str(v) if type(v) == int else v) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 174108) Time Spent: 1.5h (was: 1h 20m) > Support portable combiner lifting in Java Flink Runner > ------------------------------------------------------ > > Key: BEAM-4678 > URL: https://issues.apache.org/jira/browse/BEAM-4678 > Project: Beam > Issue Type: Sub-task > Components: runner-flink > Reporter: Daniel Oliveira > Assignee: Daniel Oliveira > Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > Adjust Flink Runner to support portable combiner lifting as described in the > following doc: > https://s.apache.org/beam-runner-api-combine-model -- This message was sent by Atlassian JIRA (v7.6.3#76005)