Repository: spark Updated Branches: refs/heads/branch-2.3 130641102 -> 32bec6ca3
[SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingListener ## What changes were proposed in this pull request? The `StreamingListener` in PySpark side seems to be lack of `onStreamingStarted` method. This patch adds it and a test for it. This patch also includes a trivial doc improvement for `createDirectStream`. Original PR is #21057. ## How was this patch tested? Added test. Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #21098 from viirya/SPARK-24014. (cherry picked from commit 8bb0df2c65355dfdcd28e362ff661c6c7ebc99c0) Signed-off-by: jerryshao <ss...@hortonworks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32bec6ca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32bec6ca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32bec6ca Branch: refs/heads/branch-2.3 Commit: 32bec6ca3d9e47587c84f928d4166475fe29f596 Parents: 1306411 Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Thu Apr 19 10:00:57 2018 +0800 Committer: jerryshao <ss...@hortonworks.com> Committed: Thu Apr 19 10:01:13 2018 +0800 ---------------------------------------------------------------------- python/pyspark/streaming/kafka.py | 3 ++- python/pyspark/streaming/listener.py | 6 ++++++ python/pyspark/streaming/tests.py | 7 +++++++ 3 files changed, 15 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/32bec6ca/python/pyspark/streaming/kafka.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/kafka.py b/python/pyspark/streaming/kafka.py index fdb9308..ed2e0e7 100644 --- a/python/pyspark/streaming/kafka.py +++ b/python/pyspark/streaming/kafka.py @@ -104,7 +104,8 @@ class KafkaUtils(object): :param topics: list of topic_name to consume. :param kafkaParams: Additional params for Kafka. :param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting - point of the stream. + point of the stream (a dictionary mapping `TopicAndPartition` to + integers). :param keyDecoder: A function used to decode key (default is utf8_decoder). :param valueDecoder: A function used to decode value (default is utf8_decoder). :param messageHandler: A function used to convert KafkaMessageAndMetadata. You can assess http://git-wip-us.apache.org/repos/asf/spark/blob/32bec6ca/python/pyspark/streaming/listener.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/listener.py b/python/pyspark/streaming/listener.py index b830797..d4ecc21 100644 --- a/python/pyspark/streaming/listener.py +++ b/python/pyspark/streaming/listener.py @@ -23,6 +23,12 @@ class StreamingListener(object): def __init__(self): pass + def onStreamingStarted(self, streamingStarted): + """ + Called when the streaming has been started. + """ + pass + def onReceiverStarted(self, receiverStarted): """ Called when a receiver has been started http://git-wip-us.apache.org/repos/asf/spark/blob/32bec6ca/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index ca28c9b..1ec418a 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -507,6 +507,10 @@ class StreamingListenerTests(PySparkStreamingTestCase): self.batchInfosCompleted = [] self.batchInfosStarted = [] self.batchInfosSubmitted = [] + self.streamingStartedTime = [] + + def onStreamingStarted(self, streamingStarted): + self.streamingStartedTime.append(streamingStarted.time) def onBatchSubmitted(self, batchSubmitted): self.batchInfosSubmitted.append(batchSubmitted.batchInfo()) @@ -530,9 +534,12 @@ class StreamingListenerTests(PySparkStreamingTestCase): batchInfosSubmitted = batch_collector.batchInfosSubmitted batchInfosStarted = batch_collector.batchInfosStarted batchInfosCompleted = batch_collector.batchInfosCompleted + streamingStartedTime = batch_collector.streamingStartedTime self.wait_for(batchInfosCompleted, 4) + self.assertEqual(len(streamingStartedTime), 1) + self.assertGreaterEqual(len(batchInfosSubmitted), 4) for info in batchInfosSubmitted: self.assertGreaterEqual(info.batchTime().milliseconds(), 0) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org