[ 
https://issues.apache.org/jira/browse/SPARK-5220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14276924#comment-14276924
 ] 

Saisai Shao commented on SPARK-5220:
------------------------------------

Yeah, I totally understand what is your requirement, maybe we should figure out 
a more elegant way to deal with such situation. Currently there's a solution 
trying to partially fix the problem you mentioned, you can refer to 
(https://github.com/apache/spark/pull/3655). But I think it is not a thorough 
way because the push thread will finally exited and block the whole system.

> keepPushingBlocks in BlockGenerator terminated when an exception occurs, 
> which causes the block pushing thread to terminate and blocks receiver  
> -------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5220
>                 URL: https://issues.apache.org/jira/browse/SPARK-5220
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Max Xu
>
> I am running a Spark streaming application with ReliableKafkaReceiver. It 
> uses BlockGenerator to push blocks to BlockManager. However, writing WALs to 
> HDFS may time out that causes keepPushingBlocks in BlockGenerator to 
> terminate.
> 15/01/12 19:07:06 ERROR receiver.BlockGenerator: Error in block pushing thread
> java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>         at 
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>         at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>         at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>         at scala.concurrent.Await$.result(package.scala:107)
>         at 
> org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler.storeBlock(ReceivedBlockHandler.scala:176)
>         at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushAndReportBlock(ReceiverSupervisorImpl.scala:160)
>         at 
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.pushArrayBuffer(ReceiverSupervisorImpl.scala:126)
>         at 
> org.apache.spark.streaming.receiver.Receiver.store(Receiver.scala:124)
>         at 
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver.org$apache$spark$streaming$kafka$ReliableKafkaReceiver$$storeBlockAndCommitOffset(ReliableKafkaReceiver.scala:207)
>         at 
> org.apache.spark.streaming.kafka.ReliableKafkaReceiver$GeneratedBlockHandler.onPushBlock(ReliableKafkaReceiver.scala:275)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator.pushBlock(BlockGenerator.scala:181)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator.org$apache$spark$streaming$receiver$BlockGenerator$$keepPushingBlocks(BlockGenerator.scala:154)
>         at 
> org.apache.spark.streaming.receiver.BlockGenerator$$anon$1.run(BlockGenerator.scala:86)
> Then the block pushing thread is done and no subsequent blocks can be pushed 
> into blockManager. In turn this blocks receiver from receiving new data.
> So when running my app and the TimeoutException happens, the 
> ReliableKafkaReceiver stays in ACTIVE status but doesn't do anything at all. 
> The application rogues.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to