I dont think there was any enhancments that can change this behavior. On Fri, Jun 19, 2015 at 6:16 PM, Tim Smith <secs...@gmail.com> wrote:
> On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das <t...@databricks.com> > wrote: > >> Also, can you find from the spark UI the break up of the stages in each >> batch's jobs, and find which stage is taking more time after a while? >> > > Sure, will try to debug/troubleshoot. Are there enhancements to this > specific API between 1.3 and 1.4 that can substantially change it's > behaviour? > > >> On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> when you say your old version was >>> >>> k = createStream ..... >>> >>> were you manually creating multiple receivers? Because otherwise you're >>> only using one receiver on one executor... >>> >> > Yes, sorry, the earlier/stable version was more like: > kInStreams = (1 to n).map{_ => KafkaUtils.createStream ............ // n > being the number of kafka partitions, 1 receiver per partition > val k = ssc.union(kInStreams) > val dataout = k.map(x=>myFunc(x._2,someParams)) > dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => { > myOutputFunc.write(rec) }) > > Thanks, > > Tim > > > > > >> >>> If that's the case I'd try direct stream without the repartitioning. >>> >>> >>> On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith <secs...@gmail.com> wrote: >>> >>>> Essentially, I went from: >>>> k = createStream ..... >>>> val dataout = k.map(x=>myFunc(x._2,someParams)) >>>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => { >>>> myOutputFunc.write(rec) }) >>>> >>>> To: >>>> kIn = createDirectStream ..... >>>> k = kIn.repartition(numberOfExecutors) //since #kafka partitions < >>>> #spark-executors >>>> val dataout = k.map(x=>myFunc(x._2,someParams)) >>>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => { >>>> myOutputFunc.write(rec) }) >>>> >>>> With the new API, the app starts up and works fine for a while but I >>>> guess starts to deteriorate after a while. With the existing API >>>> "createStream", the app does deteriorate but over a much longer period, >>>> hours vs days. >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das <t...@databricks.com> >>>> wrote: >>>> >>>>> Yes, please tell us what operation are you using. >>>>> >>>>> TD >>>>> >>>>> On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> Is there any more info you can provide / relevant code? >>>>>> >>>>>> On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith <secs...@gmail.com> wrote: >>>>>> >>>>>>> Update on performance of the new API: the new code using the >>>>>>> createDirectStream API ran overnight and when I checked the app state in >>>>>>> the morning, there were massive scheduling delays :( >>>>>>> >>>>>>> Not sure why and haven't investigated a whole lot. For now, switched >>>>>>> back to the createStream API build of my app. Yes, for the record, this >>>>>>> is >>>>>>> with CDH 5.4.1 and Spark 1.3. >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith <secs...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks for the super-fast response, TD :) >>>>>>>> >>>>>>>> I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. >>>>>>>> Cloudera, are you listening? :D >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das < >>>>>>>> tathagata.das1...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Are you using Spark 1.3.x ? That explains. This issue has been >>>>>>>>> fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more >>>>>>>>> awesome stats. :) >>>>>>>>> >>>>>>>>> On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith <secs...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I just switched from "createStream" to the "createDirectStream" >>>>>>>>>> API for kafka and while things otherwise seem happy, the first thing >>>>>>>>>> I >>>>>>>>>> noticed is that stream/receiver stats are gone from the Spark UI :( >>>>>>>>>> Those >>>>>>>>>> stats were very handy for keeping an eye on health of the app. >>>>>>>>>> >>>>>>>>>> What's the best way to re-create those in the Spark UI? Maintain >>>>>>>>>> Accumulators? Would be really nice to get back receiver-like stats >>>>>>>>>> even >>>>>>>>>> though I understand that "createDirectStream" is a receiver-less >>>>>>>>>> design. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> >>>>>>>>>> Tim >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >