Advance watermarks onBatchCompleted hook.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa31f18e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa31f18e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa31f18e

Branch: refs/heads/master
Commit: fa31f18e489d4cbe44fe4a9be7ba3d7dbee7c354
Parents: bbf3744
Author: Sela <ans...@paypal.com>
Authored: Sun Feb 12 18:31:14 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Mon Feb 20 11:30:14 2017 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/spark/SparkRunner.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fa31f18e/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index ebac375..52a080b 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -39,6 +39,7 @@ import 
org.apache.beam.runners.spark.translation.TransformEvaluator;
 import org.apache.beam.runners.spark.translation.TransformTranslator;
 import 
org.apache.beam.runners.spark.translation.streaming.Checkpoint.CheckpointDir;
 import 
org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory;
+import 
org.apache.beam.runners.spark.util.GlobalWatermarkHolder.WatermarksListener;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
@@ -191,12 +192,15 @@ public final class SparkRunner extends 
PipelineRunner<SparkPipelineResult> {
           new JavaStreamingListenerWrapper(
               new MetricsAccumulator.AccumulatorCheckpointingSparkListener()));
 
-      // register listeners.
+      // register user-defined listeners.
       for (JavaStreamingListener listener: 
mOptions.as(SparkContextOptions.class).getListeners()) {
         LOG.info("Registered listener {}." + 
listener.getClass().getSimpleName());
         jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
       }
 
+      // register Watermarks listener to broadcast the advanced WMs.
+      jssc.addStreamingListener(new JavaStreamingListenerWrapper(new 
WatermarksListener(jssc)));
+
       startPipeline = executorService.submit(new Runnable() {
 
         @Override

Reply via email to