outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {

 val writer = new
KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)

 writer.output(rec)
                                                                        }) )


So this is creating a new kafka producer for every new output partition,
right?  Have you tried pooling the producers?

On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith <secs...@gmail.com> wrote:

> 1) Yes, if I disable writing out to kafka and replace it with some very
> light weight action is rdd.take(1), the app is stable.
>
> 2) The partitions I spoke of in the previous mail are the number of
> partitions I create from each dStream. But yes, since I do processing and
> writing out, per partition, each dStream partition ends up getting written
> to a kafka partition. Flow is, broadly:
> 5 Spark/Kafka Receivers -> Split each dStream into 30 partitions (150
> partitions) -> Apply some transformation logic to each partition -> write
> out each partition to kafka (kafka has 23 partitions). Let me increase the
> number of partitions on the kafka side and see if that helps.
>
> Here's what the main block of code looks like (I added persistence back):
>
> val kInStreams = (1 to otherConf("inStreamCount").toInt).map{_ =>
> KafkaUtils.createStream(ssc,otherConf("kafkaConsumerZk").toString,otherConf("kafkaConsumerGroupId").toString,
> Map(otherConf("kafkaConsumerTopic").toString -> 1),
> StorageLevel.MEMORY_AND_DISK_SER) }
>
> if (!configMap.keySet.isEmpty)
>         {
>          // Process stream from each receiver separately
>          // (do not attempt to merge all streams and then re-partition,
> this causes un-necessary and high amount of shuffle in the job)
>          for (k <- kInStreams)
>                 {
>                  // Re-partition stream from each receiver across all
> compute nodes to spread out processing load and allows per partition
> processing
>                  // and, set persistence level to spill to disk along with
> serialization
>                  val kInMsgParts =
> k.repartition(otherConf("dStreamPartitions").toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>
>                  val outdata =
> kInMsgParts.map(x=>myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>
>                  // Write each transformed partition to Kafka via the
> "writer" object
>                  outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {
>
>  val writer = new
> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
>
>  writer.output(rec)
>                                                                         })
> )
>                 }
>         }
>
>
> Here's the life-cycle of a lost block:
>
> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
> memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
> memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
> 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
> disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
> 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
> disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
> 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0
> (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not
> compute split, block input-4-1423758372200 not found
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0
> (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 1]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0
> (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 2]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0
> (TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 3]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0
> (TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 4]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0
> (TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 5]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0
> (TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 6]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0
> (TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 7]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage 16291.0
> (TID 1042600) on executor nodedn1-17-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 8]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.9 in stage 16291.0
> (TID 1042606) on executor nodedn1-20-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 9]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.10 in stage 16291.0
> (TID 1042609) on executor nodedn1-19-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 10]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.11 in stage 16291.0
> (TID 1042611) on executor nodedn1-14-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 11]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.12 in stage 16291.0
> (TID 1042615) on executor nodedn1-22-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 12]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.13 in stage 16291.0
> (TID 1042618) on executor nodedn1-19-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 13]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.14 in stage 16291.0
> (TID 1042622) on executor nodedn1-21-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 14]
> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.15 in stage 16291.0
> (TID 1042625) on executor nodedn1-16-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 15]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.16 in stage 16291.0
> (TID 1042628) on executor nodedn1-20-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 16]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.17 in stage 16291.0
> (TID 1042633) on executor nodedn1-16-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 17]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.18 in stage 16291.0
> (TID 1042636) on executor nodedn1-17-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 18]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.19 in stage 16291.0
> (TID 1042640) on executor nodedn1-17-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 19]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.20 in stage 16291.0
> (TID 1042642) on executor nodedn1-16-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 20]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.21 in stage 16291.0
> (TID 1042645) on executor nodedn1-16-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 21]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.22 in stage 16291.0
> (TID 1042648) on executor nodedn1-14-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 22]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.23 in stage 16291.0
> (TID 1042654) on executor nodedn1-23-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 23]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.24 in stage 16291.0
> (TID 1042660) on executor nodedn1-22-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 24]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.25 in stage 16291.0
> (TID 1042662) on executor nodedn1-21-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 25]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.26 in stage 16291.0
> (TID 1042663) on executor nodedn1-14-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 26]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.27 in stage 16291.0
> (TID 1042665) on executor nodedn1-17-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 27]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.28 in stage 16291.0
> (TID 1042667) on executor nodedn1-19-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 28]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.29 in stage 16291.0
> (TID 1042671) on executor nodedn1-14-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 29]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.30 in stage 16291.0
> (TID 1042672) on executor nodedn1-12-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 30]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.31 in stage 16291.0
> (TID 1042674) on executor nodedn1-14-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 31]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.32 in stage 16291.0
> (TID 1042677) on executor nodedn1-19-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 32]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.33 in stage 16291.0
> (TID 1042680) on executor nodedn1-22-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 33]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.34 in stage 16291.0
> (TID 1042681) on executor nodedn1-20-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 34]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.35 in stage 16291.0
> (TID 1042682) on executor nodedn1-17-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 35]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.36 in stage 16291.0
> (TID 1042687) on executor nodedn1-20-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 36]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.37 in stage 16291.0
> (TID 1042689) on executor nodedn1-17-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 37]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.38 in stage 16291.0
> (TID 1042690) on executor nodedn1-17-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 38]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.39 in stage 16291.0
> (TID 1042693) on executor nodedn1-22-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 39]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.40 in stage 16291.0
> (TID 1042697) on executor nodedn1-22-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 40]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.41 in stage 16291.0
> (TID 1042700) on executor nodedn1-13-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 41]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.42 in stage 16291.0
> (TID 1042706) on executor nodedn1-15-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 42]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.43 in stage 16291.0
> (TID 1042710) on executor nodedn1-14-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 43]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.44 in stage 16291.0
> (TID 1042713) on executor nodedn1-17-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 44]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.45 in stage 16291.0
> (TID 1042716) on executor nodedn1-19-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 45]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.46 in stage 16291.0
> (TID 1042718) on executor nodedn1-15-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 46]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.47 in stage 16291.0
> (TID 1042720) on executor nodedn1-12-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 47]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.48 in stage 16291.0
> (TID 1042724) on executor nodedn1-12-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 48]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.49 in stage 16291.0
> (TID 1042729) on executor nodedn1-16-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 49]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.50 in stage 16291.0
> (TID 1042730) on executor nodedn1-21-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 50]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.51 in stage 16291.0
> (TID 1042733) on executor nodedn1-21-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 51]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.52 in stage 16291.0
> (TID 1042736) on executor nodedn1-19-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 52]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.53 in stage 16291.0
> (TID 1042740) on executor nodedn1-20-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 53]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.54 in stage 16291.0
> (TID 1042743) on executor nodedn1-22-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 54]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.55 in stage 16291.0
> (TID 1042745) on executor nodedn1-18-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 55]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.56 in stage 16291.0
> (TID 1042754) on executor nodedn1-17-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 56]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.57 in stage 16291.0
> (TID 1042758) on executor nodedn1-17-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 57]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.58 in stage 16291.0
> (TID 1042762) on executor nodedn1-12-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 58]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.59 in stage 16291.0
> (TID 1042766) on executor nodedn1-23-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 59]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.60 in stage 16291.0
> (TID 1042774) on executor nodedn1-20-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 60]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.61 in stage 16291.0
> (TID 1042779) on executor nodedn1-13-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 61]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.62 in stage 16291.0
> (TID 1042789) on executor nodedn1-20-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 62]
> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.63 in stage 16291.0
> (TID 1042793) on executor nodedn1-15-acme.com: java.lang.Exception (Could
> not compute split, block input-4-1423758372200 not found) [duplicate 63]
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 54
> in stage 16291.0 failed 64 times, most recent failure: Lost task 54.63 in
> stage 16291.0 (TID 1042793, nodedn1-15-acme.com): java.lang.Exception:
> Could not compute split, block input-4-1423758372200 not found
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 54 in stage 16291.0 failed 64 times, most recent
> failure: Lost task 54.63 in stage 16291.0 (TID 1042793,
> nodedn1-15-acme.com): java.lang.Exception: Could not compute split, block
> input-4-1423758372200 not found
>
>
> Thanks for looking into it.
>
>
>
>
>
> On Thu, Feb 12, 2015 at 8:10 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> Hey Tim,
>>
>> Let me get the key points.
>> 1. If you are not writing back to Kafka, the delay is stable? That is,
>> instead of "foreachRDD { // write to kafka }"  if you do "dstream.count",
>> then the delay is stable. Right?
>> 2. If so, then Kafka is the bottleneck. Is the number of partitions, that
>> you spoke of the in the second mail, that determines the parallelism in
>> writes? Is it stable with 30 partitions?
>>
>> Regarding the block exception, could you give me a trace of info level
>> logging that leads to this error? Basically I want trace the lifecycle of
>> the block.
>>
>> TD
>>
>>
>>
>> On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith <secs...@gmail.com> wrote:
>>
>>> Hi Gerard,
>>>
>>> Great write-up and really good guidance in there.
>>>
>>> I have to be honest, I don't know why but setting # of partitions for
>>> each dStream to a low number (5-10) just causes the app to choke/crash.
>>> Setting it to 20 gets the app going but with not so great delays. Bump it
>>> up to 30 and I start winning the war where processing time is consistently
>>> below batch time window (20 seconds) except for a batch every few batches
>>> where the compute time spikes 10x the usual.
>>>
>>> Following your guide, I took out some "logInfo" statements I had in the
>>> app but didn't seem to make much difference :(
>>>
>>> With a higher time window (20 seconds), I got the app to run stably for
>>> a few hours but then ran into the dreaded "java.lang.Exception: Could not
>>> compute split, block input-0-1423761240800 not found". Wonder if I need to
>>> add RDD persistence back?
>>>
>>> Also, I am reaching out to Virdata with some ProServ inquiries.
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas <gerard.m...@gmail.com>
>>> wrote:
>>>
>>>> Hi Tim,
>>>>
>>>> From this: " There are 5 kafka receivers and each incoming stream is
>>>> split into 40 partitions"  I suspect that you're creating too many
>>>> tasks for Spark to process on time.
>>>> Could you try some of the 'knobs' I describe here to see if that would
>>>> help?
>>>>
>>>> http://www.virdata.com/tuning-spark/
>>>>
>>>> -kr, Gerard.
>>>>
>>>> On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith <secs...@gmail.com> wrote:
>>>>
>>>>> Just read the thread "Are these numbers abnormal for spark streaming?"
>>>>> and I think I am seeing similar results - that is - increasing the window
>>>>> seems to be the trick here. I will have to monitor for a few hours/days
>>>>> before I can conclude (there are so many knobs/dials).
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith <secs...@gmail.com> wrote:
>>>>>
>>>>>> On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
>>>>>> streaming app that consumes data from Kafka and writes it back to Kafka
>>>>>> (different topic). My big problem has been Total Delay. While execution
>>>>>> time is usually <window size (in seconds), the total delay ranges from a
>>>>>> minutes to hours(s) (keeps going up).
>>>>>>
>>>>>> For a little while, I thought I had solved the issue by bumping up
>>>>>> the driver memory. Then I expanded my Kafka cluster to add more nodes and
>>>>>> the issue came up again. I tried a few things to smoke out the issue and
>>>>>> something tells me the driver is the bottleneck again:
>>>>>>
>>>>>> 1) From my app, I took out the entire write-out-to-kafka piece. Sure
>>>>>> enough, execution, scheduling delay and hence total delay fell to sub
>>>>>> second. This assured me that whatever processing I do before writing back
>>>>>> to kafka isn't the bottleneck.
>>>>>>
>>>>>> 2) In my app, I had RDD persistence set at different points but my
>>>>>> code wasn't really re-using any RDDs so I took out all explicit persist()
>>>>>> statements. And added, "spar...unpersist" to "true" in the context. After
>>>>>> this, it doesn't seem to matter how much memory I give my executor, the
>>>>>> total delay seems to be in the same range. I tried per executor memory 
>>>>>> from
>>>>>> 2G to 12G with no change in total delay so executors aren't memory 
>>>>>> starved.
>>>>>> Also, in the SparkUI, under the Executors tab, all executors show 
>>>>>> 0/1060MB
>>>>>> used when per executor memory is set to 2GB, for example.
>>>>>>
>>>>>> 3) Input rate in the kafka consumer restricts spikes in incoming
>>>>>> data.
>>>>>>
>>>>>> 4) Tried FIFO and FAIR but didn't make any difference.
>>>>>>
>>>>>> 5) Adding executors beyond a certain points seems useless (I guess
>>>>>> excess ones just sit idle).
>>>>>>
>>>>>> At any given point in time, the SparkUI shows only one batch pending
>>>>>> processing. So with just one batch pending processing, why would the
>>>>>> scheduling delay run into minutes/hours if execution time is within the
>>>>>> batch window duration? There aren't any failed stages or jobs.
>>>>>>
>>>>>> Right now, I have 100 executors ( i have tried setting executors from
>>>>>> 50-150), each with 2GB and 4 cores and the driver running with 16GB. 
>>>>>> There
>>>>>> are 5 kafka receivers and each incoming stream is split into 40 
>>>>>> partitions.
>>>>>> Per receiver, input rate is restricted to 20000 messages per second.
>>>>>>
>>>>>> Can anyone help me with clues or areas to look into, for
>>>>>> troubleshooting the issue?
>>>>>>
>>>>>> One nugget I found buried in the code says:
>>>>>> "The scheduler delay includes the network delay to send the task to
>>>>>> the worker machine and to send back the result (but not the time to fetch
>>>>>> the task result, if it needed to be fetched from the block manager on the
>>>>>> worker)."
>>>>>>
>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
>>>>>>
>>>>>> Could this be an issue with the driver being a bottlneck? All the
>>>>>> executors posting their logs/stats to the driver?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Tim
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to