Hi Juan,

I have created a ticket for this:
https://issues.apache.org/jira/browse/SPARK-8337

Thanks!
Amit


On Fri, Jun 12, 2015 at 3:17 PM, Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi,
>
> If you want I would be happy to work in this. I have worked with
> KafkaUtils.createDirectStream before, in a pull request that wasn't
> accepted https://github.com/apache/spark/pull/5367. I'm fluent with
> Python and I'm starting to feel comfortable with Scala, so if someone opens
> a JIRA I can take it.
>
> Greetings,
>
> Juan Rodriguez
>
>
> 2015-06-12 15:59 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>
>> The scala api has 2 ways of calling createDirectStream.  One of them
>> allows you to pass a message handler that gets full access to the kafka
>> MessageAndMetadata, including offset.
>>
>> I don't know why the python api was developed with only one way to call
>> createDirectStream, but the first thing I'd look at would be adding that
>> functionality back in.  If someone wants help creating a patch for that,
>> just let me know.
>>
>> Dealing with offsets on a per-message basis may not be as efficient as
>> dealing with them on a batch basis using the HasOffsetRanges interface...
>> but if efficiency was a primary concern, you probably wouldn't be using
>> Python anyway.
>>
>> On Fri, Jun 12, 2015 at 1:05 AM, Saisai Shao <sai.sai.s...@gmail.com>
>> wrote:
>>
>>> Scala KafkaRDD uses a trait to handle this problem, but it is not so
>>> easy and straightforward in Python, where we need to have a specific API to
>>> handle this, I'm not sure is there any simple workaround to fix this, maybe
>>> we should think carefully about it.
>>>
>>> 2015-06-12 13:59 GMT+08:00 Amit Ramesh <a...@yelp.com>:
>>>
>>>>
>>>> Thanks, Jerry. That's what I suspected based on the code I looked at.
>>>> Any pointers on what is needed to build in this support would be great.
>>>> This is critical to the project we are currently working on.
>>>>
>>>> Thanks!
>>>>
>>>>
>>>> On Thu, Jun 11, 2015 at 10:54 PM, Saisai Shao <sai.sai.s...@gmail.com>
>>>> wrote:
>>>>
>>>>> OK, I get it, I think currently Python based Kafka direct API do not
>>>>> provide such equivalence like Scala, maybe we should figure out to add 
>>>>> this
>>>>> into Python API also.
>>>>>
>>>>> 2015-06-12 13:48 GMT+08:00 Amit Ramesh <a...@yelp.com>:
>>>>>
>>>>>>
>>>>>> Hi Jerry,
>>>>>>
>>>>>> Take a look at this example:
>>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2
>>>>>>
>>>>>> The offsets are needed because as RDDs get generated within spark the
>>>>>> offsets move further along. With direct Kafka mode the current offsets 
>>>>>> are
>>>>>> no more persisted in Zookeeper but rather within Spark itself. If you 
>>>>>> want
>>>>>> to be able to use zookeeper based monitoring tools to keep track of
>>>>>> progress, then this is needed.
>>>>>>
>>>>>> In my specific case we need to persist Kafka offsets externally so
>>>>>> that we can continue from where we left off after a code deployment. In
>>>>>> other words, we need exactly-once processing guarantees across code
>>>>>> deployments. Spark does not support any state persistence across
>>>>>> deployments so this is something we need to handle on our own.
>>>>>>
>>>>>> Hope that helps. Let me know if not.
>>>>>>
>>>>>> Thanks!
>>>>>> Amit
>>>>>>
>>>>>>
>>>>>> On Thu, Jun 11, 2015 at 10:02 PM, Saisai Shao <sai.sai.s...@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> What is your meaning of getting the offsets from the RDD, from my
>>>>>>> understanding, the offsetRange is a parameter you offered to KafkaRDD, 
>>>>>>> why
>>>>>>> do you still want to get the one previous you set into?
>>>>>>>
>>>>>>> Thanks
>>>>>>> Jerry
>>>>>>>
>>>>>>> 2015-06-12 12:36 GMT+08:00 Amit Ramesh <a...@yelp.com>:
>>>>>>>
>>>>>>>>
>>>>>>>> Congratulations on the release of 1.4!
>>>>>>>>
>>>>>>>> I have been trying out the direct Kafka support in python but
>>>>>>>> haven't been able to figure out how to get the offsets from the RDD. 
>>>>>>>> Looks
>>>>>>>> like the documentation is yet to be updated to include Python examples 
>>>>>>>> (
>>>>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html).
>>>>>>>> I am specifically looking for the equivalent of
>>>>>>>> https://spark.apache.org/docs/latest/streaming-kafka-integration.html#tab_scala_2.
>>>>>>>> I tried digging through the python code but could not find anything
>>>>>>>> related. Any pointers would be greatly appreciated.
>>>>>>>>
>>>>>>>> Thanks!
>>>>>>>> Amit
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to