We are using kakfa streams version 1.1.0 We made some changes to kafka streams code. We are observing following sequence of events in our production environment. We want to understand if following sequence of events is possible in 1.1.0 version also.
time T0 StreamThread-1 : got assigned 0_1, 0_2 standby tasks StreamThread-2 : got assigned 0_3 standby task time T1 ----- Now let us say there is a consumer group rebalance. And task 0_1 got assigned to StreamThread-2 (i.e; it 0_1 standby task moved from StreamThread-1 to StreamThread-2). time T2 ------ StreamThread-2 sees that new standby task, 0_1, is assigned to it. Tries to initializeStateStores for 0_1, but gets *LockException* because *owningThread* for the lock is StreamThread-1. But LockException is being swallowed in *initializeNewTasks* function of *AssignedTasks.java* And 0_1 remains in *created* map inside *AssignedTasks.java* time T3 ------ StreamThread-1 realizes that 0_1 is not re-assigned to it and closes the suspended task. As part of closing suspended task, entry for 0_1 is deleted from *locks* map in *unlock* function in StateDirectory.java time T4 ------ *CleanupThread* came along after *cleanupDelayMs* time and decided 0_1 directory in local file system is obsolete and deleted the directory !!! Since local directory is deleted for the task, and 0_1 is under created map, changelog topic-partitions won't be read for 0_1 standby task until next rebalance !!! Please let us know if this is valid sequence. If not, what are the guards to prevent this sequence. We see that in https://issues.apache.org/jira/browse/KAFKA-6122, retries around locks is removed. Please let us know why retry mechanism is removed?