On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
streaming app that consumes data from Kafka and writes it back to Kafka
(different topic). My big problem has been Total Delay. While execution
time is usually <window size (in seconds), the total delay ranges from a
minutes to hours(s) (keeps going up).

For a little while, I thought I had solved the issue by bumping up the
driver memory. Then I expanded my Kafka cluster to add more nodes and the
issue came up again. I tried a few things to smoke out the issue and
something tells me the driver is the bottleneck again:

1) From my app, I took out the entire write-out-to-kafka piece. Sure
enough, execution, scheduling delay and hence total delay fell to sub
second. This assured me that whatever processing I do before writing back
to kafka isn't the bottleneck.

2) In my app, I had RDD persistence set at different points but my code
wasn't really re-using any RDDs so I took out all explicit persist()
statements. And added, "spar...unpersist" to "true" in the context. After
this, it doesn't seem to matter how much memory I give my executor, the
total delay seems to be in the same range. I tried per executor memory from
2G to 12G with no change in total delay so executors aren't memory starved.
Also, in the SparkUI, under the Executors tab, all executors show 0/1060MB
used when per executor memory is set to 2GB, for example.

3) Input rate in the kafka consumer restricts spikes in incoming data.

4) Tried FIFO and FAIR but didn't make any difference.

5) Adding executors beyond a certain points seems useless (I guess excess
ones just sit idle).

At any given point in time, the SparkUI shows only one batch pending
processing. So with just one batch pending processing, why would the
scheduling delay run into minutes/hours if execution time is within the
batch window duration? There aren't any failed stages or jobs.

Right now, I have 100 executors ( i have tried setting executors from
50-150), each with 2GB and 4 cores and the driver running with 16GB. There
are 5 kafka receivers and each incoming stream is split into 40 partitions.
Per receiver, input rate is restricted to 20000 messages per second.

Can anyone help me with clues or areas to look into, for troubleshooting
the issue?

One nugget I found buried in the code says:
"The scheduler delay includes the network delay to send the task to the
worker machine and to send back the result (but not the time to fetch the
task result, if it needed to be fetched from the block manager on the
worker)."
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Could this be an issue with the driver being a bottlneck? All the executors
posting their logs/stats to the driver?

Thanks,

Tim

Reply via email to