[ https://issues.apache.org/jira/browse/SPARK-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14604633#comment-14604633 ]
Juan RodrĂguez Hortalá edited comment on SPARK-8337 at 6/28/15 6:16 PM: ------------------------------------------------------------------------ Hi, I have worked a bit on the OffsetRange way, you can access the code at https://github.com/juanrh/spark/commit/56fbd5c38bd30b825a7818f1c56abb1f8b2beaff. I have added the following method to pyspark KafkaUtils {code} @staticmethod def getOffsetRanges(rdd): scalaRdd = rdd._jrdd.rdd() offsetRangesArray = scalaRdd.offsetRanges() return [ OffsetRange(topic = offsetRange.topic(), partition = offsetRange.partition(), fromOffset = offsetRange.fromOffset(), untilOffset = offsetRange.untilOffset()) for offsetRange in offsetRangesArray] {code} This method is used in KafkaUtils.createDirectStreamJB, which is based on the original KafkaUtilsPythonHelper.createDirectStream. The main problem I have is that I don't know where to store the OffsetRange objects. The naive trick of adding them to the __dict__ of each python RDD object doesn't work, the new field is lost in the pyspark wrappers. So the new method createDirectStreamJB takes two additional options, one for performing an action on the OffsetRange list, and another for adding it to each record of the DStream {code} def createDirectStreamJB(ssc, topics, kafkaParams, fromOffsets={}, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder, offsetRangeForeach=None, addOffsetRange=False): """ FIXME: temporary working placeholder :param offsetRangeForeach: if different to None, this function should be a function from a list of OffsetRange to None, and is applied to the OffsetRange list of each rdd :param addOffsetRange: if False (default) output records are of the shape (kafkaKey, kafkaValue); if True output records are of the shape (offsetRange, (kafkaKey, kafkaValue)) for offsetRange the OffsetRange value for the Spark partition for the record {code} This is an example of using createDirectStreamJB: {code} from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils ssc = StreamingContext(sc, 1) topics = ["test"] kafkaParams = {"metadata.broker.list" : "localhost:9092"} def offsetRangeForeach(offsetRangeList): print print for offsetRange in offsetRangeList: print offsetRange print print kafkaStream = KafkaUtils.createDirectStreamJB(ssc, topics, kafkaParams, offsetRangeForeach=offsetRangeForeach, addOffsetRange=True) # OffsetRange printed as <pyspark.streaming.kafka.OffsetRange object at 0x7f2fdc045950>, I guess due to some kind of pyspark proxy kafkaStrStream = kafkaStream.map(lambda (offRan, (k, v)) : str(offRan._fromOffset) + " " + str(offRan._untilOffset) + " " + str(k) + " " + str(v)) # kafkaStream.pprint() kafkaStrStream.pprint() ssc.start() ssc.awaitTermination(timeout=5) {code} which gets the following output {code} 15/06/28 12:36:03 INFO InputInfoTracker: remove old batch metadata: 1435487761000 ms OffsetRange(topic=test, partition=0, fromOffset=178, untilOffset=179) 15/06/28 12:36:04 INFO JobScheduler: Added jobs for time 1435487764000 ms ... 15/06/28 12:36:04 INFO DAGScheduler: Job 4 finished: runJob at PythonRDD.scala:366, took 0,075387 s ------------------------------------------- Time: 2015-06-28 12:36:04 ------------------------------------------- 178 179 None hola () 15/06/28 12:36:04 INFO JobScheduler: Finished job streaming job 1435487764000 ms.0 from job set of time 1435487764000 ms ... 15/06/28 12:36:05 INFO BlockManager: Removing RDD 12 OffsetRange(topic=test, partition=0, fromOffset=179, untilOffset=180) 15/06/28 12:36:06 INFO JobScheduler: Starting job streaming job 1435487766000 ms.0 from job set of time 1435487766000 ms .... 15/06/28 12:36:06 INFO DAGScheduler: Job 6 finished: start at NativeMethodAccessorImpl.java:-2, took 0,077993 s ------------------------------------------- Time: 2015-06-28 12:36:06 ------------------------------------------- 179 180 None caracola () 15/06/28 12:36:06 INFO JobScheduler: Finished job streaming job 1435487766000 ms.0 from job set of time 1435487766000 ms {code} Any thoughts on this will be appreciated, in particular about a suitable place to store the list of OffsetRange objects Greetings, Juan was (Author: juanrh): Hi, I have worked a bit on the OffsetRange way, you can access the code at https://github.com/juanrh/spark/commit/56fbd5c38bd30b825a7818f1c56abb1f8b2beaff. I have added the following method to pyspark KafkaUtils @staticmethod def getOffsetRanges(rdd): scalaRdd = rdd._jrdd.rdd() offsetRangesArray = scalaRdd.offsetRanges() return [ OffsetRange(topic = offsetRange.topic(), partition = offsetRange.partition(), fromOffset = offsetRange.fromOffset(), untilOffset = offsetRange.untilOffset()) for offsetRange in offsetRangesArray] This method is used in KafkaUtils.createDirectStreamJB, which is based on the original KafkaUtilsPythonHelper.createDirectStream. The main problem I have is that I don't know where to store the OffsetRange objects. The naive trick of adding them to the __dict__ of each python RDD object doesn't work, the new field is lost in the pyspark wrappers. So the new method createDirectStreamJB takes two additional options, one for performing an action on the OffsetRange list, and another for adding it to each record of the DStream def createDirectStreamJB(ssc, topics, kafkaParams, fromOffsets={}, keyDecoder=utf8_decoder, valueDecoder=utf8_decoder, offsetRangeForeach=None, addOffsetRange=False): """ FIXME: temporary working placeholder :param offsetRangeForeach: if different to None, this function should be a function from a list of OffsetRange to None, and is applied to the OffsetRange list of each rdd :param addOffsetRange: if False (default) output records are of the shape (kafkaKey, kafkaValue); if True output records are of the shape (offsetRange, (kafkaKey, kafkaValue)) for offsetRange the OffsetRange value for the Spark partition for the record This is an example of using createDirectStreamJB: from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils ssc = StreamingContext(sc, 1) topics = ["test"] kafkaParams = {"metadata.broker.list" : "localhost:9092"} def offsetRangeForeach(offsetRangeList): print print for offsetRange in offsetRangeList: print offsetRange print print kafkaStream = KafkaUtils.createDirectStreamJB(ssc, topics, kafkaParams, offsetRangeForeach=offsetRangeForeach, addOffsetRange=True) # OffsetRange printed as <pyspark.streaming.kafka.OffsetRange object at 0x7f2fdc045950>, I guess due to some kind of pyspark proxy kafkaStrStream = kafkaStream.map(lambda (offRan, (k, v)) : str(offRan._fromOffset) + " " + str(offRan._untilOffset) + " " + str(k) + " " + str(v)) # kafkaStream.pprint() kafkaStrStream.pprint() ssc.start() ssc.awaitTermination(timeout=5) which gets the following output 15/06/28 12:36:03 INFO InputInfoTracker: remove old batch metadata: 1435487761000 ms OffsetRange(topic=test, partition=0, fromOffset=178, untilOffset=179) 15/06/28 12:36:04 INFO JobScheduler: Added jobs for time 1435487764000 ms ... 15/06/28 12:36:04 INFO DAGScheduler: Job 4 finished: runJob at PythonRDD.scala:366, took 0,075387 s ------------------------------------------- Time: 2015-06-28 12:36:04 ------------------------------------------- 178 179 None hola () 15/06/28 12:36:04 INFO JobScheduler: Finished job streaming job 1435487764000 ms.0 from job set of time 1435487764000 ms ... 15/06/28 12:36:05 INFO BlockManager: Removing RDD 12 OffsetRange(topic=test, partition=0, fromOffset=179, untilOffset=180) 15/06/28 12:36:06 INFO JobScheduler: Starting job streaming job 1435487766000 ms.0 from job set of time 1435487766000 ms .... 15/06/28 12:36:06 INFO DAGScheduler: Job 6 finished: start at NativeMethodAccessorImpl.java:-2, took 0,077993 s ------------------------------------------- Time: 2015-06-28 12:36:06 ------------------------------------------- 179 180 None caracola () 15/06/28 12:36:06 INFO JobScheduler: Finished job streaming job 1435487766000 ms.0 from job set of time 1435487766000 ms Any thoughts on this will be appreciated, in particular about a suitable place to store the list of OffsetRange objects Greetings, Juan > KafkaUtils.createDirectStream for python is lacking API/feature parity with > the Scala/Java version > -------------------------------------------------------------------------------------------------- > > Key: SPARK-8337 > URL: https://issues.apache.org/jira/browse/SPARK-8337 > Project: Spark > Issue Type: Bug > Components: PySpark, Streaming > Affects Versions: 1.4.0 > Reporter: Amit Ramesh > Priority: Critical > > See the following thread for context. > http://apache-spark-developers-list.1001551.n3.nabble.com/Re-Spark-1-4-Python-API-for-getting-Kafka-offsets-in-direct-mode-tt12714.html -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org