[ https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798761#comment-17798761 ]
Sabit edited comment on KAFKA-16025 at 12/19/23 10:22 PM: ---------------------------------------------------------- I think this bug can occur when rebalancing is triggered in the middle of the cleanup thread running due to multiple calls to [tryToLockAllNonEmptyTaskDirectories|#L1154-L1180]]. This is because `tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed to be called before the next invocation of `tryToLockAllNonEmptyTaskDirectories`, so on the next call to `tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in `lockedTaskDirectories` are cleared, and the directory is now seen as empty, so any tasks that were in `lockedTaskDirectories` that now have empty directories are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks to this thread in the `StateDirectory`. Logs that correspond to this: - CleanupThread starts on `i-0f1a5e7a42158e04b` {{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed (cleanup delay is 600000ms).}} - New member joins the consumer group {{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown member id joins group 0.agg in Stable state. Created a new member id i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and add to the group.}} - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread is deleting directories - This is where Thread-21 and the CleanUp thread race to lock the same directories and reach a state where thread 21 thought it locked non-empty directories, but they actually got emptied by the cleaner thread. The only thing that doesn't lineup here is that the delete time of 0_37 is before the stream thread began re-joining, but the timeline of other deletions aligns well {{2023-12-13 22:57:17.980000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for task 0_37 as 653970ms has elapsed (cleanup delay is 600000ms).}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request joining group due to: group is already rebalancing}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] (Re-)joining group}} {{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 600000ms).}} {{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed (cleanup delay is 600000ms).}} {{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed (cleanup delay is 600000ms).}} {{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed (cleanup delay is 600000ms).}} {{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed (cleanup delay is 600000ms).}} {{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg generation 122096 (__consumer_offsets-7)}} - Now, if when this state is reached and [[TaskManager#handleRebalanceComplete||#handleRebalanceComplete] [https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199] []|#handleRebalanceComplete] is called, the tasks with now empty directories will get unlocked as expected. We would see this with the log ["Adding newly assigned partitions"|[https://github.com/apache/kafka/blob/3.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L316-L333]] as a result of the sync group request succeeding following a successful join request. However, the sync group request fails for this thread because another rebalance is triggered: {{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance group 0.agg in state PreparingRebalance with old generation 122096 (__consumer_offsets-7) (reason: Updating metadata for member i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}} {{{}2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Successfully joined group with generation Generation{generationId=122096, memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f', protocol='stream'{}}}} {{{}2023-12-13 22:57:22.191000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=122096, memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f', protocol='stream'{}}}} {{2023-12-13 22:57:22.233000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request joining group due to: rebalance failed due to 'The group is rebalancing, so a rejoin is needed.' (RebalanceInProgressException)}} {{2023-12-13 22:57:22.233000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] (Re-)joining group}} - Now the next join request executes `tryToLockAllNonEmptyTaskDirectories` again, clearing the prior state of `lockedTaskDirectories`, not realizing there were some locks in there that it could not grab again as the directories had become empty. I need to reproduce this issue to confirm it, but the fix is rather simple: Just check if the directory is still non-empty in `tryToLockAllNonEmptyTaskDirectories` after taking the lock. If it has become empty, unlock it. was (Author: JIRAUSER303392): I think this bug can occur when rebalancing is triggered in the middle of the cleanup thread running due to multiple calls to [tryToLockAllNonEmptyTaskDirectories|[https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L1154-L1180]]. This is because `tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed to be called before the next invocation of `tryToLockAllNonEmptyTaskDirectories`, so on the next call to `tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in `lockedTaskDirectories` are cleared, and the directory is now seen as empty, so any tasks that were in `lockedTaskDirectories` that now have empty directories are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks to this thread in the `StateDirectory`. Logs that correspond to this: - CleanupThread starts on `i-0f1a5e7a42158e04b` {{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed (cleanup delay is 600000ms).}} - New member joins the consumer group {{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown member id joins group 0.agg in Stable state. Created a new member id i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and add to the group.}} - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread is deleting directories - This is where Thread-21 and the CleanUp thread race to lock the same directories and reach a state where thread 21 thought it locked non-empty directories, but they actually got emptied by the cleaner thread. The only thing that doesn't lineup here is that the delete time of 0_37 is before the stream thread began re-joining, but the timeline of other deletions aligns well {{2023-12-13 22:57:17.980000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for task 0_37 as 653970ms has elapsed (cleanup delay is 600000ms).}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request joining group due to: group is already rebalancing}} {{2023-12-13 22:57:18.659000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] (Re-)joining group}} {{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed (cleanup delay is 600000ms).}} {{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed (cleanup delay is 600000ms).}} {{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed (cleanup delay is 600000ms).}} {{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed (cleanup delay is 600000ms).}} {{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed (cleanup delay is 600000ms).}} {{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg generation 122096 (__consumer_offsets-7)}} - Now, if when this state is reached and [TaskManager#handleRebalanceComplete|#handleRebalanceComplete]([https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199]) is called, the tasks with now empty directories will get unlocked as expected. We would see this with the log ["Adding newly assigned partitions"]([https://github.com/apache/kafka/blob/3.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L316-L333]) as a result of the sync group request succeeding following a successful join request. However, the sync group request fails for this thread because another rebalance is triggered: {{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance group 0.agg in state PreparingRebalance with old generation 122096 (__consumer_offsets-7) (reason: Updating metadata for member i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}} {{2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Successfully joined group with generation Generation\{generationId=122096, memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f', protocol='stream'}}} {{2023-12-13 22:57:22.191000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation\{generationId=122096, memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f', protocol='stream'}}} {{2023-12-13 22:57:22.233000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request joining group due to: rebalance failed due to 'The group is rebalancing, so a rejoin is needed.' (RebalanceInProgressException)}} {{2023-12-13 22:57:22.233000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] (Re-)joining group}} - Now the next join request executes `tryToLockAllNonEmptyTaskDirectories` again, clearing the prior state of `lockedTaskDirectories`, not realizing there were some locks in there that it could not grab again as the directories had become empty. I need to reproduce this issue to confirm it, but the fix is rather simple: Just check if the directory is still non-empty in `tryToLockAllNonEmptyTaskDirectories` after taking the lock. If it has become empty, unlock it. > Streams StateDirectory has orphaned locks after rebalancing, blocking future > rebalancing > ---------------------------------------------------------------------------------------- > > Key: KAFKA-16025 > URL: https://issues.apache.org/jira/browse/KAFKA-16025 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.4.0 > Environment: Linux > Reporter: Sabit > Priority: Major > Attachments: Screenshot 1702750558363.png > > > Hello, > > We are encountering an issue where during rebalancing, we see streams threads > on one client get stuck in rebalancing. Upon enabling debug logs, we saw that > some tasks were having issues initializing due to failure to grab a lock in > the StateDirectory: > > {{2023-12-14 22:51:57.352000Z stream-thread > [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: > stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] > Failed to lock the state directory for task 0_51; will retry}} > > Reading the comments for TaskManager, this seems like an expected case, > however in our situation, the thread would get stuck on this permanently. > After looking at the logs, we came to understand that whenever tasks 0_51, > 0_37, 0_107, 0_219, or 0_93 were being assigned to this client, the assigned > threads would get stuck due to being unable to grab the lock. We took a heap > dump of this JVM and found that all of these tasks were being locks by > StreamThread-21 (see attachment). Additionally, each of these task > directories exist on the client but are empty directories. > > The sequence of events that occurred for us to arrive at this state is that > initially, all of these tasks were being processed on the impacted client, > either as active or standby tasks. We had one client drop out of the consumer > group, so these tasks were rebalanced away from the client. When we try to > bring up a new client to replace the one that dropped out, the impacted > client cannot initialize these 5 tasks it was initially processing. Sample of > one timeline: > > {{# Task moved away from the original consumer thread}} > {{2023-12-13 22:45:58.240000Z stream-thread > [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running}} > {{2023-12-13 22:45:58.263000Z stream-thread > [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean}} > {{# Directory cleaned up}} > {{2023-12-13 22:57:18.696000Z stream-thread > [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 > for task 0_51 as 680455ms has elapsed (cleanup delay is 600000ms).}} > {{# Cannot initialize task when it is re-assigned to this client}} > {{2023-12-14 22:51:57.352000Z stream-thread > [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: > stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] > Failed to lock the state directory for task 0_51; will retry}} > > {{Reading through the StateDirectory, it wasn't immediately obvious how we > could arrive in a situation where the task is locked by a thread it hadn't > been attempted to be assigned to yet, while the directory was cleaned up, but > is now empty instead of being deleted. We didn't observe any filesystem > issues on this client around this time either.}} > -- This message was sent by Atlassian Jira (v8.20.10#820010)