[jira] [Updated] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sabit updated KAFKA-16025: -- Attachment: (was: Screenshot 1702750558363.png) > Streams StateDirectory has orphaned locks after rebalancing, blocking future > rebalancing > > > Key: KAFKA-16025 > URL: https://issues.apache.org/jira/browse/KAFKA-16025 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.4.0 > Environment: Linux >Reporter: Sabit >Priority: Major > > Hello, > > We are encountering an issue where during rebalancing, we see streams threads > on one client get stuck in rebalancing. Upon enabling debug logs, we saw that > some tasks were having issues initializing due to failure to grab a lock in > the StateDirectory: > > {{2023-12-14 22:51:57.352000Z stream-thread > [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: > stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] > Failed to lock the state directory for task 0_51; will retry}} > > We were able to reproduce this behavior reliably on 3.4.0. This is the > sequence that triggers the bug. > Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), > each with 5 threads (1-5), and the consumer is using stateful tasks which > have state stores on disk. There are 10 active tasks and 10 standby tasks. > # Instance A is deactivated > # As an example, lets say task 0_1, previously on instance B, moves to > instance C > # Task 0_1 leaves behind it's state directory on Instance B's disk, > currently unused, and no lock for it exists in Instance B's StateDirectory > in-memory lock tracker > # Instance A is re-activated > # Streams thread 1 on Instance B is asked to re-join the consumer group due > to a new member being added > # As part of re-joining, thread 1 lists non-empty state directories in order > to report the offset's it has in it's state stores as part of it's metadata. > Thread 1 sees that the directory for 0_1 is not empty. > # The cleanup thread on instance B runs. The cleanup thread locks state > store 0_1, sees the directory for 0_1 was last modified more than > `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully > # Thread 1 takes a lock on directory 0_1 due to it being found not-empty > before, unaware that the cleanup has run between the time of the check and > the lock. It tracks this lock in it's own in-memory store, in addition to > StateDirectory's in-memory lock store > # Thread 1 successfully joins the consumer group > # After every consumer in the group joins the group, assignments are > calculated, and then every consumer calls sync group to receive the new > assignments > # Thread 1 on Instance B calls sync group but gets an error - the group > coordinator has triggered a new rebalance and all members must rejoin the > group > # Thread 1 again lists non-empty state directories in order to report the > offset's it has in it's state stores as part of it's metadata. Prior to doing > so, it clears it's in-memory store tracking the locks it has taken for the > purpose of gathering rebalance metadata > # Thread 1 no longer takes a lock on 0_1 as it is empty > # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory > # All consumers re-join and sync successfully, receiving their new > assignments > # Thread 2 on Instance B is assigned task 0_1 > # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is > still being held by Thread 1 > # Thread 2 remains in rebalancing state, and cannot make progress on task > 0_1, or any other tasks it has assigned. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sabit updated KAFKA-16025: -- Description: Hello, We are encountering an issue where during rebalancing, we see streams threads on one client get stuck in rebalancing. Upon enabling debug logs, we saw that some tasks were having issues initializing due to failure to grab a lock in the StateDirectory: {{2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry}} We were able to reproduce this behavior reliably on 3.4.0. This is the sequence that triggers the bug. Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), each with 5 threads (1-5), and the consumer is using stateful tasks which have state stores on disk. There are 10 active tasks and 10 standby tasks. # Instance A is deactivated # As an example, lets say task 0_1, previously on instance B, moves to instance C # Task 0_1 leaves behind it's state directory on Instance B's disk, currently unused, and no lock for it exists in Instance B's StateDirectory in-memory lock tracker # Instance A is re-activated # Streams thread 1 on Instance B is asked to re-join the consumer group due to a new member being added # As part of re-joining, thread 1 lists non-empty state directories in order to report the offset's it has in it's state stores as part of it's metadata. Thread 1 sees that the directory for 0_1 is not empty. # The cleanup thread on instance B runs. The cleanup thread locks state store 0_1, sees the directory for 0_1 was last modified more than `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully # Thread 1 takes a lock on directory 0_1 due to it being found not-empty before, unaware that the cleanup has run between the time of the check and the lock. It tracks this lock in it's own in-memory store, in addition to StateDirectory's in-memory lock store # Thread 1 successfully joins the consumer group # After every consumer in the group joins the group, assignments are calculated, and then every consumer calls sync group to receive the new assignments # Thread 1 on Instance B calls sync group but gets an error - the group coordinator has triggered a new rebalance and all members must rejoin the group # Thread 1 again lists non-empty state directories in order to report the offset's it has in it's state stores as part of it's metadata. Prior to doing so, it clears it's in-memory store tracking the locks it has taken for the purpose of gathering rebalance metadata # Thread 1 no longer takes a lock on 0_1 as it is empty # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory # All consumers re-join and sync successfully, receiving their new assignments # Thread 2 on Instance B is assigned task 0_1 # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is still being held by Thread 1 # Thread 2 remains in rebalancing state, and cannot make progress on task 0_1, or any other tasks it has assigned. was: Hello, We are encountering an issue where during rebalancing, we see streams threads on one client get stuck in rebalancing. Upon enabling debug logs, we saw that some tasks were having issues initializing due to failure to grab a lock in the StateDirectory: {{2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry}} Reading the comments for TaskManager, this seems like an expected case, however in our situation, the thread would get stuck on this permanently. After looking at the logs, we came to understand that whenever tasks 0_51, 0_37, 0_107, 0_219, or 0_93 were being assigned to this client, the assigned threads would get stuck due to being unable to grab the lock. We took a heap dump of this JVM and found that all of these tasks were being locks by StreamThread-21 (see attachment). Additionally, each of these task directories exist on the client but are empty directories. The sequence of events that occurred for us to arrive at this state is that initially, all of these tasks were being processed on the impacted client, either as active or standby tasks. We had one client drop out of the consumer group, so these tasks were rebalanced away from the client. When we try to bring up a new client to replace the one that dropped out, the impacted client cannot initialize these 5 tasks it was initially processing. Sample of one timeline: {{# Task moved away from the original consumer thread}} {{2023-12-13 22:45:58.24Z stream-thread [i-0f1a5e7a42158e
[jira] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025 ] Sabit deleted comment on KAFKA-16025: --- was (Author: JIRAUSER303392): I think this bug can occur when rebalancing is triggered in the middle of the cleanup thread running due to multiple calls to [tryToLockAllNonEmptyTaskDirectories|#L1154-L1180]. This is because `tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed to be called before the next invocation of `tryToLockAllNonEmptyTaskDirectories`, so on the next call to `tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in `lockedTaskDirectories` are cleared, and the directory is now seen as empty, so any tasks that were in `lockedTaskDirectories` that now have empty directories are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks to this thread in the `StateDirectory`. Logs that correspond to this: - CleanupThread starts on `i-0f1a5e7a42158e04b` {{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed (cleanup delay is 60ms).}} - New member joins the consumer group {{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown member id joins group 0.agg in Stable state. Created a new member id i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and add to the group.}} - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread is deleting directories - This is where Thread-21 and the CleanUp thread race to lock the same directories and reach a state where thread 21 thought it locked non-empty directories, but they actually got emptied by the cleaner thread. The only thing that doesn't lineup here is that the delete time of 0_37 is before the stream thread began re-joining, but the timeline of other deletions aligns well {{2023-12-13 22:57:17.98Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for task 0_37 as 653970ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request joining group due to: group is already rebalancing}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] (Re-)joining group}} {{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg generation 122096 (__consumer_offsets-7)}} - Now, if when this state is reached and [TaskManager#handleRebalanceComplete|#L180-L199]] is called, the tasks with now empty directories will get unlocked as expected. We would see this with the log ["Adding newly assigned partitions"|#L316-L333]] as a result of the sync group request succeeding following a successful join request. However, the sync group request fails for this thread because another rebalance is triggered: {{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance group 0.agg in state PreparingRebalance with old generation 122096 (__consumer_offsets-7) (reason: Updating metadata for member i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}} {{{}2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Successfully joined group with generation Generation{generationId=122096, memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f', protocol='stream'{ {{{}2023-12-13 22:57:22.191000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] SyncGroup failed: The group began another rebal
[jira] [Comment Edited] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798761#comment-17798761 ] Sabit edited comment on KAFKA-16025 at 12/19/23 10:23 PM: -- I think this bug can occur when rebalancing is triggered in the middle of the cleanup thread running due to multiple calls to [tryToLockAllNonEmptyTaskDirectories|#L1154-L1180]. This is because `tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed to be called before the next invocation of `tryToLockAllNonEmptyTaskDirectories`, so on the next call to `tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in `lockedTaskDirectories` are cleared, and the directory is now seen as empty, so any tasks that were in `lockedTaskDirectories` that now have empty directories are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks to this thread in the `StateDirectory`. Logs that correspond to this: - CleanupThread starts on `i-0f1a5e7a42158e04b` {{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed (cleanup delay is 60ms).}} - New member joins the consumer group {{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown member id joins group 0.agg in Stable state. Created a new member id i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and add to the group.}} - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread is deleting directories - This is where Thread-21 and the CleanUp thread race to lock the same directories and reach a state where thread 21 thought it locked non-empty directories, but they actually got emptied by the cleaner thread. The only thing that doesn't lineup here is that the delete time of 0_37 is before the stream thread began re-joining, but the timeline of other deletions aligns well {{2023-12-13 22:57:17.98Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for task 0_37 as 653970ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request joining group due to: group is already rebalancing}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] (Re-)joining group}} {{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg generation 122096 (__consumer_offsets-7)}} - Now, if when this state is reached and [TaskManager#handleRebalanceComplete|#L180-L199]] is called, the tasks with now empty directories will get unlocked as expected. We would see this with the log ["Adding newly assigned partitions"|#L316-L333]] as a result of the sync group request succeeding following a successful join request. However, the sync group request fails for this thread because another rebalance is triggered: {{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance group 0.agg in state PreparingRebalance with old generation 122096 (__consumer_offsets-7) (reason: Updating metadata for member i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}} {{{}2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Successfully joined group with generation Generation{generationId=122096, memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f', protocol='stream'{ {{{}2023-12-13 22:57:22.191000Z [Consumer instanceId=i-0f1a5e7a42158e04b
[jira] [Comment Edited] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798761#comment-17798761 ] Sabit edited comment on KAFKA-16025 at 12/19/23 10:23 PM: -- I think this bug can occur when rebalancing is triggered in the middle of the cleanup thread running due to multiple calls to [tryToLockAllNonEmptyTaskDirectories|#L1154-L1180]]. This is because `tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed to be called before the next invocation of `tryToLockAllNonEmptyTaskDirectories`, so on the next call to `tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in `lockedTaskDirectories` are cleared, and the directory is now seen as empty, so any tasks that were in `lockedTaskDirectories` that now have empty directories are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks to this thread in the `StateDirectory`. Logs that correspond to this: - CleanupThread starts on `i-0f1a5e7a42158e04b` {{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed (cleanup delay is 60ms).}} - New member joins the consumer group {{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown member id joins group 0.agg in Stable state. Created a new member id i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and add to the group.}} - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread is deleting directories - This is where Thread-21 and the CleanUp thread race to lock the same directories and reach a state where thread 21 thought it locked non-empty directories, but they actually got emptied by the cleaner thread. The only thing that doesn't lineup here is that the delete time of 0_37 is before the stream thread began re-joining, but the timeline of other deletions aligns well {{2023-12-13 22:57:17.98Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for task 0_37 as 653970ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request joining group due to: group is already rebalancing}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] (Re-)joining group}} {{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg generation 122096 (__consumer_offsets-7)}} - Now, if when this state is reached and [TaskManager#handleRebalanceComplete|#L180-L199]] is called, the tasks with now empty directories will get unlocked as expected. We would see this with the log ["Adding newly assigned partitions"|#L316-L333]] as a result of the sync group request succeeding following a successful join request. However, the sync group request fails for this thread because another rebalance is triggered: {{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance group 0.agg in state PreparingRebalance with old generation 122096 (__consumer_offsets-7) (reason: Updating metadata for member i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}} {{{}2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Successfully joined group with generation Generation{generationId=122096, memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f', protocol='stream'{ {{{}2023-12-13 22:57:22.191000Z [Consumer instanceId=i-0f1a5e7a42158e04
[jira] [Comment Edited] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798761#comment-17798761 ] Sabit edited comment on KAFKA-16025 at 12/19/23 10:22 PM: -- I think this bug can occur when rebalancing is triggered in the middle of the cleanup thread running due to multiple calls to [tryToLockAllNonEmptyTaskDirectories|#L1154-L1180]]. This is because `tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed to be called before the next invocation of `tryToLockAllNonEmptyTaskDirectories`, so on the next call to `tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in `lockedTaskDirectories` are cleared, and the directory is now seen as empty, so any tasks that were in `lockedTaskDirectories` that now have empty directories are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks to this thread in the `StateDirectory`. Logs that correspond to this: - CleanupThread starts on `i-0f1a5e7a42158e04b` {{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed (cleanup delay is 60ms).}} - New member joins the consumer group {{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown member id joins group 0.agg in Stable state. Created a new member id i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and add to the group.}} - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread is deleting directories - This is where Thread-21 and the CleanUp thread race to lock the same directories and reach a state where thread 21 thought it locked non-empty directories, but they actually got emptied by the cleaner thread. The only thing that doesn't lineup here is that the delete time of 0_37 is before the stream thread began re-joining, but the timeline of other deletions aligns well {{2023-12-13 22:57:17.98Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for task 0_37 as 653970ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request joining group due to: group is already rebalancing}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] (Re-)joining group}} {{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg generation 122096 (__consumer_offsets-7)}} - Now, if when this state is reached and [TaskManager#handleRebalanceComplete#handleRebalanceComplete|[https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199]] is called, the tasks with now empty directories will get unlocked as expected. We would see this with the log ["Adding newly assigned partitions"|#L316-L333]] as a result of the sync group request succeeding following a successful join request. However, the sync group request fails for this thread because another rebalance is triggered: {{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance group 0.agg in state PreparingRebalance with old generation 122096 (__consumer_offsets-7) (reason: Updating metadata for member i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}} {{{}2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Successfully joined group with generation Generation{generationId=122096, memberId='i-0f1a5
[jira] [Comment Edited] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798761#comment-17798761 ] Sabit edited comment on KAFKA-16025 at 12/19/23 10:22 PM: -- I think this bug can occur when rebalancing is triggered in the middle of the cleanup thread running due to multiple calls to [tryToLockAllNonEmptyTaskDirectories|#L1154-L1180]]. This is because `tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed to be called before the next invocation of `tryToLockAllNonEmptyTaskDirectories`, so on the next call to `tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in `lockedTaskDirectories` are cleared, and the directory is now seen as empty, so any tasks that were in `lockedTaskDirectories` that now have empty directories are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks to this thread in the `StateDirectory`. Logs that correspond to this: - CleanupThread starts on `i-0f1a5e7a42158e04b` {{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed (cleanup delay is 60ms).}} - New member joins the consumer group {{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown member id joins group 0.agg in Stable state. Created a new member id i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and add to the group.}} - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread is deleting directories - This is where Thread-21 and the CleanUp thread race to lock the same directories and reach a state where thread 21 thought it locked non-empty directories, but they actually got emptied by the cleaner thread. The only thing that doesn't lineup here is that the delete time of 0_37 is before the stream thread began re-joining, but the timeline of other deletions aligns well {{2023-12-13 22:57:17.98Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for task 0_37 as 653970ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request joining group due to: group is already rebalancing}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] (Re-)joining group}} {{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg generation 122096 (__consumer_offsets-7)}} - Now, if when this state is reached and [[TaskManager#handleRebalanceComplete||#handleRebalanceComplete] [https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199] []|#handleRebalanceComplete] is called, the tasks with now empty directories will get unlocked as expected. We would see this with the log ["Adding newly assigned partitions"|[https://github.com/apache/kafka/blob/3.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L316-L333]] as a result of the sync group request succeeding following a successful join request. However, the sync group request fails for this thread because another rebalance is triggered: {{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance group 0.agg in state PreparingRebalance with old generation 122096 (__consumer_offsets-7) (reason: Updating metadata for member i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}} {{{}2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-2
[jira] [Comment Edited] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798761#comment-17798761 ] Sabit edited comment on KAFKA-16025 at 12/19/23 10:21 PM: -- I think this bug can occur when rebalancing is triggered in the middle of the cleanup thread running due to multiple calls to [tryToLockAllNonEmptyTaskDirectories|[https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L1154-L1180]]. This is because `tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed to be called before the next invocation of `tryToLockAllNonEmptyTaskDirectories`, so on the next call to `tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in `lockedTaskDirectories` are cleared, and the directory is now seen as empty, so any tasks that were in `lockedTaskDirectories` that now have empty directories are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks to this thread in the `StateDirectory`. Logs that correspond to this: - CleanupThread starts on `i-0f1a5e7a42158e04b` {{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed (cleanup delay is 60ms).}} - New member joins the consumer group {{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown member id joins group 0.agg in Stable state. Created a new member id i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and add to the group.}} - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread is deleting directories - This is where Thread-21 and the CleanUp thread race to lock the same directories and reach a state where thread 21 thought it locked non-empty directories, but they actually got emptied by the cleaner thread. The only thing that doesn't lineup here is that the delete time of 0_37 is before the stream thread began re-joining, but the timeline of other deletions aligns well {{2023-12-13 22:57:17.98Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for task 0_37 as 653970ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request joining group due to: group is already rebalancing}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] (Re-)joining group}} {{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed (cleanup delay is 60ms).}} {{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg generation 122096 (__consumer_offsets-7)}} - Now, if when this state is reached and [TaskManager#handleRebalanceComplete|#handleRebalanceComplete]([https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199]) is called, the tasks with now empty directories will get unlocked as expected. We would see this with the log ["Adding newly assigned partitions"]([https://github.com/apache/kafka/blob/3.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L316-L333]) as a result of the sync group request succeeding following a successful join request. However, the sync group request fails for this thread because another rebalance is triggered: {{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance group 0.agg in state PreparingRebalance with old generation 122096 (__consumer_offsets-7) (reason: Updating metadata for member i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc
[jira] [Commented] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798761#comment-17798761 ] Sabit commented on KAFKA-16025: --- I think this bug can occur when rebalancing is triggered in the middle of the cleanup thread running due to multiple calls to [tryToLockAllNonEmptyTaskDirectories](https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L1154-L1180). This is because `tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed to be called before the next invocation of `tryToLockAllNonEmptyTaskDirectories`, so on the next call to `tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in `lockedTaskDirectories` are cleared, and the directory is now seen as empty, so any tasks that were in `lockedTaskDirectories` that now have empty directories are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks to this thread in the `StateDirectory`. Logs that correspond to this: - CleanupThread starts on `i-0f1a5e7a42158e04b` ``` 2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed (cleanup delay is 60ms). ``` - New member joins the consumer group ``` 2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown member id joins group 0.agg in Stable state. Created a new member id i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and add to the group. ``` - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread is deleting directories - This is where Thread-21 and the CleanUp thread race to lock the same directories and reach a state where thread 21 thought it locked non-empty directories, but they actually got emptied by the cleaner thread. The only thing that doesn't lineup here is that the delete time of 0_37 is before the stream thread began re-joining, but the timeline of other deletions aligns well ``` 2023-12-13 22:57:17.98Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for task 0_37 as 653970ms has elapsed (cleanup delay is 60ms). 2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request joining group due to: group is already rebalancing 2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] (Re-)joining group 2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 60ms). 2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed (cleanup delay is 60ms). 2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed (cleanup delay is 60ms). 2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed (cleanup delay is 60ms). 2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed (cleanup delay is 60ms). 2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg generation 122096 (__consumer_offsets-7) ``` - Now, if when this state is reached and [TaskManager#handleRebalanceComplete](https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199) is called, the tasks with now empty directories will get unlocked as expected. We would see this with the log ["Adding newly assigned partitions"](https://github.com/apache/kafka/blob/3.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L316-L333) as a result of the sync group request succeeding following a successful join request. However, the sync group request fails for this thread because another rebalance is triggered: ``` 2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance group 0.agg in state PreparingRebalance with old generation 122096 (__consumer_offsets-7) (reason: Updating metadata for member i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable) 2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=
[jira] [Updated] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sabit updated KAFKA-16025: -- Description: Hello, We are encountering an issue where during rebalancing, we see streams threads on one client get stuck in rebalancing. Upon enabling debug logs, we saw that some tasks were having issues initializing due to failure to grab a lock in the StateDirectory: {{2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry}} Reading the comments for TaskManager, this seems like an expected case, however in our situation, the thread would get stuck on this permanently. After looking at the logs, we came to understand that whenever tasks 0_51, 0_37, 0_107, 0_219, or 0_93 were being assigned to this client, the assigned threads would get stuck due to being unable to grab the lock. We took a heap dump of this JVM and found that all of these tasks were being locks by StreamThread-21 (see attachment). Additionally, each of these task directories exist on the client but are empty directories. The sequence of events that occurred for us to arrive at this state is that initially, all of these tasks were being processed on the impacted client, either as active or standby tasks. We had one client drop out of the consumer group, so these tasks were rebalanced away from the client. When we try to bring up a new client to replace the one that dropped out, the impacted client cannot initialize these 5 tasks it was initially processing. Sample of one timeline: {{# Task moved away from the original consumer thread}} {{2023-12-13 22:45:58.24Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running}} {{2023-12-13 22:45:58.263000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean}} {{# Directory cleaned up}} {{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 60ms).}} {{# Cannot initialize task when it is re-assigned to this client}} {{2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry}} {{Reading through the StateDirectory, it wasn't immediately obvious how we could arrive in a situation where the task is locked by a thread it hadn't been attempted to be assigned to yet, while the directory was cleaned up, but is now empty instead of being deleted. We didn't observe any filesystem issues on this client around this time either.}} was: Hello, We are encountering an issue where during rebalancing, we see streams threads on one client get stuck in rebalancing. Upon enabling debug logs, we saw that some tasks were having issues initializing due to failure to grab a lock in the StateDirectory: {{2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry}} Reading the comments for TaskManager, this seems like an expected case, however in our situation, the thread would get stuck on this permanently. After looking at the logs, we came to understand that whenever tasks 0_51, 0_37, 0_107, 0_219, or 0_93 were being assigned to this client, those threads would get stuck due to being unable to grab the lock. We took a heap dump of this jvm and found that all of these tasks were being locks by StreamThread-21 (see attachment). Additionally, each of these task directories exist on the instance but are empty directories. The sequence of events that occurred for us to arrive at this state is that initially, all of these tasks were being processed on the impacted client, either as active or standby tasks. We had one client drop out of the consumer group, so these tasks were rebalanced away from the client. When we try to bring up a new client to replace the one that dropped out, the impacted client cannot initialize these 5 tasks it was initially processing. Sample of one timeline: {{# Task moved away from the original consumer thread}} {{2023-12-13 22:45:58.24Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running}} {{2023-12-13 22:45:58.263000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean}} {{# Directory cleaned up}} {{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThr
[jira] [Updated] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sabit updated KAFKA-16025: -- Description: Hello, We are encountering an issue where during rebalancing, we see streams threads on one client get stuck in rebalancing. Upon enabling debug logs, we saw that some tasks were having issues initializing due to failure to grab a lock in the StateDirectory: {{2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry}} Reading the comments for TaskManager, this seems like an expected case, however in our situation, the thread would get stuck on this permanently. After looking at the logs, we came to understand that whenever tasks 0_51, 0_37, 0_107, 0_219, or 0_93 were being assigned to this client, those threads would get stuck due to being unable to grab the lock. We took a heap dump of this jvm and found that all of these tasks were being locks by StreamThread-21 (see attachment). Additionally, each of these task directories exist on the instance but are empty directories. The sequence of events that occurred for us to arrive at this state is that initially, all of these tasks were being processed on the impacted client, either as active or standby tasks. We had one client drop out of the consumer group, so these tasks were rebalanced away from the client. When we try to bring up a new client to replace the one that dropped out, the impacted client cannot initialize these 5 tasks it was initially processing. Sample of one timeline: {{# Task moved away from the original consumer thread}} {{2023-12-13 22:45:58.24Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running}} {{2023-12-13 22:45:58.263000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean}} {{# Directory cleaned up}} {{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 60ms).}} {{# Cannot initialize task when it is re-assigned to this client}} {{2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry}} {{Reading through the StateDirectory, it wasn't immediately obvious how we could arrive in a situation where the task is locked by a thread it hadn't been attempted to be assigned to yet, while the directory was cleaned up, but is now empty instead of being deleted. We didn't observe any filesystem issues on this client around this time either.}} was: Hello, We are encountering an issue where during rebalancing, we see streams threads on one client get stuck in rebalancing. Upon enabling debug logs, we saw that some tasks were having issues initializing due to failure to grab a lock in the StateDirectory: {}2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry{} Reading the comments for TaskManager, this seems like an expected case, however in our situation, the thread would get stuck on this permanently. After looking at the logs, we came to understand that whenever tasks 0_51, 0_37, 0_107, 0_219, or 0_93 were being assigned to this client, those threads would get stuck due to being unable to grab the lock. We took a heap dump of this jvm and found that all of these tasks were being locks by StreamThread-21 (see attachment). Additionally, each of these task directories exist on the instance but are empty directories. The sequence of events that occurred for us to arrive at this state is that initially, all of these tasks were being processed on the impacted client, either as active or standby tasks. We had one client drop out of the consumer group, so these tasks were rebalanced away from the client. When we try to bring up a new client to replace the one that dropped out, the impacted client cannot initialize these 5 tasks it was initially processing. Sample of one timeline: {{# Task moved away from the original consumer thread}} {{2023-12-13 22:45:58.24Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running}} {{2023-12-13 22:45:58.263000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean}} {{# Directory cleaned up}} {{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-Clea
[jira] [Updated] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sabit updated KAFKA-16025: -- Description: Hello, We are encountering an issue where during rebalancing, we see streams threads on one client get stuck in rebalancing. Upon enabling debug logs, we saw that some tasks were having issues initializing due to failure to grab a lock in the StateDirectory: {}2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry{} Reading the comments for TaskManager, this seems like an expected case, however in our situation, the thread would get stuck on this permanently. After looking at the logs, we came to understand that whenever tasks 0_51, 0_37, 0_107, 0_219, or 0_93 were being assigned to this client, those threads would get stuck due to being unable to grab the lock. We took a heap dump of this jvm and found that all of these tasks were being locks by StreamThread-21 (see attachment). Additionally, each of these task directories exist on the instance but are empty directories. The sequence of events that occurred for us to arrive at this state is that initially, all of these tasks were being processed on the impacted client, either as active or standby tasks. We had one client drop out of the consumer group, so these tasks were rebalanced away from the client. When we try to bring up a new client to replace the one that dropped out, the impacted client cannot initialize these 5 tasks it was initially processing. Sample of one timeline: {{# Task moved away from the original consumer thread}} {{2023-12-13 22:45:58.24Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running}} {{2023-12-13 22:45:58.263000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean}} {{# Directory cleaned up}} {{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 60ms).}} {{# Cannot initialize task when it is re-assigned to this client}} {{2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry}} {{Reading through the StateDirectory, it wasn't immediately obvious how we could arrive in a situation where the task is locked by a thread it hadn't been attempted to be assigned to yet, while the directory was cleaned up, but is now empty instead of being deleted. We didn't observe any filesystem issues on this client around this time either.}} was: Hello, We are encountering an issue where during rebalancing, we see streams threads on one client get stuck in rebalancing. Upon enabling debug logs, we saw that some tasks were having issues initializing due to failure to grab a lock in the StateDirectory: {}2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry{} Reading the comments for TaskManager, this seems like an expected case, however in our situation, the thread would get stuck on this permanently. After looking at the logs, we came to understand that whenever tasks 0_51, 0_37, 0_107, 0_219, or 0_93 were being assigned to this client, those threads would get stuck due to being unable to grab the lock. We took a heap dump of this jvm and found that all of these tasks were being locks by StreamThread-21 (see attachment). Additionally, each of these task directories exist on the instance but are empty directories. The sequence of events that occurred for us to arrive at this state is that initially, all of these tasks were being processed on the impacted client, either as active or standby tasks. We had one client drop out of the consumer group, so these tasks were rebalanced away from the client. When we try to bring up a new client to replace the one that dropped out, the impacted client cannot initialize these 5 tasks it was initially processing. Sample of one timeline: {{# Task moved away from the original consumer thread}} {}2023-12-13 22:45:58.24Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running{} {}2023-12-13 22:45:58.263000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean{} {{# Directory cleaned up}} {}2023-12-13 22:57:18
[jira] [Created] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
Sabit created KAFKA-16025: - Summary: Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing Key: KAFKA-16025 URL: https://issues.apache.org/jira/browse/KAFKA-16025 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.4.0 Environment: Linux Reporter: Sabit Attachments: Screenshot 1702750558363.png Hello, We are encountering an issue where during rebalancing, we see streams threads on one client get stuck in rebalancing. Upon enabling debug logs, we saw that some tasks were having issues initializing due to failure to grab a lock in the StateDirectory: 2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry Reading the comments for TaskManager, this seems like an expected case, however in our situation, the thread would get stuck on this permanently. After looking at the logs, we came to understand that whenever tasks 0_51, 0_37, 0_107, 0_219, or 0_93 were being assigned to this client, those threads would get stuck due to being unable to grab the lock. We took a heap dump of this jvm and found that all of these tasks were being locks by StreamThread-21 (see attachment). Additionally, each of these task directories exist on the instance but are empty directories. The sequence of events that occurred for us to arrive at this state is that initially, all of these tasks were being processed on the impacted client, either as active or standby tasks. We had one client drop out of the consumer group, so these tasks were rebalanced away from the client. When we try to bring up a new client to replace the one that dropped out, the impacted client cannot initialize these 5 tasks it was initially processing. Sample of one timeline: {{# Task moved away from the original consumer thread}} 2023-12-13 22:45:58.24Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running 2023-12-13 22:45:58.263000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean {{# Directory cleaned up?{{{}{} 2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 60ms). # Cannot initialize task when it is re-assigned to this client 2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry Reading through the StateDirectory, it wasn't immediately obvious how we could arrive in a situation where the task is locked by a thread it hadn't been attempted to be assigned to yet, while the directory was cleaned up, but is now empty instead of being deleted. We didn't observe any filesystem issues on this client around this time either. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16025) Streams StateDirectory has orphaned locks after rebalancing, blocking future rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sabit updated KAFKA-16025: -- Description: Hello, We are encountering an issue where during rebalancing, we see streams threads on one client get stuck in rebalancing. Upon enabling debug logs, we saw that some tasks were having issues initializing due to failure to grab a lock in the StateDirectory: {}2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry{} Reading the comments for TaskManager, this seems like an expected case, however in our situation, the thread would get stuck on this permanently. After looking at the logs, we came to understand that whenever tasks 0_51, 0_37, 0_107, 0_219, or 0_93 were being assigned to this client, those threads would get stuck due to being unable to grab the lock. We took a heap dump of this jvm and found that all of these tasks were being locks by StreamThread-21 (see attachment). Additionally, each of these task directories exist on the instance but are empty directories. The sequence of events that occurred for us to arrive at this state is that initially, all of these tasks were being processed on the impacted client, either as active or standby tasks. We had one client drop out of the consumer group, so these tasks were rebalanced away from the client. When we try to bring up a new client to replace the one that dropped out, the impacted client cannot initialize these 5 tasks it was initially processing. Sample of one timeline: {{# Task moved away from the original consumer thread}} {}2023-12-13 22:45:58.24Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running{} {}2023-12-13 22:45:58.263000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean{} {{# Directory cleaned up}} {}2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 60ms).{} {}# Cannot initialize task when it is re-assigned to this client{} {}2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry{} {}Reading through the StateDirectory, it wasn't immediately obvious how we could arrive in a situation where the task is locked by a thread it hadn't been attempted to be assigned to yet, while the directory was cleaned up, but is now empty instead of being deleted. We didn't observe any filesystem issues on this client around this time either.{} was: Hello, We are encountering an issue where during rebalancing, we see streams threads on one client get stuck in rebalancing. Upon enabling debug logs, we saw that some tasks were having issues initializing due to failure to grab a lock in the StateDirectory: 2023-12-14 22:51:57.352000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed to lock the state directory for task 0_51; will retry Reading the comments for TaskManager, this seems like an expected case, however in our situation, the thread would get stuck on this permanently. After looking at the logs, we came to understand that whenever tasks 0_51, 0_37, 0_107, 0_219, or 0_93 were being assigned to this client, those threads would get stuck due to being unable to grab the lock. We took a heap dump of this jvm and found that all of these tasks were being locks by StreamThread-21 (see attachment). Additionally, each of these task directories exist on the instance but are empty directories. The sequence of events that occurred for us to arrive at this state is that initially, all of these tasks were being processed on the impacted client, either as active or standby tasks. We had one client drop out of the consumer group, so these tasks were rebalanced away from the client. When we try to bring up a new client to replace the one that dropped out, the impacted client cannot initialize these 5 tasks it was initially processing. Sample of one timeline: {{# Task moved away from the original consumer thread}} 2023-12-13 22:45:58.24Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running 2023-12-13 22:45:58.263000Z stream-thread [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean {{# Direc
[jira] [Commented] (KAFKA-12679) Rebalancing a restoring or running task may cause directory livelocking with newly created task
[ https://issues.apache.org/jira/browse/KAFKA-12679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17797281#comment-17797281 ] Sabit commented on KAFKA-12679: --- Would it be possible to add some logging as to which thread owns the lock when this happens? We are currently experiencing this issue repeatedly on one client, and it would be helpful to understand which other streams thread is the lock owner in case that thread is behaving unexpectedly. We are on streams 3.4.x and not using the stateUpdater. > Rebalancing a restoring or running task may cause directory livelocking with > newly created task > --- > > Key: KAFKA-12679 > URL: https://issues.apache.org/jira/browse/KAFKA-12679 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.1 > Environment: Broker and client version 2.6.1 > Multi-node broker cluster > Multi-node, auto scaling streams app instances >Reporter: Peter Nahas >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.7.0 > > Attachments: Backoff-between-directory-lock-attempts.patch > > > If a task that uses a state store is in the restoring state or in a running > state and the task gets rebalanced to a separate thread on the same instance, > the newly created task will attempt to lock the state store director while > the first thread is continuing to use it. This is totally normal and expected > behavior when the first thread is not yet aware of the rebalance. However, > that newly created task is effectively running a while loop with no backoff > waiting to lock the directory: > # TaskManager tells the task to restore in `tryToCompleteRestoration` > # The task attempts to lock the directory > # The lock attempt fails and throws a > `org.apache.kafka.streams.errors.LockException` > # TaskManager catches the exception, stops further processing on the task > and reports that not all tasks have restored > # The StreamThread `runLoop` continues to run. > I've seen some documentation indicate that there is supposed to be a backoff > when this condition occurs, but there does not appear to be any in the code. > The result is that if this goes on for long enough, the lock-loop may > dominate CPU usage in the process and starve out the old stream thread task > processing. > > When in this state, the DEBUG level logging for TaskManager will produce a > steady stream of messages like the following: > {noformat} > 2021-03-30 20:59:51,098 DEBUG --- [StreamThread-10] o.a.k.s.p.i.TaskManager > : stream-thread [StreamThread-10] Could not initialize 0_34 due > to the following exception; will retry > org.apache.kafka.streams.errors.LockException: stream-thread > [StreamThread-10] standby-task [0_34] Failed to lock the state directory for > task 0_34 > {noformat} > > > I've attached a git formatted patch to resolve the issue. Simply detect the > scenario and sleep for the backoff time in the appropriate StreamThread. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)