[ 
https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14604633#comment-14604633
 ] 

Juan Rodríguez Hortalá edited comment on SPARK-8337 at 6/28/15 6:16 PM:
------------------------------------------------------------------------

Hi, 

I have worked a bit on the OffsetRange way, you can access the code at 
https://github.com/juanrh/spark/commit/56fbd5c38bd30b825a7818f1c56abb1f8b2beaff.
 I have added the following method to pyspark KafkaUtils

{code}
@staticmethod
    def getOffsetRanges(rdd):
        scalaRdd = rdd._jrdd.rdd()
        offsetRangesArray = scalaRdd.offsetRanges()
        return [ OffsetRange(topic = offsetRange.topic(),
                             partition = offsetRange.partition(), 
                             fromOffset = offsetRange.fromOffset(), 
                             untilOffset = offsetRange.untilOffset())
                    for offsetRange in offsetRangesArray]
{code}

This method is used in KafkaUtils.createDirectStreamJB, which is based on the 
original KafkaUtilsPythonHelper.createDirectStream. The main problem I have is 
that I don't  know where to store the OffsetRange objects. The naive trick of 
adding them to the __dict__ of each python RDD object doesn't work, the new 
field is lost in the pyspark wrappers. So the new method createDirectStreamJB 
takes two additional options, one for performing an action on the OffsetRange 
list, and another for adding it to each record of the DStream

{code}
def createDirectStreamJB(ssc, topics, kafkaParams, fromOffsets={},
                                          keyDecoder=utf8_decoder, 
valueDecoder=utf8_decoder,       
                                         offsetRangeForeach=None, 
addOffsetRange=False):
        """
        FIXME: temporary working placeholder
        :param offsetRangeForeach: if different to None, this function should 
be a function from a list of OffsetRange to None, and is applied to the 
OffsetRange
            list of each rdd
        :param addOffsetRange: if False (default) output records are of the 
shape (kafkaKey, kafkaValue); if True output records are of the shape 
(offsetRange, (kafkaKey, kafkaValue)) for offsetRange the OffsetRange value for 
the Spark partition for the record
{code}

This is an example of using createDirectStreamJB:

{code}
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc, 1)
topics = ["test"]
kafkaParams = {"metadata.broker.list" : "localhost:9092"}
def offsetRangeForeach(offsetRangeList):
    print 
    print 
    for offsetRange in offsetRangeList:
        print offsetRange
    print 
    print 

kafkaStream = KafkaUtils.createDirectStreamJB(ssc, topics, kafkaParams, 
offsetRangeForeach=offsetRangeForeach, addOffsetRange=True)

# OffsetRange printed as <pyspark.streaming.kafka.OffsetRange object at 
0x7f2fdc045950>, I guess due to some kind of pyspark proxy 
kafkaStrStream = kafkaStream.map(lambda (offRan, (k, v)) :  
str(offRan._fromOffset) + " " + str(offRan._untilOffset) + " " + str(k) + " " + 
str(v))
# kafkaStream.pprint()
kafkaStrStream.pprint()
ssc.start()
ssc.awaitTermination(timeout=5)
{code}

which gets the following output

{code}
15/06/28 12:36:03 INFO InputInfoTracker: remove old batch metadata: 
1435487761000 ms


OffsetRange(topic=test, partition=0, fromOffset=178, untilOffset=179)


15/06/28 12:36:04 INFO JobScheduler: Added jobs for time 1435487764000 ms
...
15/06/28 12:36:04 INFO DAGScheduler: Job 4 finished: runJob at 
PythonRDD.scala:366, took 0,075387 s
-------------------------------------------
Time: 2015-06-28 12:36:04
-------------------------------------------
178 179 None hola
()
15/06/28 12:36:04 INFO JobScheduler: Finished job streaming job 1435487764000 
ms.0 from job set of time 1435487764000 ms
...
15/06/28 12:36:05 INFO BlockManager: Removing RDD 12


OffsetRange(topic=test, partition=0, fromOffset=179, untilOffset=180)


15/06/28 12:36:06 INFO JobScheduler: Starting job streaming job 1435487766000 
ms.0 from job set of time 1435487766000 ms
....
15/06/28 12:36:06 INFO DAGScheduler: Job 6 finished: start at 
NativeMethodAccessorImpl.java:-2, took 0,077993 s
-------------------------------------------
Time: 2015-06-28 12:36:06
-------------------------------------------
179 180 None caracola
()
15/06/28 12:36:06 INFO JobScheduler: Finished job streaming job 1435487766000 
ms.0 from job set of time 1435487766000 ms
{code}

Any thoughts on this will be appreciated, in particular about a suitable place 
to store the list of OffsetRange objects

Greetings, 

Juan




was (Author: juanrh):
Hi, 

I have worked a bit on the OffsetRange way, you can access the code at 
https://github.com/juanrh/spark/commit/56fbd5c38bd30b825a7818f1c56abb1f8b2beaff.
 I have added the following method to pyspark KafkaUtils

@staticmethod
    def getOffsetRanges(rdd):
        scalaRdd = rdd._jrdd.rdd()
        offsetRangesArray = scalaRdd.offsetRanges()
        return [ OffsetRange(topic = offsetRange.topic(),
                             partition = offsetRange.partition(), 
                             fromOffset = offsetRange.fromOffset(), 
                             untilOffset = offsetRange.untilOffset())
                    for offsetRange in offsetRangesArray]


This method is used in KafkaUtils.createDirectStreamJB, which is based on the 
original KafkaUtilsPythonHelper.createDirectStream. The main problem I have is 
that I don't  know where to store the OffsetRange objects. The naive trick of 
adding them to the __dict__ of each python RDD object doesn't work, the new 
field is lost in the pyspark wrappers. So the new method createDirectStreamJB 
takes two additional options, one for performing an action on the OffsetRange 
list, and another for adding it to each record of the DStream

def createDirectStreamJB(ssc, topics, kafkaParams, fromOffsets={},
                           keyDecoder=utf8_decoder, valueDecoder=utf8_decoder, 
offsetRangeForeach=None, addOffsetRange=False):
        """
        FIXME: temporary working placeholder
        :param offsetRangeForeach: if different to None, this function should 
be a function from a list of OffsetRange to None, and is applied to the 
OffsetRange
            list of each rdd
        :param addOffsetRange: if False (default) output records are of the 
shape (kafkaKey, kafkaValue); if True output records are of the shape 
(offsetRange, (kafkaKey, kafkaValue)) for offsetRange the OffsetRange value for 
the Spark partition for the record


This is an example of using createDirectStreamJB:

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
ssc = StreamingContext(sc, 1)
topics = ["test"]
kafkaParams = {"metadata.broker.list" : "localhost:9092"}
def offsetRangeForeach(offsetRangeList):
    print 
    print 
    for offsetRange in offsetRangeList:
        print offsetRange
    print 
    print 

kafkaStream = KafkaUtils.createDirectStreamJB(ssc, topics, kafkaParams, 
offsetRangeForeach=offsetRangeForeach, addOffsetRange=True)

# OffsetRange printed as <pyspark.streaming.kafka.OffsetRange object at 
0x7f2fdc045950>, I guess due to some kind of pyspark proxy 
kafkaStrStream = kafkaStream.map(lambda (offRan, (k, v)) :  
str(offRan._fromOffset) + " " + str(offRan._untilOffset) + " " + str(k) + " " + 
str(v))
# kafkaStream.pprint()
kafkaStrStream.pprint()
ssc.start()
ssc.awaitTermination(timeout=5)

which gets the following output

15/06/28 12:36:03 INFO InputInfoTracker: remove old batch metadata: 
1435487761000 ms


OffsetRange(topic=test, partition=0, fromOffset=178, untilOffset=179)


15/06/28 12:36:04 INFO JobScheduler: Added jobs for time 1435487764000 ms
...
15/06/28 12:36:04 INFO DAGScheduler: Job 4 finished: runJob at 
PythonRDD.scala:366, took 0,075387 s
-------------------------------------------
Time: 2015-06-28 12:36:04
-------------------------------------------
178 179 None hola
()
15/06/28 12:36:04 INFO JobScheduler: Finished job streaming job 1435487764000 
ms.0 from job set of time 1435487764000 ms
...
15/06/28 12:36:05 INFO BlockManager: Removing RDD 12


OffsetRange(topic=test, partition=0, fromOffset=179, untilOffset=180)


15/06/28 12:36:06 INFO JobScheduler: Starting job streaming job 1435487766000 
ms.0 from job set of time 1435487766000 ms
....
15/06/28 12:36:06 INFO DAGScheduler: Job 6 finished: start at 
NativeMethodAccessorImpl.java:-2, took 0,077993 s
-------------------------------------------
Time: 2015-06-28 12:36:06
-------------------------------------------
179 180 None caracola
()
15/06/28 12:36:06 INFO JobScheduler: Finished job streaming job 1435487766000 
ms.0 from job set of time 1435487766000 ms

Any thoughts on this will be appreciated, in particular about a suitable place 
to store the list of OffsetRange objects

Greetings, 

Juan



> KafkaUtils.createDirectStream for python is lacking API/feature parity with 
> the Scala/Java version
> --------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-8337
>                 URL: https://issues.apache.org/jira/browse/SPARK-8337
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Streaming
>    Affects Versions: 1.4.0
>            Reporter: Amit Ramesh
>            Priority: Critical
>
> See the following thread for context.
> http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to