[ 
https://issues.apache.org/jira/browse/KAFKA-20505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-20505.
------------------------------------
    Resolution: Fixed

> Deadlock in KIP-932 share path: 
> `SharePartition.rollbackOrProcessStateUpdates` completes future inside write 
> lock, reenters `DelayedOperation` lock held by request handler
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-20505
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20505
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 4.2.0
>            Reporter: Travis Bischel
>            Assignee: Apoorv Mittal
>            Priority: Blocker
>             Fix For: 4.3.0, 4.2.1
>
>         Attachments: DEADLOCK_EXTRACT.txt, DEADLOCK_THREADDUMP.log
>
>
> I had Claude dig in over night to figure out why my tests against a local 3 
> node cluster were hanging, after ruling out problems within my own test. The 
> following is the full analysis it came up with:
> —
> The JVM's own deadlock detector (\{{kill -3}} SIGQUIT thread dump) reports a 
> cross-thread deadlock between two locks in the share-fetch + 
> share-acknowledge path on a single-broker KRaft 4.2.0 cluster. Once it fires, 
> every \{{ShareFetch}} and \{{ShareAcknowledge}} against the affected broker 
> hangs until the session times out; the broker process remains alive with the 
> JVM detecting the deadlock in its own threads.
> h2. Reproduction
> * Single broker, 4.2.0, KRaft mode, \{{network_mode: host}}, \{{-Xmx1g}}.
> * Relevant non-default broker configs:
> {code}
> share.coordinator.state.topic.replication.factor=1
> share.coordinator.state.topic.num.partitions=5
> share.coordinator.state.topic.min.isr=1
> group.coordinator.rebalance.protocols=classic,consumer,share
> {code}
> * Driver: franz-go \{{TestShareGroupETL}} (heavy share-group ETL, rebalances, 
> ack/release/reject mix):
> {code}
> KGO_TEST_RF=1 go test -v -run TestShareGroupETL -timeout 2m
> {code}
> * Occurs reliably within 6 consecutive iterations on a freshly started 
> cluster. First one or two runs pass, then a subsequent run makes no progress; 
> client gets \{{context deadline exceeded}} on every \{{ShareFetch}} to the 
> broker.
> * Also reproduces on a 3-node cluster; eliminating replication (single 
> broker) does not prevent it.
> h2. JVM deadlock (verbatim from \{{kill -3 <pid>}})
> {noformat}
> Found one Java-level deadlock:
> =============================
> "PersisterStateManager":
>   waiting for ownable synchronizer 0x00000000d9b000a0, (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
>   which is held by "data-plane-kafka-request-handler-4"
> "data-plane-kafka-request-handler-4":
>   waiting for ownable synchronizer 0x00000000d9b2d3b8, (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
>   which is held by "PersisterStateManager"
> Java stack information for the threads listed above:
> ===================================================
> "PersisterStateManager":
>     at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
>     - parking to wait for  <0x00000000d9b000a0> (a 
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
>     at java.util.concurrent.locks.LockSupport.park(...)
>     at java.util.concurrent.locks.ReentrantLock.lock(...)
>     at 
> org.apache.kafka.server.purgatory.DelayedOperation.safeTryComplete(DelayedOperation.java:133)
>     at 
> org.apache.kafka.server.purgatory.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperationPurgatory.java:342)
>     at 
> org.apache.kafka.server.purgatory.DelayedOperationPurgatory.checkAndComplete(DelayedOperationPurgatory.java:193)
>     at 
> kafka.server.ReplicaManager.completeDelayedShareFetchRequest(ReplicaManager.scala:376)
>     at 
> kafka.server.share.SharePartitionManager.lambda$releaseSession$0(SharePartitionManager.java:395)
>     at java.util.ArrayList.forEach(...)
>     at 
> kafka.server.share.SharePartitionManager.releaseSession(SharePartitionManager.java:376)
>     at 
> kafka.server.KafkaApis.$anonfun$handleShareAcknowledgeRequest$4(KafkaApis.scala:3597)
>     at java.util.concurrent.CompletableFuture.uniHandle(...)
>     at java.util.concurrent.CompletableFuture.postComplete(...)
>     at java.util.concurrent.CompletableFuture.complete(...)
>     at 
> kafka.server.share.SharePartitionManager.lambda$acknowledge$1(SharePartitionManager.java:321)
>     at java.util.concurrent.CompletableFuture.uniWhenComplete(...)
>     at java.util.concurrent.CompletableFuture.postComplete(...)
>     at 
> kafka.server.share.SharePartition.lambda$rollbackOrProcessStateUpdates$1(SharePartition.java:2520)
>     at java.util.concurrent.CompletableFuture.uniWhenComplete(...)
>     at java.util.concurrent.CompletableFuture.postComplete(...)
>     at 
> kafka.server.share.SharePartition.lambda$writeShareGroupState$0(SharePartition.java:2797)
>     at java.util.concurrent.CompletableFuture.uniWhenComplete(...)
>     at java.util.concurrent.CompletableFuture.postComplete(...)
>     at 
> org.apache.kafka.server.share.persister.PersisterStateManager$WriteStateHandler.handleRequestResponse(PersisterStateManager.java:806)
>     at 
> org.apache.kafka.server.share.persister.PersisterStateManager$PersisterStateManagerHandler.onComplete(PersisterStateManager.java:366)
>     at 
> org.apache.kafka.server.share.persister.PersisterStateManager$SendThread.lambda$generateRequests$5(PersisterStateManager.java:1555)
>     at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
>     at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:669)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:661)
>     at 
> org.apache.kafka.server.util.InterBrokerSendThread.pollOnce(InterBrokerSendThread.java:110)
>     at 
> org.apache.kafka.server.util.InterBrokerSendThread.doWork(InterBrokerSendThread.java:137)
>     at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)
> "data-plane-kafka-request-handler-4":
>     at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
>     - parking to wait for  <0x00000000d9b2d3b8> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>     at java.util.concurrent.locks.LockSupport.park(...)
>     at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(...)
>     at 
> kafka.server.share.SharePartition.partitionState(SharePartition.java:2435)
>     at 
> kafka.server.share.SharePartition.stateNotActive(SharePartition.java:1615)
>     at 
> kafka.server.share.SharePartition.maybeAcquireFetchLock(SharePartition.java:1501)
>     at 
> kafka.server.share.DelayedShareFetch.lambda$acquirablePartitions$0(DelayedShareFetch.java:409)
>     at java.util.LinkedHashMap.forEach(...)
>     at 
> kafka.server.share.DelayedShareFetch.acquirablePartitions(DelayedShareFetch.java:406)
>     at 
> kafka.server.share.DelayedShareFetch.tryComplete(DelayedShareFetch.java:341)
>     at 
> org.apache.kafka.server.purgatory.DelayedOperation.safeTryComplete(DelayedOperation.java:136)
>     at 
> org.apache.kafka.server.purgatory.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperationPurgatory.java:342)
>     at 
> org.apache.kafka.server.purgatory.DelayedOperationPurgatory.checkAndComplete(DelayedOperationPurgatory.java:193)
>     at 
> kafka.server.ReplicaManager.$anonfun$addCompletePurgatoryAction$2(ReplicaManager.scala:909)
>     at scala.collection.mutable.HashMap.foreach(HashMap.scala:504)
>     at 
> kafka.server.ReplicaManager.$anonfun$addCompletePurgatoryAction$1(ReplicaManager.scala:900)
>     at 
> org.apache.kafka.server.DelayedActionQueue.tryCompleteActions(DelayedActionQueue.java:45)
>     at 
> kafka.server.ReplicaManager.tryCompleteActions(ReplicaManager.scala:602)
>     at kafka.server.KafkaApis.handle(KafkaApis.scala:258)
>     at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:163)
> Found 1 deadlock.
> {noformat}
> h2. Root cause
> {\{SharePartition.rollbackOrProcessStateUpdates}} completes the returned 
> \{{CompletableFuture}} *while still holding the \{{SharePartition}} write 
> lock*. The \{{whenComplete}}/\{{thenCompose}} chain on that future runs the 
> downstream share-acknowledge completion path 
> (\{{SharePartitionManager.acknowledge}}'s continuation -> 
> \{{handleShareAcknowledgeRequest$4}} -> 
> \{{SharePartitionManager.releaseSession}} -> 
> \{{ReplicaManager.completeDelayedShareFetchRequest}} -> 
> \{{DelayedOperationPurgatory.checkAndComplete}} -> 
> \{{Watchers.tryCompleteWatched}} -> \{{DelayedOperation.safeTryComplete}}) on 
> the same thread, still holding the write lock.
> At the same moment, a request-handler thread inside 
> \{{ReplicaManager.tryCompleteActions}} has already entered 
> \{{DelayedOperation.safeTryComplete}} (line 136 in that file) -- taking the 
> \{{DelayedOperation.lock}} -- and then calls through 
> \{{DelayedShareFetch.tryComplete}} -> \{{acquirablePartitions}} -> 
> \{{SharePartition.maybeAcquireFetchLock}} -> \{{stateNotActive}} -> 
> \{{partitionState}}, which takes the \{{SharePartition}} *read* lock.
> Lock ordering:
> * *Persister callback*: \{{SharePartition.writeLock}} (already held) -> 
> \{{DelayedOperation.lock}} (via \{{completeDelayedShareFetchRequest}}). 
> *BLOCKS.*
> * *Request handler*: \{{DelayedOperation.lock}} (already held) -> 
> \{{SharePartition.readLock}} (via \{{DelayedShareFetch.tryComplete}}). 
> *BLOCKS.*
> Both orderings exist, so any interleaving that catches both in the critical 
> section deadlocks the broker.
> h2. Exact line in 4.2.0
> {\{core/src/main/java/kafka/server/share/SharePartition.java}} -- 
> \{{rollbackOrProcessStateUpdates}}:
> {code:java}
> 2486:        lock.writeLock().lock();
> 2487:        try {
> ...              // state updates, cacheStateUpdated = 
> maybeUpdateCachedStateAndOffsets()
> 2520:            future.complete(null);          // <-- triggers the 
> reentrant share-ack chain synchronously,
>                                                 //     STILL HOLDING THE 
> WRITE LOCK.
> 2521:        } finally {
> 2522:            lock.writeLock().unlock();
> 2523:            // Maybe complete the delayed share fetch request if the 
> state has been changed in cache
> 2524:            // which might have moved start offset ahead. Hence, the 
> pending delayed share fetch
> 2525:            // request can be completed. The call should be made outside 
> the lock to avoid deadlock.
> 2526:            maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
> 2527:        }
> {code}
> The comment at lines 2523-2525 recognizes the exact danger for 
> \{{maybeCompleteDelayedShareFetchRequest}}. The same reasoning applies to 
> \{{future.complete(null)}}: downstream \{{whenComplete}} stages are 
> registered by callers (here \{{SharePartitionManager.acknowledge}}) that do 
> call into \{{ReplicaManager.completeDelayedShareFetchRequest}}.
> h2. Trunk status
> Still present on trunk as of 2026-04-20, at 
> \{{core/src/main/java/kafka/server/share/SharePartition.java:2642}}:
> {code:java}
> 2640:                    // Update the cached state and start and end offsets 
> after acknowledging/releasing the acquired records.
> 2641:                    cacheStateUpdated = 
> maybeUpdateCachedStateAndOffsets();
> 2642:                    future.complete(null);     // <-- still inside 
> writeLock
> 2643:                } finally {
> 2644:                    lock.writeLock().unlock();
> 2645:                    // ... call should be made outside the lock to avoid 
> deadlock.
> 2646:                    
> maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
> {code}
> h2. Related fixed issues (same bug class)
> * KAFKA-18129 (fixed 4.0.0): \{{SharePartition#maybeInitialize}} was 
> completing future inside write lock -- same pattern, different callsite. Fix 
> moved future completion outside lock.
> * KAFKA-18084 (fixed 4.0.0): added missing write locks in async callbacks in 
> SharePartition.
> * KAFKA-18154: deadlock in ShareConsumerTest.
> * KAFKA-18265 (fixed 4.2.0): SharePartition locking optimization.
> None of these touched \{{rollbackOrProcessStateUpdates}}'s 
> \{{future.complete(null)}} callsite.
> h2. Suggested fix
> Same remedy KAFKA-18129 applied to \{{maybeInitialize}}: move 
> \{{future.complete(null)}} (and the exceptional counterparts) outside the 
> \{{finally lock.writeLock().unlock()}} block. For example:
> {code:java}
> boolean cacheStateUpdated = false;
> Throwable completeException = null;
> lock.writeLock().lock();
> try {
>     // ... existing logic ...
>     cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
> } catch (Throwable t) {
>     completeException = t;
> } finally {
>     lock.writeLock().unlock();
> }
> if (completeException != null) {
>     future.completeExceptionally(completeException);
> } else {
>     future.complete(null);
> }
> maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
> {code}
> (Exact form left to the share maintainers' judgment -- same approach as 
> KAFKA-18129's PR #18053.)
> h2. Artifacts available on request
> (first two are attached)
> * \{{DEADLOCK_EXTRACT.txt}}: the 196-line JVM "Found 1 deadlock" output.
> * \{{DEADLOCK_THREADDUMP.log}}: full 159 KB thread dump.
> * \{{INVESTIGATION.md}}: notes on ruled-out hypotheses (GC, heap, 
> \{{__share_group_state}} sizing, inter-broker replication, kgo client).
> * GC log showing no pauses > 32 ms during the stall.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to