[ 
https://issues.apache.org/jira/browse/SPARK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14276974#comment-14276974
 ] 

Max Xu commented on SPARK-5220:
-------------------------------

I believe with https://github.com/apache/spark/pull/3655, exceptions are 
handled inside storeBlockAndCommitOffset in ReliableKafkaReceiver, so the block 
pushing thread shouldn't exit (no exception will be caught in 
keepPushingBlocks). The change stops the receiver after three retries. What 
would happen after receiver is stopped? Does the application fail?

> keepPushingBlocks in BlockGenerator terminated when an exception occurs, 
> which causes the block pushing thread to terminate and blocks receiver  
> -------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5220
>                 URL: https://issues.apache.org/jira/browse/SPARK-5220
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Max Xu
>
> I am running a Spark streaming application with ReliableKafkaReceiver. It 
> uses BlockGenerator to push blocks to BlockManager. However, writing WALs to 
> HDFS may time out that causes keepPushingBlocks in BlockGenerator to 
> terminate.
> 15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>         at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>         at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>         at scala.concurrent.Await$.result(package.scala:107)
>         at 
> org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)
>         at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)
>         at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)
>         at 
> org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
>         at 
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)
>         at 
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86)
> Then the block pushing thread is done and no subsequent blocks can be pushed 
> into blockManager. In turn this blocks receiver from receiving new data.
> So when running my app and the TimeoutException happens, the 
> ReliableKafkaReceiver stays in ACTIVE status but doesn't do anything at all. 
> The application rogues.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to