[ https://issues.apache.org/jira/browse/SPARK-19280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nan Zhu updated SPARK-19280: ---------------------------- Description: In one of our applications, we found the following issue, the application recovering from a checkpoint file named "checkpoint-***166700000" but with the timestamp ***166500000 will recover from the very beginning of the stream and because our application relies on the external & periodically-cleaned data (syncing with checkpoint cleanup), the recovery just failed We identified a potential issue in Spark Streaming checkpoint and will describe it with the following example. We will propose a fix in the end of this JIRA. 1. The application properties: Batch Duration: 20000, Functionality: Single Stream calling ReduceByKeyAndWindow and print, Window Size: 60000, SlideDuration, 20000 2. RDD at 166500000 is generated and the corresponding job is submitted to the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of JobGenerator 3. Job at 166500000 is finished and JobCompleted message is sent to JobScheduler's queue, and meanwhile, Job at 166520000 is submitted to the execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of JobGenerator 4. JobScheduler's message processing thread (I will use JS-EventLoop to identify it) is not scheduled by the operating system for a long time, and during this period, Jobs generated from 166520000 - 166700000 are generated and completed. 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled and processed all DoCheckpoint messages for jobs ranging from 166520000 - 166700000 and checkpoint files are successfully written. CRITICAL: at this moment, the lastCheckpointTime would be 166700000. 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs ranging from 166520000 - 166700000. CRITICAL: a ClearMetadata message is pushed to JobGenerator's message queue for EACH JobCompleted. 7. The current message queue contains 20 ClearMetadata messages and JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will remove all RDDs out of rememberDuration window. In our case, ReduceyKeyAndWindow will set rememberDuration to 100000 (rememberDuration of ReducedWindowDStream (40000) + windowSize) resulting that only RDDs <- (166600000, 166700000] are kept. And ClearMetaData processing logic will push a DoCheckpoint to JobGenerator's thread 8. JG-EventLoop is scheduled again to process DoCheckpoint for 16650000, VERY CRITICAL: at this step, RDD no later than 166700000 has been removed, and checkpoint data is updated as https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 and https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. 9. After 8, a checkpoint named /path/checkpoint-166700000 is created but with the timestamp 166500000. and at this moment, Application crashed 10. Application recovers from /path/checkpoint-166700000 and try to get RDD with validTime 166500000. Of course it will not find it and has to recompute. In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until to the start of the stream. When the stream depends on the external data, it will not successfully recover. In the case of Kafka, the recovered RDDs would not be the same as the original one, as the currentOffsets has been updated to the value at the moment of 166700000 The proposed fix: 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime instead of using the timestamp of Checkpoint instance (any side-effect?) 1. ClearMetadata shall be ClearMedataAndCheckpoint 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see any necessary to have two threads here was: In one of our applications, we found the following issue, the application recovering from a checkpoint file named "checkpoint-***166700000" but with the timestamp ***166500000 will recover from the very beginning of the stream and because our application relies on the external & periodically-cleaned data (syncing with checkpoint cleanup), the recovery just failed We identified a potential issue in Spark Streaming checkpoint and will describe it with the following example. We will propose a fix in the end of this JIRA. 1. The application properties: Batch Duration: 20000, Functionality: Single Stream calling ReduceByKeyAndWindow and print, Window Size: 60000, SlideDuration, 20000 2. RDD at 166500000 is generated and the corresponding job is submitted to the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the queue of JobGenerator 3. Job at 166500000 is finished and JobCompleted message is sent to JobScheduler's queue, and meanwhile, Job at 166520000 is submitted to the execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of JobGenerator 4. JobScheduler's message processing thread (I will use JS-EventLoop to identify it) is not scheduled by the operating system for a long time, and during this period, Jobs generated from 166520000 - 166700000 are generated and completed. 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled and processed all DoCheckpoint messages for jobs ranging from 166520000 - 166700000 and checkpoint files are successfully written. CRITICAL: at this moment, the lastCheckpointTime would be 166700000. 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs ranging from 166520000 - 166700000. CRITICAL: a ClearMetadata message is pushed to JobGenerator's message queue for EACH JobCompleted. 7. The current message queue contains 20 ClearMetadata messages and JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will remove all RDDs out of rememberDuration window. In our case, ReduceyKeyAndWindow will set rememberDuration to 100000 (rememberDuration of ReducedWindowDStream (40000) + windowSize) resulting that only RDDs <- (166600000, 166700000] are kept. And ClearMetaData processing logic will push a DoCheckpoint to JobGenerator's thread 8. JG-EventLoop is scheduled again to process DoCheckpoint for 16650000, VERY CRITICAL: at this step, RDD no later than 166700000 has been removed, and checkpoint data is updated as https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 and https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. 9. After 8, a checkpoint named /path/checkpoint-166700000 is created but with the timestamp 166500000. and at this moment, Application crashed 10. Application recovers from /path/checkpoint-166700000 and try to get RDD with validTime 166500000. Of course it will not find it and has to recompute. In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until to the start of the stream. When the stream depends on the external data, it will not successfully recover. In the case of Kafka, the recovered RDDs would not be the same as the original one, as the currentOffsets has been updated to the value at the moment of 166700000 The proposed fix: 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime instead of using the timestamp of Checkpoint instance (any side-effect?) 1. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see any necessary to have two threads here > Failed Recovery from checkpoint caused by the multi-threads issue in Spark > Streaming scheduler > ---------------------------------------------------------------------------------------------- > > Key: SPARK-19280 > URL: https://issues.apache.org/jira/browse/SPARK-19280 > Project: Spark > Issue Type: Bug > Affects Versions: 1.6.3, 2.0.0, 2.0.1, 2.0.2, 2.1.0 > Reporter: Nan Zhu > > In one of our applications, we found the following issue, the application > recovering from a checkpoint file named "checkpoint-***166700000" but with > the timestamp ***166500000 will recover from the very beginning of the stream > and because our application relies on the external & periodically-cleaned > data (syncing with checkpoint cleanup), the recovery just failed > We identified a potential issue in Spark Streaming checkpoint and will > describe it with the following example. We will propose a fix in the end of > this JIRA. > 1. The application properties: Batch Duration: 20000, Functionality: Single > Stream calling ReduceByKeyAndWindow and print, Window Size: 60000, > SlideDuration, 20000 > 2. RDD at 166500000 is generated and the corresponding job is submitted to > the execution ThreadPool. Meanwhile, a DoCheckpoint message is sent to the > queue of JobGenerator > 3. Job at 166500000 is finished and JobCompleted message is sent to > JobScheduler's queue, and meanwhile, Job at 166520000 is submitted to the > execution ThreadPool and similarly, a DoCheckpoint is sent to the queue of > JobGenerator > 4. JobScheduler's message processing thread (I will use JS-EventLoop to > identify it) is not scheduled by the operating system for a long time, and > during this period, Jobs generated from 166520000 - 166700000 are generated > and completed. > 5. the message processing thread of JobGenerator (JG-EventLoop) is scheduled > and processed all DoCheckpoint messages for jobs ranging from 166520000 - > 166700000 and checkpoint files are successfully written. CRITICAL: at this > moment, the lastCheckpointTime would be 166700000. > 6. JS-EventLoop is scheduled, and process all JobCompleted messages for jobs > ranging from 166520000 - 166700000. CRITICAL: a ClearMetadata message is > pushed to JobGenerator's message queue for EACH JobCompleted. > 7. The current message queue contains 20 ClearMetadata messages and > JG-EventLoop is scheduled to process them. CRITICAL: ClearMetadata will > remove all RDDs out of rememberDuration window. In our case, > ReduceyKeyAndWindow will set rememberDuration to 100000 (rememberDuration of > ReducedWindowDStream (40000) + windowSize) resulting that only RDDs <- > (166600000, 166700000] are kept. And ClearMetaData processing logic will push > a DoCheckpoint to JobGenerator's thread > 8. JG-EventLoop is scheduled again to process DoCheckpoint for 16650000, VERY > CRITICAL: at this step, RDD no later than 166700000 has been removed, and > checkpoint data is updated as > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L53 > and > https://github.com/apache/spark/blob/a81e336f1eddc2c6245d807aae2c81ddc60eabf9/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala#L59. > 9. After 8, a checkpoint named /path/checkpoint-166700000 is created but with > the timestamp 166500000. and at this moment, Application crashed > 10. Application recovers from /path/checkpoint-166700000 and try to get RDD > with validTime 166500000. Of course it will not find it and has to recompute. > In the case of ReduceByKeyAndWindow, it needs to recursively slice RDDs until > to the start of the stream. When the stream depends on the external data, it > will not successfully recover. In the case of Kafka, the recovered RDDs would > not be the same as the original one, as the currentOffsets has been updated > to the value at the moment of 166700000 > The proposed fix: > 0. a hot-fix would be setting timestamp Checkpoint File to lastCheckpointTime > instead of using the timestamp of Checkpoint instance (any side-effect?) > 1. ClearMetadata shall be ClearMedataAndCheckpoint > 2. a long-term fix would be merge JobScheduler and JobGenerator, I didn't see > any necessary to have two threads here -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org