Re: createDirectStream and Stats
Reading from kafka is always going to be bounded by the number of kafka partitions you have, regardless of what you're using to read it. If most of your time is coming from calculation, not reading, then yes a spark repartition will help. If most of your time is coming just from reading, you need more kafka partitions. On Sun, Jul 12, 2015 at 6:30 AM, gaurav sharma sharmagaura...@gmail.com wrote: Hi guys, I too am facing similar challenge with directstream. I have 3 Kafka Partitions. and running spark on 18 cores, with parallelism level set to 48. I am running simple map-reduce job on incoming stream. Though the reduce stage takes milliseconds-seconds for around 15 million packets, the Map stage takes around 4-5 minutes, since it creates only 3 tasks for Map stage(I believe 3 tasks because I have 3 kafka partitions and the JavaPairDStreamString, String kafkaStream = KafkaConnector.getKafkaStream(jsc); kafkaStream that i create in code is the parent Rdd for Map Job, so it would create only 3 tasks) I have 10 such jobs similar to above one working on same KafkaStream i create Could you guys please advise, if repartitioning the KafkaStream (taking into account the rechuffle at repartition stage) would optimize my overall batch processing time. On Sat, Jun 20, 2015 at 7:24 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Are you sure you were using all 100 executors even with the receiver model? Because in receiver mode, the number of partitions is dependent on the batch duration and block interval. It may not necessarily map directly to the number of executors in your app unless you've adjusted the block interval and batch duration. *From:* Tim Smith secs...@gmail.com *Sent:* Friday, June 19, 2015 10:36 PM *To:* user@spark.apache.org I did try without repartition, initially, but that was even more horrible because instead of the allocated 100 executors, only 30 (which is the number of kafka partitions) would have to do the work. The MyFunc is a CPU bound task so adding more memory per executor wouldn't help and I saw that each of the 30 executors was only using one thread/core on each Spark box. I could go and play with threading in MyFunc but I don't want to mess with threading with all the parallelism already involved and I don't think in-app threading outside of what the framework does is really desirable. With repartition, there is shuffle involved, but at least the computation load spreads across all 100 executors instead of just 30. On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.org wrote: If that's the case, you're still only using as many read executors as there are kafka partitions. I'd remove the repartition. If you weren't doing any shuffles in the old job, and are doing a shuffle in the new job, it's not really comparable. On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.com wrote: On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? Sure, will try to debug/troubleshoot. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... Yes, sorry, the earlier/stable version was more like: kInStreams = (1 to n).map{_ = KafkaUtils.createStream // n being the number of kafka partitions, 1 receiver per partition val k = ssc.union(kInStreams) val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) Thanks, Tim If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger
Re: createDirectStream and Stats
Hi guys, I too am facing similar challenge with directstream. I have 3 Kafka Partitions. and running spark on 18 cores, with parallelism level set to 48. I am running simple map-reduce job on incoming stream. Though the reduce stage takes milliseconds-seconds for around 15 million packets, the Map stage takes around 4-5 minutes, since it creates only 3 tasks for Map stage(I believe 3 tasks because I have 3 kafka partitions and the JavaPairDStreamString, String kafkaStream = KafkaConnector.getKafkaStream(jsc); kafkaStream that i create in code is the parent Rdd for Map Job, so it would create only 3 tasks) I have 10 such jobs similar to above one working on same KafkaStream i create Could you guys please advise, if repartitioning the KafkaStream (taking into account the rechuffle at repartition stage) would optimize my overall batch processing time. On Sat, Jun 20, 2015 at 7:24 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Are you sure you were using all 100 executors even with the receiver model? Because in receiver mode, the number of partitions is dependent on the batch duration and block interval. It may not necessarily map directly to the number of executors in your app unless you've adjusted the block interval and batch duration. *From:* Tim Smith secs...@gmail.com *Sent:* Friday, June 19, 2015 10:36 PM *To:* user@spark.apache.org I did try without repartition, initially, but that was even more horrible because instead of the allocated 100 executors, only 30 (which is the number of kafka partitions) would have to do the work. The MyFunc is a CPU bound task so adding more memory per executor wouldn't help and I saw that each of the 30 executors was only using one thread/core on each Spark box. I could go and play with threading in MyFunc but I don't want to mess with threading with all the parallelism already involved and I don't think in-app threading outside of what the framework does is really desirable. With repartition, there is shuffle involved, but at least the computation load spreads across all 100 executors instead of just 30. On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.org wrote: If that's the case, you're still only using as many read executors as there are kafka partitions. I'd remove the repartition. If you weren't doing any shuffles in the old job, and are doing a shuffle in the new job, it's not really comparable. On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.com wrote: On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? Sure, will try to debug/troubleshoot. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... Yes, sorry, the earlier/stable version was more like: kInStreams = (1 to n).map{_ = KafkaUtils.createStream // n being the number of kafka partitions, 1 receiver per partition val k = ssc.union(kInStreams) val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) Thanks, Tim If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to
Re: createDirectStream and Stats
Are you sure you were using all 100 executors even with the receiver model? Because in receiver mode, the number of partitions is dependent on the batch duration and block interval. It may not necessarily map directly to the number of executors in your app unless you've adjusted the block interval and batch duration. From: Tim Smithmailto:secs...@gmail.com Sent: Friday, June 19, 2015 10:36 PM To: user@spark.apache.orgmailto:user@spark.apache.org I did try without repartition, initially, but that was even more horrible because instead of the allocated 100 executors, only 30 (which is the number of kafka partitions) would have to do the work. The MyFunc is a CPU bound task so adding more memory per executor wouldn't help and I saw that each of the 30 executors was only using one thread/core on each Spark box. I could go and play with threading in MyFunc but I don't want to mess with threading with all the parallelism already involved and I don't think in-app threading outside of what the framework does is really desirable. With repartition, there is shuffle involved, but at least the computation load spreads across all 100 executors instead of just 30. On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org wrote: If that's the case, you're still only using as many read executors as there are kafka partitions. I'd remove the repartition. If you weren't doing any shuffles in the old job, and are doing a shuffle in the new job, it's not really comparable. On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.commailto:secs...@gmail.com wrote: On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? Sure, will try to debug/troubleshoot. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... Yes, sorry, the earlier/stable version was more like: kInStreams = (1 to n).map{_ = KafkaUtils.createStream // n being the number of kafka partitions, 1 receiver per partition val k = ssc.union(kInStreams) val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) Thanks, Tim If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.commailto:secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.commailto:secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.commailto:secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.commailto:secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem
Re: createDirectStream and Stats
when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
I dont think there was any enhancments that can change this behavior. On Fri, Jun 19, 2015 at 6:16 PM, Tim Smith secs...@gmail.com wrote: On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? Sure, will try to debug/troubleshoot. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... Yes, sorry, the earlier/stable version was more like: kInStreams = (1 to n).map{_ = KafkaUtils.createStream // n being the number of kafka partitions, 1 receiver per partition val k = ssc.union(kInStreams) val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) Thanks, Tim If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
If that's the case, you're still only using as many read executors as there are kafka partitions. I'd remove the repartition. If you weren't doing any shuffles in the old job, and are doing a shuffle in the new job, it's not really comparable. On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.com wrote: On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? Sure, will try to debug/troubleshoot. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... Yes, sorry, the earlier/stable version was more like: kInStreams = (1 to n).map{_ = KafkaUtils.createStream // n being the number of kafka partitions, 1 receiver per partition val k = ssc.union(kInStreams) val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) Thanks, Tim If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
I did try without repartition, initially, but that was even more horrible because instead of the allocated 100 executors, only 30 (which is the number of kafka partitions) would have to do the work. The MyFunc is a CPU bound task so adding more memory per executor wouldn't help and I saw that each of the 30 executors was only using one thread/core on each Spark box. I could go and play with threading in MyFunc but I don't want to mess with threading with all the parallelism already involved and I don't think in-app threading outside of what the framework does is really desirable. With repartition, there is shuffle involved, but at least the computation load spreads across all 100 executors instead of just 30. On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.org wrote: If that's the case, you're still only using as many read executors as there are kafka partitions. I'd remove the repartition. If you weren't doing any shuffles in the old job, and are doing a shuffle in the new job, it's not really comparable. On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.com wrote: On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? Sure, will try to debug/troubleshoot. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... Yes, sorry, the earlier/stable version was more like: kInStreams = (1 to n).map{_ = KafkaUtils.createStream // n being the number of kafka partitions, 1 receiver per partition val k = ssc.union(kInStreams) val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) Thanks, Tim If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
So were you repartitioning with the original job as well? On Fri, Jun 19, 2015 at 9:36 PM, Tim Smith secs...@gmail.com wrote: I did try without repartition, initially, but that was even more horrible because instead of the allocated 100 executors, only 30 (which is the number of kafka partitions) would have to do the work. The MyFunc is a CPU bound task so adding more memory per executor wouldn't help and I saw that each of the 30 executors was only using one thread/core on each Spark box. I could go and play with threading in MyFunc but I don't want to mess with threading with all the parallelism already involved and I don't think in-app threading outside of what the framework does is really desirable. With repartition, there is shuffle involved, but at least the computation load spreads across all 100 executors instead of just 30. On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.org wrote: If that's the case, you're still only using as many read executors as there are kafka partitions. I'd remove the repartition. If you weren't doing any shuffles in the old job, and are doing a shuffle in the new job, it's not really comparable. On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.com wrote: On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? Sure, will try to debug/troubleshoot. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... Yes, sorry, the earlier/stable version was more like: kInStreams = (1 to n).map{_ = KafkaUtils.createStream // n being the number of kafka partitions, 1 receiver per partition val k = ssc.union(kInStreams) val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) Thanks, Tim If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote: Also, can you find from the spark UI the break up of the stages in each batch's jobs, and find which stage is taking more time after a while? Sure, will try to debug/troubleshoot. Are there enhancements to this specific API between 1.3 and 1.4 that can substantially change it's behaviour? On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote: when you say your old version was k = createStream . were you manually creating multiple receivers? Because otherwise you're only using one receiver on one executor... Yes, sorry, the earlier/stable version was more like: kInStreams = (1 to n).map{_ = KafkaUtils.createStream // n being the number of kafka partitions, 1 receiver per partition val k = ssc.union(kInStreams) val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) Thanks, Tim If that's the case I'd try direct stream without the repartitioning. On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote: Essentially, I went from: k = createStream . val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) To: kIn = createDirectStream . k = kIn.repartition(numberOfExecutors) //since #kafka partitions #spark-executors val dataout = k.map(x=myFunc(x._2,someParams)) dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { myOutputFunc.write(rec) }) With the new API, the app starts up and works fine for a while but I guess starts to deteriorate after a while. With the existing API createStream, the app does deteriorate but over a much longer period, hours vs days. On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote: Yes, please tell us what operation are you using. TD On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote: Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
Is there any more info you can provide / relevant code? On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote: Update on performance of the new API: the new code using the createDirectStream API ran overnight and when I checked the app state in the morning, there were massive scheduling delays :( Not sure why and haven't investigated a whole lot. For now, switched back to the createStream API build of my app. Yes, for the record, this is with CDH 5.4.1 and Spark 1.3. On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote: Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
Thanks for the super-fast response, TD :) I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are you listening? :D On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
Re: createDirectStream and Stats
Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :) On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote: Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim
createDirectStream and Stats
Hi, I just switched from createStream to the createDirectStream API for kafka and while things otherwise seem happy, the first thing I noticed is that stream/receiver stats are gone from the Spark UI :( Those stats were very handy for keeping an eye on health of the app. What's the best way to re-create those in the Spark UI? Maintain Accumulators? Would be really nice to get back receiver-like stats even though I understand that createDirectStream is a receiver-less design. Thanks, Tim