[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14605241#comment-14605241
 ] 

Saisai Shao commented on SPARK-8337:
------------------------------------

Hi [~juanrh], I think the best choice is to keep the python programming way 
similar to Scala/Java, here in Java/Scala, we use offsetRange like:

{code}
directKafkaStream.foreachRDD { rdd => 
     val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
     // offsetRanges.length = # of Kafka partitions being consumed
     ...
 }
{code}

It would be better to keep Python the same programming way. Looks like your 
implementation is a different way. From my understanding, you will return the 
offsetRange with each record of KafkaRDD, actually offsetRange is only related 
to RDD, not records of RDD, so maybe a little strange from my point, you have 
to serialize the offsetRange from driver to each executor.

Here is what TD suggested, though still have some details should be figured 
out. I tried a bit but still have something block on the road.

{quote}
I think the way it works is that the Java/Python friendly DStream returned
by Java APIs of KafkaUtils, is wrapped in Python's DStream class in
dstream.py. The foreachRDD of that class uses another Python class
TransformFunction to wrap the JavaRDDs into Python's RDD objects and
applies the user defined python function on them. To allow the wrapped
Python RDDs to have a method called "offsetRanges", you have to
1. Create a custom KafkaRDD Python class (extending to Python's RDD class)
which can wrap a KafkaRDD class. This may actually require defining a
JavaKafkaRDD class
2. Create a custom KafkaTransformFunc Python class (extending Python's
TransformFunc class) which wraps JavaRDDs into Python's KafkaRDD classes,
and applies user's function on those.
3. Create a custom KafkaDStream Python class (extending Python's DStream
class) which overrides transform() and foreachRDD() to use
KafkaTransformFunc instead of TransformFunc.
>From my cursory look, this may work. Think about it.
TD
{quote}


> KafkaUtils.createDirectStream for python is lacking API/feature parity with 
> the Scala/Java version
> --------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-8337
>                 URL: https://issues.apache.org/jira/browse/SPARK-8337
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Streaming
>    Affects Versions: 1.4.0
>            Reporter: Amit Ramesh
>            Priority: Critical
>
> See the following thread for context.
> http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to