On Spark 1.2 (have been seeing this behaviour since 1.0), I have a streaming app that consumes data from Kafka and writes it back to Kafka (different topic). My big problem has been Total Delay. While execution time is usually <window size (in seconds), the total delay ranges from a minutes to hours(s) (keeps going up).
For a little while, I thought I had solved the issue by bumping up the driver memory. Then I expanded my Kafka cluster to add more nodes and the issue came up again. I tried a few things to smoke out the issue and something tells me the driver is the bottleneck again: 1) From my app, I took out the entire write-out-to-kafka piece. Sure enough, execution, scheduling delay and hence total delay fell to sub second. This assured me that whatever processing I do before writing back to kafka isn't the bottleneck. 2) In my app, I had RDD persistence set at different points but my code wasn't really re-using any RDDs so I took out all explicit persist() statements. And added, "spar...unpersist" to "true" in the context. After this, it doesn't seem to matter how much memory I give my executor, the total delay seems to be in the same range. I tried per executor memory from 2G to 12G with no change in total delay so executors aren't memory starved. Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB used when per executor memory is set to 2GB, for example. 3) Input rate in the kafka consumer restricts spikes in incoming data. 4) Tried FIFO and FAIR but didn't make any difference. 5) Adding executors beyond a certain points seems useless (I guess excess ones just sit idle). At any given point in time, the SparkUI shows only one batch pending processing. So with just one batch pending processing, why would the scheduling delay run into minutes/hours if execution time is within the batch window duration? There aren't any failed stages or jobs. Right now, I have 100 executors ( i have tried setting executors from 50-150), each with 2GB and 4 cores and the driver running with 16GB. There are 5 kafka receivers and each incoming stream is split into 40 partitions. Per receiver, input rate is restricted to 20000 messages per second. Can anyone help me with clues or areas to look into, for troubleshooting the issue? One nugget I found buried in the code says: "The scheduler delay includes the network delay to send the task to the worker machine and to send back the result (but not the time to fetch the task result, if it needed to be fetched from the block manager on the worker)." https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala Could this be an issue with the driver being a bottlneck? All the executors posting their logs/stats to the driver? Thanks, Tim