Re: createDirectStream and Stats

2015-07-13 Thread Cody Koeninger
Reading from kafka is always going to be bounded by the number of kafka
partitions you have, regardless of what you're using to read it.

If most of your time is coming from calculation, not reading, then yes a
spark repartition will help.  If most of your time is coming just from
reading, you need more kafka partitions.

On Sun, Jul 12, 2015 at 6:30 AM, gaurav sharma sharmagaura...@gmail.com
wrote:

 Hi guys,

 I too am facing similar challenge with directstream.

 I have 3 Kafka Partitions.

 and running spark on 18 cores, with parallelism level set to 48.

 I am running simple map-reduce job on incoming stream.

 Though the reduce stage takes milliseconds-seconds for around 15 million
 packets, the Map stage takes around 4-5 minutes, since it creates only 3
 tasks for Map stage(I believe 3 tasks because I have 3 kafka partitions and
 the

 JavaPairDStreamString, String kafkaStream =
 KafkaConnector.getKafkaStream(jsc);

 kafkaStream that i create in code is the parent Rdd for Map Job, so it
 would create only 3 tasks)


 I have 10 such jobs similar to above one working on same KafkaStream i
 create

 Could you guys please advise, if repartitioning the KafkaStream (taking
 into account the rechuffle at repartition stage) would optimize my overall
 batch processing time.


 On Sat, Jun 20, 2015 at 7:24 PM, Silvio Fiorito 
 silvio.fior...@granturing.com wrote:

  Are you sure you were using all 100 executors even with the receiver
 model? Because in receiver mode, the number of partitions is dependent on
 the batch duration and block interval. It may not necessarily map directly
 to the number of executors in your app unless you've adjusted the block
 interval and batch duration.

  *From:* Tim Smith secs...@gmail.com
 *Sent:* ‎Friday‎, ‎June‎ ‎19‎, ‎2015 ‎10‎:‎36‎ ‎PM
 *To:* user@spark.apache.org

  I did try without repartition, initially, but that was even more
 horrible because instead of the allocated 100 executors, only 30 (which is
 the number of kafka partitions) would have to do the work. The MyFunc is
 a CPU bound task so adding more memory per executor wouldn't help and I saw
 that each of the 30 executors was only using one thread/core on each Spark
 box. I could go and play with threading in MyFunc but I don't want to mess
 with threading with all the parallelism already involved and I don't think
 in-app threading outside of what the framework does is really desirable.

  With repartition, there is shuffle involved, but at least the
 computation load spreads across all 100 executors instead of just 30.




 On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.org
 wrote:

 If that's the case, you're still only using as many read executors as
 there are kafka partitions.

  I'd remove the repartition. If you weren't doing any shuffles in the
 old job, and are doing a shuffle in the new job, it's not really comparable.

 On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.com wrote:

  On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com
 wrote:

 Also, can you find from the spark UI the break up of the stages in
 each batch's jobs, and find which stage is taking more time after a while?


  Sure, will try to debug/troubleshoot. Are there enhancements to this
 specific API between 1.3 and 1.4 that can substantially change it's
 behaviour?


   On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org
 wrote:

 when you say your old version was

  k = createStream .

  were you manually creating multiple receivers?  Because otherwise
 you're only using one receiver on one executor...


  Yes, sorry, the earlier/stable version was more like:
  kInStreams = (1 to n).map{_ = KafkaUtils.createStream 
 // n being the number of kafka partitions, 1 receiver per partition
 val k = ssc.union(kInStreams)
  val dataout = k.map(x=myFunc(x._2,someParams))
  dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

  Thanks,

  Tim






  If that's the case I'd try direct stream without the repartitioning.


 On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote:

  Essentially, I went from:
  k = createStream .
 val dataout = k.map(x=myFunc(x._2,someParams))
  dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

  To:
 kIn = createDirectStream .
 k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
 #spark-executors
 val dataout = k.map(x=myFunc(x._2,someParams))
  dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

  With the new API, the app starts up and works fine for a while but
 I guess starts to deteriorate after a while. With the existing API
 createStream, the app does deteriorate but over a much longer period,
 hours vs days.






 On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Yes, please tell us what operation are you using.

  TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger 
 

Re: createDirectStream and Stats

2015-07-12 Thread gaurav sharma
Hi guys,

I too am facing similar challenge with directstream.

I have 3 Kafka Partitions.

and running spark on 18 cores, with parallelism level set to 48.

I am running simple map-reduce job on incoming stream.

Though the reduce stage takes milliseconds-seconds for around 15 million
packets, the Map stage takes around 4-5 minutes, since it creates only 3
tasks for Map stage(I believe 3 tasks because I have 3 kafka partitions and
the

JavaPairDStreamString, String kafkaStream =
KafkaConnector.getKafkaStream(jsc);

kafkaStream that i create in code is the parent Rdd for Map Job, so it
would create only 3 tasks)


I have 10 such jobs similar to above one working on same KafkaStream i
create

Could you guys please advise, if repartitioning the KafkaStream (taking
into account the rechuffle at repartition stage) would optimize my overall
batch processing time.


On Sat, Jun 20, 2015 at 7:24 PM, Silvio Fiorito 
silvio.fior...@granturing.com wrote:

  Are you sure you were using all 100 executors even with the receiver
 model? Because in receiver mode, the number of partitions is dependent on
 the batch duration and block interval. It may not necessarily map directly
 to the number of executors in your app unless you've adjusted the block
 interval and batch duration.

  *From:* Tim Smith secs...@gmail.com
 *Sent:* ‎Friday‎, ‎June‎ ‎19‎, ‎2015 ‎10‎:‎36‎ ‎PM
 *To:* user@spark.apache.org

  I did try without repartition, initially, but that was even more
 horrible because instead of the allocated 100 executors, only 30 (which is
 the number of kafka partitions) would have to do the work. The MyFunc is
 a CPU bound task so adding more memory per executor wouldn't help and I saw
 that each of the 30 executors was only using one thread/core on each Spark
 box. I could go and play with threading in MyFunc but I don't want to mess
 with threading with all the parallelism already involved and I don't think
 in-app threading outside of what the framework does is really desirable.

  With repartition, there is shuffle involved, but at least the
 computation load spreads across all 100 executors instead of just 30.




 On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.org
 wrote:

 If that's the case, you're still only using as many read executors as
 there are kafka partitions.

  I'd remove the repartition. If you weren't doing any shuffles in the
 old job, and are doing a shuffle in the new job, it's not really comparable.

 On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.com wrote:

  On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com
 wrote:

 Also, can you find from the spark UI the break up of the stages in each
 batch's jobs, and find which stage is taking more time after a while?


  Sure, will try to debug/troubleshoot. Are there enhancements to this
 specific API between 1.3 and 1.4 that can substantially change it's
 behaviour?


   On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org
 wrote:

 when you say your old version was

  k = createStream .

  were you manually creating multiple receivers?  Because otherwise
 you're only using one receiver on one executor...


  Yes, sorry, the earlier/stable version was more like:
  kInStreams = (1 to n).map{_ = KafkaUtils.createStream  //
 n being the number of kafka partitions, 1 receiver per partition
 val k = ssc.union(kInStreams)
  val dataout = k.map(x=myFunc(x._2,someParams))
  dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

  Thanks,

  Tim






  If that's the case I'd try direct stream without the repartitioning.


 On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote:

  Essentially, I went from:
  k = createStream .
 val dataout = k.map(x=myFunc(x._2,someParams))
  dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

  To:
 kIn = createDirectStream .
 k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
 #spark-executors
 val dataout = k.map(x=myFunc(x._2,someParams))
  dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

  With the new API, the app starts up and works fine for a while but
 I guess starts to deteriorate after a while. With the existing API
 createStream, the app does deteriorate but over a much longer period,
 hours vs days.






 On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Yes, please tell us what operation are you using.

  TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
  wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com
 wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state 
 in
 the morning, there were massive scheduling delays :(

  Not sure why and haven't investigated a whole lot. For now,
 switched back to 

Re: createDirectStream and Stats

2015-06-20 Thread Silvio Fiorito
Are you sure you were using all 100 executors even with the receiver model? 
Because in receiver mode, the number of partitions is dependent on the batch 
duration and block interval. It may not necessarily map directly to the number 
of executors in your app unless you've adjusted the block interval and batch 
duration.

From: Tim Smithmailto:secs...@gmail.com
Sent: ‎Friday‎, ‎June‎ ‎19‎, ‎2015 ‎10‎:‎36‎ ‎PM
To: user@spark.apache.orgmailto:user@spark.apache.org

I did try without repartition, initially, but that was even more horrible 
because instead of the allocated 100 executors, only 30 (which is the number of 
kafka partitions) would have to do the work. The MyFunc is a CPU bound task 
so adding more memory per executor wouldn't help and I saw that each of the 30 
executors was only using one thread/core on each Spark box. I could go and play 
with threading in MyFunc but I don't want to mess with threading with all the 
parallelism already involved and I don't think in-app threading outside of what 
the framework does is really desirable.

With repartition, there is shuffle involved, but at least the computation load 
spreads across all 100 executors instead of just 30.




On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger 
c...@koeninger.orgmailto:c...@koeninger.org wrote:
If that's the case, you're still only using as many read executors as there are 
kafka partitions.

I'd remove the repartition. If you weren't doing any shuffles in the old job, 
and are doing a shuffle in the new job, it's not really comparable.

On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith 
secs...@gmail.commailto:secs...@gmail.com wrote:
On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:
Also, can you find from the spark UI the break up of the stages in each batch's 
jobs, and find which stage is taking more time after a while?

Sure, will try to debug/troubleshoot. Are there enhancements to this specific 
API between 1.3 and 1.4 that can substantially change it's behaviour?

On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger 
c...@koeninger.orgmailto:c...@koeninger.org wrote:
when you say your old version was

k = createStream .

were you manually creating multiple receivers?  Because otherwise you're only 
using one receiver on one executor...

Yes, sorry, the earlier/stable version was more like:
kInStreams = (1 to n).map{_ = KafkaUtils.createStream  // n being 
the number of kafka partitions, 1 receiver per partition
val k = ssc.union(kInStreams)
val dataout = k.map(x=myFunc(x._2,someParams))
dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { 
myOutputFunc.write(rec) })

Thanks,

Tim





If that's the case I'd try direct stream without the repartitioning.


On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith 
secs...@gmail.commailto:secs...@gmail.com wrote:
Essentially, I went from:
k = createStream .
val dataout = k.map(x=myFunc(x._2,someParams))
dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { 
myOutputFunc.write(rec) })

To:
kIn = createDirectStream .
k = kIn.repartition(numberOfExecutors) //since #kafka partitions  
#spark-executors
val dataout = k.map(x=myFunc(x._2,someParams))
dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = { 
myOutputFunc.write(rec) })

With the new API, the app starts up and works fine for a while but I guess 
starts to deteriorate after a while. With the existing API createStream, the 
app does deteriorate but over a much longer period, hours vs days.






On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:
Yes, please tell us what operation are you using.

TD

On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger 
c...@koeninger.orgmailto:c...@koeninger.org wrote:
Is there any more info you can provide / relevant code?

On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith 
secs...@gmail.commailto:secs...@gmail.com wrote:
Update on performance of the new API: the new code using the createDirectStream 
API ran overnight and when I checked the app state in the morning, there were 
massive scheduling delays :(

Not sure why and haven't investigated a whole lot. For now, switched back to 
the createStream API build of my app. Yes, for the record, this is with CDH 
5.4.1 and Spark 1.3.



On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith 
secs...@gmail.commailto:secs...@gmail.com wrote:
Thanks for the super-fast response, TD :)

I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera, are 
you listening? :D





On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com wrote:
Are you using Spark 1.3.x ? That explains. This issue has been fixed in Spark 
1.4.0. Bonus you get a fancy new streaming UI with more awesome stats. :)

On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith 
secs...@gmail.commailto:secs...@gmail.com wrote:
Hi,

I just switched from createStream to the createDirectStream API for kafka 
and while things otherwise seem 

Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
when you say your old version was

k = createStream .

were you manually creating multiple receivers?  Because otherwise you're
only using one receiver on one executor...

If that's the case I'd try direct stream without the repartitioning.


On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote:

 Essentially, I went from:
 k = createStream .
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 To:
 kIn = createDirectStream .
 k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
 #spark-executors
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 With the new API, the app starts up and works fine for a while but I guess
 starts to deteriorate after a while. With the existing API createStream,
 the app does deteriorate but over a much longer period, hours vs days.






 On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Yes, please tell us what operation are you using.

 TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now, switched
 back to the createStream API build of my app. Yes, for the record, this is
 with CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
 Cloudera, are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been fixed
 in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
 stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API
 for kafka and while things otherwise seem happy, the first thing I 
 noticed
 is that stream/receiver stats are gone from the Spark UI :( Those stats
 were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim











Re: createDirectStream and Stats

2015-06-19 Thread Tathagata Das
Yes, please tell us what operation are you using.

TD

On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now, switched back
 to the createStream API build of my app. Yes, for the record, this is with
 CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera,
 are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been fixed in
 Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
 stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API
 for kafka and while things otherwise seem happy, the first thing I noticed
 is that stream/receiver stats are gone from the Spark UI :( Those stats
 were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim









Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
Essentially, I went from:
k = createStream .
val dataout = k.map(x=myFunc(x._2,someParams))
dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
myOutputFunc.write(rec) })

To:
kIn = createDirectStream .
k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
#spark-executors
val dataout = k.map(x=myFunc(x._2,someParams))
dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
myOutputFunc.write(rec) })

With the new API, the app starts up and works fine for a while but I guess
starts to deteriorate after a while. With the existing API createStream,
the app does deteriorate but over a much longer period, hours vs days.






On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com wrote:

 Yes, please tell us what operation are you using.

 TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now, switched
 back to the createStream API build of my app. Yes, for the record, this is
 with CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
 Cloudera, are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been fixed
 in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
 stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API
 for kafka and while things otherwise seem happy, the first thing I 
 noticed
 is that stream/receiver stats are gone from the Spark UI :( Those stats
 were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim










Re: createDirectStream and Stats

2015-06-19 Thread Tathagata Das
I dont think there was any enhancments that can change this behavior.

On Fri, Jun 19, 2015 at 6:16 PM, Tim Smith secs...@gmail.com wrote:

 On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com
 wrote:

 Also, can you find from the spark UI the break up of the stages in each
 batch's jobs, and find which stage is taking more time after a while?


 Sure, will try to debug/troubleshoot. Are there enhancements to this
 specific API between 1.3 and 1.4 that can substantially change it's
 behaviour?


 On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org
 wrote:

 when you say your old version was

 k = createStream .

 were you manually creating multiple receivers?  Because otherwise you're
 only using one receiver on one executor...


 Yes, sorry, the earlier/stable version was more like:
 kInStreams = (1 to n).map{_ = KafkaUtils.createStream  // n
 being the number of kafka partitions, 1 receiver per partition
 val k = ssc.union(kInStreams)
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 Thanks,

 Tim






 If that's the case I'd try direct stream without the repartitioning.


 On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote:

 Essentially, I went from:
 k = createStream .
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 To:
 kIn = createDirectStream .
 k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
 #spark-executors
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 With the new API, the app starts up and works fine for a while but I
 guess starts to deteriorate after a while. With the existing API
 createStream, the app does deteriorate but over a much longer period,
 hours vs days.






 On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Yes, please tell us what operation are you using.

 TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now, switched
 back to the createStream API build of my app. Yes, for the record, this 
 is
 with CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com
 wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
 Cloudera, are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been
 fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more
 awesome stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com
 wrote:

 Hi,

 I just switched from createStream to the createDirectStream
 API for kafka and while things otherwise seem happy, the first thing 
 I
 noticed is that stream/receiver stats are gone from the Spark UI :( 
 Those
 stats were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats 
 even
 though I understand that createDirectStream is a receiver-less 
 design.

 Thanks,

 Tim














Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
If that's the case, you're still only using as many read executors as there
are kafka partitions.

I'd remove the repartition. If you weren't doing any shuffles in the old
job, and are doing a shuffle in the new job, it's not really comparable.

On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.com wrote:

 On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com
 wrote:

 Also, can you find from the spark UI the break up of the stages in each
 batch's jobs, and find which stage is taking more time after a while?


 Sure, will try to debug/troubleshoot. Are there enhancements to this
 specific API between 1.3 and 1.4 that can substantially change it's
 behaviour?


 On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org
 wrote:

 when you say your old version was

 k = createStream .

 were you manually creating multiple receivers?  Because otherwise you're
 only using one receiver on one executor...


 Yes, sorry, the earlier/stable version was more like:
 kInStreams = (1 to n).map{_ = KafkaUtils.createStream  // n
 being the number of kafka partitions, 1 receiver per partition
 val k = ssc.union(kInStreams)
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 Thanks,

 Tim






 If that's the case I'd try direct stream without the repartitioning.


 On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote:

 Essentially, I went from:
 k = createStream .
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 To:
 kIn = createDirectStream .
 k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
 #spark-executors
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 With the new API, the app starts up and works fine for a while but I
 guess starts to deteriorate after a while. With the existing API
 createStream, the app does deteriorate but over a much longer period,
 hours vs days.






 On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Yes, please tell us what operation are you using.

 TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now, switched
 back to the createStream API build of my app. Yes, for the record, this 
 is
 with CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com
 wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
 Cloudera, are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been
 fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more
 awesome stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com
 wrote:

 Hi,

 I just switched from createStream to the createDirectStream
 API for kafka and while things otherwise seem happy, the first thing 
 I
 noticed is that stream/receiver stats are gone from the Spark UI :( 
 Those
 stats were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats 
 even
 though I understand that createDirectStream is a receiver-less 
 design.

 Thanks,

 Tim














Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
I did try without repartition, initially, but that was even more horrible
because instead of the allocated 100 executors, only 30 (which is the
number of kafka partitions) would have to do the work. The MyFunc is a
CPU bound task so adding more memory per executor wouldn't help and I saw
that each of the 30 executors was only using one thread/core on each Spark
box. I could go and play with threading in MyFunc but I don't want to mess
with threading with all the parallelism already involved and I don't think
in-app threading outside of what the framework does is really desirable.

With repartition, there is shuffle involved, but at least the computation
load spreads across all 100 executors instead of just 30.




On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.org wrote:

 If that's the case, you're still only using as many read executors as
 there are kafka partitions.

 I'd remove the repartition. If you weren't doing any shuffles in the old
 job, and are doing a shuffle in the new job, it's not really comparable.

 On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.com wrote:

 On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com
 wrote:

 Also, can you find from the spark UI the break up of the stages in each
 batch's jobs, and find which stage is taking more time after a while?


 Sure, will try to debug/troubleshoot. Are there enhancements to this
 specific API between 1.3 and 1.4 that can substantially change it's
 behaviour?


 On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org
 wrote:

 when you say your old version was

 k = createStream .

 were you manually creating multiple receivers?  Because otherwise
 you're only using one receiver on one executor...


 Yes, sorry, the earlier/stable version was more like:
 kInStreams = (1 to n).map{_ = KafkaUtils.createStream  // n
 being the number of kafka partitions, 1 receiver per partition
 val k = ssc.union(kInStreams)
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 Thanks,

 Tim






 If that's the case I'd try direct stream without the repartitioning.


 On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote:

 Essentially, I went from:
 k = createStream .
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 To:
 kIn = createDirectStream .
 k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
 #spark-executors
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 With the new API, the app starts up and works fine for a while but I
 guess starts to deteriorate after a while. With the existing API
 createStream, the app does deteriorate but over a much longer period,
 hours vs days.






 On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Yes, please tell us what operation are you using.

 TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com
 wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state 
 in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now,
 switched back to the createStream API build of my app. Yes, for the 
 record,
 this is with CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com
 wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
 Cloudera, are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been
 fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with 
 more
 awesome stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com
 wrote:

 Hi,

 I just switched from createStream to the createDirectStream
 API for kafka and while things otherwise seem happy, the first 
 thing I
 noticed is that stream/receiver stats are gone from the Spark UI :( 
 Those
 stats were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats 
 even
 though I understand that createDirectStream is a receiver-less 
 design.

 Thanks,

 Tim















Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
So were you repartitioning with the original job as well?

On Fri, Jun 19, 2015 at 9:36 PM, Tim Smith secs...@gmail.com wrote:

 I did try without repartition, initially, but that was even more horrible
 because instead of the allocated 100 executors, only 30 (which is the
 number of kafka partitions) would have to do the work. The MyFunc is a
 CPU bound task so adding more memory per executor wouldn't help and I saw
 that each of the 30 executors was only using one thread/core on each Spark
 box. I could go and play with threading in MyFunc but I don't want to mess
 with threading with all the parallelism already involved and I don't think
 in-app threading outside of what the framework does is really desirable.

 With repartition, there is shuffle involved, but at least the computation
 load spreads across all 100 executors instead of just 30.




 On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger c...@koeninger.org
 wrote:

 If that's the case, you're still only using as many read executors as
 there are kafka partitions.

 I'd remove the repartition. If you weren't doing any shuffles in the old
 job, and are doing a shuffle in the new job, it's not really comparable.

 On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith secs...@gmail.com wrote:

 On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com
 wrote:

 Also, can you find from the spark UI the break up of the stages in each
 batch's jobs, and find which stage is taking more time after a while?


 Sure, will try to debug/troubleshoot. Are there enhancements to this
 specific API between 1.3 and 1.4 that can substantially change it's
 behaviour?


 On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org
 wrote:

 when you say your old version was

 k = createStream .

 were you manually creating multiple receivers?  Because otherwise
 you're only using one receiver on one executor...


 Yes, sorry, the earlier/stable version was more like:
 kInStreams = (1 to n).map{_ = KafkaUtils.createStream  // n
 being the number of kafka partitions, 1 receiver per partition
 val k = ssc.union(kInStreams)
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 Thanks,

 Tim






 If that's the case I'd try direct stream without the repartitioning.


 On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote:

 Essentially, I went from:
 k = createStream .
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 To:
 kIn = createDirectStream .
 k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
 #spark-executors
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 With the new API, the app starts up and works fine for a while but I
 guess starts to deteriorate after a while. With the existing API
 createStream, the app does deteriorate but over a much longer period,
 hours vs days.






 On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Yes, please tell us what operation are you using.

 TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
  wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com
 wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state 
 in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now,
 switched back to the createStream API build of my app. Yes, for the 
 record,
 this is with CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com
 wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
 Cloudera, are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been
 fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with 
 more
 awesome stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com
 wrote:

 Hi,

 I just switched from createStream to the createDirectStream
 API for kafka and while things otherwise seem happy, the first 
 thing I
 noticed is that stream/receiver stats are gone from the Spark UI 
 :( Those
 stats were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI?
 Maintain Accumulators? Would be really nice to get back 
 receiver-like stats
 even though I understand that createDirectStream is a 
 receiver-less
 design.

 Thanks,

 Tim
















Re: createDirectStream and Stats

2015-06-19 Thread Tathagata Das
Also, can you find from the spark UI the break up of the stages in each
batch's jobs, and find which stage is taking more time after a while?



On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org wrote:

 when you say your old version was

 k = createStream .

 were you manually creating multiple receivers?  Because otherwise you're
 only using one receiver on one executor...

 If that's the case I'd try direct stream without the repartitioning.


 On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote:

 Essentially, I went from:
 k = createStream .
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 To:
 kIn = createDirectStream .
 k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
 #spark-executors
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 With the new API, the app starts up and works fine for a while but I
 guess starts to deteriorate after a while. With the existing API
 createStream, the app does deteriorate but over a much longer period,
 hours vs days.






 On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Yes, please tell us what operation are you using.

 TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now, switched
 back to the createStream API build of my app. Yes, for the record, this is
 with CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
 Cloudera, are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been fixed
 in Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
 stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com
 wrote:

 Hi,

 I just switched from createStream to the createDirectStream API
 for kafka and while things otherwise seem happy, the first thing I 
 noticed
 is that stream/receiver stats are gone from the Spark UI :( Those stats
 were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less 
 design.

 Thanks,

 Tim












Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das t...@databricks.com wrote:

 Also, can you find from the spark UI the break up of the stages in each
 batch's jobs, and find which stage is taking more time after a while?


Sure, will try to debug/troubleshoot. Are there enhancements to this
specific API between 1.3 and 1.4 that can substantially change it's
behaviour?


 On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger c...@koeninger.org
 wrote:

 when you say your old version was

 k = createStream .

 were you manually creating multiple receivers?  Because otherwise you're
 only using one receiver on one executor...


Yes, sorry, the earlier/stable version was more like:
kInStreams = (1 to n).map{_ = KafkaUtils.createStream  // n
being the number of kafka partitions, 1 receiver per partition
val k = ssc.union(kInStreams)
val dataout = k.map(x=myFunc(x._2,someParams))
dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
myOutputFunc.write(rec) })

Thanks,

Tim






 If that's the case I'd try direct stream without the repartitioning.


 On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith secs...@gmail.com wrote:

 Essentially, I went from:
 k = createStream .
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 To:
 kIn = createDirectStream .
 k = kIn.repartition(numberOfExecutors) //since #kafka partitions 
 #spark-executors
 val dataout = k.map(x=myFunc(x._2,someParams))
 dataout.foreachRDD ( rdd = rdd.foreachPartition(rec = {
 myOutputFunc.write(rec) })

 With the new API, the app starts up and works fine for a while but I
 guess starts to deteriorate after a while. With the existing API
 createStream, the app does deteriorate but over a much longer period,
 hours vs days.






 On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das t...@databricks.com
 wrote:

 Yes, please tell us what operation are you using.

 TD

 On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger c...@koeninger.org
 wrote:

 Is there any more info you can provide / relevant code?

 On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now, switched
 back to the createStream API build of my app. Yes, for the record, this 
 is
 with CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
 Cloudera, are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been
 fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with more
 awesome stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com
 wrote:

 Hi,

 I just switched from createStream to the createDirectStream
 API for kafka and while things otherwise seem happy, the first thing I
 noticed is that stream/receiver stats are gone from the Spark UI :( 
 Those
 stats were very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats 
 even
 though I understand that createDirectStream is a receiver-less 
 design.

 Thanks,

 Tim













Re: createDirectStream and Stats

2015-06-19 Thread Tim Smith
Update on performance of the new API: the new code using the
createDirectStream API ran overnight and when I checked the app state in
the morning, there were massive scheduling delays :(

Not sure why and haven't investigated a whole lot. For now, switched back
to the createStream API build of my app. Yes, for the record, this is with
CDH 5.4.1 and Spark 1.3.



On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera,
 are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been fixed in
 Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
 stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API for
 kafka and while things otherwise seem happy, the first thing I noticed is
 that stream/receiver stats are gone from the Spark UI :( Those stats were
 very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim







Re: createDirectStream and Stats

2015-06-19 Thread Cody Koeninger
Is there any more info you can provide / relevant code?

On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith secs...@gmail.com wrote:

 Update on performance of the new API: the new code using the
 createDirectStream API ran overnight and when I checked the app state in
 the morning, there were massive scheduling delays :(

 Not sure why and haven't investigated a whole lot. For now, switched back
 to the createStream API build of my app. Yes, for the record, this is with
 CDH 5.4.1 and Spark 1.3.



 On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith secs...@gmail.com wrote:

 Thanks for the super-fast response, TD :)

 I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera,
 are you listening? :D





 On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been fixed in
 Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
 stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API for
 kafka and while things otherwise seem happy, the first thing I noticed is
 that stream/receiver stats are gone from the Spark UI :( Those stats were
 very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim








Re: createDirectStream and Stats

2015-06-18 Thread Tim Smith
Thanks for the super-fast response, TD :)

I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4. Cloudera,
are you listening? :D





On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Are you using Spark 1.3.x ? That explains. This issue has been fixed in
 Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
 stats. :)

 On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API for
 kafka and while things otherwise seem happy, the first thing I noticed is
 that stream/receiver stats are gone from the Spark UI :( Those stats were
 very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim






Re: createDirectStream and Stats

2015-06-18 Thread Tathagata Das
Are you using Spark 1.3.x ? That explains. This issue has been fixed in
Spark 1.4.0. Bonus you get a fancy new streaming UI with more awesome
stats. :)

On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith secs...@gmail.com wrote:

 Hi,

 I just switched from createStream to the createDirectStream API for
 kafka and while things otherwise seem happy, the first thing I noticed is
 that stream/receiver stats are gone from the Spark UI :( Those stats were
 very handy for keeping an eye on health of the app.

 What's the best way to re-create those in the Spark UI? Maintain
 Accumulators? Would be really nice to get back receiver-like stats even
 though I understand that createDirectStream is a receiver-less design.

 Thanks,

 Tim





createDirectStream and Stats

2015-06-18 Thread Tim Smith
Hi,

I just switched from createStream to the createDirectStream API for
kafka and while things otherwise seem happy, the first thing I noticed is
that stream/receiver stats are gone from the Spark UI :( Those stats were
very handy for keeping an eye on health of the app.

What's the best way to re-create those in the Spark UI? Maintain
Accumulators? Would be really nice to get back receiver-like stats even
though I understand that createDirectStream is a receiver-less design.

Thanks,

Tim