Repository: beam
Updated Branches:
  refs/heads/master fe2369933 -> 3473055f2


Fix caching in the Spark streaming, doing the cache update in the streaming 
context

Fix order of call


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ec81468
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ec81468
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ec81468

Branch: refs/heads/master
Commit: 0ec8146848ba3cbb1afb51b5b9919b3ab3db7955
Parents: fe23699
Author: Jean-Baptiste Onofré <jbono...@apache.org>
Authored: Fri Mar 24 15:27:04 2017 +0100
Committer: Amit Sela <amitsel...@gmail.com>
Committed: Sat Mar 25 16:00:09 2017 +0300

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/spark/SparkRunner.java | 8 +-------
 .../streaming/SparkRunnerStreamingContextFactory.java        | 6 ++----
 2 files changed, 3 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0ec81468/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index fc5d4af..97532c4 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -38,7 +38,6 @@ import 
org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
 import 
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
-import 
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator;
 import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarksListener;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
@@ -163,11 +162,6 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
           
JavaStreamingContext.getOrCreate(checkpointDir.getSparkCheckpointDir().toString(),
               contextFactory);
 
-      // update cache candidates
-      translator = new StreamingTransformTranslator.Translator(
-          new TransformTranslator.Translator());
-      updateCacheCandidates(pipeline, translator, 
contextFactory.getEvaluationContext());
-
       // Checkpoint aggregator/metrics values
       jssc.addStreamingListener(
           new JavaStreamingListenerWrapper(
@@ -269,7 +263,7 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
   /**
    * Evaluator that update/populate the cache candidates.
    */
-  private void updateCacheCandidates(
+  public static void updateCacheCandidates(
       Pipeline pipeline,
       SparkPipelineTranslator translator,
       EvaluationContext evaluationContext) {

http://git-wip-us.apache.org/repos/asf/beam/blob/0ec81468/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index c298886..98521e9 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -83,6 +83,8 @@ public class SparkRunnerStreamingContextFactory implements 
JavaStreamingContextF
     SparkRunner.initAccumulators(options, jsc);
 
     ctxt = new EvaluationContext(jsc, pipeline, jssc);
+    // update cache candidates
+    SparkRunner.updateCacheCandidates(pipeline, translator, ctxt);
     pipeline.traverseTopologically(new SparkRunner.Evaluator(translator, 
ctxt));
     ctxt.computeOutputs();
 
@@ -91,10 +93,6 @@ public class SparkRunnerStreamingContextFactory implements 
JavaStreamingContextF
     return jssc;
   }
 
-  public EvaluationContext getEvaluationContext() {
-    return this.ctxt;
-  }
-
   private void checkpoint(JavaStreamingContext jssc) {
     Path rootCheckpointPath = checkpointDir.getRootCheckpointDir();
     Path sparkCheckpointPath = checkpointDir.getSparkCheckpointDir();

Reply via email to