Repository: incubator-beam Updated Branches: refs/heads/master 3ad767750 -> 8cc43aa70
[BEAM-851] Determine if the pipeline must be translated into streaming mode (if not set) Now an Evaluator (visitor) detects if there are Unbonded.Read transforms. This approach is based on Flink's PipelineTranslationOptimizer Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cc96b138 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cc96b138 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cc96b138 Branch: refs/heads/master Commit: cc96b1381b6db849adf69daddecf30b9c61acf73 Parents: 3ad7677 Author: Ismaël MejÃa <[email protected]> Authored: Fri Nov 25 14:52:26 2016 +0100 Committer: Ismaël MejÃa <[email protected]> Committed: Sun Nov 27 11:18:12 2016 +0100 ---------------------------------------------------------------------- .../apache/beam/runners/spark/SparkRunner.java | 61 +++++++++++++++++++- .../streaming/StreamingTransformTranslator.java | 2 +- .../streaming/EmptyStreamAssertionTest.java | 2 + .../streaming/FlattenStreamingTest.java | 2 + .../streaming/SimpleStreamingWordCountTest.java | 1 + .../SparkTestPipelineOptionsForStreaming.java | 6 -- 6 files changed, 65 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index e800071..49e0113 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptionsValidator; @@ -120,12 +121,12 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> { mOptions = options; } - @Override public EvaluationResult run(Pipeline pipeline) { try { LOG.info("Executing pipeline using the SparkRunner."); + detectTranslationMode(pipeline); if (mOptions.isStreaming()) { SparkRunnerStreamingContextFactory contextFactory = new SparkRunnerStreamingContextFactory(pipeline, mOptions); @@ -136,7 +137,7 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> { jssc.start(); // if recovering from checkpoint, we have to reconstruct the EvaluationResult instance. - return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sc(), + return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sparkContext(), pipeline, jssc) : contextFactory.getCtxt(); } else { JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions); @@ -168,6 +169,62 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> { } /** + * Detect the translation mode for the pipeline and change options in case streaming + * translation is needed. + * @param pipeline + */ + private void detectTranslationMode(Pipeline pipeline) { + TranslationModeDetector detector = new TranslationModeDetector(); + pipeline.traverseTopologically(detector); + if (detector.getTranslationMode().equals(TranslationMode.STREAMING)) { + // set streaming mode if it's a streaming pipeline + this.mOptions.setStreaming(true); + } + } + + /** + * The translation mode of the Beam Pipeline. + */ + enum TranslationMode { + /** Uses the batch mode. */ + BATCH, + /** Uses the streaming mode. */ + STREAMING + } + + /** + * Traverses the Pipeline to determine the {@link TranslationMode} for this pipeline. + */ + static class TranslationModeDetector extends Pipeline.PipelineVisitor.Defaults { + private static final Logger LOG = LoggerFactory.getLogger(TranslationModeDetector.class); + + private TranslationMode translationMode; + + TranslationModeDetector(TranslationMode defaultMode) { + this.translationMode = defaultMode; + } + + TranslationModeDetector() { + this(TranslationMode.BATCH); + } + + TranslationMode getTranslationMode() { + return translationMode; + } + + @Override + public void visitPrimitiveTransform(TransformTreeNode node) { + if (translationMode.equals(TranslationMode.BATCH)) { + Class<? extends PTransform> transformClass = node.getTransform().getClass(); + if (transformClass == Read.Unbounded.class) { + LOG.info("Found {}. Switching to streaming execution.", transformClass); + translationMode = TranslationMode.STREAMING; + } + } + } + } + + /** * Evaluator on the pipeline. */ public static class Evaluator extends Pipeline.PipelineVisitor.Defaults { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index b30f079..6ed5b55 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -466,7 +466,7 @@ final class StreamingTransformTranslator { @SuppressWarnings("unchecked") TransformEvaluator<TransformT> transformEvaluator = (TransformEvaluator<TransformT>) EVALUATORS.get(clazz); checkState(transformEvaluator != null, - "No TransformEvaluator registered for for UNBOUNDED transform %s", clazz); + "No TransformEvaluator registered for UNBOUNDED transform %s", clazz); return transformEvaluator; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java index d40bcff..656107a 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java @@ -57,6 +57,8 @@ public class EmptyStreamAssertionTest implements Serializable { @Test public void testAssertion() throws Exception { SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); + options.setStreaming(true); + Duration windowDuration = new Duration(options.getBatchIntervalMillis()); Pipeline pipeline = Pipeline.create(options); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index 3e75b18..d36796a 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -63,6 +63,7 @@ public class FlattenStreamingTest { @Test public void testFlattenUnbounded() throws Exception { SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); + options.setStreaming(true); Pipeline p = Pipeline.create(options); PCollection<String> w1 = @@ -82,6 +83,7 @@ public class FlattenStreamingTest { @Test public void testFlattenBoundedUnbounded() throws Exception { SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir); + options.setStreaming(true); Pipeline p = Pipeline.create(options); PCollection<String> w1 = http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index 9a15ff2..3734cf6 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -64,6 +64,7 @@ public class SimpleStreamingWordCountTest implements Serializable { @Test public void testFixedWindows() throws Exception { SparkPipelineOptions options = pipelineOptions.withTmpCheckpointDir(checkpointParentDir); + options.setStreaming(true); // override defaults options.setBatchIntervalMillis(BATCH_INTERVAL.getMillis()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cc96b138/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java index f74c74a..28f6d5d 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/SparkTestPipelineOptionsForStreaming.java @@ -28,12 +28,6 @@ import org.junit.rules.TemporaryFolder; */ public class SparkTestPipelineOptionsForStreaming extends SparkTestPipelineOptions { - @Override - protected void before() throws Throwable { - super.before(); - options.setStreaming(true); - } - public SparkPipelineOptions withTmpCheckpointDir(TemporaryFolder parent) throws IOException { // tests use JUnit's TemporaryFolder path in the form of: /.../junit/...
