As far as I can tell, Dibyendu's "cons" boil down to:

1. Spark checkpoints can't be recovered if you upgrade code
2. Some Spark transformations involve a shuffle, which can repartition data

It's not accurate to imply that either one of those things are inherently
"cons" of the direct stream api.

Regarding checkpoints, nothing about the direct stream requires you to use
checkpoints.  You can save offsets in a checkpoint, your own database, or
not save offsets at all (as James wants).  One might even say that the
direct stream api is . . . flexible . . . in that regard.

Regarding partitions, the direct stream api gives you the same ordering
guarantee as Kafka, namely that within a given partition messages will be
in increasing offset order.   Clearly if you do a transformation that
repartitions the stream, that no longer holds.  Thing is, that doesn't
matter if you're saving offsets and results for each rdd in the driver.
The offset ranges for the original rdd don't change as a result of the
transformation you executed, they're immutable.

Sure, you can get into trouble if you're trying to save offsets / results
per partition on the executors, after a shuffle of some kind. You can avoid
this pretty easily by just using normal scala code to do your
transformation on the iterator inside a foreachPartition.  Again, this
isn't a "con" of the direct stream api, this is just a need to understand
how Spark works.



On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> The low level consumer which Akhil mentioned , has been running in Pearson
> for last 4-5 months without any downtime. I think this one is the reliable
> "Receiver Based" Kafka consumer as of today for Spark .. if you say it that
> way ..
>
> Prior to Spark 1.3 other Receiver based consumers have used Kafka High
> level APIs which has serious issue with re-balancing and lesser fault
> tolerant aspect and data loss .
>
> Cody's implementation is definitely a good approach using direct stream ,
> but both direct stream based approach and receiver based low level consumer
> approach has pros and cons. Like Receiver based approach need to use WAL
> for recovery from Driver failure which is a overhead for Kafka like system
> . For direct stream the offsets stored as check-pointed directory got lost
> if driver code is modified ..you can manage offset from your driver but for
> derived stream generated from this direct stream , there is no guarantee
> that batches are processed is order ( and offsets commits in order ) .. etc
> ..
>
> So whoever use whichever consumer need to study pros and cons of both
> approach before taking a call ..
>
> Regards,
> Dibyendu
>
>
>
>
>
>
>
> On Tue, May 12, 2015 at 8:10 PM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> Hi Cody,
>> I was just saying that i found more success and high throughput with the
>> low level kafka api prior to KafkfaRDDs which is the future it seems. My
>> apologies if you felt it that way. :)
>> On 12 May 2015 19:47, "Cody Koeninger" <c...@koeninger.org> wrote:
>>
>>> Akhil, I hope I'm misreading the tone of this. If you have personal
>>> issues at stake, please take them up outside of the public list.  If you
>>> have actual factual concerns about the kafka integration, please share them
>>> in a jira.
>>>
>>> Regarding reliability, here's a screenshot of a current production job
>>> with a 3 week uptime  Was a month before that, only took it down to change
>>> code.
>>>
>>> http://tinypic.com/r/2e4vkht/8
>>>
>>> Regarding flexibility, both of the apis available in spark will do what
>>> James needs, as I described.
>>>
>>>
>>>
>>> On Tue, May 12, 2015 at 8:55 AM, Akhil Das <ak...@sigmoidanalytics.com>
>>> wrote:
>>>
>>>> Hi Cody,
>>>>
>>>> If you are so sure, can you share a bench-marking (which you ran for
>>>> days maybe?) that you have done with Kafka APIs provided by Spark?
>>>>
>>>> Thanks
>>>> Best Regards
>>>>
>>>> On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> I don't think it's accurate for Akhil to claim that the linked library
>>>>> is "much more flexible/reliable" than what's available in Spark at this
>>>>> point.
>>>>>
>>>>> James, what you're describing is the default behavior for the
>>>>> createDirectStream api available as part of spark since 1.3.  The kafka
>>>>> parameter auto.offset.reset defaults to largest, ie start at the most
>>>>> recent available message.
>>>>>
>>>>> This is described at
>>>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>>>>>  The createDirectStream api implementation is described in detail at
>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md
>>>>>
>>>>> If for some reason you're stuck using an earlier version of spark, you
>>>>> can accomplish what you want simply by starting the job using a new
>>>>> consumer group (there will be no prior state in zookeeper, so it will 
>>>>> start
>>>>> consuming according to auto.offset.reset)
>>>>>
>>>>> On Tue, May 12, 2015 at 7:26 AM, James King <jakwebin...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Very nice! will try and let you know, thanks.
>>>>>>
>>>>>> On Tue, May 12, 2015 at 2:25 PM, Akhil Das <
>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>
>>>>>>> Yep, you can try this lowlevel Kafka receiver
>>>>>>> https://github.com/dibbhatt/kafka-spark-consumer. Its much more
>>>>>>> flexible/reliable than the one comes with Spark.
>>>>>>>
>>>>>>> Thanks
>>>>>>> Best Regards
>>>>>>>
>>>>>>> On Tue, May 12, 2015 at 5:15 PM, James King <jakwebin...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> What I want is if the driver dies for some reason and it is
>>>>>>>> restarted I want to read only messages that arrived into Kafka 
>>>>>>>> following
>>>>>>>> the restart of the driver program and re-connection to Kafka.
>>>>>>>>
>>>>>>>> Has anyone done this? any links or resources that can help explain
>>>>>>>> this?
>>>>>>>>
>>>>>>>> Regards
>>>>>>>> jk
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to