Repository: spark
Updated Branches:
  refs/heads/master de5e531d3 -> ace0db471


[SPARK-6328][PYTHON] Python API for StreamingListener

Author: Daniel Jalova <djal...@us.ibm.com>

Closes #9186 from djalova/SPARK-6328.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ace0db47
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ace0db47
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ace0db47

Branch: refs/heads/master
Commit: ace0db47141ffd457c2091751038fc291f6d5a8b
Parents: de5e531
Author: Daniel Jalova <djal...@us.ibm.com>
Authored: Mon Nov 16 11:29:27 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Nov 16 11:29:27 2015 -0800

----------------------------------------------------------------------
 python/pyspark/streaming/__init__.py            |   3 +-
 python/pyspark/streaming/context.py             |   8 ++
 python/pyspark/streaming/listener.py            |  75 +++++++++++
 python/pyspark/streaming/tests.py               | 126 ++++++++++++++++++-
 .../api/java/JavaStreamingListener.scala        |  76 +++++++++++
 5 files changed, 286 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ace0db47/python/pyspark/streaming/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/__init__.py 
b/python/pyspark/streaming/__init__.py
index d2644a1..66e8f8e 100644
--- a/python/pyspark/streaming/__init__.py
+++ b/python/pyspark/streaming/__init__.py
@@ -17,5 +17,6 @@
 
 from pyspark.streaming.context import StreamingContext
 from pyspark.streaming.dstream import DStream
+from pyspark.streaming.listener import StreamingListener
 
-__all__ = ['StreamingContext', 'DStream']
+__all__ = ['StreamingContext', 'DStream', 'StreamingListener']

http://git-wip-us.apache.org/repos/asf/spark/blob/ace0db47/python/pyspark/streaming/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index 8be56c9..1388b6d 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -363,3 +363,11 @@ class StreamingContext(object):
         first = dstreams[0]
         jrest = [d._jdstream for d in dstreams[1:]]
         return DStream(self._jssc.union(first._jdstream, jrest), self, 
first._jrdd_deserializer)
+
+    def addStreamingListener(self, streamingListener):
+        """
+        Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] 
object for
+        receiving system events related to streaming.
+        """
+        self._jssc.addStreamingListener(self._jvm.JavaStreamingListenerWrapper(
+            self._jvm.PythonStreamingListenerWrapper(streamingListener)))

http://git-wip-us.apache.org/repos/asf/spark/blob/ace0db47/python/pyspark/streaming/listener.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/listener.py 
b/python/pyspark/streaming/listener.py
new file mode 100644
index 0000000..b830797
--- /dev/null
+++ b/python/pyspark/streaming/listener.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = ["StreamingListener"]
+
+
+class StreamingListener(object):
+
+    def __init__(self):
+        pass
+
+    def onReceiverStarted(self, receiverStarted):
+        """
+        Called when a receiver has been started
+        """
+        pass
+
+    def onReceiverError(self, receiverError):
+        """
+        Called when a receiver has reported an error
+        """
+        pass
+
+    def onReceiverStopped(self, receiverStopped):
+        """
+        Called when a receiver has been stopped
+        """
+        pass
+
+    def onBatchSubmitted(self, batchSubmitted):
+        """
+        Called when a batch of jobs has been submitted for processing.
+        """
+        pass
+
+    def onBatchStarted(self, batchStarted):
+        """
+        Called when processing of a batch of jobs has started.
+        """
+        pass
+
+    def onBatchCompleted(self, batchCompleted):
+        """
+        Called when processing of a batch of jobs has completed.
+        """
+        pass
+
+    def onOutputOperationStarted(self, outputOperationStarted):
+        """
+        Called when processing of a job of a batch has started.
+        """
+        pass
+
+    def onOutputOperationCompleted(self, outputOperationCompleted):
+        """
+        Called when processing of a job of a batch has completed
+        """
+        pass
+
+    class Java:
+        implements = 
["org.apache.spark.streaming.api.java.PythonStreamingListener"]

http://git-wip-us.apache.org/repos/asf/spark/blob/ace0db47/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 6ee864d..2983028 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -48,6 +48,7 @@ from pyspark.streaming.kafka import Broker, KafkaUtils, 
OffsetRange, TopicAndPar
 from pyspark.streaming.flume import FlumeUtils
 from pyspark.streaming.mqtt import MQTTUtils
 from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
+from pyspark.streaming.listener import StreamingListener
 
 
 class PySparkStreamingTestCase(unittest.TestCase):
@@ -403,6 +404,128 @@ class BasicOperationTests(PySparkStreamingTestCase):
         self._test_func(input, func, expected)
 
 
+class StreamingListenerTests(PySparkStreamingTestCase):
+
+    duration = .5
+
+    class BatchInfoCollector(StreamingListener):
+
+        def __init__(self):
+            super(StreamingListener, self).__init__()
+            self.batchInfosCompleted = []
+            self.batchInfosStarted = []
+            self.batchInfosSubmitted = []
+
+        def onBatchSubmitted(self, batchSubmitted):
+            self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
+
+        def onBatchStarted(self, batchStarted):
+            self.batchInfosStarted.append(batchStarted.batchInfo())
+
+        def onBatchCompleted(self, batchCompleted):
+            self.batchInfosCompleted.append(batchCompleted.batchInfo())
+
+    def test_batch_info_reports(self):
+        batch_collector = self.BatchInfoCollector()
+        self.ssc.addStreamingListener(batch_collector)
+        input = [[1], [2], [3], [4]]
+
+        def func(dstream):
+            return dstream.map(int)
+        expected = [[1], [2], [3], [4]]
+        self._test_func(input, func, expected)
+
+        batchInfosSubmitted = batch_collector.batchInfosSubmitted
+        batchInfosStarted = batch_collector.batchInfosStarted
+        batchInfosCompleted = batch_collector.batchInfosCompleted
+
+        self.wait_for(batchInfosCompleted, 4)
+
+        self.assertGreaterEqual(len(batchInfosSubmitted), 4)
+        for info in batchInfosSubmitted:
+            self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
+            self.assertGreaterEqual(info.submissionTime(), 0)
+
+            for streamId in info.streamIdToInputInfo():
+                streamInputInfo = info.streamIdToInputInfo()[streamId]
+                self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
+                self.assertGreaterEqual(streamInputInfo.numRecords, 0)
+                for key in streamInputInfo.metadata():
+                    self.assertIsNotNone(streamInputInfo.metadata()[key])
+                self.assertIsNotNone(streamInputInfo.metadataDescription())
+
+            for outputOpId in info.outputOperationInfos():
+                outputInfo = info.outputOperationInfos()[outputOpId]
+                self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 
0)
+                self.assertGreaterEqual(outputInfo.id(), 0)
+                self.assertIsNotNone(outputInfo.name())
+                self.assertIsNotNone(outputInfo.description())
+                self.assertGreaterEqual(outputInfo.startTime(), -1)
+                self.assertGreaterEqual(outputInfo.endTime(), -1)
+                self.assertIsNone(outputInfo.failureReason())
+
+            self.assertEqual(info.schedulingDelay(), -1)
+            self.assertEqual(info.processingDelay(), -1)
+            self.assertEqual(info.totalDelay(), -1)
+            self.assertEqual(info.numRecords(), 0)
+
+        self.assertGreaterEqual(len(batchInfosStarted), 4)
+        for info in batchInfosStarted:
+            self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
+            self.assertGreaterEqual(info.submissionTime(), 0)
+
+            for streamId in info.streamIdToInputInfo():
+                streamInputInfo = info.streamIdToInputInfo()[streamId]
+                self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
+                self.assertGreaterEqual(streamInputInfo.numRecords, 0)
+                for key in streamInputInfo.metadata():
+                    self.assertIsNotNone(streamInputInfo.metadata()[key])
+                self.assertIsNotNone(streamInputInfo.metadataDescription())
+
+            for outputOpId in info.outputOperationInfos():
+                outputInfo = info.outputOperationInfos()[outputOpId]
+                self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 
0)
+                self.assertGreaterEqual(outputInfo.id(), 0)
+                self.assertIsNotNone(outputInfo.name())
+                self.assertIsNotNone(outputInfo.description())
+                self.assertGreaterEqual(outputInfo.startTime(), -1)
+                self.assertGreaterEqual(outputInfo.endTime(), -1)
+                self.assertIsNone(outputInfo.failureReason())
+
+            self.assertGreaterEqual(info.schedulingDelay(), 0)
+            self.assertEqual(info.processingDelay(), -1)
+            self.assertEqual(info.totalDelay(), -1)
+            self.assertEqual(info.numRecords(), 0)
+
+        self.assertGreaterEqual(len(batchInfosCompleted), 4)
+        for info in batchInfosCompleted:
+            self.assertGreaterEqual(info.batchTime().milliseconds(), 0)
+            self.assertGreaterEqual(info.submissionTime(), 0)
+
+            for streamId in info.streamIdToInputInfo():
+                streamInputInfo = info.streamIdToInputInfo()[streamId]
+                self.assertGreaterEqual(streamInputInfo.inputStreamId(), 0)
+                self.assertGreaterEqual(streamInputInfo.numRecords, 0)
+                for key in streamInputInfo.metadata():
+                    self.assertIsNotNone(streamInputInfo.metadata()[key])
+                self.assertIsNotNone(streamInputInfo.metadataDescription())
+
+            for outputOpId in info.outputOperationInfos():
+                outputInfo = info.outputOperationInfos()[outputOpId]
+                self.assertGreaterEqual(outputInfo.batchTime().milliseconds(), 
0)
+                self.assertGreaterEqual(outputInfo.id(), 0)
+                self.assertIsNotNone(outputInfo.name())
+                self.assertIsNotNone(outputInfo.description())
+                self.assertGreaterEqual(outputInfo.startTime(), 0)
+                self.assertGreaterEqual(outputInfo.endTime(), 0)
+                self.assertIsNone(outputInfo.failureReason())
+
+            self.assertGreaterEqual(info.schedulingDelay(), 0)
+            self.assertGreaterEqual(info.processingDelay(), 0)
+            self.assertGreaterEqual(info.totalDelay(), 0)
+            self.assertEqual(info.numRecords(), 0)
+
+
 class WindowFunctionTests(PySparkStreamingTestCase):
 
     timeout = 15
@@ -1308,7 +1431,8 @@ if __name__ == "__main__":
 
     os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
     testcases = [BasicOperationTests, WindowFunctionTests, 
StreamingContextTests, CheckpointTests,
-                 KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, 
MQTTStreamTests]
+                 KafkaStreamTests, FlumeStreamTests, FlumePollingStreamTests, 
MQTTStreamTests,
+                 StreamingListenerTests]
 
     if kinesis_jar_present is True:
         testcases.append(KinesisStreamTests)

http://git-wip-us.apache.org/repos/asf/spark/blob/ace0db47/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
index 3442907..7bfd6bd 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
@@ -18,6 +18,82 @@
 package org.apache.spark.streaming.api.java
 
 import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.scheduler.StreamingListener
+
+private[streaming] trait PythonStreamingListener{
+
+  /** Called when a receiver has been started */
+  def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted) 
{ }
+
+  /** Called when a receiver has reported an error */
+  def onReceiverError(receiverError: JavaStreamingListenerReceiverError) { }
+
+  /** Called when a receiver has been stopped */
+  def onReceiverStopped(receiverStopped: JavaStreamingListenerReceiverStopped) 
{ }
+
+  /** Called when a batch of jobs has been submitted for processing. */
+  def onBatchSubmitted(batchSubmitted: JavaStreamingListenerBatchSubmitted) { }
+
+  /** Called when processing of a batch of jobs has started.  */
+  def onBatchStarted(batchStarted: JavaStreamingListenerBatchStarted) { }
+
+  /** Called when processing of a batch of jobs has completed. */
+  def onBatchCompleted(batchCompleted: JavaStreamingListenerBatchCompleted) { }
+
+  /** Called when processing of a job of a batch has started. */
+  def onOutputOperationStarted(
+      outputOperationStarted: JavaStreamingListenerOutputOperationStarted) { }
+
+  /** Called when processing of a job of a batch has completed. */
+  def onOutputOperationCompleted(
+      outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted) 
{ }
+}
+
+private[streaming] class PythonStreamingListenerWrapper(listener: 
PythonStreamingListener)
+  extends JavaStreamingListener {
+
+  /** Called when a receiver has been started */
+  override def onReceiverStarted(receiverStarted: 
JavaStreamingListenerReceiverStarted): Unit = {
+    listener.onReceiverStarted(receiverStarted)
+  }
+
+  /** Called when a receiver has reported an error */
+  override def onReceiverError(receiverError: 
JavaStreamingListenerReceiverError): Unit = {
+    listener.onReceiverError(receiverError)
+  }
+
+  /** Called when a receiver has been stopped */
+  override def onReceiverStopped(receiverStopped: 
JavaStreamingListenerReceiverStopped): Unit = {
+    listener.onReceiverStopped(receiverStopped)
+  }
+
+  /** Called when a batch of jobs has been submitted for processing. */
+  override def onBatchSubmitted(batchSubmitted: 
JavaStreamingListenerBatchSubmitted): Unit = {
+    listener.onBatchSubmitted(batchSubmitted)
+  }
+
+  /** Called when processing of a batch of jobs has started.  */
+  override def onBatchStarted(batchStarted: 
JavaStreamingListenerBatchStarted): Unit = {
+    listener.onBatchStarted(batchStarted)
+  }
+
+  /** Called when processing of a batch of jobs has completed. */
+  override def onBatchCompleted(batchCompleted: 
JavaStreamingListenerBatchCompleted): Unit = {
+    listener.onBatchCompleted(batchCompleted)
+  }
+
+  /** Called when processing of a job of a batch has started. */
+  override def onOutputOperationStarted(
+    outputOperationStarted: JavaStreamingListenerOutputOperationStarted): Unit 
= {
+      listener.onOutputOperationStarted(outputOperationStarted)
+  }
+
+  /** Called when processing of a job of a batch has completed. */
+  override def onOutputOperationCompleted(
+    outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted): 
Unit = {
+      listener.onOutputOperationCompleted(outputOperationCompleted)
+  }
+}
 
 /**
  * A listener interface for receiving information about an ongoing streaming  
computation.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to