[ https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15831209#comment-15831209 ]
Nan Zhu commented on SPARK-19280: --------------------------------- [~zsxwing] Thanks for reply 0) I do not think the content in checkpoint file (except that timestamp) has some problem. A checkpoint request for an earlier time is just for reducing the future checkpoint size (because a checkpoint request for an earlier timestamp but follow a ClearMetadata for the same timestamp). based on this, we should not recover from that "just-for-reduceing-size" moment, we shall recover from the latestCheckpointTime. I didn't see how recovering from this moment will lose data (I may miss something..) 1) can you elaborate it? my original idea is just to avoid a checkpoint with an earlier timestamp (if I miss any data loss case in 0, then forget about this one...) 2) long-term fix and brings benefits, including SPARK-19233, it is the second one caused by multi threads here. I am volunteering to work on it and will be happy if you or TD can be the Shepherd > Failed Recovery from checkpoint caused by the multi-threads issue in Spark > Streaming scheduler > ---------------------------------------------------------------------------------------------- > > Key: SPARK-19280 > URL: https://issues.apache.org/jira/browse/SPARK-19280 > Project: Spark > Issue Type: Bug > Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 > Reporter: Nan Zhu > Priority: Critical > > In one of our applications, we found the following issue, the application > recovering from a checkpoint file named "checkpoint-***166700000" but with > the timestamp ***166500000 will recover from the very beginning of the stream > and because our application relies on the external & periodically-cleaned > data (syncing with checkpoint cleanup), the recovery just failed > We identified a potential issue in Spark Streaming checkpoint and will > describe it with the following example. We will propose a fix in the end of > this JIRA. > 1. The application properties: Batch Duration: 20000, Functionality: Single > Stream calling ReduceByKeyAndWindow and print, Window Size: 60000, > SlideDuration, 20000 > 2. RDD at 166500000 is generated and the corresponding job is submitted to > the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the > queue of JobGenerator > 3. Job at 166500000 is finished and JobCompleted message is sent to > JobScheduler's queue, and meanwhile, Job at 166520000 is submitted to the > execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of > JobGenerator > 4. JobScheduler's message processing thread (I will use JS-EventLoop to > identify it) is not scheduled by the operating system for a long time, and > during this period, Jobs generated from 166520000 - 166700000 are generated > and completed. > 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled > and processed all DoCheckpoint messages for jobs ranging from 166520000 - > 166700000 and checkpoint files are successfully written. CRITICAL: at this > moment, the lastCheckpointTime would be 166700000. > 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs > ranging from 166520000 - 166700000. CRITICAL: a ClearMetadata message is > pushed to JobGenerator's message queue for EACH JobCompleted. > 7. The current message queue contains 20 ClearMetadata messages and > JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will > remove all RDDs out of rememberDuration window. In our case, > ReduceyKeyAndWindow will set rememberDuration to 100000 (rememberDuration of > ReducedWindowDStream (40000) + windowSize) resulting that only RDDs <- > (166600000, 166700000] are kept. And ClearMetaData processing logic will push > a DoCheckpoint to JobGenerator's thread > 8. JG-EventLoop is scheduled again to process DoCheckpoint for 16650000, VERY > CRITICAL: at this step, RDD no later than 166600000 has been removed, and > checkpoint data is updated as > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 > and > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. > 9. After 8, a checkpoint named /path/checkpoint-166700000 is created but with > the timestamp 166500000. and at this moment, Application crashed > 10. Application recovers from /path/checkpoint-166700000 and try to get RDD > with validTime 166500000. Of course it will not find it and has to recompute. > In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until > to the start of the stream. When the stream depends on the external data, it > will not successfully recover. In the case of Kafka, the recovered RDDs would > not be the same as the original one, as the currentOffsets has been updated > to the value at the moment of 166700000 > The proposed fix: > 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime > instead of using the timestamp of Checkpoint instance (any side-effect?) > 1. ClearMetadata shall be ClearMedataAndCheckpoint > 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see > any necessary to have two threads here -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org