[ https://issues.apache.org/jira/browse/SPARK-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tathagata Das resolved SPARK-5233. ---------------------------------- Resolution: Fixed Fix Version/s: 1.2.1 1.3.0 > Error replay of WAL when recovered from driver failue > ----------------------------------------------------- > > Key: SPARK-5233 > URL: https://issues.apache.org/jira/browse/SPARK-5233 > Project: Spark > Issue Type: Sub-task > Components: Streaming > Affects Versions: 1.2.0 > Reporter: Saisai Shao > Fix For: 1.3.0, 1.2.1 > > > Spark Streaming will write all the event into WAL for driver recovery, the > sequence in the WAL may be like this: > {code} > BlockAdditionEvent ---> BlockAdditionEvent ---> BlockAdditionEvent ---> > BatchAllocationEvent ---> BatchCleanupEvent ---> BlockAdditionEvent ---> > BlockAdditionEvent ---> 'Driver Down Time' ---> BlockAdditionEvent ---> > BlockAdditionEvent ---> BatchAllocationEvent > {code} > When driver recovered from failure, it will replay all the existed metadata > WAL to get the right status, in this situation, two BatchAdditionEvent before > down will put into received block queue. After driver started, new incoming > blocking will also put into this queue and a follow-up BlockAllocationEvent > will allocate an allocatedBlocks with queue draining out. So old, not this > batch's data will also mix into this batch, here is the partial log: > {code} > 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>block store result for > batch 1421140750000 ms > .... > 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: > WriteAheadLogFileSegment(file: > /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406 > 53201,46704,480) > 197757 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: > WriteAheadLogFileSegment(file: > /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406 > 53201,47188,480) > 197758 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: > WriteAheadLogFileSegment(file: > /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406 > 53201,47672,480) > 197759 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: > WriteAheadLogFileSegment(file: > /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406 > 53201,48156,480) > > 197760 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: > WriteAheadLogFileSegment(file: > /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406 > 53201,48640,480) > 197761 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: > WriteAheadLogFileSegment(file: > /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406 > 53201,49124,480) > 197762 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: > WriteAheadLogFileSegment(file: > /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408 > 07074,0,44184) > 197763 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: > WriteAheadLogFileSegment(file: > /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408 > 07074,44188,58536) > 197764 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: > WriteAheadLogFileSegment(file: > /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408 > 07074,102728,60168) > 197765 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: > WriteAheadLogFileSegment(file: > /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408 > 07074,162900,64584) > 197766 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: > WriteAheadLogFileSegment(file: > /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408 > 07074,227488,51240) > {code} > The old log > "/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406" > is obviously far older than current batch interval, and will fetch again to > add to process. > This issue is subtle, because in the previous code we never delete the old > received data WAL. This will lead to unwanted result as I know. > Basically because we miss some BlockAllocationEvent when recovered from > failure. I think we need to correctly replay and insert all the events > correctly. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org