This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch dont-fuse in repository https://gitbox.apache.org/repos/asf/beam.git
commit 091c4781678ff1d2fbe3a7a685bb291125c97315 Author: Kyle Weaver <kcwea...@google.com> AuthorDate: Mon May 13 13:46:19 2019 -0700 [BEAM-7282] portable Spark: don't fuse already optimized graph --- .../org/apache/beam/runners/spark/SparkPipelineRunner.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java index 0e8a649..b0a1063 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java @@ -25,6 +25,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline; +import org.apache.beam.runners.core.construction.graph.ExecutableStage; import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser; import org.apache.beam.runners.core.construction.graph.PipelineTrimmer; import org.apache.beam.runners.core.metrics.MetricsPusher; @@ -58,7 +59,14 @@ public class SparkPipelineRunner implements PortablePipelineRunner { // Don't let the fuser fuse any subcomponents of native transforms. Pipeline trimmedPipeline = PipelineTrimmer.trim(pipeline, translator.knownUrns()); - Pipeline fusedPipeline = GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline(); + + // Fused pipeline proto. + // TODO: Consider supporting partially-fused graphs. + RunnerApi.Pipeline fusedPipeline = + trimmedPipeline.getComponents().getTransformsMap().values().stream() + .anyMatch(proto -> ExecutableStage.URN.equals(proto.getSpec().getUrn())) + ? trimmedPipeline + : GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline(); if (pipelineOptions.getFilesToStage() == null) { pipelineOptions.setFilesToStage(