[ 
https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated SPARK-19280:
---------------------------------
    Priority: Critical  (was: Major)

> Failed Recovery from checkpoint caused by the multi-threads issue in Spark 
> Streaming scheduler
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19280
>                 URL: https://issues.apache.org/jira/browse/SPARK-19280
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0
>            Reporter: Nan Zhu
>            Priority: Critical
>
> In one of our applications, we found the following issue, the application 
> recovering from a checkpoint file named "checkpoint-***166700000" but with 
> the timestamp ***166500000 will recover from the very beginning of the stream 
> and because our application relies on the external & periodically-cleaned 
> data (syncing with checkpoint cleanup), the recovery just failed
> We identified a potential issue in Spark Streaming checkpoint and will 
> describe it with the following example. We will propose a fix in the end of 
> this JIRA.
> 1. The application properties: Batch Duration: 20000, Functionality: Single 
> Stream calling ReduceByKeyAndWindow and print, Window Size: 60000, 
> SlideDuration, 20000
> 2. RDD at 166500000 is generated and the corresponding job is submitted to 
> the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the 
> queue of JobGenerator
> 3. Job at 166500000 is finished and JobCompleted message is sent to 
> JobScheduler's queue, and meanwhile, Job at 166520000 is submitted to the 
> execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of 
> JobGenerator
> 4. JobScheduler's message processing thread (I will use JS-EventLoop to 
> identify it) is not scheduled by the operating system for a long time, and 
> during this period, Jobs generated from 166520000 - 166700000 are generated 
> and completed.
> 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled 
> and processed all DoCheckpoint messages for jobs ranging from 166520000 - 
> 166700000 and checkpoint files are successfully written. CRITICAL: at this 
> moment, the lastCheckpointTime would be 166700000.
> 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs 
> ranging from 166520000 - 166700000. CRITICAL: a ClearMetadata message is 
> pushed to JobGenerator's message queue for EACH JobCompleted.
> 7. The current message queue contains 20 ClearMetadata messages and 
> JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will 
> remove all RDDs out of rememberDuration window. In our case, 
> ReduceyKeyAndWindow will set rememberDuration to 100000 (rememberDuration of 
> ReducedWindowDStream (40000) + windowSize) resulting that only RDDs <- 
> (166600000, 166700000] are kept. And ClearMetaData processing logic will push 
> a DoCheckpoint to JobGenerator's thread
> 8. JG-EventLoop is scheduled again to process DoCheckpoint for 16650000, VERY 
> CRITICAL: at this step, RDD no later than 166700000 has been removed, and 
> checkpoint data is updated as  
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53
>  and 
> https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59.
> 9. After 8, a checkpoint named /path/checkpoint-166700000 is created but with 
> the timestamp 166500000. and at this moment, Application crashed
> 10. Application recovers from /path/checkpoint-166700000 and try to get RDD 
> with validTime 166500000. Of course it will not find it and has to recompute. 
> In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until 
> to the start of the stream. When the stream depends on the external data, it 
> will not successfully recover. In the case of Kafka, the recovered RDDs would 
> not be the same as the original one, as the currentOffsets has been updated 
> to the value at the moment of 166700000
> The proposed fix:
> 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime 
> instead of using the timestamp of Checkpoint instance (any side-effect?)
> 1. ClearMetadata shall be ClearMedataAndCheckpoint 
> 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see 
> any necessary to have two threads here



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to