Reinstate proto round trip in Java DirectRunner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c0cb28cc Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c0cb28cc Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c0cb28cc Branch: refs/heads/master Commit: c0cb28cc30733f561d4cc6155be5738584956ebd Parents: 3d99d07 Author: Kenn Knowles <k...@kennknowles.com> Authored: Sat Sep 30 10:30:20 2017 -0700 Committer: Kenneth Knowles <k...@apache.org> Committed: Tue Oct 17 12:45:11 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/direct/DirectOptions.java | 8 -------- .../org/apache/beam/runners/direct/DirectRunner.java | 15 ++++++--------- 2 files changed, 6 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c0cb28cc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java index af67306..574ab46 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.direct; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -76,10 +74,4 @@ public interface DirectOptions extends PipelineOptions, ApplicationNameOptions { return Math.max(Runtime.getRuntime().availableProcessors(), MIN_PARALLELISM); } } - - @Experimental(Kind.CORE_RUNNERS_ONLY) - @Default.Boolean(false) - @Description("Control whether toProto/fromProto translations are applied to original Pipeline") - boolean isProtoTranslation(); - void setProtoTranslation(boolean b); } http://git-wip-us.apache.org/repos/asf/beam/blob/c0cb28cc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java index 35d55b1..d041a5a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java @@ -29,6 +29,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.PTransformTranslation; @@ -160,15 +161,11 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> { @Override public DirectPipelineResult run(Pipeline originalPipeline) { Pipeline pipeline; - if (getPipelineOptions().isProtoTranslation()) { - try { - pipeline = PipelineTranslation.fromProto( - PipelineTranslation.toProto(originalPipeline)); - } catch (IOException exception) { - throw new RuntimeException("Error preparing pipeline for direct execution.", exception); - } - } else { - pipeline = originalPipeline; + try { + RunnerApi.Pipeline protoPipeline = PipelineTranslation.toProto(originalPipeline); + pipeline = PipelineTranslation.fromProto(protoPipeline); + } catch (IOException exception) { + throw new RuntimeException("Error preparing pipeline for direct execution.", exception); } pipeline.replaceAll(defaultTransformOverrides()); MetricsEnvironment.setMetricsSupported(true);