[ 
https://issues.apache.org/jira/browse/SPARK-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tathagata Das resolved SPARK-5233.
----------------------------------
       Resolution: Fixed
    Fix Version/s: 1.2.1
                   1.3.0

> Error replay of WAL when recovered from driver failue
> -----------------------------------------------------
>
>                 Key: SPARK-5233
>                 URL: https://issues.apache.org/jira/browse/SPARK-5233
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Saisai Shao
>             Fix For: 1.3.0, 1.2.1
>
>
> Spark Streaming will write all the event into WAL for driver recovery, the 
> sequence in the WAL may be like this:
> {code}
> BlockAdditionEvent ---> BlockAdditionEvent ---> BlockAdditionEvent ---> 
> BatchAllocationEvent ---> BatchCleanupEvent ---> BlockAdditionEvent ---> 
> BlockAdditionEvent ---> 'Driver Down Time' ---> BlockAdditionEvent ---> 
> BlockAdditionEvent ---> BatchAllocationEvent
> {code}
> When driver recovered from failure, it will replay all the existed metadata 
> WAL to get the right status, in this situation, two BatchAdditionEvent before 
> down will put into received block queue. After driver started, new incoming 
> blocking will also put into this queue and a follow-up BlockAllocationEvent 
> will allocate an allocatedBlocks with queue draining out. So old, not this 
> batch's data will also mix into this batch, here is the partial log:
> {code}
> 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>block store result for 
> batch 1421140750000 ms
> ....
> 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
>        53201,46704,480)
> 197757 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
>        53201,47188,480)
> 197758 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
>        53201,47672,480)
> 197759 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
>        53201,48156,480)                                                       
>                            
> 197760 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
>        53201,48640,480)
> 197761 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
>        53201,49124,480)
> 197762 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
>        07074,0,44184)
> 197763 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
>        07074,44188,58536)
> 197764 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
>        07074,102728,60168)
> 197765 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
>        07074,162900,64584)
> 197766 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
>        07074,227488,51240)
> {code}
> The old log 
> "/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406"
>  is obviously far older than current batch interval, and will fetch again to 
> add to process.
> This issue is subtle, because in the previous code we never delete the old 
> received data WAL. This will lead to unwanted result as I know.
> Basically because we miss some BlockAllocationEvent when recovered from 
> failure. I think we need to correctly replay and insert all the events 
> correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to