Repository: incubator-beam
Updated Branches:
  refs/heads/master 3ad767750 -> 8cc43aa70


[BEAM-851] Determine if the pipeline must be translated into streaming mode (if 
not set)

Now an Evaluator (visitor) detects if there are Unbonded.Read transforms.
This approach is based on Flink's PipelineTranslationOptimizer


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc96b138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc96b138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc96b138

Branch: refs/heads/master
Commit: cc96b1381b6db849adf69daddecf30b9c61acf73
Parents: 3ad7677
Author: Ismaël Mejía <[email protected]>
Authored: Fri Nov 25 14:52:26 2016 +0100
Committer: Ismaël Mejía <[email protected]>
Committed: Sun Nov 27 11:18:12 2016 +0100

----------------------------------------------------------------------
 .../apache/beam/runners/spark/SparkRunner.java  | 61 +++++++++++++++++++-
 .../streaming/StreamingTransformTranslator.java |  2 +-
 .../streaming/EmptyStreamAssertionTest.java     |  2 +
 .../streaming/FlattenStreamingTest.java         |  2 +
 .../streaming/SimpleStreamingWordCountTest.java |  1 +
 .../SparkTestPipelineOptionsForStreaming.java   |  6 --
 6 files changed, 65 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index e800071..49e0113 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -27,6 +27,7 @@ import 
org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
@@ -120,12 +121,12 @@ public final class SparkRunner extends 
PipelineRunner<EvaluationResult> {
     mOptions = options;
   }
 
-
   @Override
   public EvaluationResult run(Pipeline pipeline) {
     try {
       LOG.info("Executing pipeline using the SparkRunner.");
 
+      detectTranslationMode(pipeline);
       if (mOptions.isStreaming()) {
         SparkRunnerStreamingContextFactory contextFactory =
             new SparkRunnerStreamingContextFactory(pipeline, mOptions);
@@ -136,7 +137,7 @@ public final class SparkRunner extends 
PipelineRunner<EvaluationResult> {
         jssc.start();
 
         // if recovering from checkpoint, we have to reconstruct the 
EvaluationResult instance.
-        return contextFactory.getCtxt() == null ? new 
EvaluationContext(jssc.sc(),
+        return contextFactory.getCtxt() == null ? new 
EvaluationContext(jssc.sparkContext(),
             pipeline, jssc) : contextFactory.getCtxt();
       } else {
         JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
@@ -168,6 +169,62 @@ public final class SparkRunner extends 
PipelineRunner<EvaluationResult> {
   }
 
   /**
+   * Detect the translation mode for the pipeline and change options in case 
streaming
+   * translation is needed.
+   * @param pipeline
+   */
+  private void detectTranslationMode(Pipeline pipeline) {
+    TranslationModeDetector detector = new TranslationModeDetector();
+    pipeline.traverseTopologically(detector);
+    if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) {
+      // set streaming mode if it's a streaming pipeline
+      this.mOptions.setStreaming(true);
+    }
+  }
+
+  /**
+   * The translation mode of the Beam Pipeline.
+   */
+  enum TranslationMode {
+    /** Uses the batch mode. */
+    BATCH,
+    /** Uses the streaming mode. */
+    STREAMING
+  }
+
+  /**
+   * Traverses the Pipeline to determine the {@link TranslationMode} for this 
pipeline.
+   */
+  static class TranslationModeDetector extends 
Pipeline.PipelineVisitor.Defaults {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TranslationModeDetector.class);
+
+    private TranslationMode translationMode;
+
+    TranslationModeDetector(TranslationMode defaultMode) {
+      this.translationMode = defaultMode;
+    }
+
+    TranslationModeDetector() {
+      this(TranslationMode.BATCH);
+    }
+
+    TranslationMode getTranslationMode() {
+      return translationMode;
+    }
+
+    @Override
+    public void visitPrimitiveTransform(TransformTreeNode node) {
+      if (translationMode.equals(TranslationMode.BATCH)) {
+        Class<? extends PTransform> transformClass = 
node.getTransform().getClass();
+        if (transformClass == Read.Unbounded.class) {
+          LOG.info("Found {}. Switching to streaming execution.", 
transformClass);
+          translationMode = TranslationMode.STREAMING;
+        }
+      }
+    }
+  }
+
+  /**
    * Evaluator on the pipeline.
    */
   public static class Evaluator extends Pipeline.PipelineVisitor.Defaults {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
index b30f079..6ed5b55 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java
@@ -466,7 +466,7 @@ final class StreamingTransformTranslator {
       @SuppressWarnings("unchecked") TransformEvaluator<TransformT> 
transformEvaluator =
           (TransformEvaluator<TransformT>) EVALUATORS.get(clazz);
       checkState(transformEvaluator != null,
-          "No TransformEvaluator registered for for UNBOUNDED transform %s", 
clazz);
+          "No TransformEvaluator registered for UNBOUNDED transform %s", 
clazz);
       return transformEvaluator;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
index d40bcff..656107a 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
@@ -57,6 +57,8 @@ public class EmptyStreamAssertionTest implements Serializable 
{
   @Test
   public void testAssertion() throws Exception {
     SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
+    options.setStreaming(true);
+
     Duration windowDuration = new Duration(options.getBatchIntervalMillis());
 
     Pipeline pipeline = Pipeline.create(options);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
index 3e75b18..d36796a 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java
@@ -63,6 +63,7 @@ public class FlattenStreamingTest {
   @Test
   public void testFlattenUnbounded() throws Exception {
     SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
+    options.setStreaming(true);
 
     Pipeline p = Pipeline.create(options);
     PCollection<String> w1 =
@@ -82,6 +83,7 @@ public class FlattenStreamingTest {
   @Test
   public void testFlattenBoundedUnbounded() throws Exception {
     SparkPipelineOptions options = 
commonOptions.withTmpCheckpointDir(checkpointParentDir);
+    options.setStreaming(true);
 
     Pipeline p = Pipeline.create(options);
     PCollection<String> w1 =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
index 9a15ff2..3734cf6 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java
@@ -64,6 +64,7 @@ public class SimpleStreamingWordCountTest implements 
Serializable {
   @Test
   public void testFixedWindows() throws Exception {
     SparkPipelineOptions options = 
pipelineOptions.withTmpCheckpointDir(checkpointParentDir);
+    options.setStreaming(true);
 
     // override defaults
     options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
index f74c74a..28f6d5d 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java
@@ -28,12 +28,6 @@ import org.junit.rules.TemporaryFolder;
  */
 public class SparkTestPipelineOptionsForStreaming extends 
SparkTestPipelineOptions {
 
-  @Override
-  protected void before() throws Throwable {
-    super.before();
-    options.setStreaming(true);
-  }
-
   public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent)
       throws IOException {
     // tests use JUnit's TemporaryFolder path in the form of: /.../junit/...

Reply via email to