Repository: spark
Updated Branches:
  refs/heads/master a9a6b80c7 -> dd77e278b


[SPARK-11335][STREAMING] update kafka direct python docs on how to get the 
offset ranges for a KafkaRDD

tdas koeninger

This updates the Spark Streaming + Kafka Integration Guide doc with a working 
method to access the offsets of a `KafkaRDD` through Python.

Author: Nick Evans <m...@nicolasevans.org>

Closes #9289 from manygrams/update_kafka_direct_python_docs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd77e278
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd77e278
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd77e278

Branch: refs/heads/master
Commit: dd77e278b99e45c20fdefb1c795f3c5148d577db
Parents: a9a6b80
Author: Nick Evans <m...@nicolasevans.org>
Authored: Wed Nov 11 13:29:30 2015 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Nov 11 13:29:30 2015 -0800

----------------------------------------------------------------------
 docs/streaming-kafka-integration.md | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dd77e278/docs/streaming-kafka-integration.md
----------------------------------------------------------------------
diff --git a/docs/streaming-kafka-integration.md 
b/docs/streaming-kafka-integration.md
index ab7f011..b00351b 100644
--- a/docs/streaming-kafka-integration.md
+++ b/docs/streaming-kafka-integration.md
@@ -181,7 +181,20 @@ Next, we discuss how to use this approach in your 
streaming application.
                );
        </div>
        <div data-lang="python" markdown="1">
-               Not supported yet
+               offsetRanges = []
+
+               def storeOffsetRanges(rdd):
+                   global offsetRanges
+                   offsetRanges = rdd.offsetRanges()
+                   return rdd
+
+               def printOffsetRanges(rdd):
+                   for o in offsetRanges:
+                       print "%s %s %s %s" % (o.topic, o.partition, 
o.fromOffset, o.untilOffset)
+
+               directKafkaStream\
+                   .transform(storeOffsetRanges)\
+                   .foreachRDD(printOffsetRanges)
        </div>
        </div>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to