[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14605241#comment-14605241 ]
Saisai Shao commented on SPARK-8337: ------------------------------------ Hi [~juanrh], I think the best choice is to keep the python programming way similar to Scala/Java, here in Java/Scala, we use offsetRange like: {code} directKafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges] // offsetRanges.length = # of Kafka partitions being consumed ... } {code} It would be better to keep Python the same programming way. Looks like your implementation is a different way. From my understanding, you will return the offsetRange with each record of KafkaRDD, actually offsetRange is only related to RDD, not records of RDD, so maybe a little strange from my point, you have to serialize the offsetRange from driver to each executor. Here is what TD suggested, though still have some details should be figured out. I tried a bit but still have something block on the road. {quote} I think the way it works is that the Java/Python friendly DStream returned by Java APIs of KafkaUtils, is wrapped in Python's DStream class in dstream.py. The foreachRDD of that class uses another Python class TransformFunction to wrap the JavaRDDs into Python's RDD objects and applies the user defined python function on them. To allow the wrapped Python RDDs to have a method called "offsetRanges", you have to 1. Create a custom KafkaRDD Python class (extending to Python's RDD class) which can wrap a KafkaRDD class. This may actually require defining a JavaKafkaRDD class 2. Create a custom KafkaTransformFunc Python class (extending Python's TransformFunc class) which wraps JavaRDDs into Python's KafkaRDD classes, and applies user's function on those. 3. Create a custom KafkaDStream Python class (extending Python's DStream class) which overrides transform() and foreachRDD() to use KafkaTransformFunc instead of TransformFunc. >From my cursory look, this may work. Think about it. TD {quote} > KafkaUtils.createDirectStream for python is lacking API/feature parity with > the Scala/Java version > -------------------------------------------------------------------------------------------------- > > Key: SPARK-8337 > URL: https://issues.apache.org/jira/browse/SPARK-8337 > Project: Spark > Issue Type: Bug > Components: PySpark, Streaming > Affects Versions: 1.4.0 > Reporter: Amit Ramesh > Priority: Critical > > See the following thread for context. > http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org