[ https://issues.apache.org/jira/browse/SPARK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14276917#comment-14276917 ]
Max Xu commented on SPARK-5220: ------------------------------- Current situation: 1. The TimeoutException is thrown by ReliableKafkaReceiver, receiver log the error and do nothing else; 2. Block pushing thread in BlockGenerator is terminated due to the exception; 3. ReliableKafkaReceiver is ACTIVE but rogues afterward because blocksForPushing queue is full and no block pushing thread available to clean the queue. So when the exception occurs, what user will see is the streaming application keeps running but sits there doing nothing (We are using some supervisor to restart the failed app. But if the app keeps running and doing nothing, we have to have some curator to monitoring the data flow and kill the app). Can ReliableKafkaReceiver handle TimeoutException? For example retry pushing block when a TimeoutException is thrown? Or restart itself? Or at least fail the streaming application? > keepPushingBlocks in BlockGenerator terminated when an exception occurs, > which causes the block pushing thread to terminate and blocks receiver > ------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-5220 > URL: https://issues.apache.org/jira/browse/SPARK-5220 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.2.0 > Reporter: Max Xu > > I am running a Spark streaming application with ReliableKafkaReceiver. It > uses BlockGenerator to push blocks to BlockManager. However, writing WALs to > HDFS may time out that causes keepPushingBlocks in BlockGenerator to > terminate. > 15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread > java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] > at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) > at > scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) > at scala.concurrent.Await$.result(package.scala:107) > at > org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176) > at > org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160) > at > org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126) > at > org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124) > at > org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207) > at > org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275) > at > org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181) > at > org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154) > at > org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86) > Then the block pushing thread is done and no subsequent blocks can be pushed > into blockManager. In turn this blocks receiver from receiving new data. > So when running my app and the TimeoutException happens, the > ReliableKafkaReceiver stays in ACTIVE status but doesn't do anything at all. > The application rogues. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org