Cool. Thanks for the detailed response Cody. Thanks Best Regards
On Tue, May 19, 2015 at 6:43 PM, Cody Koeninger <c...@koeninger.org> wrote: > If those questions aren't answered by > > https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md > > please let me know so I can update it. > > If you set auto.offset.reset to largest, it will start at the largest > offset. Any messages before that will be skipped, so if prior runs of the > job didn't consume them, they're lost. > > KafkaRDD / DirectStream doesn't make any scheduling decisions (aside from > a locality hint if you have kafka running on the same node as spark), and > it doesn't have any long-running receivers. Executors get whatever > partitions the normal scheduler decides they should get. If an executor > fails, a different executor reads the offset range for the failed > partition; they're immutable, so no difference in result. > > Deciding where to save offsets (or not) is up to you. You can checkpoint, > or store them yourself. > > On Mon, May 18, 2015 at 12:00 PM, Akhil Das <ak...@sigmoidanalytics.com> > wrote: > >> I have played a bit with the directStream kafka api. Good work cody. >> These are my findings and also can you clarify a few things for me (see >> below). >> >> -> When "auto.offset.reset"-> "smallest" and you have 60GB of messages in >> Kafka, it takes forever as it reads the whole 60GB at once. "largest" will >> only read the latest messages. >> -> To avoid this, you can actually limit the rate with >> spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always >> reads the same amount of data). >> -> Number of partitions per batch = number of kafka partitions. >> >> -> In the case of driver failures, offset reset being set to "smallest" >> will replay the whole messages and "largest" will only read those messages >> which are pushed after the streaming job has started. What happens to those >> messages which arrive in between? >> >> *Few things which are unclear:* >> >> -> If we have a kafka topic with 9 partitions, and spark cluster with 3 >> slaves, how does it decides which slave should read from which partition? >> And what happens if a single slave fails while reading the data? >> >> -> By default it doesn't push the offsets of messages which are read >> anywhere, then how does it replay the message in case of failures? >> >> Thanks >> Best Regards >> >> On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> You linked to a google mail tab, not a public archive, so I don't know >>> exactly which conversation you're referring to. >>> >>> As far as I know, streaming only runs a single job at a time in the >>> order they were defined, unless you turn on an experimental option for more >>> parallelism (TD or someone more knowledgeable can chime in on this). If >>> you're talking about the possibility of the next job starting before the >>> prior one has fully finished, because your processing is lagging behind... >>> I'm not 100% sure this is possible because I've never observed it. >>> >>> The thing is, it's a moot point, because if you're saving offsets >>> yourself transactionally, you already need to be verifying that offsets are >>> correct (increasing without gaps) in order to handle restarts correctly. >>> >>> If you're super concerned about how batches get generated, the direct >>> api gives you access to KafkaUtils.createRDD... just schedule your own rdds >>> in the order you want. Again, flexible. >>> >>> >>> >>> >>> On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya < >>> dibyendu.bhattach...@gmail.com> wrote: >>> >>>> Thanks Cody for your email. I think my concern was not to get the >>>> ordering of message within a partition , which as you said is possible if >>>> one knows how Spark works. The issue is how Spark schedule jobs on every >>>> batch which is not on the same order they generated. So if that is not >>>> guaranteed it does not matter if you manege order within your partition. So >>>> depends on par-partition ordering to commit offset may leads to offsets >>>> commit in wrong order. >>>> >>>> In this thread you have discussed this as well and some workaround : >>>> >>>> >>>> https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15 >>>> >>>> So again , one need to understand every details of a Consumer to take a >>>> decision if that solves their use case. >>>> >>>> Regards, >>>> Dibyendu >>>> >>>> On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> >>>>> As far as I can tell, Dibyendu's "cons" boil down to: >>>>> >>>>> 1. Spark checkpoints can't be recovered if you upgrade code >>>>> 2. Some Spark transformations involve a shuffle, which can repartition >>>>> data >>>>> >>>>> It's not accurate to imply that either one of those things are >>>>> inherently "cons" of the direct stream api. >>>>> >>>>> Regarding checkpoints, nothing about the direct stream requires you to >>>>> use checkpoints. You can save offsets in a checkpoint, your own database, >>>>> or not save offsets at all (as James wants). One might even say that the >>>>> direct stream api is . . . flexible . . . in that regard. >>>>> >>>>> Regarding partitions, the direct stream api gives you the same >>>>> ordering guarantee as Kafka, namely that within a given partition messages >>>>> will be in increasing offset order. Clearly if you do a transformation >>>>> that repartitions the stream, that no longer holds. Thing is, that >>>>> doesn't >>>>> matter if you're saving offsets and results for each rdd in the driver. >>>>> The offset ranges for the original rdd don't change as a result of the >>>>> transformation you executed, they're immutable. >>>>> >>>>> Sure, you can get into trouble if you're trying to save offsets / >>>>> results per partition on the executors, after a shuffle of some kind. You >>>>> can avoid this pretty easily by just using normal scala code to do your >>>>> transformation on the iterator inside a foreachPartition. Again, this >>>>> isn't a "con" of the direct stream api, this is just a need to understand >>>>> how Spark works. >>>>> >>>>> >>>>> >>>>> On Tue, May 12, 2015 at 10:30 PM, Dibyendu Bhattacharya < >>>>> dibyendu.bhattach...@gmail.com> wrote: >>>>> >>>>>> The low level consumer which Akhil mentioned , has been running in >>>>>> Pearson for last 4-5 months without any downtime. I think this one is the >>>>>> reliable "Receiver Based" Kafka consumer as of today for Spark .. if you >>>>>> say it that way .. >>>>>> >>>>>> Prior to Spark 1.3 other Receiver based consumers have used Kafka >>>>>> High level APIs which has serious issue with re-balancing and lesser >>>>>> fault >>>>>> tolerant aspect and data loss . >>>>>> >>>>>> Cody's implementation is definitely a good approach using direct >>>>>> stream , but both direct stream based approach and receiver based low >>>>>> level >>>>>> consumer approach has pros and cons. Like Receiver based approach need to >>>>>> use WAL for recovery from Driver failure which is a overhead for Kafka >>>>>> like >>>>>> system . For direct stream the offsets stored as check-pointed directory >>>>>> got lost if driver code is modified ..you can manage offset from your >>>>>> driver but for derived stream generated from this direct stream , there >>>>>> is >>>>>> no guarantee that batches are processed is order ( and offsets commits in >>>>>> order ) .. etc .. >>>>>> >>>>>> So whoever use whichever consumer need to study pros and cons of both >>>>>> approach before taking a call .. >>>>>> >>>>>> Regards, >>>>>> Dibyendu >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, May 12, 2015 at 8:10 PM, Akhil Das < >>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>> >>>>>>> Hi Cody, >>>>>>> I was just saying that i found more success and high throughput with >>>>>>> the low level kafka api prior to KafkfaRDDs which is the future it >>>>>>> seems. >>>>>>> My apologies if you felt it that way. :) >>>>>>> On 12 May 2015 19:47, "Cody Koeninger" <c...@koeninger.org> wrote: >>>>>>> >>>>>>>> Akhil, I hope I'm misreading the tone of this. If you have personal >>>>>>>> issues at stake, please take them up outside of the public list. If >>>>>>>> you >>>>>>>> have actual factual concerns about the kafka integration, please share >>>>>>>> them >>>>>>>> in a jira. >>>>>>>> >>>>>>>> Regarding reliability, here's a screenshot of a current production >>>>>>>> job with a 3 week uptime Was a month before that, only took it down to >>>>>>>> change code. >>>>>>>> >>>>>>>> http://tinypic.com/r/2e4vkht/8 >>>>>>>> >>>>>>>> Regarding flexibility, both of the apis available in spark will do >>>>>>>> what James needs, as I described. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, May 12, 2015 at 8:55 AM, Akhil Das < >>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>> >>>>>>>>> Hi Cody, >>>>>>>>> >>>>>>>>> If you are so sure, can you share a bench-marking (which you ran >>>>>>>>> for days maybe?) that you have done with Kafka APIs provided by Spark? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> On Tue, May 12, 2015 at 7:22 PM, Cody Koeninger < >>>>>>>>> c...@koeninger.org> wrote: >>>>>>>>> >>>>>>>>>> I don't think it's accurate for Akhil to claim that the linked >>>>>>>>>> library is "much more flexible/reliable" than what's available in >>>>>>>>>> Spark at >>>>>>>>>> this point. >>>>>>>>>> >>>>>>>>>> James, what you're describing is the default behavior for the >>>>>>>>>> createDirectStream api available as part of spark since 1.3. The >>>>>>>>>> kafka >>>>>>>>>> parameter auto.offset.reset defaults to largest, ie start at the most >>>>>>>>>> recent available message. >>>>>>>>>> >>>>>>>>>> This is described at >>>>>>>>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html >>>>>>>>>> The createDirectStream api implementation is described in detail at >>>>>>>>>> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md >>>>>>>>>> >>>>>>>>>> If for some reason you're stuck using an earlier version of >>>>>>>>>> spark, you can accomplish what you want simply by starting the job >>>>>>>>>> using a >>>>>>>>>> new consumer group (there will be no prior state in zookeeper, so it >>>>>>>>>> will >>>>>>>>>> start consuming according to auto.offset.reset) >>>>>>>>>> >>>>>>>>>> On Tue, May 12, 2015 at 7:26 AM, James King < >>>>>>>>>> jakwebin...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Very nice! will try and let you know, thanks. >>>>>>>>>>> >>>>>>>>>>> On Tue, May 12, 2015 at 2:25 PM, Akhil Das < >>>>>>>>>>> ak...@sigmoidanalytics.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Yep, you can try this lowlevel Kafka receiver >>>>>>>>>>>> https://github.com/dibbhatt/kafka-spark-consumer. Its much >>>>>>>>>>>> more flexible/reliable than the one comes with Spark. >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Best Regards >>>>>>>>>>>> >>>>>>>>>>>> On Tue, May 12, 2015 at 5:15 PM, James King < >>>>>>>>>>>> jakwebin...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> What I want is if the driver dies for some reason and it is >>>>>>>>>>>>> restarted I want to read only messages that arrived into Kafka >>>>>>>>>>>>> following >>>>>>>>>>>>> the restart of the driver program and re-connection to Kafka. >>>>>>>>>>>>> >>>>>>>>>>>>> Has anyone done this? any links or resources that can help >>>>>>>>>>>>> explain this? >>>>>>>>>>>>> >>>>>>>>>>>>> Regards >>>>>>>>>>>>> jk >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>>> >>>> >>> >> >