We are using kakfa streams version 1.1.0

We made some changes to kafka streams code. We are observing following sequence 
of events in our production environment. We want to understand if following 
sequence of events is possible in 1.1.0 version also.

time T0

StreamThread-1 : got assigned 0_1, 0_2 standby tasks
StreamThread-2 : got assigned 0_3 standby task

time T1 -----

Now let us say there is a consumer group rebalance.

And task 0_1 got assigned to StreamThread-2 (i.e; it 0_1 standby task moved 
from StreamThread-1 to StreamThread-2).

time T2 ------

StreamThread-2 sees that new standby task, 0_1, is assigned to it. 
Tries to initializeStateStores for 0_1, but gets *LockException* because 
*owningThread* for the lock is StreamThread-1.

But LockException is being swallowed in *initializeNewTasks* function of 
*AssignedTasks.java*

And 0_1 remains in *created* map inside *AssignedTasks.java*

time T3 ------

StreamThread-1 realizes that 0_1 is not re-assigned to it and closes the 
suspended task. 
As part of closing suspended task, entry for 0_1 is deleted from *locks* map in 
*unlock* function in StateDirectory.java

time T4 ------

 *CleanupThread* came along after *cleanupDelayMs* time and decided 0_1 
directory in local 
 file system is obsolete and deleted the directory !!!
Since local directory is deleted for the task, and 0_1 is under created map, 
changelog topic-partitions won't be read for 0_1 standby task until next 
rebalance !!!


Please let us know if this is valid sequence. If not, what are the guards to 
prevent this sequence.

We see that in https://issues.apache.org/jira/browse/KAFKA-6122, retries around 
locks is removed. Please let us know why retry mechanism is removed?

Reply via email to