Reinstate proto round trip in Java DirectRunner

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c0cb28cc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c0cb28cc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c0cb28cc

Branch: refs/heads/master
Commit: c0cb28cc30733f561d4cc6155be5738584956ebd
Parents: 3d99d07
Author: Kenn Knowles <k...@kennknowles.com>
Authored: Sat Sep 30 10:30:20 2017 -0700
Committer: Kenneth Knowles <k...@apache.org>
Committed: Tue Oct 17 12:45:11 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/direct/DirectOptions.java    |  8 --------
 .../org/apache/beam/runners/direct/DirectRunner.java | 15 ++++++---------
 2 files changed, 6 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c0cb28cc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
index af67306..574ab46 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -76,10 +74,4 @@ public interface DirectOptions extends PipelineOptions, 
ApplicationNameOptions {
       return Math.max(Runtime.getRuntime().availableProcessors(), 
MIN_PARALLELISM);
     }
   }
-
-  @Experimental(Kind.CORE_RUNNERS_ONLY)
-  @Default.Boolean(false)
-  @Description("Control whether toProto/fromProto translations are applied to 
original Pipeline")
-  boolean isProtoTranslation();
-  void setProtoTranslation(boolean b);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c0cb28cc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 35d55b1..d041a5a 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -29,6 +29,7 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
@@ -160,15 +161,11 @@ public class DirectRunner extends 
PipelineRunner<DirectPipelineResult> {
   @Override
   public DirectPipelineResult run(Pipeline originalPipeline) {
     Pipeline pipeline;
-    if (getPipelineOptions().isProtoTranslation()) {
-      try {
-        pipeline = PipelineTranslation.fromProto(
-            PipelineTranslation.toProto(originalPipeline));
-      } catch (IOException exception) {
-        throw new RuntimeException("Error preparing pipeline for direct 
execution.", exception);
-      }
-    } else {
-      pipeline = originalPipeline;
+    try {
+      RunnerApi.Pipeline protoPipeline = 
PipelineTranslation.toProto(originalPipeline);
+      pipeline = PipelineTranslation.fromProto(protoPipeline);
+    } catch (IOException exception) {
+      throw new RuntimeException("Error preparing pipeline for direct 
execution.", exception);
     }
     pipeline.replaceAll(defaultTransformOverrides());
     MetricsEnvironment.setMetricsSupported(true);

Reply via email to