[
https://issues.apache.org/jira/browse/KAFKA-20505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chia-Ping Tsai resolved KAFKA-20505.
------------------------------------
Resolution: Fixed
> Deadlock in KIP-932 share path:
> `SharePartition.rollbackOrProcessStateUpdates` completes future inside write
> lock, reenters `DelayedOperation` lock held by request handler
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-20505
> URL: https://issues.apache.org/jira/browse/KAFKA-20505
> Project: Kafka
> Issue Type: Bug
> Components: core
> Affects Versions: 4.2.0
> Reporter: Travis Bischel
> Assignee: Apoorv Mittal
> Priority: Blocker
> Fix For: 4.3.0, 4.2.1
>
> Attachments: DEADLOCK_EXTRACT.txt, DEADLOCK_THREADDUMP.log
>
>
> I had Claude dig in over night to figure out why my tests against a local 3
> node cluster were hanging, after ruling out problems within my own test. The
> following is the full analysis it came up with:
> —
> The JVM's own deadlock detector (\{{kill -3}} SIGQUIT thread dump) reports a
> cross-thread deadlock between two locks in the share-fetch +
> share-acknowledge path on a single-broker KRaft 4.2.0 cluster. Once it fires,
> every \{{ShareFetch}} and \{{ShareAcknowledge}} against the affected broker
> hangs until the session times out; the broker process remains alive with the
> JVM detecting the deadlock in its own threads.
> h2. Reproduction
> * Single broker, 4.2.0, KRaft mode, \{{network_mode: host}}, \{{-Xmx1g}}.
> * Relevant non-default broker configs:
> {code}
> share.coordinator.state.topic.replication.factor=1
> share.coordinator.state.topic.num.partitions=5
> share.coordinator.state.topic.min.isr=1
> group.coordinator.rebalance.protocols=classic,consumer,share
> {code}
> * Driver: franz-go \{{TestShareGroupETL}} (heavy share-group ETL, rebalances,
> ack/release/reject mix):
> {code}
> KGO_TEST_RF=1 go test -v -run TestShareGroupETL -timeout 2m
> {code}
> * Occurs reliably within 6 consecutive iterations on a freshly started
> cluster. First one or two runs pass, then a subsequent run makes no progress;
> client gets \{{context deadline exceeded}} on every \{{ShareFetch}} to the
> broker.
> * Also reproduces on a 3-node cluster; eliminating replication (single
> broker) does not prevent it.
> h2. JVM deadlock (verbatim from \{{kill -3 <pid>}})
> {noformat}
> Found one Java-level deadlock:
> =============================
> "PersisterStateManager":
> waiting for ownable synchronizer 0x00000000d9b000a0, (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync),
> which is held by "data-plane-kafka-request-handler-4"
> "data-plane-kafka-request-handler-4":
> waiting for ownable synchronizer 0x00000000d9b2d3b8, (a
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
> which is held by "PersisterStateManager"
> Java stack information for the threads listed above:
> ===================================================
> "PersisterStateManager":
> at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
> - parking to wait for <0x00000000d9b000a0> (a
> java.util.concurrent.locks.ReentrantLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(...)
> at java.util.concurrent.locks.ReentrantLock.lock(...)
> at
> org.apache.kafka.server.purgatory.DelayedOperation.safeTryComplete(DelayedOperation.java:133)
> at
> org.apache.kafka.server.purgatory.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperationPurgatory.java:342)
> at
> org.apache.kafka.server.purgatory.DelayedOperationPurgatory.checkAndComplete(DelayedOperationPurgatory.java:193)
> at
> kafka.server.ReplicaManager.completeDelayedShareFetchRequest(ReplicaManager.scala:376)
> at
> kafka.server.share.SharePartitionManager.lambda$releaseSession$0(SharePartitionManager.java:395)
> at java.util.ArrayList.forEach(...)
> at
> kafka.server.share.SharePartitionManager.releaseSession(SharePartitionManager.java:376)
> at
> kafka.server.KafkaApis.$anonfun$handleShareAcknowledgeRequest$4(KafkaApis.scala:3597)
> at java.util.concurrent.CompletableFuture.uniHandle(...)
> at java.util.concurrent.CompletableFuture.postComplete(...)
> at java.util.concurrent.CompletableFuture.complete(...)
> at
> kafka.server.share.SharePartitionManager.lambda$acknowledge$1(SharePartitionManager.java:321)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(...)
> at java.util.concurrent.CompletableFuture.postComplete(...)
> at
> kafka.server.share.SharePartition.lambda$rollbackOrProcessStateUpdates$1(SharePartition.java:2520)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(...)
> at java.util.concurrent.CompletableFuture.postComplete(...)
> at
> kafka.server.share.SharePartition.lambda$writeShareGroupState$0(SharePartition.java:2797)
> at java.util.concurrent.CompletableFuture.uniWhenComplete(...)
> at java.util.concurrent.CompletableFuture.postComplete(...)
> at
> org.apache.kafka.server.share.persister.PersisterStateManager$WriteStateHandler.handleRequestResponse(PersisterStateManager.java:806)
> at
> org.apache.kafka.server.share.persister.PersisterStateManager$PersisterStateManagerHandler.onComplete(PersisterStateManager.java:366)
> at
> org.apache.kafka.server.share.persister.PersisterStateManager$SendThread.lambda$generateRequests$5(PersisterStateManager.java:1555)
> at
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
> at
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:669)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:661)
> at
> org.apache.kafka.server.util.InterBrokerSendThread.pollOnce(InterBrokerSendThread.java:110)
> at
> org.apache.kafka.server.util.InterBrokerSendThread.doWork(InterBrokerSendThread.java:137)
> at
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)
> "data-plane-kafka-request-handler-4":
> at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
> - parking to wait for <0x00000000d9b2d3b8> (a
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
> at java.util.concurrent.locks.LockSupport.park(...)
> at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(...)
> at
> kafka.server.share.SharePartition.partitionState(SharePartition.java:2435)
> at
> kafka.server.share.SharePartition.stateNotActive(SharePartition.java:1615)
> at
> kafka.server.share.SharePartition.maybeAcquireFetchLock(SharePartition.java:1501)
> at
> kafka.server.share.DelayedShareFetch.lambda$acquirablePartitions$0(DelayedShareFetch.java:409)
> at java.util.LinkedHashMap.forEach(...)
> at
> kafka.server.share.DelayedShareFetch.acquirablePartitions(DelayedShareFetch.java:406)
> at
> kafka.server.share.DelayedShareFetch.tryComplete(DelayedShareFetch.java:341)
> at
> org.apache.kafka.server.purgatory.DelayedOperation.safeTryComplete(DelayedOperation.java:136)
> at
> org.apache.kafka.server.purgatory.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperationPurgatory.java:342)
> at
> org.apache.kafka.server.purgatory.DelayedOperationPurgatory.checkAndComplete(DelayedOperationPurgatory.java:193)
> at
> kafka.server.ReplicaManager.$anonfun$addCompletePurgatoryAction$2(ReplicaManager.scala:909)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:504)
> at
> kafka.server.ReplicaManager.$anonfun$addCompletePurgatoryAction$1(ReplicaManager.scala:900)
> at
> org.apache.kafka.server.DelayedActionQueue.tryCompleteActions(DelayedActionQueue.java:45)
> at
> kafka.server.ReplicaManager.tryCompleteActions(ReplicaManager.scala:602)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:258)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:163)
> Found 1 deadlock.
> {noformat}
> h2. Root cause
> {\{SharePartition.rollbackOrProcessStateUpdates}} completes the returned
> \{{CompletableFuture}} *while still holding the \{{SharePartition}} write
> lock*. The \{{whenComplete}}/\{{thenCompose}} chain on that future runs the
> downstream share-acknowledge completion path
> (\{{SharePartitionManager.acknowledge}}'s continuation ->
> \{{handleShareAcknowledgeRequest$4}} ->
> \{{SharePartitionManager.releaseSession}} ->
> \{{ReplicaManager.completeDelayedShareFetchRequest}} ->
> \{{DelayedOperationPurgatory.checkAndComplete}} ->
> \{{Watchers.tryCompleteWatched}} -> \{{DelayedOperation.safeTryComplete}}) on
> the same thread, still holding the write lock.
> At the same moment, a request-handler thread inside
> \{{ReplicaManager.tryCompleteActions}} has already entered
> \{{DelayedOperation.safeTryComplete}} (line 136 in that file) -- taking the
> \{{DelayedOperation.lock}} -- and then calls through
> \{{DelayedShareFetch.tryComplete}} -> \{{acquirablePartitions}} ->
> \{{SharePartition.maybeAcquireFetchLock}} -> \{{stateNotActive}} ->
> \{{partitionState}}, which takes the \{{SharePartition}} *read* lock.
> Lock ordering:
> * *Persister callback*: \{{SharePartition.writeLock}} (already held) ->
> \{{DelayedOperation.lock}} (via \{{completeDelayedShareFetchRequest}}).
> *BLOCKS.*
> * *Request handler*: \{{DelayedOperation.lock}} (already held) ->
> \{{SharePartition.readLock}} (via \{{DelayedShareFetch.tryComplete}}).
> *BLOCKS.*
> Both orderings exist, so any interleaving that catches both in the critical
> section deadlocks the broker.
> h2. Exact line in 4.2.0
> {\{core/src/main/java/kafka/server/share/SharePartition.java}} --
> \{{rollbackOrProcessStateUpdates}}:
> {code:java}
> 2486: lock.writeLock().lock();
> 2487: try {
> ... // state updates, cacheStateUpdated =
> maybeUpdateCachedStateAndOffsets()
> 2520: future.complete(null); // <-- triggers the
> reentrant share-ack chain synchronously,
> // STILL HOLDING THE
> WRITE LOCK.
> 2521: } finally {
> 2522: lock.writeLock().unlock();
> 2523: // Maybe complete the delayed share fetch request if the
> state has been changed in cache
> 2524: // which might have moved start offset ahead. Hence, the
> pending delayed share fetch
> 2525: // request can be completed. The call should be made outside
> the lock to avoid deadlock.
> 2526: maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
> 2527: }
> {code}
> The comment at lines 2523-2525 recognizes the exact danger for
> \{{maybeCompleteDelayedShareFetchRequest}}. The same reasoning applies to
> \{{future.complete(null)}}: downstream \{{whenComplete}} stages are
> registered by callers (here \{{SharePartitionManager.acknowledge}}) that do
> call into \{{ReplicaManager.completeDelayedShareFetchRequest}}.
> h2. Trunk status
> Still present on trunk as of 2026-04-20, at
> \{{core/src/main/java/kafka/server/share/SharePartition.java:2642}}:
> {code:java}
> 2640: // Update the cached state and start and end offsets
> after acknowledging/releasing the acquired records.
> 2641: cacheStateUpdated =
> maybeUpdateCachedStateAndOffsets();
> 2642: future.complete(null); // <-- still inside
> writeLock
> 2643: } finally {
> 2644: lock.writeLock().unlock();
> 2645: // ... call should be made outside the lock to avoid
> deadlock.
> 2646:
> maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
> {code}
> h2. Related fixed issues (same bug class)
> * KAFKA-18129 (fixed 4.0.0): \{{SharePartition#maybeInitialize}} was
> completing future inside write lock -- same pattern, different callsite. Fix
> moved future completion outside lock.
> * KAFKA-18084 (fixed 4.0.0): added missing write locks in async callbacks in
> SharePartition.
> * KAFKA-18154: deadlock in ShareConsumerTest.
> * KAFKA-18265 (fixed 4.2.0): SharePartition locking optimization.
> None of these touched \{{rollbackOrProcessStateUpdates}}'s
> \{{future.complete(null)}} callsite.
> h2. Suggested fix
> Same remedy KAFKA-18129 applied to \{{maybeInitialize}}: move
> \{{future.complete(null)}} (and the exceptional counterparts) outside the
> \{{finally lock.writeLock().unlock()}} block. For example:
> {code:java}
> boolean cacheStateUpdated = false;
> Throwable completeException = null;
> lock.writeLock().lock();
> try {
> // ... existing logic ...
> cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
> } catch (Throwable t) {
> completeException = t;
> } finally {
> lock.writeLock().unlock();
> }
> if (completeException != null) {
> future.completeExceptionally(completeException);
> } else {
> future.complete(null);
> }
> maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
> {code}
> (Exact form left to the share maintainers' judgment -- same approach as
> KAFKA-18129's PR #18053.)
> h2. Artifacts available on request
> (first two are attached)
> * \{{DEADLOCK_EXTRACT.txt}}: the 196-line JVM "Found 1 deadlock" output.
> * \{{DEADLOCK_THREADDUMP.log}}: full 159 KB thread dump.
> * \{{INVESTIGATION.md}}: notes on ruled-out hypotheses (GC, heap,
> \{{__share_group_state}} sizing, inter-broker replication, kgo client).
> * GC log showing no pauses > 32 ms during the stall.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)