[ 
https://issues.apache.org/jira/browse/SPARK-5233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14285603#comment-14285603
 ] 

Saisai Shao commented on SPARK-5233:
------------------------------------

Hi TD, may be I didn't describe the problem very clearly.

The problem is that if our metadata WAL end with {{BlockAdditionEvent}} when 
driver failed, at the time when driver recovered from metadata WAL, it will 
read all the events and replay to put into the right place. Because the WAL is 
ended with {{BlockAdditionEvent}}, so this event will be left in queue, and 
waiting for {{BatchAllocationEvent}} to aggregate, but seems lack of 
{{BatchAllocationEvent}}, this old data will always be in queue. Also in this 
time, the new received data event will also put into the queue, finally with 
the new {{BatchAllocationEvent}}, all the events left in queue will be 
aggregated, so old {{BlockAdditionEvent}} will be mixed into new event as the 
receiver input for processing, this will lead to error from my understanding.

{code}
......-----> BatchCleanupEvent ---> BlockAdditionEvent ---> BlockAdditionEvent 
---> 'Driver Down

 Time' ---> BlockAdditionEvent ---> BlockAdditionEvent ---> BatchAllocationEvent
 
these four BlockAdditionEvents (2 old, 2 new) will be aggregated when job is 
submitted
{code}

> Error replay of WAL when recovered from driver failue
> -----------------------------------------------------
>
>                 Key: SPARK-5233
>                 URL: https://issues.apache.org/jira/browse/SPARK-5233
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Saisai Shao
>
> Spark Streaming will write all the event into WAL for driver recovery, the 
> sequence in the WAL may be like this:
> {code}
> BlockAdditionEvent ---> BlockAdditionEvent ---> BlockAdditionEvent ---> 
> BatchAllocationEvent ---> BatchCleanupEvent ---> BlockAdditionEvent ---> 
> BlockAdditionEvent ---> 'Driver Down Time' ---> BlockAdditionEvent ---> 
> BlockAdditionEvent ---> BatchAllocationEvent
> {code}
> When driver recovered from failure, it will replay all the existed metadata 
> WAL to get the right status, in this situation, two BatchAdditionEvent before 
> down will put into received block queue. After driver started, new incoming 
> blocking will also put into this queue and a follow-up BlockAllocationEvent 
> will allocate an allocatedBlocks with queue draining out. So old, not this 
> batch's data will also mix into this batch, here is the partial log:
> {code}
> 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>block store result for 
> batch 1421140750000 ms
> ....
> 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
>        53201,46704,480)
> 197757 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
>        53201,47188,480)
> 197758 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
>        53201,47672,480)
> 197759 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
>        53201,48156,480)                                                       
>                            
> 197760 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
>        53201,48640,480)
> 197761 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406
>        53201,49124,480)
> 197762 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
>        07074,0,44184)
> 197763 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
>        07074,44188,58536)
> 197764 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
>        07074,102728,60168)
> 197765 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
>        07074,162900,64584)
> 197766 15/01/13 17:19:10 INFO KafkaInputDStream: >>>>>>>>>>>>>log segment: 
> WriteAheadLogFileSegment(file:       
> /home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140747074-14211408
>        07074,227488,51240)
> {code}
> The old log 
> "/home/jerryshao/project/apache-spark/checkpoint-wal-test/receivedData/0/log-1421140593201-14211406"
>  is obviously far older than current batch interval, and will fetch again to 
> add to process.
> This issue is subtle, because in the previous code we never delete the old 
> received data WAL. This will lead to unwanted result as I know.
> Basically because we miss some BlockAllocationEvent when recovered from 
> failure. I think we need to correctly replay and insert all the events 
> correctly.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to