[ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17798761#comment-17798761
 ] 

Sabit edited comment on KAFKA-16025 at 12/19/23 10:22 PM:
----------------------------------------------------------

I think this bug can occur when rebalancing is triggered in the middle of the 
cleanup thread running due to multiple calls to 
[tryToLockAllNonEmptyTaskDirectories|#L1154-L1180]]. This is because 
`tryToLockAllNonEmptyTaskDirectories` does not check if the directory becomes 
empty between calls to `listNonEmptyTaskDirectories` and `lock`, so it can lock 
an empty directory. `releaseLockedUnassignedTaskDirectories` is not guaranteed 
to be called before the next invocation of 
`tryToLockAllNonEmptyTaskDirectories`, so on the next call to 
`tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in 
`lockedTaskDirectories` are cleared, and the directory is now seen as empty, so 
any tasks that were in `lockedTaskDirectories` that now have empty directories 
are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks 
to this thread in the `StateDirectory`. 

Logs that correspond to this:
 - CleanupThread starts on `i-0f1a5e7a42158e04b`

{{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed 
(cleanup delay is 600000ms).}}
 - New member joins the consumer group

{{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown 
member id joins group 0.agg in Stable state. Created a new member id 
i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and 
add to the group.}}
 - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread 
is deleting directories
    - This is where Thread-21 and the CleanUp thread race to lock the same 
directories and reach a state where thread 21 thought it locked non-empty 
directories, but they actually got emptied by the cleaner thread. The only 
thing that doesn't lineup here is that the delete time of 0_37 is before the 
stream thread began re-joining, but the timeline of other deletions aligns well

{{2023-12-13 22:57:17.980000Z    stream-thread 
[i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for 
task 0_37 as 653970ms has elapsed (cleanup delay is 600000ms).}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request 
joining group due to: group is already rebalancing}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
(Re-)joining group}}
{{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed 
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed 
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed 
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed 
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed 
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg 
generation 122096 (__consumer_offsets-7)}}
 - Now, if when this state is reached and 
[[TaskManager#handleRebalanceComplete||#handleRebalanceComplete] 
[https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199]
 []|#handleRebalanceComplete] is called, the tasks with now empty directories 
will get unlocked as expected. We would see this with the log ["Adding newly 
assigned 
partitions"|[https://github.com/apache/kafka/blob/3.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L316-L333]]
 as a result of the sync group request succeeding following a successful join 
request. However, the sync group request fails for this thread because another 
rebalance is triggered:

{{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance 
group 0.agg in state PreparingRebalance with old generation 122096 
(__consumer_offsets-7) (reason: Updating metadata for member 
i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}}
{{{}2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
Successfully joined group with generation Generation{generationId=122096, 
memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f', 
protocol='stream'{}}}}
{{{}2023-12-13 22:57:22.191000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] SyncGroup 
failed: The group began another rebalance. Need to re-join the group. Sent 
generation was Generation{generationId=122096, 
memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f', 
protocol='stream'{}}}}
{{2023-12-13 22:57:22.233000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request 
joining group due to: rebalance failed due to 'The group is rebalancing, so a 
rejoin is needed.' (RebalanceInProgressException)}}
{{2023-12-13 22:57:22.233000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
(Re-)joining group}}
 - Now the next join request executes `tryToLockAllNonEmptyTaskDirectories` 
again, clearing the prior state of `lockedTaskDirectories`, not realizing there 
were some locks in there that it could not grab again as the directories had 
become empty.

I need to reproduce this issue to confirm it, but the fix is rather simple: 
Just check if the directory is still non-empty in 
`tryToLockAllNonEmptyTaskDirectories` after taking the lock. If it has become 
empty, unlock it.

 


was (Author: JIRAUSER303392):
I think this bug can occur when rebalancing is triggered in the middle of the 
cleanup thread running due to multiple calls to 
[tryToLockAllNonEmptyTaskDirectories|[https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L1154-L1180]].
 This is because `tryToLockAllNonEmptyTaskDirectories` does not check if the 
directory becomes empty between calls to `listNonEmptyTaskDirectories` and 
`lock`, so it can lock an empty directory. 
`releaseLockedUnassignedTaskDirectories` is not guaranteed to be called before 
the next invocation of `tryToLockAllNonEmptyTaskDirectories`, so on the next 
call to `tryToLockAllNonEmptyTaskDirectories`, the prior locked tasks in 
`lockedTaskDirectories` are cleared, and the directory is now seen as empty, so 
any tasks that were in `lockedTaskDirectories` that now have empty directories 
are not re-inserted into `lockedTaskDirectories`, and now have orphaned locks 
to this thread in the `StateDirectory`. 

Logs that correspond to this:
 - CleanupThread starts on `i-0f1a5e7a42158e04b`


{{2023-12-13 22:57:03.033000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_143 for task 0_143 as 692300ms has elapsed 
(cleanup delay is 600000ms).}}
 - New member joins the consumer group


{{2023-12-13 22:57:17.142000Z [GroupCoordinator 2]: Static Member with unknown 
member id joins group 0.agg in Stable state. Created a new member id 
i-07cc34cd33f2886e4-20-892934e1-760f-415e-948c-538c11cbb2c9 for this member and 
add to the group.}}
 - Thread 21 on `i-0f1a5e7a42158e04b` re-joins the group as the CleanupThread 
is deleting directories
    - This is where Thread-21 and the CleanUp thread race to lock the same 
directories and reach a state where thread 21 thought it locked non-empty 
directories, but they actually got emptied by the cleaner thread. The only 
thing that doesn't lineup here is that the delete time of 0_37 is before the 
stream thread began re-joining, but the timeline of other deletions aligns well


{{2023-12-13 22:57:17.980000Z    stream-thread 
[i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_37 for 
task 0_37 as 653970ms has elapsed (cleanup delay is 600000ms).}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request 
joining group due to: group is already rebalancing}}
{{2023-12-13 22:57:18.659000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
(Re-)joining group}}
{{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed 
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:19.391000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_23 for task 0_23 as 664025ms has elapsed 
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:20.136000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_93 for task 0_93 as 673331ms has elapsed 
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:20.796000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_107 for task 0_107 as 659430ms has elapsed 
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:21.382000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_219 for task 0_219 as 600952ms has elapsed 
(cleanup delay is 600000ms).}}
{{2023-12-13 22:57:21.969000Z [GroupCoordinator 2]: Stabilized group 0.agg 
generation 122096 (__consumer_offsets-7)}}
 - Now, if when this state is reached and 
[TaskManager#handleRebalanceComplete|#handleRebalanceComplete]([https://github.com/apache/kafka/blob/3.4/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L180-L199])
 is called, the tasks with now empty directories will get unlocked as expected. 
We would see this with the log ["Adding newly assigned 
partitions"]([https://github.com/apache/kafka/blob/3.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L316-L333])
 as a result of the sync group request succeeding following a successful join 
request. However, the sync group request fails for this thread because another 
rebalance is triggered:


{{2023-12-13 22:57:22.029000Z [GroupCoordinator 2]: Preparing to rebalance 
group 0.agg in state PreparingRebalance with old generation 122096 
(__consumer_offsets-7) (reason: Updating metadata for member 
i-0b2771f8e7a021034-28-d6bab48b-d1e9-4b36-8597-9818fc655d3d during Stable)}}
{{2023-12-13 22:57:22.091000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
Successfully joined group with generation Generation\{generationId=122096, 
memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f', 
protocol='stream'}}}
{{2023-12-13 22:57:22.191000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] SyncGroup 
failed: The group began another rebalance. Need to re-join the group. Sent 
generation was Generation\{generationId=122096, 
memberId='i-0f1a5e7a42158e04b-21-8a648b21-3210-44f8-92cc-9896c5b07e0f', 
protocol='stream'}}}
{{2023-12-13 22:57:22.233000Z [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] Request 
joining group due to: rebalance failed due to 'The group is rebalancing, so a 
rejoin is needed.' (RebalanceInProgressException)}}
{{2023-12-13 22:57:22.233000Z    [Consumer instanceId=i-0f1a5e7a42158e04b-21, 
clientId=i-0f1a5e7a42158e04b-StreamThread-21-consumer, groupId=0.agg] 
(Re-)joining group}}
 - Now the next join request executes `tryToLockAllNonEmptyTaskDirectories` 
again, clearing the prior state of `lockedTaskDirectories`, not realizing there 
were some locks in there that it could not grab again as the directories had 
become empty.

I need to reproduce this issue to confirm it, but the fix is rather simple: 
Just check if the directory is still non-empty in 
`tryToLockAllNonEmptyTaskDirectories` after taking the lock. If it has become 
empty, unlock it.

 

> Streams StateDirectory has orphaned locks after rebalancing, blocking future 
> rebalancing
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16025
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16025
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.4.0
>         Environment: Linux
>            Reporter: Sabit
>            Priority: Major
>         Attachments: Screenshot 1702750558363.png
>
>
> Hello,
>  
> We are encountering an issue where during rebalancing, we see streams threads 
> on one client get stuck in rebalancing. Upon enabling debug logs, we saw that 
> some tasks were having issues initializing due to failure to grab a lock in 
> the StateDirectory:
>  
> {{2023-12-14 22:51:57.352000Z stream-thread 
> [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: 
> stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] 
> Failed to lock the state directory for task 0_51; will retry}}
>  
> Reading the comments for TaskManager, this seems like an expected case, 
> however in our situation, the thread would get stuck on this permanently. 
> After looking at the logs, we came to understand that whenever tasks 0_51, 
> 0_37, 0_107, 0_219, or 0_93 were being assigned to this client, the assigned 
> threads would get stuck due to being unable to grab the lock. We took a heap 
> dump of this JVM and found that all of these tasks were being locks by 
> StreamThread-21 (see attachment). Additionally, each of these task 
> directories exist on the client but are empty directories.
>  
> The sequence of events that occurred for us to arrive at this state is that 
> initially, all of these tasks were being processed on the impacted client, 
> either as active or standby tasks. We had one client drop out of the consumer 
> group, so these tasks were rebalanced away from the client. When we try to 
> bring up a new client to replace the one that dropped out, the impacted 
> client cannot initialize these 5 tasks it was initially processing. Sample of 
> one timeline:
>  
> {{# Task moved away from the original consumer thread}}
> {{2023-12-13 22:45:58.240000Z stream-thread 
> [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running}}
> {{2023-12-13 22:45:58.263000Z stream-thread 
> [i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean}}
> {{# Directory cleaned up}}
> {{2023-12-13 22:57:18.696000Z stream-thread 
> [i-0f1a5e7a42158e04b-CleanupThread] Deleting obsolete state directory 0_51 
> for task 0_51 as 680455ms has elapsed (cleanup delay is 600000ms).}}
> {{# Cannot initialize task when it is re-assigned to this client}}
> {{2023-12-14 22:51:57.352000Z stream-thread 
> [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: 
> stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] 
> Failed to lock the state directory for task 0_51; will retry}}
>  
> {{Reading through the StateDirectory, it wasn't immediately obvious how we 
> could arrive in a situation where the task is locked by a thread it hadn't 
> been attempted to be assigned to yet, while the directory was cleaned up, but 
> is now empty instead of being deleted. We didn't observe any filesystem 
> issues on this client around this time either.}}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to