[ 
https://issues.apache.org/jira/browse/KAFKA-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16891259#comment-16891259
 ] 

Matthias J. Sax commented on KAFKA-8574:
----------------------------------------

I cannot remember all the details. What was the deadlock issue? We should 
hand-over tasks from one thread to another within an instance using the .lock 
files as synchronization points.

Another high level idea would be, to actually write the checkpoint file on 
suspend() (instead of close()) and delete it on resume()? \cc [~guozhang]

> EOS race condition during task transition leads to LocalStateStore truncation 
> in Kafka Streams 2.0.1
> ----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8574
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8574
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.1
>            Reporter: William Greer
>            Priority: Major
>
> *Overview*
>  While using EOS in Kafka Stream there is a race condition where the 
> checkpoint file is written by the previous owning thread (Thread A) after the 
> new owning thread (Thread B) reads the checkpoint file. Thread B then starts 
> a restoration since no checkpoint file was found. A re-balance occurs before 
> Thread B completes the restoration and a third Thread (Thread C) becomes the 
> owning thread (Thread C) reads the checkpoint file written by Thread A which 
> does not correspond to the current state of the RocksDB state store. When 
> this race condition occurs the state store will have the most recent records 
> and some amount of the oldest records but will be missing some amount of 
> records in between. If A->Z represents the entire changelog to the present 
> then when this scenario occurs the state store would contain records [A->K 
> and Y->Z] where the state store is missing records K->Y.
>   
>  This race condition is possible due to dirty writes and dirty reads of the 
> checkpoint file.
>   
>  *Example:*
>  Thread refers to a Kafka Streams StreamThread [0]
>  Thread A, B and C are running in the same JVM in the same streams 
> application.
>   
>  Scenario:
>  Thread-A is in RUNNING state and up to date on partition 1.
>  Thread-A is suspended on 1. This does not write a checkpoint file because 
> EOS is enabled [1]
>  Thread-B is assigned to 1
>  Thread-B does not find checkpoint in StateManager [2]
>  Thread-A is assigned a different partition. Task writes suspended tasks 
> checkpoints to disk. Checkpoint for 1 is written. [3]
>  Thread-B deletes LocalStore and starts restoring. The deletion of the 
> LocalStore does not delete checkpoint file. [4]
>  Thread-C is revoked
>  Thread-A is revoked
>  Thread-B is revoked from the assigned status. Does not write a checkpoint 
> file
>  - Note Thread-B never reaches the running state, it remains in the 
> PARTITIONS_ASSIGNED state until it transitions to the PARTITIONS_REVOKED state
> Thread-C is assigned 1
>  Thread-C finds checkpoint in StateManager. This checkpoint corresponds to 
> where Thread-A left the state store for partition 1 at and not where Thread-B 
> left the state store at.
>  Thread-C begins restoring from checkpoint. The state store is missing an 
> unknown number of records at this point
>  Thread-B is assigned does not write a checkpoint file for partition 1, 
> because it had not reached a running status before being revoked
>   
>  [0] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java]
>  [1] 
> [https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L522-L553]
>  [2] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L98]
>  [3] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L104-L105]
>  & 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L316-L331]
>  [4] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L228]
>  & 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L62-L123]
>  Specifically 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L107-L119]
>  is where the state store is deleted but the checkpoint file is not.
>   
>  *How we recovered:*
>  1. Deleted the impacted state store. This triggered multiple exceptions and 
> initiated a re-balance.
>   
>  *Possible approaches to address this issue:*
>  1. Add a collection of global task locks for concurrency protection of the 
> checkpoint file. With the lock for suspended tasks being released after 
> closeNonAssignedSuspendedTasks and the locks being acquired after lock 
> release for the assigned tasks.
>  2. Delete checkpoint file in EOS when partitions are revoked. This doesn't 
> address the race condition but would make it so that the checkpoint file 
> would never be ahead of the LocalStore in EOS, this would increase the 
> likelihood of triggering a full restoration of a LocalStore on partition 
> movement between threads on one host.
>  3. Configure task stickiness for StreamThreads. E.G. if a host with multiple 
> StreamThreads is assigned a task the host had before prefer to assign the 
> task to the thread on the host that had the task before.
>  4. Add a new state that splits the PARTITIONS_ASSIGNED state to a clean up 
> previous assignment step and a bootstrap new assignment. This would require 
> all valid threads to complete the clean up step before any thread could 
> progress into the bootstrap new assignment step.
>  5. Force a checkpoint of the current position during PARTITIONS_REVOKED. I 
> don't think this addresses the race condition but I think it mitigates the 
> truncation scenario.
>   
> *Made less likely by KAFKA-7672*
>  It seems the fix for https://issues.apache.org/jira/browse/KAFKA-7672 
> introduces a forced checkpoint during EOS so this truncation scenario may be 
> less likely for 2.2.0 but not for earlier versions, The change-set for 
> KAFKA-7672 doesn't address the race condition's around reading and writing 
> the checkpoint files. As far as I can tell It is still possible for a 
> StreamThread to not have completed the checkpoint writing in 
> PARTITIONS_REVOKED before another StreamThread has completed the checkpoint 
> read in PARTITIONS_ASSIGNED.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to