Repository: incubator-beam Updated Branches: refs/heads/master a0f649eac -> d790dfe1b
[BEAM-734] Add support for Spark Streaming Listeners. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b49abcf Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b49abcf Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b49abcf Branch: refs/heads/master Commit: 4b49abcf7d248e033b2bd8435dff031261f35b73 Parents: a0f649e Author: Sela <ans...@paypal.com> Authored: Sun Oct 9 13:44:58 2016 +0300 Committer: Sela <ans...@paypal.com> Committed: Fri Oct 14 12:58:47 2016 +0300 ---------------------------------------------------------------------- .../beam/runners/spark/SparkPipelineOptions.java | 18 ++++++++++++++++++ .../SparkRunnerStreamingContextFactory.java | 8 ++++++++ 2 files changed, 26 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b49abcf/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java index 7afb68c..4c20b10 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java @@ -19,6 +19,8 @@ package org.apache.beam.runners.spark; import com.fasterxml.jackson.annotation.JsonIgnore; +import java.util.ArrayList; +import java.util.List; import org.apache.beam.sdk.options.ApplicationNameOptions; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.DefaultValueFactory; @@ -26,6 +28,8 @@ import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.api.java.JavaStreamingListener; + /** * Spark runner pipeline options. @@ -88,4 +92,18 @@ public interface SparkPipelineOptions extends PipelineOptions, StreamingOptions, @JsonIgnore JavaSparkContext getProvidedSparkContext(); void setProvidedSparkContext(JavaSparkContext jsc); + + @Description("Spark streaming listeners") + @Default.InstanceFactory(EmptyListenersList.class) + @JsonIgnore + List<JavaStreamingListener> getListeners(); + void setListeners(List<JavaStreamingListener> listeners); + + /** Returns an empty list, top avoid handling null. */ + static class EmptyListenersList implements DefaultValueFactory<List<JavaStreamingListener>> { + @Override + public List<JavaStreamingListener> create(PipelineOptions options) { + return new ArrayList<>(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b49abcf/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java index b7a407c..79c87fb 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java @@ -33,6 +33,8 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; +import org.apache.spark.streaming.api.java.JavaStreamingListener; +import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,6 +91,12 @@ public class SparkRunnerStreamingContextFactory implements JavaStreamingContextF } jssc.checkpoint(checkpointDir); + // register listeners. + for (JavaStreamingListener listener: options.getListeners()) { + LOG.info("Registered listener {}." + listener.getClass().getSimpleName()); + jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener)); + } + return jssc; }