Hi all, I am running a Spark streaming application with ReliableKafkaReceiver (Spark 1.2.0). Constantly I was getting the following exception:
15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160) at org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126) at org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207) at org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275) at org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181) at org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154) at org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86) After the exception, ReliableKafkaReceiver stayed in ACTIVE status but stopped receiving data from Kafka. The Kafka message handler thread is in BLOCKED state: Thread 92: KafkaMessageHandler-0 (BLOCKED) org.apache.spark.streaming.receiver.BlockGenerator.addDataWithCallback(BlockGenerator.scala:123) org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeMessageAndMetadata(ReliableKafkaReceiver.scala:185) org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:247) java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) java.util.concurrent.FutureTask.run(FutureTask.java:262) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Sometimes when the exception was thrown, I also see warning messages like this: 15/01/12 01:08:07 WARN hdfs.DFSClient: Slow ReadProcessor read fields took 30533ms (threshold=30000ms); ack: seqno: 113 status: SUCCESS status: SUCCESS downstreamAckTimeNanos: 30524893062, targets: [172.20.xxx.xxx:50010, 172.20.xxx.xxx:50010] 15/01/12 01:08:07 WARN hdfs.DFSClient: Slow waitForAckedSeqno took 30526ms (threshold=30000ms) In the past, I never have such problem with KafkaReceiver. What causes this exception? How can I solve this problem? Thanks in advance, Max