Hi Juan, I have created a ticket for this: https://issues.apache.org/jira/browse/SPARK-8337
Thanks! Amit On Fri, Jun 12, 2015 at 3:17 PM, Juan Rodríguez Hortalá < juan.rodriguez.hort...@gmail.com> wrote: > Hi, > > If you want I would be happy to work in this. I have worked with > KafkaUtils.createDirectStream before, in a pull request that wasn't > accepted https://github.com/apache/spark/pull/5367. I'm fluent with > Python and I'm starting to feel comfortable with Scala, so if someone opens > a JIRA I can take it. > > Greetings, > > Juan Rodriguez > > > 2015-06-12 15:59 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > >> The scala api has 2 ways of calling createDirectStream. One of them >> allows you to pass a message handler that gets full access to the kafka >> MessageAndMetadata, including offset. >> >> I don't know why the python api was developed with only one way to call >> createDirectStream, but the first thing I'd look at would be adding that >> functionality back in. If someone wants help creating a patch for that, >> just let me know. >> >> Dealing with offsets on a per-message basis may not be as efficient as >> dealing with them on a batch basis using the HasOffsetRanges interface... >> but if efficiency was a primary concern, you probably wouldn't be using >> Python anyway. >> >> On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao <sai.sai.s...@gmail.com> >> wrote: >> >>> Scala KafkaRDD uses a trait to handle this problem, but it is not so >>> easy and straightforward in Python, where we need to have a specific API to >>> handle this, I'm not sure is there any simple workaround to fix this, maybe >>> we should think carefully about it. >>> >>> 2015-06-12 13:59 GMT+08:00 Amit Ramesh <a...@yelp.com>: >>> >>>> >>>> Thanks, Jerry. That's what I suspected based on the code I looked at. >>>> Any pointers on what is needed to build in this support would be great. >>>> This is critical to the project we are currently working on. >>>> >>>> Thanks! >>>> >>>> >>>> On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao <sai.sai.s...@gmail.com> >>>> wrote: >>>> >>>>> OK, I get it, I think currently Python based Kafka direct API do not >>>>> provide such equivalence like Scala, maybe we should figure out to add >>>>> this >>>>> into Python API also. >>>>> >>>>> 2015-06-12 13:48 GMT+08:00 Amit Ramesh <a...@yelp.com>: >>>>> >>>>>> >>>>>> Hi Jerry, >>>>>> >>>>>> Take a look at this example: >>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 >>>>>> >>>>>> The offsets are needed because as RDDs get generated within spark the >>>>>> offsets move further along. With direct Kafka mode the current offsets >>>>>> are >>>>>> no more persisted in Zookeeper but rather within Spark itself. If you >>>>>> want >>>>>> to be able to use zookeeper based monitoring tools to keep track of >>>>>> progress, then this is needed. >>>>>> >>>>>> In my specific case we need to persist Kafka offsets externally so >>>>>> that we can continue from where we left off after a code deployment. In >>>>>> other words, we need exactly-once processing guarantees across code >>>>>> deployments. Spark does not support any state persistence across >>>>>> deployments so this is something we need to handle on our own. >>>>>> >>>>>> Hope that helps. Let me know if not. >>>>>> >>>>>> Thanks! >>>>>> Amit >>>>>> >>>>>> >>>>>> On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao <sai.sai.s...@gmail.com >>>>>> > wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> What is your meaning of getting the offsets from the RDD, from my >>>>>>> understanding, the offsetRange is a parameter you offered to KafkaRDD, >>>>>>> why >>>>>>> do you still want to get the one previous you set into? >>>>>>> >>>>>>> Thanks >>>>>>> Jerry >>>>>>> >>>>>>> 2015-06-12 12:36 GMT+08:00 Amit Ramesh <a...@yelp.com>: >>>>>>> >>>>>>>> >>>>>>>> Congratulations on the release of 1.4! >>>>>>>> >>>>>>>> I have been trying out the direct Kafka support in python but >>>>>>>> haven't been able to figure out how to get the offsets from the RDD. >>>>>>>> Looks >>>>>>>> like the documentation is yet to be updated to include Python examples >>>>>>>> ( >>>>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html). >>>>>>>> I am specifically looking for the equivalent of >>>>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. >>>>>>>> I tried digging through the python code but could not find anything >>>>>>>> related. Any pointers would be greatly appreciated. >>>>>>>> >>>>>>>> Thanks! >>>>>>>> Amit >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >