Cool. Thanks for the detailed response Cody.

Thanks
Best Regards

On Tue, May 19, 2015 at 6:43 PM, Cody Koeninger <c...@koeninger.org> wrote:

> If those questions aren't answered by
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md
>
> please let me know so I can update it.
>
> If you set auto.offset.reset to largest, it will start at the largest
> offset.  Any messages before that will be skipped, so if prior runs of the
> job didn't consume them, they're lost.
>
> KafkaRDD / DirectStream doesn't make any scheduling decisions (aside from
> a locality hint if you have kafka running on the same node as spark), and
> it doesn't have any long-running receivers.  Executors get whatever
> partitions the normal scheduler decides they should get.  If an executor
> fails, a different executor reads the offset range for the failed
> partition; they're immutable, so no difference in result.
>
> Deciding where to save offsets (or not) is up to you.  You can checkpoint,
> or store them yourself.
>
> On Mon, May 18, 2015 at 12:00 PM, Akhil Das <ak...@sigmoidanalytics.com>
> wrote:
>
>> I have played a bit with the directStream kafka api. Good work cody.
>> These are my findings and also can you clarify a few things for me (see
>> below).
>>
>> -> When "auto.offset.reset"-> "smallest" and you have 60GB of messages in
>> Kafka, it takes forever as it reads the whole 60GB at once. "largest" will
>> only read the latest messages.
>> -> To avoid this, you can actually limit the rate with
>> spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always
>> reads the same amount of data).
>> -> Number of partitions per batch = number of kafka partitions.
>>
>> -> In the case of driver failures, offset reset being set to "smallest"
>> will replay the whole messages and "largest" will only read those messages
>> which are pushed after the streaming job has started. What happens to those
>> messages which arrive in between?
>>
>> *Few things which are unclear:*
>>
>> -> If we have a kafka topic with 9 partitions, and spark cluster with 3
>> slaves, how does it decides which slave should read from which partition?
>> And what happens if a single slave fails while reading the data?
>>
>> -> By default it doesn't push the offsets of messages which are read
>> anywhere, then how does it replay the message in case of failures?
>>
>> Thanks
>> Best Regards
>>
>> On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> You linked to a google mail tab, not a public archive, so I don't know
>>> exactly which conversation you're referring to.
>>>
>>> As far as I know, streaming only runs a single job at a time in the
>>> order they were defined, unless you turn on an experimental option for more
>>> parallelism (TD or someone more knowledgeable can chime in on this).  If
>>> you're talking about the possibility of the next job starting before the
>>> prior one has fully finished, because your processing is lagging behind...
>>> I'm not 100% sure this is possible because I've never observed it.
>>>
>>> The thing is, it's a moot point, because if you're saving offsets
>>> yourself transactionally, you already need to be verifying that offsets are
>>> correct (increasing without gaps) in order to handle restarts correctly.
>>>
>>> If you're super concerned about how batches get generated, the direct
>>> api gives you access to KafkaUtils.createRDD... just schedule your own rdds
>>> in the order you want.  Again, flexible.
>>>
>>>
>>>
>>>
>>> On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
>>>> Thanks Cody for your email. I think my concern was not to get the
>>>> ordering of message within a partition , which as you said is possible if
>>>> one knows how Spark works. The issue is how Spark schedule jobs on every
>>>> batch  which is not on the same order they generated. So if that is not
>>>> guaranteed it does not matter if you manege order within your partition. So
>>>> depends on par-partition ordering to commit offset may leads to offsets
>>>> commit in wrong order.
>>>>
>>>> In this thread you have discussed this as well and some workaround  :
>>>>
>>>>
>>>> https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15
>>>>
>>>> So again , one need to understand every details of a Consumer to take a
>>>> decision if that solves their use case.
>>>>
>>>> Regards,
>>>> Dibyendu
>>>>
>>>> On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> As far as I can tell, Dibyendu's "cons" boil down to:
>>>>>
>>>>> 1. Spark checkpoints can't be recovered if you upgrade code
>>>>> 2. Some Spark transformations involve a shuffle, which can repartition
>>>>> data
>>>>>
>>>>> It's not accurate to imply that either one of those things are
>>>>> inherently "cons" of the direct stream api.
>>>>>
>>>>> Regarding checkpoints, nothing about the direct stream requires you to
>>>>> use checkpoints.  You can save offsets in a checkpoint, your own database,
>>>>> or not save offsets at all (as James wants).  One might even say that the
>>>>> direct stream api is . . . flexible . . . in that regard.
>>>>>
>>>>> Regarding partitions, the direct stream api gives you the same
>>>>> ordering guarantee as Kafka, namely that within a given partition messages
>>>>> will be in increasing offset order.   Clearly if you do a transformation
>>>>> that repartitions the stream, that no longer holds.  Thing is, that 
>>>>> doesn't
>>>>> matter if you're saving offsets and results for each rdd in the driver.
>>>>> The offset ranges for the original rdd don't change as a result of the
>>>>> transformation you executed, they're immutable.
>>>>>
>>>>> Sure, you can get into trouble if you're trying to save offsets /
>>>>> results per partition on the executors, after a shuffle of some kind. You
>>>>> can avoid this pretty easily by just using normal scala code to do your
>>>>> transformation on the iterator inside a foreachPartition.  Again, this
>>>>> isn't a "con" of the direct stream api, this is just a need to understand
>>>>> how Spark works.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya <
>>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>>
>>>>>> The low level consumer which Akhil mentioned , has been running in
>>>>>> Pearson for last 4-5 months without any downtime. I think this one is the
>>>>>> reliable "Receiver Based" Kafka consumer as of today for Spark .. if you
>>>>>> say it that way ..
>>>>>>
>>>>>> Prior to Spark 1.3 other Receiver based consumers have used Kafka
>>>>>> High level APIs which has serious issue with re-balancing and lesser 
>>>>>> fault
>>>>>> tolerant aspect and data loss .
>>>>>>
>>>>>> Cody's implementation is definitely a good approach using direct
>>>>>> stream , but both direct stream based approach and receiver based low 
>>>>>> level
>>>>>> consumer approach has pros and cons. Like Receiver based approach need to
>>>>>> use WAL for recovery from Driver failure which is a overhead for Kafka 
>>>>>> like
>>>>>> system . For direct stream the offsets stored as check-pointed directory
>>>>>> got lost if driver code is modified ..you can manage offset from your
>>>>>> driver but for derived stream generated from this direct stream , there 
>>>>>> is
>>>>>> no guarantee that batches are processed is order ( and offsets commits in
>>>>>> order ) .. etc ..
>>>>>>
>>>>>> So whoever use whichever consumer need to study pros and cons of both
>>>>>> approach before taking a call ..
>>>>>>
>>>>>> Regards,
>>>>>> Dibyendu
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, May 12, 2015 at 8:10 PM, Akhil Das <
>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>
>>>>>>> Hi Cody,
>>>>>>> I was just saying that i found more success and high throughput with
>>>>>>> the low level kafka api prior to KafkfaRDDs which is the future it 
>>>>>>> seems.
>>>>>>> My apologies if you felt it that way. :)
>>>>>>> On 12 May 2015 19:47, "Cody Koeninger" <c...@koeninger.org> wrote:
>>>>>>>
>>>>>>>> Akhil, I hope I'm misreading the tone of this. If you have personal
>>>>>>>> issues at stake, please take them up outside of the public list.  If 
>>>>>>>> you
>>>>>>>> have actual factual concerns about the kafka integration, please share 
>>>>>>>> them
>>>>>>>> in a jira.
>>>>>>>>
>>>>>>>> Regarding reliability, here's a screenshot of a current production
>>>>>>>> job with a 3 week uptime  Was a month before that, only took it down to
>>>>>>>> change code.
>>>>>>>>
>>>>>>>> http://tinypic.com/r/2e4vkht/8
>>>>>>>>
>>>>>>>> Regarding flexibility, both of the apis available in spark will do
>>>>>>>> what James needs, as I described.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, May 12, 2015 at 8:55 AM, Akhil Das <
>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Cody,
>>>>>>>>>
>>>>>>>>> If you are so sure, can you share a bench-marking (which you ran
>>>>>>>>> for days maybe?) that you have done with Kafka APIs provided by Spark?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger <
>>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>>
>>>>>>>>>> I don't think it's accurate for Akhil to claim that the linked
>>>>>>>>>> library is "much more flexible/reliable" than what's available in 
>>>>>>>>>> Spark at
>>>>>>>>>> this point.
>>>>>>>>>>
>>>>>>>>>> James, what you're describing is the default behavior for the
>>>>>>>>>> createDirectStream api available as part of spark since 1.3.  The 
>>>>>>>>>> kafka
>>>>>>>>>> parameter auto.offset.reset defaults to largest, ie start at the most
>>>>>>>>>> recent available message.
>>>>>>>>>>
>>>>>>>>>> This is described at
>>>>>>>>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>>>>>>>>>>  The createDirectStream api implementation is described in detail at
>>>>>>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md
>>>>>>>>>>
>>>>>>>>>> If for some reason you're stuck using an earlier version of
>>>>>>>>>> spark, you can accomplish what you want simply by starting the job 
>>>>>>>>>> using a
>>>>>>>>>> new consumer group (there will be no prior state in zookeeper, so it 
>>>>>>>>>> will
>>>>>>>>>> start consuming according to auto.offset.reset)
>>>>>>>>>>
>>>>>>>>>> On Tue, May 12, 2015 at 7:26 AM, James King <
>>>>>>>>>> jakwebin...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Very nice! will try and let you know, thanks.
>>>>>>>>>>>
>>>>>>>>>>> On Tue, May 12, 2015 at 2:25 PM, Akhil Das <
>>>>>>>>>>> ak...@sigmoidanalytics.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Yep, you can try this lowlevel Kafka receiver
>>>>>>>>>>>> https://github.com/dibbhatt/kafka-spark-consumer. Its much
>>>>>>>>>>>> more flexible/reliable than the one comes with Spark.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, May 12, 2015 at 5:15 PM, James King <
>>>>>>>>>>>> jakwebin...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> What I want is if the driver dies for some reason and it is
>>>>>>>>>>>>> restarted I want to read only messages that arrived into Kafka 
>>>>>>>>>>>>> following
>>>>>>>>>>>>> the restart of the driver program and re-connection to Kafka.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Has anyone done this? any links or resources that can help
>>>>>>>>>>>>> explain this?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards
>>>>>>>>>>>>> jk
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to