Repository: incubator-beam
Updated Branches:
  refs/heads/master a0f649eac -> d790dfe1b


[BEAM-734] Add support for Spark Streaming Listeners.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4b49abcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4b49abcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4b49abcf

Branch: refs/heads/master
Commit: 4b49abcf7d248e033b2bd8435dff031261f35b73
Parents: a0f649e
Author: Sela <ans...@paypal.com>
Authored: Sun Oct 9 13:44:58 2016 +0300
Committer: Sela <ans...@paypal.com>
Committed: Fri Oct 14 12:58:47 2016 +0300

----------------------------------------------------------------------
 .../beam/runners/spark/SparkPipelineOptions.java  | 18 ++++++++++++++++++
 .../SparkRunnerStreamingContextFactory.java       |  8 ++++++++
 2 files changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b49abcf/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
index 7afb68c..4c20b10 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineOptions.java
@@ -19,6 +19,8 @@
 package org.apache.beam.runners.spark;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.beam.sdk.options.ApplicationNameOptions;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -26,6 +28,8 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+
 
 /**
  * Spark runner pipeline options.
@@ -88,4 +92,18 @@ public interface SparkPipelineOptions extends 
PipelineOptions, StreamingOptions,
   @JsonIgnore
   JavaSparkContext getProvidedSparkContext();
   void setProvidedSparkContext(JavaSparkContext jsc);
+
+  @Description("Spark streaming listeners")
+  @Default.InstanceFactory(EmptyListenersList.class)
+  @JsonIgnore
+  List<JavaStreamingListener> getListeners();
+  void setListeners(List<JavaStreamingListener> listeners);
+
+  /** Returns an empty list, top avoid handling null. */
+  static class EmptyListenersList implements 
DefaultValueFactory<List<JavaStreamingListener>> {
+    @Override
+    public List<JavaStreamingListener> create(PipelineOptions options) {
+      return new ArrayList<>();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4b49abcf/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
index b7a407c..79c87fb 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/SparkRunnerStreamingContextFactory.java
@@ -33,6 +33,8 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.Duration;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
+import org.apache.spark.streaming.api.java.JavaStreamingListener;
+import org.apache.spark.streaming.api.java.JavaStreamingListenerWrapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,6 +91,12 @@ public class SparkRunnerStreamingContextFactory implements 
JavaStreamingContextF
     }
     jssc.checkpoint(checkpointDir);
 
+    // register listeners.
+    for (JavaStreamingListener listener: options.getListeners()) {
+      LOG.info("Registered listener {}." + 
listener.getClass().getSimpleName());
+      jssc.addStreamingListener(new JavaStreamingListenerWrapper(listener));
+    }
+
     return jssc;
   }
 

Reply via email to