[ 
https://issues.apache.org/jira/browse/KAFKA-16025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sabit updated KAFKA-16025:
--------------------------
    Description: 
Hello,

 

We are encountering an issue where during rebalancing, we see streams threads 
on one client get stuck in rebalancing. Upon enabling debug logs, we saw that 
some tasks were having issues initializing due to failure to grab a lock in the 
StateDirectory:

 

{{2023-12-14 22:51:57.352000Z stream-thread 
[i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: 
stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed 
to lock the state directory for task 0_51; will retry}}

 

We were able to reproduce this behavior reliably on 3.4.0. This is the sequence 
that triggers the bug.

Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), each 
with 5 threads (1-5), and the consumer is using stateful tasks which have state 
stores on disk. There are 10 active tasks and 10 standby tasks.
 # Instance A is deactivated
 # As an example, lets say task 0_1, previously on instance B, moves to 
instance C
 # Task 0_1 leaves behind it's state directory on Instance B's disk, currently 
unused, and no lock for it exists in Instance B's StateDirectory in-memory lock 
tracker
 # Instance A is re-activated
 # Streams thread 1 on Instance B is asked to re-join the consumer group due to 
a new member being added
 # As part of re-joining, thread 1 lists non-empty state directories in order 
to report the offset's it has in it's state stores as part of it's metadata. 
Thread 1 sees that the directory for 0_1 is not empty.
 # The cleanup thread on instance B runs. The cleanup thread locks state store 
0_1, sees the directory for 0_1 was last modified more than 
`state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully
 # Thread 1 takes a lock on directory 0_1 due to it being found not-empty 
before, unaware that the cleanup has run between the time of the check and the 
lock. It tracks this lock in it's own in-memory store, in addition to 
StateDirectory's in-memory lock store
 # Thread 1 successfully joins the consumer group
 # After every consumer in the group joins the group, assignments are 
calculated, and then every consumer calls sync group to receive the new 
assignments
 # Thread 1 on Instance B calls sync group but gets an error - the group 
coordinator has triggered a new rebalance and all members must rejoin the group
 # Thread 1 again lists non-empty state directories in order to report the 
offset's it has in it's state stores as part of it's metadata. Prior to doing 
so, it clears it's in-memory store tracking the locks it has taken for the 
purpose of gathering rebalance metadata
 # Thread 1 no longer takes a lock on 0_1 as it is empty
 # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory
 # All consumers re-join and sync successfully, receiving their new assignments
 # Thread 2 on Instance B is assigned task 0_1
 # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is still 
being held by Thread 1
 # Thread 2 remains in rebalancing state, and cannot make progress on task 0_1, 
or any other tasks it has assigned.

  was:
Hello,

 

We are encountering an issue where during rebalancing, we see streams threads 
on one client get stuck in rebalancing. Upon enabling debug logs, we saw that 
some tasks were having issues initializing due to failure to grab a lock in the 
StateDirectory:

 

{{2023-12-14 22:51:57.352000Z stream-thread 
[i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: 
stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed 
to lock the state directory for task 0_51; will retry}}

 

Reading the comments for TaskManager, this seems like an expected case, however 
in our situation, the thread would get stuck on this permanently. After looking 
at the logs, we came to understand that whenever tasks 0_51, 0_37, 0_107, 
0_219, or 0_93 were being assigned to this client, the assigned threads would 
get stuck due to being unable to grab the lock. We took a heap dump of this JVM 
and found that all of these tasks were being locks by StreamThread-21 (see 
attachment). Additionally, each of these task directories exist on the client 
but are empty directories.

 

The sequence of events that occurred for us to arrive at this state is that 
initially, all of these tasks were being processed on the impacted client, 
either as active or standby tasks. We had one client drop out of the consumer 
group, so these tasks were rebalanced away from the client. When we try to 
bring up a new client to replace the one that dropped out, the impacted client 
cannot initialize these 5 tasks it was initially processing. Sample of one 
timeline:

 

{{# Task moved away from the original consumer thread}}

{{2023-12-13 22:45:58.240000Z stream-thread 
[i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Suspended running}}

{{2023-12-13 22:45:58.263000Z stream-thread 
[i-0f1a5e7a42158e04b-StreamThread-32] standby-task [0_51] Closed clean}}

{{# Directory cleaned up}}

{{2023-12-13 22:57:18.696000Z stream-thread [i-0f1a5e7a42158e04b-CleanupThread] 
Deleting obsolete state directory 0_51 for task 0_51 as 680455ms has elapsed 
(cleanup delay is 600000ms).}}

{{# Cannot initialize task when it is re-assigned to this client}}

{{2023-12-14 22:51:57.352000Z stream-thread 
[i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: 
stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] Failed 
to lock the state directory for task 0_51; will retry}}

 

{{Reading through the StateDirectory, it wasn't immediately obvious how we 
could arrive in a situation where the task is locked by a thread it hadn't been 
attempted to be assigned to yet, while the directory was cleaned up, but is now 
empty instead of being deleted. We didn't observe any filesystem issues on this 
client around this time either.}}

 


> Streams StateDirectory has orphaned locks after rebalancing, blocking future 
> rebalancing
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16025
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16025
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.4.0
>         Environment: Linux
>            Reporter: Sabit
>            Priority: Major
>         Attachments: Screenshot 1702750558363.png
>
>
> Hello,
>  
> We are encountering an issue where during rebalancing, we see streams threads 
> on one client get stuck in rebalancing. Upon enabling debug logs, we saw that 
> some tasks were having issues initializing due to failure to grab a lock in 
> the StateDirectory:
>  
> {{2023-12-14 22:51:57.352000Z stream-thread 
> [i-0f1a5e7a42158e04b-StreamThread-14] Could not initialize task 0_51 since: 
> stream-thread [i-0f1a5e7a42158e04b-StreamThread-14] standby-task [0_51] 
> Failed to lock the state directory for task 0_51; will retry}}
>  
> We were able to reproduce this behavior reliably on 3.4.0. This is the 
> sequence that triggers the bug.
> Assume in a streams consumer group, there are 5 instances (A, B, C, D, E), 
> each with 5 threads (1-5), and the consumer is using stateful tasks which 
> have state stores on disk. There are 10 active tasks and 10 standby tasks.
>  # Instance A is deactivated
>  # As an example, lets say task 0_1, previously on instance B, moves to 
> instance C
>  # Task 0_1 leaves behind it's state directory on Instance B's disk, 
> currently unused, and no lock for it exists in Instance B's StateDirectory 
> in-memory lock tracker
>  # Instance A is re-activated
>  # Streams thread 1 on Instance B is asked to re-join the consumer group due 
> to a new member being added
>  # As part of re-joining, thread 1 lists non-empty state directories in order 
> to report the offset's it has in it's state stores as part of it's metadata. 
> Thread 1 sees that the directory for 0_1 is not empty.
>  # The cleanup thread on instance B runs. The cleanup thread locks state 
> store 0_1, sees the directory for 0_1 was last modified more than 
> `state.cleanup.delay.ms` ago, deletes it, and unlocks it successfully
>  # Thread 1 takes a lock on directory 0_1 due to it being found not-empty 
> before, unaware that the cleanup has run between the time of the check and 
> the lock. It tracks this lock in it's own in-memory store, in addition to 
> StateDirectory's in-memory lock store
>  # Thread 1 successfully joins the consumer group
>  # After every consumer in the group joins the group, assignments are 
> calculated, and then every consumer calls sync group to receive the new 
> assignments
>  # Thread 1 on Instance B calls sync group but gets an error - the group 
> coordinator has triggered a new rebalance and all members must rejoin the 
> group
>  # Thread 1 again lists non-empty state directories in order to report the 
> offset's it has in it's state stores as part of it's metadata. Prior to doing 
> so, it clears it's in-memory store tracking the locks it has taken for the 
> purpose of gathering rebalance metadata
>  # Thread 1 no longer takes a lock on 0_1 as it is empty
>  # However, that lock on 0_1 owned by Thread 1 remains in StateDirectory
>  # All consumers re-join and sync successfully, receiving their new 
> assignments
>  # Thread 2 on Instance B is assigned task 0_1
>  # Thread 2 cannot take a lock on 0_1 in the StateDirectory because it is 
> still being held by Thread 1
>  # Thread 2 remains in rebalancing state, and cannot make progress on task 
> 0_1, or any other tasks it has assigned.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to