outdata.foreachRDD( rdd => rdd.foreachPartition(rec => { val writer = new KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
writer.output(rec) }) ) So this is creating a new kafka producer for every new output partition, right? Have you tried pooling the producers? On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith <secs...@gmail.com> wrote: > 1) Yes, if I disable writing out to kafka and replace it with some very > light weight action is rdd.take(1), the app is stable. > > 2) The partitions I spoke of in the previous mail are the number of > partitions I create from each dStream. But yes, since I do processing and > writing out, per partition, each dStream partition ends up getting written > to a kafka partition. Flow is, broadly: > 5 Spark/Kafka Receivers -> Split each dStream into 30 partitions (150 > partitions) -> Apply some transformation logic to each partition -> write > out each partition to kafka (kafka has 23 partitions). Let me increase the > number of partitions on the kafka side and see if that helps. > > Here's what the main block of code looks like (I added persistence back): > > val kInStreams = (1 to otherConf("inStreamCount").toInt).map{_ => > KafkaUtils.createStream(ssc,otherConf("kafkaConsumerZk").toString,otherConf("kafkaConsumerGroupId").toString, > Map(otherConf("kafkaConsumerTopic").toString -> 1), > StorageLevel.MEMORY_AND_DISK_SER) } > > if (!configMap.keySet.isEmpty) > { > // Process stream from each receiver separately > // (do not attempt to merge all streams and then re-partition, > this causes un-necessary and high amount of shuffle in the job) > for (k <- kInStreams) > { > // Re-partition stream from each receiver across all > compute nodes to spread out processing load and allows per partition > processing > // and, set persistence level to spill to disk along with > serialization > val kInMsgParts = > k.repartition(otherConf("dStreamPartitions").toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) > > val outdata = > kInMsgParts.map(x=>myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) > > // Write each transformed partition to Kafka via the > "writer" object > outdata.foreachRDD( rdd => rdd.foreachPartition(rec => { > > val writer = new > KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap) > > writer.output(rec) > }) > ) > } > } > > > Here's the life-cycle of a lost block: > > 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in > memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB) > 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in > memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB) > 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on > disk on nodedn1-22-acme.com:42084 (size: 164.7 KB) > 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on > disk on nodedn1-23-acme.com:34526 (size: 164.7 KB) > 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0 > (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not > compute split, block input-4-1423758372200 not found > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0 > (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 1] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0 > (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 2] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0 > (TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 3] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0 > (TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 4] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0 > (TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 5] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0 > (TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 6] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0 > (TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 7] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage 16291.0 > (TID 1042600) on executor nodedn1-17-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 8] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.9 in stage 16291.0 > (TID 1042606) on executor nodedn1-20-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 9] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.10 in stage 16291.0 > (TID 1042609) on executor nodedn1-19-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 10] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.11 in stage 16291.0 > (TID 1042611) on executor nodedn1-14-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 11] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.12 in stage 16291.0 > (TID 1042615) on executor nodedn1-22-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 12] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.13 in stage 16291.0 > (TID 1042618) on executor nodedn1-19-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 13] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.14 in stage 16291.0 > (TID 1042622) on executor nodedn1-21-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 14] > 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.15 in stage 16291.0 > (TID 1042625) on executor nodedn1-16-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 15] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.16 in stage 16291.0 > (TID 1042628) on executor nodedn1-20-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 16] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.17 in stage 16291.0 > (TID 1042633) on executor nodedn1-16-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 17] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.18 in stage 16291.0 > (TID 1042636) on executor nodedn1-17-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 18] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.19 in stage 16291.0 > (TID 1042640) on executor nodedn1-17-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 19] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.20 in stage 16291.0 > (TID 1042642) on executor nodedn1-16-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 20] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.21 in stage 16291.0 > (TID 1042645) on executor nodedn1-16-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 21] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.22 in stage 16291.0 > (TID 1042648) on executor nodedn1-14-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 22] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.23 in stage 16291.0 > (TID 1042654) on executor nodedn1-23-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 23] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.24 in stage 16291.0 > (TID 1042660) on executor nodedn1-22-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 24] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.25 in stage 16291.0 > (TID 1042662) on executor nodedn1-21-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 25] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.26 in stage 16291.0 > (TID 1042663) on executor nodedn1-14-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 26] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.27 in stage 16291.0 > (TID 1042665) on executor nodedn1-17-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 27] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.28 in stage 16291.0 > (TID 1042667) on executor nodedn1-19-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 28] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.29 in stage 16291.0 > (TID 1042671) on executor nodedn1-14-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 29] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.30 in stage 16291.0 > (TID 1042672) on executor nodedn1-12-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 30] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.31 in stage 16291.0 > (TID 1042674) on executor nodedn1-14-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 31] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.32 in stage 16291.0 > (TID 1042677) on executor nodedn1-19-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 32] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.33 in stage 16291.0 > (TID 1042680) on executor nodedn1-22-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 33] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.34 in stage 16291.0 > (TID 1042681) on executor nodedn1-20-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 34] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.35 in stage 16291.0 > (TID 1042682) on executor nodedn1-17-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 35] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.36 in stage 16291.0 > (TID 1042687) on executor nodedn1-20-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 36] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.37 in stage 16291.0 > (TID 1042689) on executor nodedn1-17-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 37] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.38 in stage 16291.0 > (TID 1042690) on executor nodedn1-17-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 38] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.39 in stage 16291.0 > (TID 1042693) on executor nodedn1-22-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 39] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.40 in stage 16291.0 > (TID 1042697) on executor nodedn1-22-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 40] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.41 in stage 16291.0 > (TID 1042700) on executor nodedn1-13-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 41] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.42 in stage 16291.0 > (TID 1042706) on executor nodedn1-15-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 42] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.43 in stage 16291.0 > (TID 1042710) on executor nodedn1-14-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 43] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.44 in stage 16291.0 > (TID 1042713) on executor nodedn1-17-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 44] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.45 in stage 16291.0 > (TID 1042716) on executor nodedn1-19-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 45] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.46 in stage 16291.0 > (TID 1042718) on executor nodedn1-15-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 46] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.47 in stage 16291.0 > (TID 1042720) on executor nodedn1-12-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 47] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.48 in stage 16291.0 > (TID 1042724) on executor nodedn1-12-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 48] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.49 in stage 16291.0 > (TID 1042729) on executor nodedn1-16-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 49] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.50 in stage 16291.0 > (TID 1042730) on executor nodedn1-21-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 50] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.51 in stage 16291.0 > (TID 1042733) on executor nodedn1-21-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 51] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.52 in stage 16291.0 > (TID 1042736) on executor nodedn1-19-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 52] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.53 in stage 16291.0 > (TID 1042740) on executor nodedn1-20-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 53] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.54 in stage 16291.0 > (TID 1042743) on executor nodedn1-22-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 54] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.55 in stage 16291.0 > (TID 1042745) on executor nodedn1-18-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 55] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.56 in stage 16291.0 > (TID 1042754) on executor nodedn1-17-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 56] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.57 in stage 16291.0 > (TID 1042758) on executor nodedn1-17-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 57] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.58 in stage 16291.0 > (TID 1042762) on executor nodedn1-12-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 58] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.59 in stage 16291.0 > (TID 1042766) on executor nodedn1-23-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 59] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.60 in stage 16291.0 > (TID 1042774) on executor nodedn1-20-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 60] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.61 in stage 16291.0 > (TID 1042779) on executor nodedn1-13-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 61] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.62 in stage 16291.0 > (TID 1042789) on executor nodedn1-20-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 62] > 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.63 in stage 16291.0 > (TID 1042793) on executor nodedn1-15-acme.com: java.lang.Exception (Could > not compute split, block input-4-1423758372200 not found) [duplicate 63] > org.apache.spark.SparkException: Job aborted due to stage failure: Task 54 > in stage 16291.0 failed 64 times, most recent failure: Lost task 54.63 in > stage 16291.0 (TID 1042793, nodedn1-15-acme.com): java.lang.Exception: > Could not compute split, block input-4-1423758372200 not found > Exception in thread "main" org.apache.spark.SparkException: Job aborted > due to stage failure: Task 54 in stage 16291.0 failed 64 times, most recent > failure: Lost task 54.63 in stage 16291.0 (TID 1042793, > nodedn1-15-acme.com): java.lang.Exception: Could not compute split, block > input-4-1423758372200 not found > > > Thanks for looking into it. > > > > > > On Thu, Feb 12, 2015 at 8:10 PM, Tathagata Das <t...@databricks.com> > wrote: > >> Hey Tim, >> >> Let me get the key points. >> 1. If you are not writing back to Kafka, the delay is stable? That is, >> instead of "foreachRDD { // write to kafka }" if you do "dstream.count", >> then the delay is stable. Right? >> 2. If so, then Kafka is the bottleneck. Is the number of partitions, that >> you spoke of the in the second mail, that determines the parallelism in >> writes? Is it stable with 30 partitions? >> >> Regarding the block exception, could you give me a trace of info level >> logging that leads to this error? Basically I want trace the lifecycle of >> the block. >> >> TD >> >> >> >> On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith <secs...@gmail.com> wrote: >> >>> Hi Gerard, >>> >>> Great write-up and really good guidance in there. >>> >>> I have to be honest, I don't know why but setting # of partitions for >>> each dStream to a low number (5-10) just causes the app to choke/crash. >>> Setting it to 20 gets the app going but with not so great delays. Bump it >>> up to 30 and I start winning the war where processing time is consistently >>> below batch time window (20 seconds) except for a batch every few batches >>> where the compute time spikes 10x the usual. >>> >>> Following your guide, I took out some "logInfo" statements I had in the >>> app but didn't seem to make much difference :( >>> >>> With a higher time window (20 seconds), I got the app to run stably for >>> a few hours but then ran into the dreaded "java.lang.Exception: Could not >>> compute split, block input-0-1423761240800 not found". Wonder if I need to >>> add RDD persistence back? >>> >>> Also, I am reaching out to Virdata with some ProServ inquiries. >>> >>> Thanks >>> >>> >>> >>> >>> >>> On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas <gerard.m...@gmail.com> >>> wrote: >>> >>>> Hi Tim, >>>> >>>> From this: " There are 5 kafka receivers and each incoming stream is >>>> split into 40 partitions" I suspect that you're creating too many >>>> tasks for Spark to process on time. >>>> Could you try some of the 'knobs' I describe here to see if that would >>>> help? >>>> >>>> http://www.virdata.com/tuning-spark/ >>>> >>>> -kr, Gerard. >>>> >>>> On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith <secs...@gmail.com> wrote: >>>> >>>>> Just read the thread "Are these numbers abnormal for spark streaming?" >>>>> and I think I am seeing similar results - that is - increasing the window >>>>> seems to be the trick here. I will have to monitor for a few hours/days >>>>> before I can conclude (there are so many knobs/dials). >>>>> >>>>> >>>>> >>>>> On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith <secs...@gmail.com> wrote: >>>>> >>>>>> On Spark 1.2 (have been seeing this behaviour since 1.0), I have a >>>>>> streaming app that consumes data from Kafka and writes it back to Kafka >>>>>> (different topic). My big problem has been Total Delay. While execution >>>>>> time is usually <window size (in seconds), the total delay ranges from a >>>>>> minutes to hours(s) (keeps going up). >>>>>> >>>>>> For a little while, I thought I had solved the issue by bumping up >>>>>> the driver memory. Then I expanded my Kafka cluster to add more nodes and >>>>>> the issue came up again. I tried a few things to smoke out the issue and >>>>>> something tells me the driver is the bottleneck again: >>>>>> >>>>>> 1) From my app, I took out the entire write-out-to-kafka piece. Sure >>>>>> enough, execution, scheduling delay and hence total delay fell to sub >>>>>> second. This assured me that whatever processing I do before writing back >>>>>> to kafka isn't the bottleneck. >>>>>> >>>>>> 2) In my app, I had RDD persistence set at different points but my >>>>>> code wasn't really re-using any RDDs so I took out all explicit persist() >>>>>> statements. And added, "spar...unpersist" to "true" in the context. After >>>>>> this, it doesn't seem to matter how much memory I give my executor, the >>>>>> total delay seems to be in the same range. I tried per executor memory >>>>>> from >>>>>> 2G to 12G with no change in total delay so executors aren't memory >>>>>> starved. >>>>>> Also, in the SparkUI, under the Executors tab, all executors show >>>>>> 0/1060MB >>>>>> used when per executor memory is set to 2GB, for example. >>>>>> >>>>>> 3) Input rate in the kafka consumer restricts spikes in incoming >>>>>> data. >>>>>> >>>>>> 4) Tried FIFO and FAIR but didn't make any difference. >>>>>> >>>>>> 5) Adding executors beyond a certain points seems useless (I guess >>>>>> excess ones just sit idle). >>>>>> >>>>>> At any given point in time, the SparkUI shows only one batch pending >>>>>> processing. So with just one batch pending processing, why would the >>>>>> scheduling delay run into minutes/hours if execution time is within the >>>>>> batch window duration? There aren't any failed stages or jobs. >>>>>> >>>>>> Right now, I have 100 executors ( i have tried setting executors from >>>>>> 50-150), each with 2GB and 4 cores and the driver running with 16GB. >>>>>> There >>>>>> are 5 kafka receivers and each incoming stream is split into 40 >>>>>> partitions. >>>>>> Per receiver, input rate is restricted to 20000 messages per second. >>>>>> >>>>>> Can anyone help me with clues or areas to look into, for >>>>>> troubleshooting the issue? >>>>>> >>>>>> One nugget I found buried in the code says: >>>>>> "The scheduler delay includes the network delay to send the task to >>>>>> the worker machine and to send back the result (but not the time to fetch >>>>>> the task result, if it needed to be fetched from the block manager on the >>>>>> worker)." >>>>>> >>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala >>>>>> >>>>>> Could this be an issue with the driver being a bottlneck? All the >>>>>> executors posting their logs/stats to the driver? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Tim >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >