Repository: spark
Updated Branches:
  refs/heads/branch-2.3 130641102 -> 32bec6ca3


[SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener

## What changes were proposed in this pull request?

The `StreamingListener` in PySpark side seems to be lack of 
`onStreamingStarted` method. This patch adds it and a test for it.

This patch also includes a trivial doc improvement for `createDirectStream`.

Original PR is #21057.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <vii...@gmail.com>

Closes #21098 from viirya/SPARK-24014.

(cherry picked from commit 8bb0df2c65355dfdcd28e362ff661c6c7ebc99c0)
Signed-off-by: jerryshao <ss...@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32bec6ca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32bec6ca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32bec6ca

Branch: refs/heads/branch-2.3
Commit: 32bec6ca3d9e47587c84f928d4166475fe29f596
Parents: 1306411
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Thu Apr 19 10:00:57 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Thu Apr 19 10:01:13 2018 +0800

----------------------------------------------------------------------
 python/pyspark/streaming/kafka.py    | 3 ++-
 python/pyspark/streaming/listener.py | 6 ++++++
 python/pyspark/streaming/tests.py    | 7 +++++++
 3 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/32bec6ca/python/pyspark/streaming/kafka.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/kafka.py 
b/python/pyspark/streaming/kafka.py
index fdb9308..ed2e0e7 100644
--- a/python/pyspark/streaming/kafka.py
+++ b/python/pyspark/streaming/kafka.py
@@ -104,7 +104,8 @@ class KafkaUtils(object):
         :param topics:  list of topic_name to consume.
         :param kafkaParams: Additional params for Kafka.
         :param fromOffsets: Per-topic/partition Kafka offsets defining the 
(inclusive) starting
-                            point of the stream.
+                            point of the stream (a dictionary mapping 
`TopicAndPartition` to
+                            integers).
         :param keyDecoder:  A function used to decode key (default is 
utf8_decoder).
         :param valueDecoder:  A function used to decode value (default is 
utf8_decoder).
         :param messageHandler: A function used to convert 
KafkaMessageAndMetadata. You can assess

http://git-wip-us.apache.org/repos/asf/spark/blob/32bec6ca/python/pyspark/streaming/listener.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/listener.py 
b/python/pyspark/streaming/listener.py
index b830797..d4ecc21 100644
--- a/python/pyspark/streaming/listener.py
+++ b/python/pyspark/streaming/listener.py
@@ -23,6 +23,12 @@ class StreamingListener(object):
     def __init__(self):
         pass
 
+    def onStreamingStarted(self, streamingStarted):
+        """
+        Called when the streaming has been started.
+        """
+        pass
+
     def onReceiverStarted(self, receiverStarted):
         """
         Called when a receiver has been started

http://git-wip-us.apache.org/repos/asf/spark/blob/32bec6ca/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index ca28c9b..1ec418a 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -507,6 +507,10 @@ class StreamingListenerTests(PySparkStreamingTestCase):
             self.batchInfosCompleted = []
             self.batchInfosStarted = []
             self.batchInfosSubmitted = []
+            self.streamingStartedTime = []
+
+        def onStreamingStarted(self, streamingStarted):
+            self.streamingStartedTime.append(streamingStarted.time)
 
         def onBatchSubmitted(self, batchSubmitted):
             self.batchInfosSubmitted.append(batchSubmitted.batchInfo())
@@ -530,9 +534,12 @@ class StreamingListenerTests(PySparkStreamingTestCase):
         batchInfosSubmitted = batch_collector.batchInfosSubmitted
         batchInfosStarted = batch_collector.batchInfosStarted
         batchInfosCompleted = batch_collector.batchInfosCompleted
+        streamingStartedTime = batch_collector.streamingStartedTime
 
         self.wait_for(batchInfosCompleted, 4)
 
+        self.assertEqual(len(streamingStartedTime), 1)
+
         self.assertGreaterEqual(len(batchInfosSubmitted), 4)
         for info in batchInfosSubmitted:
             self.assertGreaterEqual(info.batchTime().milliseconds(), 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to