William Greer created KAFKA-8187:
------------------------------------

             Summary: State store record loss across multiple reassignments 
when using standby tasks
                 Key: KAFKA-8187
                 URL: https://issues.apache.org/jira/browse/KAFKA-8187
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.0.1
            Reporter: William Greer


Overview:
There is a race condition that can cause a partitioned state store to be 
missing records up to an offset when using standby tasks.

When a reassignment occurs and a task is migrated to a StandbyTask in another 
StreamThread/TaskManager on the same JVM, there can be lock contention that 
prevents the StandbyTask on the currently assigned StreamThread from acquiring 
the lock and to not retry acquiring the lock because all of the active 
StreamTasks are running for that StreamThread. If the StandbyTask does not 
acquire the lock before the StreamThread enters into the RUNNING state, then 
the StandbyTask will not consume any records. If there is no subsequent 
reassignment before the second execution of the stateDirCleaner Thread, then 
the task directory for the StandbyTask will be deleted. When the next 
reassignment occurs the offset that was read by the StandbyTask at creation 
time before acquiring the lock will be written back to the state store 
directory, this re-creates the state store directory.


An example:
StreamThread(A) and StreamThread(B) are running on the same JVM in the same 
streams application.

StreamThread(A) has StandbyTask 1_0
StreamThread(B) has no tasks

A reassignment is triggered by another host in the streams application fleet.

StreamThread(A) is notified with a PARTITIONS_REVOKED event of the threads one 
task
StreamThread(B) is notified with a PARTITIONS_ASSIGNED event of a standby task 
for 1_0

Here begins the race condition.
StreamThread(B) creates the StandbyTask which reads the current checkpoint from 
disk.
StreamThread(B) then attempts to updateNewAndRestoringTasks() for it's assigned 
tasks. [0]
StreamThread(B) initializes the new tasks for the active and standby tasks. [1] 
[2]
StreamThread(B) attempts to lock the state directory for task 1_0 but fails 
with a LockException [3], since StreamThread(A) still holds the lock.
StreamThread(B) returns true from updateNewAndRestoringTasks() due to the check 
at [4] which only checks that the active assigned tasks are running.
StreamThread(B) state is set to RUNNING
StreamThread(A) closes the previous StandbyTask specifically calling 
closeStateManager() [5]
StreamThread(A) state is set to RUNNING

Streams application for this host has completed re-balancing and is now in the 
RUNNING state.

State at this point is the following: State directory exists for 1_0 and all 
data is present.

Then at a period that is 1 to 2 intervals of [6](which is default of 10 
minutes) after the reassignment had completed the stateDirCleaner thread will 
execute [7].

The stateDirCleaner will then do [8], which finds the directory 1_0, finds that 
there isn't an active lock for that directory, acquire the lock, and deletes 
the directory.

State at this point is the following: State directory does not exist for 1_0.

When the next reassignment occurs. The offset that was read by StreamThread(B) 
during construction of the StandbyTask for 1_0 will be written back to disk. 
This write re-creates the state store directory and writes the .checkpoint file 
with the old offset.

State at this point is the following: State directory exists for 1_0 with a 
'.checkpoint' file in it, but there is no other state store data in the 
directory.

If this host is assigned the active task for 1_0 then all the history in the 
state store will be missing from before the offset that was read at the 
previous reassignment. 
If this host is assigned the standby task for 1_0 then the lock will be 
acquired and the standby will start to consume records, but it will still be 
missing all records from before the offset that was read at the previous 
reassignment.
If this host is not assigned 1_0, then the state directory will get cleaned up 
by the stateDirCleaner thread 10 to 20 minutes later and the record loss issue 
will be hidden.

[0] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L865-L869
[1] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L324-L340
[2] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L65-L84
[3] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L212-L236
[4] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L332
[5] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L245-L264
[6] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L797
[7] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java#L798-L803
[8] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java#L262-L327


How we recovered from the record loss:
1. Stop the streams application
2. Delete the impacted task directory to remove the .checkpoint file
3. Restart the streams application


Some possible ways of addressing this issue could be the following:
1. Check that the assigned standbys are running in addition to the assigned 
active tasks before returning in this method [1]
2. Only write the checkpoint file for a task if the thread still has the state 
directory lock for the task [9], on close StandbyTasks commit the offsets they 
have in memory. [10]
3. Read the checkpoint file after acquiring the locks for a task in the 
StreamThread.

[9] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L325
[10] 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java#L125-L148



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to