Hi all,

I am running a Spark streaming application with ReliableKafkaReceiver (Spark 
1.2.0). Constantly I was getting the following exception:

15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at 
org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)
        at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)
        at 
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)
        at 
org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
        at 
org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)
        at 
org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)
        at 
org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)
        at 
org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)
        at 
org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86)

After the exception, ReliableKafkaReceiver stayed in ACTIVE status but stopped 
receiving data from Kafka. The Kafka message handler thread is in BLOCKED state:

Thread 92: KafkaMessageHandler-0 (BLOCKED)
org.apache.spark.streaming.receiver.BlockGenerator.addDataWithCallback(BlockGenerator.scala:123)
org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeMessageAndMetadata(ReliableKafkaReceiver.scala:185)
org.apache.spark.streaming.kafka.ReliableKafkaReceiver$MessageHandler.run(ReliableKafkaReceiver.scala:247)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
java.util.concurrent.FutureTask.run(FutureTask.java:262)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Sometimes when the exception was thrown, I also see warning messages like this:
15/01/12 01:08:07 WARN hdfs.DFSClient: Slow ReadProcessor read fields took 
30533ms (threshold=30000ms); ack: seqno: 113 status: SUCCESS status: SUCCESS 
downstreamAckTimeNanos: 30524893062, targets: [172.20.xxx.xxx:50010, 
172.20.xxx.xxx:50010]
15/01/12 01:08:07 WARN hdfs.DFSClient: Slow waitForAckedSeqno took 30526ms 
(threshold=30000ms)

In the past, I never have such problem with KafkaReceiver. What causes this 
exception? How can I solve this problem?

Thanks in advance,
Max

Reply via email to