Hello,

I'm running a small Spark Streaming instance: 4 node cluster, 1000
records per second coming in. For each record, I'm querying Cassandra,
updating some very simple stats, and sending the results back to
Cassandra. I'm using 10 second mini-batches, and it typically takes 8
seconds to process them. However, every so often a mini-batch will
take significantly longer to process -- generally between 30 seconds
and 2+ minutes (I'm rate-limiting my input, so the mini-batch size
isn't changing). It might take 5 minutes to come across one of these
strange batches or 45 minutes, but they always happen.

I've been trying to debug this problem for a while now, but I'm
getting stuck. I've turned on spark's debug logging and look through
the logs during one of these slow downs. I found that 3 of my 4
executor nodes were completely silent on the logs -- the timestamp
just jumped 30 seconds between consecutive lines. Before the silence,
every task assigned to those nodes finished an no new ones were
assigned. Then at the end of that 30 seconds, the logs show them
receiving an akka message (shown below), then they start running tasks
again and things go back to normal.


10:34:54,072 DEBUG -- sparkExecutor-akka.actor.default-dispatcher-6
akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1 - [actor]
received message
AkkaMessage(LaunchTask(org.apache.spark.util.SerializableBuffer@700b204a),false)
from Actor[akka://sparkExecutor/deadLetters]
10:34:54,072 DEBUG -- sparkExecutor-akka.actor.default-dispatcher-6
akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1 - Received
RPC message: 
AkkaMessage(LaunchTask(org.apache.spark.util.SerializableBuffer@700b204a),false)
10:34:54,073  INFO -- sparkExecutor-akka.actor.default-dispatcher-6
executor.CoarseGrainedExecutorBackend - Got assigned task 151600
10:34:54,073 DEBUG -- sparkExecutor-akka.actor.default-dispatcher-6
akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1 - [actor]
handled message (0.215506 ms)
AkkaMessage(LaunchTask(org.apache.spark.util.SerializableBuffer@700b204a),false)
from Actor[akka://sparkExecutor/deadLetters]
10:34:54,073 DEBUG -- sparkExecutor-akka.actor.default-dispatcher-6
akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1 - [actor]
received message
AkkaMessage(LaunchTask(org.apache.spark.util.SerializableBuffer@10793472),false)
from Actor[akka://sparkExecutor/deadLetters]
10:34:54,073 DEBUG -- sparkExecutor-akka.actor.default-dispatcher-6
akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1 - Received
RPC message: 
AkkaMessage(LaunchTask(org.apache.spark.util.SerializableBuffer@10793472),false)
10:34:54,073  INFO -- Executor task launch worker-1
executor.Executor - Running task 2.0 in stage 16466.0 (TID 151600)



As I mentioned, that is on 3 of the 4 executors. On the driver, during
that 30 second period, his logs only show him taking in input making
blocks for it, and sending those blocks to the 4th executor. That 4th
executor's logs during that period show him doing no tasks and instead
only receiving those blocks. At the end of that time period, the
driver and executor suddenly start doing work again just like the
other executors.

Can anyone give me any ideas (even vague ones!) of things to check to
figure out what's happening here?

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to