Recovery techniques for Spark Streaming scheduling delay

2016-06-22 Thread C. Josephson
We have a Spark Streaming application that has basically zero scheduling
delay for hours, but then suddenly it jumps up to multiple minutes and
spirals out of control (see screenshot of job manager here:
http://i.stack.imgur.com/kSftN.png)

This is happens after a while even if we double the batch interval.

We are not sure what causes the delay to happen (theories include garbage
collection). The cluster has generally low CPU utilization regardless of
whether we use 3, 5 or 10 slaves.

We are really reluctant to further increase the batch interval, since the
delay is zero for such long periods. Are there any techniques to improve
recovery time from a sudden spike in scheduling delay? We've tried seeing
if it will recover on its own, but it takes hours if it even recovers at all

Thanks,
-cjoseph


Re: Streaming scheduling delay

2015-03-01 Thread Josh J
On Fri, Feb 13, 2015 at 2:21 AM, Gerard Maas gerard.m...@gmail.com wrote:

 KafkaOutputServicePool


Could you please give an example code of how KafkaOutputServicePool would
look like? When I tried object pooling I end up with various not
serializable exceptions.

Thanks!
Josh


Re: Streaming scheduling delay

2015-02-12 Thread Tathagata Das
1. Can you try count()? Take often does not force the entire computation.
2. Can you give the full log. From the log it seems that the blocks are
added to two nodes but the tasks seem to be launched to different nodes. I
dont see any message removing the blocks. So need the whole log to debug
this.

TD


On Feb 12, 2015 8:52 PM, Tim Smith secs...@gmail.com wrote:

 1) Yes, if I disable writing out to kafka and replace it with some very
light weight action is rdd.take(1), the app is stable.

 2) The partitions I spoke of in the previous mail are the number of
partitions I create from each dStream. But yes, since I do processing and
writing out, per partition, each dStream partition ends up getting written
to a kafka partition. Flow is, broadly:
 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150
partitions) - Apply some transformation logic to each partition - write
out each partition to kafka (kafka has 23 partitions). Let me increase the
number of partitions on the kafka side and see if that helps.


 Here's what the main block of code looks like (I added persistence back):

 val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ =
KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString,
 Map(otherConf(kafkaConsumerTopic).toString - 1),
StorageLevel.MEMORY_AND_DISK_SER) }

 if (!configMap.keySet.isEmpty)
 {
  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each receiver across all
compute nodes to spread out processing load and allows per partition
processing
  // and, set persistence level to spill to disk along
with serialization
  val kInMsgParts =
k.repartition(otherConf(dStreamPartitions).toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

  val outdata =
kInMsgParts.map(x=myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

  // Write each transformed partition to Kafka via the
writer object
  outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

 val writer = new
KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

 writer.output(rec)

}) )
 }
 }


 Here's the life-cycle of a lost block:

 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0
(TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not compute
split, block input-4-1423758372200 not found
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0
(TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 1]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0
(TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 2]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0
(TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 3]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0
(TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 4]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0
(TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 5]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0
(TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 6]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0
(TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) [duplicate 7]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage 16291.0
(TID 1042600) on executor nodedn1-17-acme.com: java.lang.Exception (Could
not compute split, block input-4-1423758372200 not found) 

Re: Streaming scheduling delay

2015-02-12 Thread Saisai Shao
Hi Tim,

I think this code will still introduce shuffle even when you call
repartition on each input stream. Actually this style of implementation
will generate more jobs (job per each input stream) than union into one
stream as called DStream.union(), and union normally has no special
overhead as I understood.

Also as Cody said, creating Producer per partition could be a potential
overhead, producer pool or sharing the Producer for one executor might be
better :).


 // Process stream from each receiver separately
 // (do not attempt to merge all streams and then re-partition,
this causes un-necessary and high amount of shuffle in the job)
 for (k - kInStreams)
{
 // Re-partition stream from each receiver across all
compute nodes to spread out processing load and allows per partition
processing
 // and, set persistence level to spill to disk along with
serialization
 val kInMsgParts =
k.repartition(otherConf(dStreamPartitions).toInt).

2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org:

 outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

  val writer = new
 KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

  writer.output(rec)
 })
 )


 So this is creating a new kafka producer for every new output partition,
 right?  Have you tried pooling the producers?

 On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote:

 1) Yes, if I disable writing out to kafka and replace it with some very
 light weight action is rdd.take(1), the app is stable.

 2) The partitions I spoke of in the previous mail are the number of
 partitions I create from each dStream. But yes, since I do processing and
 writing out, per partition, each dStream partition ends up getting written
 to a kafka partition. Flow is, broadly:
 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150
 partitions) - Apply some transformation logic to each partition - write
 out each partition to kafka (kafka has 23 partitions). Let me increase the
 number of partitions on the kafka side and see if that helps.

 Here's what the main block of code looks like (I added persistence back):

 val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ =
 KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString,
 Map(otherConf(kafkaConsumerTopic).toString - 1),
 StorageLevel.MEMORY_AND_DISK_SER) }

 if (!configMap.keySet.isEmpty)
 {
  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each receiver across all
 compute nodes to spread out processing load and allows per partition
 processing
  // and, set persistence level to spill to disk along
 with serialization
  val kInMsgParts =
 k.repartition(otherConf(dStreamPartitions).toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

  val outdata =
 kInMsgParts.map(x=myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

  // Write each transformed partition to Kafka via the
 writer object
  outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

  val writer = new
 KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

  writer.output(rec)

 }) )
 }
 }


 Here's the life-cycle of a lost block:

 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
 memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
 memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
 disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
 disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0
 (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not
 compute split, block input-4-1423758372200 not found
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0
 (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception
 (Could not compute split, block input-4-1423758372200 not found) [duplicate
 1]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0
 (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception
 (Could not compute split, block input-4-1423758372200 not found) [duplicate
 2]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0
 (TID 1042584) on 

Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
Hi Gerard,

Great write-up and really good guidance in there.

I have to be honest, I don't know why but setting # of partitions for each
dStream to a low number (5-10) just causes the app to choke/crash. Setting
it to 20 gets the app going but with not so great delays. Bump it up to 30
and I start winning the war where processing time is consistently below
batch time window (20 seconds) except for a batch every few batches where
the compute time spikes 10x the usual.

Following your guide, I took out some logInfo statements I had in the app
but didn't seem to make much difference :(

With a higher time window (20 seconds), I got the app to run stably for a
few hours but then ran into the dreaded java.lang.Exception: Could not
compute split, block input-0-1423761240800 not found. Wonder if I need to
add RDD persistence back?

Also, I am reaching out to Virdata with some ProServ inquiries.

Thanks





On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com wrote:

 Hi Tim,

 From this:  There are 5 kafka receivers and each incoming stream is
 split into 40 partitions  I suspect that you're creating too many tasks
 for Spark to process on time.
 Could you try some of the 'knobs' I describe here to see if that would
 help?

 http://www.virdata.com/tuning-spark/

 -kr, Gerard.

 On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote:

 Just read the thread Are these numbers abnormal for spark streaming?
 and I think I am seeing similar results - that is - increasing the window
 seems to be the trick here. I will have to monitor for a few hours/days
 before I can conclude (there are so many knobs/dials).



 On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote:

 On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
 streaming app that consumes data from Kafka and writes it back to Kafka
 (different topic). My big problem has been Total Delay. While execution
 time is usually window size (in seconds), the total delay ranges from a
 minutes to hours(s) (keeps going up).

 For a little while, I thought I had solved the issue by bumping up the
 driver memory. Then I expanded my Kafka cluster to add more nodes and the
 issue came up again. I tried a few things to smoke out the issue and
 something tells me the driver is the bottleneck again:

 1) From my app, I took out the entire write-out-to-kafka piece. Sure
 enough, execution, scheduling delay and hence total delay fell to sub
 second. This assured me that whatever processing I do before writing back
 to kafka isn't the bottleneck.

 2) In my app, I had RDD persistence set at different points but my code
 wasn't really re-using any RDDs so I took out all explicit persist()
 statements. And added, spar...unpersist to true in the context. After
 this, it doesn't seem to matter how much memory I give my executor, the
 total delay seems to be in the same range. I tried per executor memory from
 2G to 12G with no change in total delay so executors aren't memory starved.
 Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
 used when per executor memory is set to 2GB, for example.

 3) Input rate in the kafka consumer restricts spikes in incoming data.

 4) Tried FIFO and FAIR but didn't make any difference.

 5) Adding executors beyond a certain points seems useless (I guess
 excess ones just sit idle).

 At any given point in time, the SparkUI shows only one batch pending
 processing. So with just one batch pending processing, why would the
 scheduling delay run into minutes/hours if execution time is within the
 batch window duration? There aren't any failed stages or jobs.

 Right now, I have 100 executors ( i have tried setting executors from
 50-150), each with 2GB and 4 cores and the driver running with 16GB. There
 are 5 kafka receivers and each incoming stream is split into 40 partitions.
 Per receiver, input rate is restricted to 2 messages per second.

 Can anyone help me with clues or areas to look into, for troubleshooting
 the issue?

 One nugget I found buried in the code says:
 The scheduler delay includes the network delay to send the task to the
 worker machine and to send back the result (but not the time to fetch the
 task result, if it needed to be fetched from the block manager on the
 worker).

 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

 Could this be an issue with the driver being a bottlneck? All the
 executors posting their logs/stats to the driver?

 Thanks,

 Tim




















Re: Streaming scheduling delay

2015-02-12 Thread Tathagata Das
Hey Tim,

Let me get the key points.
1. If you are not writing back to Kafka, the delay is stable? That is,
instead of foreachRDD { // write to kafka }  if you do dstream.count,
then the delay is stable. Right?
2. If so, then Kafka is the bottleneck. Is the number of partitions, that
you spoke of the in the second mail, that determines the parallelism in
writes? Is it stable with 30 partitions?

Regarding the block exception, could you give me a trace of info level
logging that leads to this error? Basically I want trace the lifecycle of
the block.

On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith secs...@gmail.com wrote:

 Hi Gerard,

 Great write-up and really good guidance in there.

 I have to be honest, I don't know why but setting # of partitions for each
 dStream to a low number (5-10) just causes the app to choke/crash. Setting
 it to 20 gets the app going but with not so great delays. Bump it up to 30
 and I start winning the war where processing time is consistently below
 batch time window (20 seconds) except for a batch every few batches where
 the compute time spikes 10x the usual.

 Following your guide, I took out some logInfo statements I had in the
 app but didn't seem to make much difference :(

 With a higher time window (20 seconds), I got the app to run stably for a
 few hours but then ran into the dreaded java.lang.Exception: Could not
 compute split, block input-0-1423761240800 not found. Wonder if I need to
 add RDD persistence back?

 Also, I am reaching out to Virdata with some ProServ inquiries.

 Thanks





 On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas gerard.m...@gmail.com
 wrote:

 Hi Tim,

 From this:  There are 5 kafka receivers and each incoming stream is
 split into 40 partitions  I suspect that you're creating too many tasks
 for Spark to process on time.
 Could you try some of the 'knobs' I describe here to see if that would
 help?

 http://www.virdata.com/tuning-spark/

 -kr, Gerard.

 On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith secs...@gmail.com wrote:

 Just read the thread Are these numbers abnormal for spark streaming?
 and I think I am seeing similar results - that is - increasing the window
 seems to be the trick here. I will have to monitor for a few hours/days
 before I can conclude (there are so many knobs/dials).



 On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote:

 On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
 streaming app that consumes data from Kafka and writes it back to Kafka
 (different topic). My big problem has been Total Delay. While execution
 time is usually window size (in seconds), the total delay ranges from a
 minutes to hours(s) (keeps going up).

 For a little while, I thought I had solved the issue by bumping up the
 driver memory. Then I expanded my Kafka cluster to add more nodes and the
 issue came up again. I tried a few things to smoke out the issue and
 something tells me the driver is the bottleneck again:

 1) From my app, I took out the entire write-out-to-kafka piece. Sure
 enough, execution, scheduling delay and hence total delay fell to sub
 second. This assured me that whatever processing I do before writing back
 to kafka isn't the bottleneck.

 2) In my app, I had RDD persistence set at different points but my code
 wasn't really re-using any RDDs so I took out all explicit persist()
 statements. And added, spar...unpersist to true in the context. After
 this, it doesn't seem to matter how much memory I give my executor, the
 total delay seems to be in the same range. I tried per executor memory from
 2G to 12G with no change in total delay so executors aren't memory starved.
 Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
 used when per executor memory is set to 2GB, for example.

 3) Input rate in the kafka consumer restricts spikes in incoming data.

 4) Tried FIFO and FAIR but didn't make any difference.

 5) Adding executors beyond a certain points seems useless (I guess
 excess ones just sit idle).

 At any given point in time, the SparkUI shows only one batch pending
 processing. So with just one batch pending processing, why would the
 scheduling delay run into minutes/hours if execution time is within the
 batch window duration? There aren't any failed stages or jobs.

 Right now, I have 100 executors ( i have tried setting executors from
 50-150), each with 2GB and 4 cores and the driver running with 16GB. There
 are 5 kafka receivers and each incoming stream is split into 40 partitions.
 Per receiver, input rate is restricted to 2 messages per second.

 Can anyone help me with clues or areas to look into, for
 troubleshooting the issue?

 One nugget I found buried in the code says:
 The scheduler delay includes the network delay to send the task to the
 worker machine and to send back the result (but not the time to fetch the
 task result, if it needed to be fetched from the block manager on the
 worker).

 

Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
1) Yes, if I disable writing out to kafka and replace it with some very
light weight action is rdd.take(1), the app is stable.

2) The partitions I spoke of in the previous mail are the number of
partitions I create from each dStream. But yes, since I do processing and
writing out, per partition, each dStream partition ends up getting written
to a kafka partition. Flow is, broadly:
5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150
partitions) - Apply some transformation logic to each partition - write
out each partition to kafka (kafka has 23 partitions). Let me increase the
number of partitions on the kafka side and see if that helps.

Here's what the main block of code looks like (I added persistence back):

val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ =
KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString,
Map(otherConf(kafkaConsumerTopic).toString - 1),
StorageLevel.MEMORY_AND_DISK_SER) }

if (!configMap.keySet.isEmpty)
{
 // Process stream from each receiver separately
 // (do not attempt to merge all streams and then re-partition,
this causes un-necessary and high amount of shuffle in the job)
 for (k - kInStreams)
{
 // Re-partition stream from each receiver across all
compute nodes to spread out processing load and allows per partition
processing
 // and, set persistence level to spill to disk along with
serialization
 val kInMsgParts =
k.repartition(otherConf(dStreamPartitions).toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

 val outdata =
kInMsgParts.map(x=myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

 // Write each transformed partition to Kafka via the
writer object
 outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

 val writer = new
KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

 writer.output(rec)
}) )
}
}


Here's the life-cycle of a lost block:

15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0 (TID
1042569, nodedn1-13-acme.com): java.lang.Exception: Could not compute
split, block input-4-1423758372200 not found
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0 (TID
1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 1]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0 (TID
1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 2]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0 (TID
1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 3]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0 (TID
1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 4]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0 (TID
1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 5]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0 (TID
1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 6]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0 (TID
1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 7]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage 16291.0 (TID
1042600) on executor nodedn1-17-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 8]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.9 in stage 16291.0 (TID
1042606) on executor nodedn1-20-acme.com: java.lang.Exception (Could not
compute split, block input-4-1423758372200 not found) [duplicate 9]
15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.10 in stage 16291.0
(TID 1042609) on executor 

Re: Streaming scheduling delay

2015-02-12 Thread Cody Koeninger
outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

 val writer = new
KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

 writer.output(rec)
}) )


So this is creating a new kafka producer for every new output partition,
right?  Have you tried pooling the producers?

On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote:

 1) Yes, if I disable writing out to kafka and replace it with some very
 light weight action is rdd.take(1), the app is stable.

 2) The partitions I spoke of in the previous mail are the number of
 partitions I create from each dStream. But yes, since I do processing and
 writing out, per partition, each dStream partition ends up getting written
 to a kafka partition. Flow is, broadly:
 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150
 partitions) - Apply some transformation logic to each partition - write
 out each partition to kafka (kafka has 23 partitions). Let me increase the
 number of partitions on the kafka side and see if that helps.

 Here's what the main block of code looks like (I added persistence back):

 val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ =
 KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString,
 Map(otherConf(kafkaConsumerTopic).toString - 1),
 StorageLevel.MEMORY_AND_DISK_SER) }

 if (!configMap.keySet.isEmpty)
 {
  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each receiver across all
 compute nodes to spread out processing load and allows per partition
 processing
  // and, set persistence level to spill to disk along with
 serialization
  val kInMsgParts =
 k.repartition(otherConf(dStreamPartitions).toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

  val outdata =
 kInMsgParts.map(x=myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)

  // Write each transformed partition to Kafka via the
 writer object
  outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

  val writer = new
 KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

  writer.output(rec)
 })
 )
 }
 }


 Here's the life-cycle of a lost block:

 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
 memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
 memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
 disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
 disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0
 (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not
 compute split, block input-4-1423758372200 not found
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0
 (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 1]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0
 (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 2]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0
 (TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 3]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0
 (TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 4]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0
 (TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 5]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0
 (TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 6]
 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0
 (TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could
 not compute split, block input-4-1423758372200 not found) [duplicate 7]
 15/02/12 16:32:27 INFO 

Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
I replaced the writeToKafka statements with a rdd.count() and sure enough,
I have a stable app with total delay well within the batch window (20
seconds). Here's the total delay lines from the driver log:
15/02/13 06:14:26 INFO JobScheduler: Total delay: 6.521 s for time
142380806 ms (execution: 6.404 s)
15/02/13 06:15:22 INFO JobScheduler: Total delay: 42.396 s for time
142380808 ms (execution: 42.338 s)
15/02/13 06:16:21 INFO JobScheduler: Total delay: 81.879 s for time
142380810 ms (execution: 59.483 s)
15/02/13 06:16:40 INFO JobScheduler: Total delay: 80.242 s for time
142380812 ms (execution: 18.363 s)
15/02/13 06:16:50 INFO JobScheduler: Total delay: 70.342 s for time
142380814 ms (execution: 10.100 s)
15/02/13 06:16:56 INFO JobScheduler: Total delay: 56.551 s for time
142380816 ms (execution: 6.209 s)
15/02/13 06:17:06 INFO JobScheduler: Total delay: 46.405 s for time
142380818 ms (execution: 9.854 s)
15/02/13 06:17:13 INFO JobScheduler: Total delay: 33.443 s for time
142380820 ms (execution: 7.038 s)
15/02/13 06:17:21 INFO JobScheduler: Total delay: 21.483 s for time
142380822 ms (execution: 8.039 s)
15/02/13 06:17:26 INFO JobScheduler: Total delay: 6.697 s for time
142380824 ms (execution: 5.213 s)
15/02/13 06:17:45 INFO JobScheduler: Total delay: 5.814 s for time
142380826 ms (execution: 5.767 s)
15/02/13 06:18:06 INFO JobScheduler: Total delay: 6.905 s for time
142380828 ms (execution: 6.858 s)
15/02/13 06:18:28 INFO JobScheduler: Total delay: 8.604 s for time
142380830 ms (execution: 8.556 s)
15/02/13 06:18:45 INFO JobScheduler: Total delay: 5.631 s for time
142380832 ms (execution: 5.583 s)
15/02/13 06:19:04 INFO JobScheduler: Total delay: 4.838 s for time
142380834 ms (execution: 4.791 s)
15/02/13 06:19:24 INFO JobScheduler: Total delay: 4.467 s for time
142380836 ms (execution: 4.422 s)
15/02/13 06:19:45 INFO JobScheduler: Total delay: 5.779 s for time
142380838 ms (execution: 5.733 s)
15/02/13 06:20:04 INFO JobScheduler: Total delay: 4.747 s for time
142380840 ms (execution: 4.701 s)
15/02/13 06:20:24 INFO JobScheduler: Total delay: 4.829 s for time
142380842 ms (execution: 4.782 s)
15/02/13 06:20:44 INFO JobScheduler: Total delay: 4.724 s for time
142380844 ms (execution: 4.678 s)
15/02/13 06:21:04 INFO JobScheduler: Total delay: 4.110 s for time
142380846 ms (execution: 4.064 s)
15/02/13 06:21:24 INFO JobScheduler: Total delay: 4.562 s for time
142380848 ms (execution: 4.514 s)
15/02/13 06:21:43 INFO JobScheduler: Total delay: 3.999 s for time
142380850 ms (execution: 3.954 s)
15/02/13 06:22:04 INFO JobScheduler: Total delay: 4.353 s for time
142380852 ms (execution: 4.309 s)
15/02/13 06:22:24 INFO JobScheduler: Total delay: 4.712 s for time
142380854 ms (execution: 4.667 s)
15/02/13 06:22:44 INFO JobScheduler: Total delay: 4.726 s for time
142380856 ms (execution: 4.681 s)
15/02/13 06:23:07 INFO JobScheduler: Total delay: 7.860 s for time
142380858 ms (execution: 7.816 s)
15/02/13 06:23:28 INFO JobScheduler: Total delay: 8.426 s for time
142380860 ms (execution: 8.383 s)
15/02/13 06:23:43 INFO JobScheduler: Total delay: 3.857 s for time
142380862 ms (execution: 3.814 s)
15/02/13 06:24:03 INFO JobScheduler: Total delay: 3.936 s for time
142380864 ms (execution: 3.892 s)
15/02/13 06:24:23 INFO JobScheduler: Total delay: 3.810 s for time
142380866 ms (execution: 3.767 s)
15/02/13 06:24:43 INFO JobScheduler: Total delay: 3.889 s for time
142380868 ms (execution: 3.845 s)
15/02/13 06:25:03 INFO JobScheduler: Total delay: 3.553 s for time
142380870 ms (execution: 3.510 s)
15/02/13 06:25:27 INFO JobScheduler: Total delay: 7.031 s for time
142380872 ms (execution: 6.989 s)
15/02/13 06:25:43 INFO JobScheduler: Total delay: 3.636 s for time
142380874 ms (execution: 3.594 s)
15/02/13 06:26:03 INFO JobScheduler: Total delay: 3.425 s for time
142380876 ms (execution: 3.383 s)
15/02/13 06:26:23 INFO JobScheduler: Total delay: 3.939 s for time
142380878 ms (execution: 3.897 s)
15/02/13 06:26:43 INFO JobScheduler: Total delay: 3.640 s for time
142380880 ms (execution: 3.596 s)
15/02/13 06:27:03 INFO JobScheduler: Total delay: 3.905 s for time
142380882 ms (execution: 3.861 s)
15/02/13 06:27:24 INFO JobScheduler: Total delay: 4.068 s for time
142380884 ms (execution: 4.026 s)




On Thu, Feb 12, 2015 at 9:54 PM, Tim Smith secs...@gmail.com wrote:

 TD - I will try count() and report back. Meanwhile, attached is the entire
 driver log that includes the error logs about missing blocks.

 Cody - Let me research a bit about how to do connection pooling. Sorry, I
 am not really a programmer. I did see the connection pooling advise in the
 Spark Streaming Programming guide as an optimization but wasn't sure how to
 implement it. But do you think it will have a significant impact on
 performance?

 Saisai - I think, ideally, I'd rather not do any 

Re: Streaming scheduling delay

2015-02-12 Thread Saisai Shao
Yes, you can try it. For example, if you have a cluster of 10 executors, 60
Kafka partitions, you can try to choose 10 receivers * 2 consumer threads,
so each thread will consume 3 partitions ideally, if you increase the
threads to 6, each threads will consume 1 partitions ideally. What I think
importantly is that each executor will have a receiver, so the data will be
distributed to each executor.

If you have a large cluster even number of executors are more than the
Kafka partitions, maybe you need to increase the Kafka partitions to
increase the parallelism, otherwise some of the computation resources may
be idle.

Besides if executors * consumers  Kafka partitions, the left consumers
beyond partition numbers will be idle, each partition could only be
consumed by one consumer.

We have a in house benchmark cluster with such deploy criterion, I'm not
sure if it works for you, you can try it.

Thanks
Saisai

2015-02-13 15:19 GMT+08:00 Tim Smith secs...@gmail.com:

 Hi Saisai,

 If I understand correctly, you are suggesting that control parallelism by
 having number of consumers/executors at least 1:1 for number of kafka
 partitions. For example, if I have 50 partitions for a kafka topic then
 either have:
 - 25 or more executors, 25 receivers, each receiver set to 2 consumer
 threads per topic, or,
 - 50 or more executors, 50 receivers, each receiver set to 1 consumer
 thread per topic

 Actually, both executors and total consumers can be more than the number
 of kafka partitions (some will probably sit idle).

 But do away with dStream partitioning altogether.

 Right?

 Thanks,

 - Tim




 On Thu, Feb 12, 2015 at 11:03 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Tim,

 I think maybe you can try this way:

 create Receiver per executor and specify thread for each topic large than
 1, and the total number of consumer thread will be: total consumer =
 (receiver number) * (thread number), and make sure this total consumer is
 less than or equal to Kafka partition number. In this case, I think the
 parallelism is enough, received blocks are distributed to each executor. So
 you don't need to repartition to increase the parallelism.

 Besides for Kafka's high-level API, Kafka partitions may not be equally
 distributed to all the receivers, so some tasks may process more data than
 other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3,
 that will be more balanced because each Kafka partition mapping to Spark
 partition.


 Besides set partition count to 1 for each dStream means
 dstream.repartition(1) ? If so I think it will still introduce shuffle and
 move all the data into one partition.

 Thanks
 Saisai

 2015-02-13 13:54 GMT+08:00 Tim Smith secs...@gmail.com:

 TD - I will try count() and report back. Meanwhile, attached is the
 entire driver log that includes the error logs about missing blocks.

 Cody - Let me research a bit about how to do connection pooling. Sorry,
 I am not really a programmer. I did see the connection pooling advise in
 the Spark Streaming Programming guide as an optimization but wasn't sure
 how to implement it. But do you think it will have a significant impact on
 performance?

 Saisai - I think, ideally, I'd rather not do any dStream partitioning.
 Instead have 1 receiver for each kafka partition (so in this case 23
 receivers for 23 kafka partitions) and then have as many or more executors
 to handle processing of the dStreams. Right? Trouble is, I tried this
 approach and didn't work. Even If I set 23 receivers, and set partition
 count to 1 for each dStream (effectively, no stream splitting), my
 performance is extremely poor/laggy. Should I modify my code to remove
 dStream partitioning altogether and then try setting as many receivers as
 kafka partitions?





 On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Tim,

 I think this code will still introduce shuffle even when you call
 repartition on each input stream. Actually this style of implementation
 will generate more jobs (job per each input stream) than union into one
 stream as called DStream.union(), and union normally has no special
 overhead as I understood.

 Also as Cody said, creating Producer per partition could be a potential
 overhead, producer pool or sharing the Producer for one executor might be
 better :).


  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each receiver across all
 compute nodes to spread out processing load and allows per partition
 processing
  // and, set persistence level to spill to disk along
 with serialization
  val kInMsgParts =
 k.repartition(otherConf(dStreamPartitions).toInt).

 2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org:

 

Re: Streaming scheduling delay

2015-02-12 Thread Saisai Shao
Hi Tim,

I think maybe you can try this way:

create Receiver per executor and specify thread for each topic large than
1, and the total number of consumer thread will be: total consumer =
(receiver number) * (thread number), and make sure this total consumer is
less than or equal to Kafka partition number. In this case, I think the
parallelism is enough, received blocks are distributed to each executor. So
you don't need to repartition to increase the parallelism.

Besides for Kafka's high-level API, Kafka partitions may not be equally
distributed to all the receivers, so some tasks may process more data than
other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3,
that will be more balanced because each Kafka partition mapping to Spark
partition.


Besides set partition count to 1 for each dStream means
dstream.repartition(1) ? If so I think it will still introduce shuffle and
move all the data into one partition.

Thanks
Saisai

2015-02-13 13:54 GMT+08:00 Tim Smith secs...@gmail.com:

 TD - I will try count() and report back. Meanwhile, attached is the entire
 driver log that includes the error logs about missing blocks.

 Cody - Let me research a bit about how to do connection pooling. Sorry, I
 am not really a programmer. I did see the connection pooling advise in the
 Spark Streaming Programming guide as an optimization but wasn't sure how to
 implement it. But do you think it will have a significant impact on
 performance?

 Saisai - I think, ideally, I'd rather not do any dStream partitioning.
 Instead have 1 receiver for each kafka partition (so in this case 23
 receivers for 23 kafka partitions) and then have as many or more executors
 to handle processing of the dStreams. Right? Trouble is, I tried this
 approach and didn't work. Even If I set 23 receivers, and set partition
 count to 1 for each dStream (effectively, no stream splitting), my
 performance is extremely poor/laggy. Should I modify my code to remove
 dStream partitioning altogether and then try setting as many receivers as
 kafka partitions?





 On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Tim,

 I think this code will still introduce shuffle even when you call
 repartition on each input stream. Actually this style of implementation
 will generate more jobs (job per each input stream) than union into one
 stream as called DStream.union(), and union normally has no special
 overhead as I understood.

 Also as Cody said, creating Producer per partition could be a potential
 overhead, producer pool or sharing the Producer for one executor might be
 better :).


  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each receiver across all
 compute nodes to spread out processing load and allows per partition
 processing
  // and, set persistence level to spill to disk along
 with serialization
  val kInMsgParts =
 k.repartition(otherConf(dStreamPartitions).toInt).

 2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org:

 outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

  val writer = new
 KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

  writer.output(rec)

 }) )


 So this is creating a new kafka producer for every new output partition,
 right?  Have you tried pooling the producers?

 On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote:

 1) Yes, if I disable writing out to kafka and replace it with some very
 light weight action is rdd.take(1), the app is stable.

 2) The partitions I spoke of in the previous mail are the number of
 partitions I create from each dStream. But yes, since I do processing and
 writing out, per partition, each dStream partition ends up getting written
 to a kafka partition. Flow is, broadly:
 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150
 partitions) - Apply some transformation logic to each partition - write
 out each partition to kafka (kafka has 23 partitions). Let me increase the
 number of partitions on the kafka side and see if that helps.

 Here's what the main block of code looks like (I added persistence
 back):

 val kInStreams = (1 to otherConf(inStreamCount).toInt).map{_ =
 KafkaUtils.createStream(ssc,otherConf(kafkaConsumerZk).toString,otherConf(kafkaConsumerGroupId).toString,
 Map(otherConf(kafkaConsumerTopic).toString - 1),
 StorageLevel.MEMORY_AND_DISK_SER) }

 if (!configMap.keySet.isEmpty)
 {
  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each 

Re: Streaming scheduling delay

2015-02-12 Thread Tim Smith
Hi Saisai,

If I understand correctly, you are suggesting that control parallelism by
having number of consumers/executors at least 1:1 for number of kafka
partitions. For example, if I have 50 partitions for a kafka topic then
either have:
- 25 or more executors, 25 receivers, each receiver set to 2 consumer
threads per topic, or,
- 50 or more executors, 50 receivers, each receiver set to 1 consumer
thread per topic

Actually, both executors and total consumers can be more than the number of
kafka partitions (some will probably sit idle).

But do away with dStream partitioning altogether.

Right?

Thanks,

- Tim




On Thu, Feb 12, 2015 at 11:03 PM, Saisai Shao sai.sai.s...@gmail.com
wrote:

 Hi Tim,

 I think maybe you can try this way:

 create Receiver per executor and specify thread for each topic large than
 1, and the total number of consumer thread will be: total consumer =
 (receiver number) * (thread number), and make sure this total consumer is
 less than or equal to Kafka partition number. In this case, I think the
 parallelism is enough, received blocks are distributed to each executor. So
 you don't need to repartition to increase the parallelism.

 Besides for Kafka's high-level API, Kafka partitions may not be equally
 distributed to all the receivers, so some tasks may process more data than
 other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3,
 that will be more balanced because each Kafka partition mapping to Spark
 partition.


 Besides set partition count to 1 for each dStream means
 dstream.repartition(1) ? If so I think it will still introduce shuffle and
 move all the data into one partition.

 Thanks
 Saisai

 2015-02-13 13:54 GMT+08:00 Tim Smith secs...@gmail.com:

 TD - I will try count() and report back. Meanwhile, attached is the
 entire driver log that includes the error logs about missing blocks.

 Cody - Let me research a bit about how to do connection pooling. Sorry, I
 am not really a programmer. I did see the connection pooling advise in the
 Spark Streaming Programming guide as an optimization but wasn't sure how to
 implement it. But do you think it will have a significant impact on
 performance?

 Saisai - I think, ideally, I'd rather not do any dStream partitioning.
 Instead have 1 receiver for each kafka partition (so in this case 23
 receivers for 23 kafka partitions) and then have as many or more executors
 to handle processing of the dStreams. Right? Trouble is, I tried this
 approach and didn't work. Even If I set 23 receivers, and set partition
 count to 1 for each dStream (effectively, no stream splitting), my
 performance is extremely poor/laggy. Should I modify my code to remove
 dStream partitioning altogether and then try setting as many receivers as
 kafka partitions?





 On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Hi Tim,

 I think this code will still introduce shuffle even when you call
 repartition on each input stream. Actually this style of implementation
 will generate more jobs (job per each input stream) than union into one
 stream as called DStream.union(), and union normally has no special
 overhead as I understood.

 Also as Cody said, creating Producer per partition could be a potential
 overhead, producer pool or sharing the Producer for one executor might be
 better :).


  // Process stream from each receiver separately
  // (do not attempt to merge all streams and then re-partition,
 this causes un-necessary and high amount of shuffle in the job)
  for (k - kInStreams)
 {
  // Re-partition stream from each receiver across all
 compute nodes to spread out processing load and allows per partition
 processing
  // and, set persistence level to spill to disk along
 with serialization
  val kInMsgParts =
 k.repartition(otherConf(dStreamPartitions).toInt).

 2015-02-13 13:27 GMT+08:00 Cody Koeninger c...@koeninger.org:

 outdata.foreachRDD( rdd = rdd.foreachPartition(rec = {

  val writer = new
 KafkaOutputService(otherConf(kafkaProducerTopic).toString, propsMap)

  writer.output(rec)

 }) )


 So this is creating a new kafka producer for every new output
 partition, right?  Have you tried pooling the producers?

 On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith secs...@gmail.com wrote:

 1) Yes, if I disable writing out to kafka and replace it with some
 very light weight action is rdd.take(1), the app is stable.

 2) The partitions I spoke of in the previous mail are the number of
 partitions I create from each dStream. But yes, since I do processing and
 writing out, per partition, each dStream partition ends up getting written
 to a kafka partition. Flow is, broadly:
 5 Spark/Kafka Receivers - Split each dStream into 30 partitions (150
 partitions) - Apply some transformation logic to each partition - write
 out each partition to kafka (kafka has 23 partitions). Let me increase the
 number of 

Re: Streaming scheduling delay

2015-02-11 Thread Tim Smith
Just read the thread Are these numbers abnormal for spark streaming? and
I think I am seeing similar results - that is - increasing the window seems
to be the trick here. I will have to monitor for a few hours/days before I
can conclude (there are so many knobs/dials).



On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith secs...@gmail.com wrote:

 On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
 streaming app that consumes data from Kafka and writes it back to Kafka
 (different topic). My big problem has been Total Delay. While execution
 time is usually window size (in seconds), the total delay ranges from a
 minutes to hours(s) (keeps going up).

 For a little while, I thought I had solved the issue by bumping up the
 driver memory. Then I expanded my Kafka cluster to add more nodes and the
 issue came up again. I tried a few things to smoke out the issue and
 something tells me the driver is the bottleneck again:

 1) From my app, I took out the entire write-out-to-kafka piece. Sure
 enough, execution, scheduling delay and hence total delay fell to sub
 second. This assured me that whatever processing I do before writing back
 to kafka isn't the bottleneck.

 2) In my app, I had RDD persistence set at different points but my code
 wasn't really re-using any RDDs so I took out all explicit persist()
 statements. And added, spar...unpersist to true in the context. After
 this, it doesn't seem to matter how much memory I give my executor, the
 total delay seems to be in the same range. I tried per executor memory from
 2G to 12G with no change in total delay so executors aren't memory starved.
 Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
 used when per executor memory is set to 2GB, for example.

 3) Input rate in the kafka consumer restricts spikes in incoming data.

 4) Tried FIFO and FAIR but didn't make any difference.

 5) Adding executors beyond a certain points seems useless (I guess excess
 ones just sit idle).

 At any given point in time, the SparkUI shows only one batch pending
 processing. So with just one batch pending processing, why would the
 scheduling delay run into minutes/hours if execution time is within the
 batch window duration? There aren't any failed stages or jobs.

 Right now, I have 100 executors ( i have tried setting executors from
 50-150), each with 2GB and 4 cores and the driver running with 16GB. There
 are 5 kafka receivers and each incoming stream is split into 40 partitions.
 Per receiver, input rate is restricted to 2 messages per second.

 Can anyone help me with clues or areas to look into, for troubleshooting
 the issue?

 One nugget I found buried in the code says:
 The scheduler delay includes the network delay to send the task to the
 worker machine and to send back the result (but not the time to fetch the
 task result, if it needed to be fetched from the block manager on the
 worker).

 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

 Could this be an issue with the driver being a bottlneck? All the
 executors posting their logs/stats to the driver?

 Thanks,

 Tim


















Streaming scheduling delay

2015-02-11 Thread Tim Smith
On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
streaming app that consumes data from Kafka and writes it back to Kafka
(different topic). My big problem has been Total Delay. While execution
time is usually window size (in seconds), the total delay ranges from a
minutes to hours(s) (keeps going up).

For a little while, I thought I had solved the issue by bumping up the
driver memory. Then I expanded my Kafka cluster to add more nodes and the
issue came up again. I tried a few things to smoke out the issue and
something tells me the driver is the bottleneck again:

1) From my app, I took out the entire write-out-to-kafka piece. Sure
enough, execution, scheduling delay and hence total delay fell to sub
second. This assured me that whatever processing I do before writing back
to kafka isn't the bottleneck.

2) In my app, I had RDD persistence set at different points but my code
wasn't really re-using any RDDs so I took out all explicit persist()
statements. And added, spar...unpersist to true in the context. After
this, it doesn't seem to matter how much memory I give my executor, the
total delay seems to be in the same range. I tried per executor memory from
2G to 12G with no change in total delay so executors aren't memory starved.
Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
used when per executor memory is set to 2GB, for example.

3) Input rate in the kafka consumer restricts spikes in incoming data.

4) Tried FIFO and FAIR but didn't make any difference.

5) Adding executors beyond a certain points seems useless (I guess excess
ones just sit idle).

At any given point in time, the SparkUI shows only one batch pending
processing. So with just one batch pending processing, why would the
scheduling delay run into minutes/hours if execution time is within the
batch window duration? There aren't any failed stages or jobs.

Right now, I have 100 executors ( i have tried setting executors from
50-150), each with 2GB and 4 cores and the driver running with 16GB. There
are 5 kafka receivers and each incoming stream is split into 40 partitions.
Per receiver, input rate is restricted to 2 messages per second.

Can anyone help me with clues or areas to look into, for troubleshooting
the issue?

One nugget I found buried in the code says:
The scheduler delay includes the network delay to send the task to the
worker machine and to send back the result (but not the time to fetch the
task result, if it needed to be fetched from the block manager on the
worker).
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Could this be an issue with the driver being a bottlneck? All the executors
posting their logs/stats to the driver?

Thanks,

Tim