Repository: beam Updated Branches: refs/heads/master fe2369933 -> 3473055f2
Fix caching in the Spark streaming, doing the cache update in the streaming context Fix order of call Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ec81468 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ec81468 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ec81468 Branch: refs/heads/master Commit: 0ec8146848ba3cbb1afb51b5b9919b3ab3db7955 Parents: fe23699 Author: Jean-Baptiste Onofré <jbono...@apache.org> Authored: Fri Mar 24 15:27:04 2017 +0100 Committer: Amit Sela <amitsel...@gmail.com> Committed: Sat Mar 25 16:00:09 2017 +0300 ---------------------------------------------------------------------- .../main/java/org/apache/beam/runners/spark/SparkRunner.java | 8 +------- .../streaming/SparkRunnerStreamingContextFactory.java | 6 ++---- 2 files changed, 3 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0ec81468/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index fc5d4af..97532c4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -38,7 +38,6 @@ import org.apache.beam.runners.spark.translation.TransformEvaluator; import org.apache.beam.runners.spark.translation.TransformTranslator; import org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir; import org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory; -import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarksListener; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; @@ -163,11 +162,6 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(), contextFactory); - // update cache candidates - translator = new StreamingTransformTranslator.Translator( - new TransformTranslator.Translator()); - updateCacheCandidates(pipeline, translator, contextFactory.getEvaluationContext()); - // Checkpoint aggregator/metrics values jssc.addStreamingListener( new JavaStreamingListenerWrapper( @@ -269,7 +263,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> { /** * Evaluator that update/populate the cache candidates. */ - private void updateCacheCandidates( + public static void updateCacheCandidates( Pipeline pipeline, SparkPipelineTranslator translator, EvaluationContext evaluationContext) { http://git-wip-us.apache.org/repos/asf/beam/blob/0ec81468/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index c298886..98521e9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -83,6 +83,8 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF SparkRunner.initAccumulators(options, jsc); ctxt = new EvaluationContext(jsc, pipeline, jssc); + // update cache candidates + SparkRunner.updateCacheCandidates(pipeline, translator, ctxt); pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, ctxt)); ctxt.computeOutputs(); @@ -91,10 +93,6 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF return jssc; } - public EvaluationContext getEvaluationContext() { - return this.ctxt; - } - private void checkpoint(JavaStreamingContext jssc) { Path rootCheckpointPath = checkpointDir.getRootCheckpointDir(); Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir();