Re: createDirectStream and Stats

2015-07-13 Thread Cody Koeninger
Reading from kafka is always going to be bounded by the number of kafka partitions you have, regardless of what you're using to read it. If most of your time is coming from calculation, not reading, then yes a spark repartition will help. If most of your time is coming just from reading, you

Re: createDirectStream and Stats

2015-07-12 Thread gaurav sharma
Hi guys, I too am facing similar challenge with directstream. I have 3 Kafka Partitions. and running spark on 18 cores, with parallelism level set to 48. I am running simple map-reduce job on incoming stream. Though the reduce stage takes milliseconds-seconds for around 15 million packets,

Re: createDirectStream and Stats

2015-06-20 Thread Silvio Fiorito
Are you sure you were using all 100 executors even with the receiver model? Because in receiver mode, the number of partitions is dependent on the batch duration and block interval. It may not necessarily map directly to the number of executors in your app unless you've adjusted the block

Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith

Re: createDirectStream and Stats

2015-06-19 Thread Tathagata Das
Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API:

Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors

Re: createDirectStream and Stats

2015-06-19 Thread Tathagata Das
I dont think there was any enhancments that can change this behavior. On Fri, Jun 19, 2015 at 6:16 PM, Tim Smith secs...@gmail.com wrote: On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's

Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
If that's the case, you're still only using as many read executors as there are kafka partitions. I'd remove the repartition. If you weren't doing any shuffles in the old job, and are doing a shuffle in the new job, it's not really comparable. On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith

Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
I did try without repartition, initially, but that was even more horrible because instead of the allocated 100 executors, only 30 (which is the number of kafka partitions) would have to do the work. The MyFunc is a CPU bound task so adding more memory per executor wouldn't help and I saw that each

Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
So were you repartitioning with the original job as well? On Fri, Jun 19, 2015 at 9:36 PM, Tim Smith secs...@gmail.com wrote: I did try without repartition, initially, but that was even more horrible because instead of the allocated 100 executors, only 30 (which is the number of kafka

Re: createDirectStream and Stats

2015-06-19 Thread Tathagata Das
Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you

Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? Sure, will try to debug/troubleshoot. Are there enhancements to this specific

Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my

Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive

Re: createDirectStream and Stats

2015-06-18 Thread Tim Smith
Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in

Re: createDirectStream and Stats

2015-06-18 Thread Tathagata Das
Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for

createDirectStream and Stats

2015-06-18 Thread Tim Smith
Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to