Perfect! I'll start working on it 2015-06-13 2:23 GMT+02:00 Amit Ramesh <a...@yelp.com>:
> > Hi Juan, > > I have created a ticket for this: > https://issues.apache.org/jira/browse/SPARK-8337 > > Thanks! > Amit > > > On Fri, Jun 12, 2015 at 3:17 PM, Juan Rodríguez Hortalá < > juan.rodriguez.hort...@gmail.com> wrote: > >> Hi, >> >> If you want I would be happy to work in this. I have worked with >> KafkaUtils.createDirectStream before, in a pull request that wasn't >> accepted https://github.com/apache/spark/pull/5367. I'm fluent with >> Python and I'm starting to feel comfortable with Scala, so if someone opens >> a JIRA I can take it. >> >> Greetings, >> >> Juan Rodriguez >> >> >> 2015-06-12 15:59 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >> >>> The scala api has 2 ways of calling createDirectStream. One of them >>> allows you to pass a message handler that gets full access to the kafka >>> MessageAndMetadata, including offset. >>> >>> I don't know why the python api was developed with only one way to call >>> createDirectStream, but the first thing I'd look at would be adding that >>> functionality back in. If someone wants help creating a patch for that, >>> just let me know. >>> >>> Dealing with offsets on a per-message basis may not be as efficient as >>> dealing with them on a batch basis using the HasOffsetRanges interface... >>> but if efficiency was a primary concern, you probably wouldn't be using >>> Python anyway. >>> >>> On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao <sai.sai.s...@gmail.com> >>> wrote: >>> >>>> Scala KafkaRDD uses a trait to handle this problem, but it is not so >>>> easy and straightforward in Python, where we need to have a specific API to >>>> handle this, I'm not sure is there any simple workaround to fix this, maybe >>>> we should think carefully about it. >>>> >>>> 2015-06-12 13:59 GMT+08:00 Amit Ramesh <a...@yelp.com>: >>>> >>>>> >>>>> Thanks, Jerry. That's what I suspected based on the code I looked at. >>>>> Any pointers on what is needed to build in this support would be great. >>>>> This is critical to the project we are currently working on. >>>>> >>>>> Thanks! >>>>> >>>>> >>>>> On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao <sai.sai.s...@gmail.com> >>>>> wrote: >>>>> >>>>>> OK, I get it, I think currently Python based Kafka direct API do not >>>>>> provide such equivalence like Scala, maybe we should figure out to add >>>>>> this >>>>>> into Python API also. >>>>>> >>>>>> 2015-06-12 13:48 GMT+08:00 Amit Ramesh <a...@yelp.com>: >>>>>> >>>>>>> >>>>>>> Hi Jerry, >>>>>>> >>>>>>> Take a look at this example: >>>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2 >>>>>>> >>>>>>> The offsets are needed because as RDDs get generated within spark >>>>>>> the offsets move further along. With direct Kafka mode the current >>>>>>> offsets >>>>>>> are no more persisted in Zookeeper but rather within Spark itself. If >>>>>>> you >>>>>>> want to be able to use zookeeper based monitoring tools to keep track of >>>>>>> progress, then this is needed. >>>>>>> >>>>>>> In my specific case we need to persist Kafka offsets externally so >>>>>>> that we can continue from where we left off after a code deployment. In >>>>>>> other words, we need exactly-once processing guarantees across code >>>>>>> deployments. Spark does not support any state persistence across >>>>>>> deployments so this is something we need to handle on our own. >>>>>>> >>>>>>> Hope that helps. Let me know if not. >>>>>>> >>>>>>> Thanks! >>>>>>> Amit >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao < >>>>>>> sai.sai.s...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> What is your meaning of getting the offsets from the RDD, from my >>>>>>>> understanding, the offsetRange is a parameter you offered to KafkaRDD, >>>>>>>> why >>>>>>>> do you still want to get the one previous you set into? >>>>>>>> >>>>>>>> Thanks >>>>>>>> Jerry >>>>>>>> >>>>>>>> 2015-06-12 12:36 GMT+08:00 Amit Ramesh <a...@yelp.com>: >>>>>>>> >>>>>>>>> >>>>>>>>> Congratulations on the release of 1.4! >>>>>>>>> >>>>>>>>> I have been trying out the direct Kafka support in python but >>>>>>>>> haven't been able to figure out how to get the offsets from the RDD. >>>>>>>>> Looks >>>>>>>>> like the documentation is yet to be updated to include Python >>>>>>>>> examples ( >>>>>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html). >>>>>>>>> I am specifically looking for the equivalent of >>>>>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2. >>>>>>>>> I tried digging through the python code but could not find anything >>>>>>>>> related. Any pointers would be greatly appreciated. >>>>>>>>> >>>>>>>>> Thanks! >>>>>>>>> Amit >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >