Travis Bischel created KAFKA-20505:
--------------------------------------
Summary: Deadlock in KIP-932 share path:
`SharePartition.rollbackOrProcessStateUpdates` completes future inside write
lock, reenters `DelayedOperation` lock held by request handler
Key: KAFKA-20505
URL: https://issues.apache.org/jira/browse/KAFKA-20505
Project: Kafka
Issue Type: Bug
Components: core
Affects Versions: 4.2.0, 4.2.1
Reporter: Travis Bischel
Attachments: DEADLOCK_EXTRACT.txt, DEADLOCK_THREADDUMP.log
I had Claude dig in over night to figure out why my tests against a local 3
node cluster were hanging, after ruling out problems within my own test. The
following is the full analysis it came up with:
---
The JVM's own deadlock detector (`kill -3` SIGQUIT thread dump) reports a
cross-thread deadlock between two locks in the share-fetch + share-acknowledge
path on a single-broker KRaft 4.2.0 cluster. Once it fires, every `ShareFetch`
and `ShareAcknowledge` against the affected broker hangs until the session
times out; the broker process remains alive with the JVM detecting the deadlock
in its own threads.
### Reproduction
- Single broker, 4.2.0, KRaft mode, `network_mode: host`, `-Xmx1g`.
- Relevant non-default broker configs:
```
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.num.partitions=5
share.coordinator.state.topic.min.isr=1
group.coordinator.rebalance.protocols=classic,consumer,share
```
- Driver: franz-go `TestShareGroupETL` (heavy share-group ETL, rebalances,
ack/release/reject mix): (currently in my branch
[1295|https://github.com/twmb/franz-go/pull/1295] but will be merged to master
soon)
```
KGO_TEST_RF=1 go test -v -run TestShareGroupETL -timeout 2m
```
- Occurs reliably within 6 consecutive iterations on a freshly started cluster.
First one or two runs pass, then a subsequent run timestamps no progress;
client gets `context deadline exceeded` on every `ShareFetch` to the broker.
- Also reproduces on 3-node cluster; eliminating replication (single broker)
does not prevent it.
### JVM deadlock (verbatim from `kill -3 <pid>`)
```
Found one Java-level deadlock:
=============================
"PersisterStateManager":
waiting for ownable synchronizer 0x00000000d9b000a0, (a
java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "data-plane-kafka-request-handler-4"
"data-plane-kafka-request-handler-4":
waiting for ownable synchronizer 0x00000000d9b2d3b8, (a
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
which is held by "PersisterStateManager"
Java stack information for the threads listed above:
===================================================
"PersisterStateManager":
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00000000d9b000a0> (a
java.util.concurrent.locks.ReentrantLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(...)
at java.util.concurrent.locks.ReentrantLock.lock(...)
at
org.apache.kafka.server.purgatory.DelayedOperation.safeTryComplete(DelayedOperation.java:133)
at
org.apache.kafka.server.purgatory.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperationPurgatory.java:342)
at
org.apache.kafka.server.purgatory.DelayedOperationPurgatory.checkAndComplete(DelayedOperationPurgatory.java:193)
at
kafka.server.ReplicaManager.completeDelayedShareFetchRequest(ReplicaManager.scala:376)
at
kafka.server.share.SharePartitionManager.lambda$releaseSession$0(SharePartitionManager.java:395)
at java.util.ArrayList.forEach(...)
at
kafka.server.share.SharePartitionManager.releaseSession(SharePartitionManager.java:376)
at
kafka.server.KafkaApis.$anonfun$handleShareAcknowledgeRequest$4(KafkaApis.scala:3597)
at java.util.concurrent.CompletableFuture.uniHandle(...)
at java.util.concurrent.CompletableFuture.postComplete(...)
at java.util.concurrent.CompletableFuture.complete(...)
at
kafka.server.share.SharePartitionManager.lambda$acknowledge$1(SharePartitionManager.java:321)
at java.util.concurrent.CompletableFuture.uniWhenComplete(...)
at java.util.concurrent.CompletableFuture.postComplete(...)
at
kafka.server.share.SharePartition.lambda$rollbackOrProcessStateUpdates$1(SharePartition.java:2520)
at java.util.concurrent.CompletableFuture.uniWhenComplete(...)
at java.util.concurrent.CompletableFuture.postComplete(...)
at
kafka.server.share.SharePartition.lambda$writeShareGroupState$0(SharePartition.java:2797)
at java.util.concurrent.CompletableFuture.uniWhenComplete(...)
at java.util.concurrent.CompletableFuture.postComplete(...)
at
org.apache.kafka.server.share.persister.PersisterStateManager$WriteStateHandler.handleRequestResponse(PersisterStateManager.java:806)
at
org.apache.kafka.server.share.persister.PersisterStateManager$PersisterStateManagerHandler.onComplete(PersisterStateManager.java:366)
at
org.apache.kafka.server.share.persister.PersisterStateManager$SendThread.lambda$generateRequests$5(PersisterStateManager.java:1555)
at
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
at
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:669)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:661)
at
org.apache.kafka.server.util.InterBrokerSendThread.pollOnce(InterBrokerSendThread.java:110)
at
org.apache.kafka.server.util.InterBrokerSendThread.doWork(InterBrokerSendThread.java:137)
at
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)
"data-plane-kafka-request-handler-4":
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x00000000d9b2d3b8> (a
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(...)
at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(...)
at
kafka.server.share.SharePartition.partitionState(SharePartition.java:2435)
at
kafka.server.share.SharePartition.stateNotActive(SharePartition.java:1615)
at
kafka.server.share.SharePartition.maybeAcquireFetchLock(SharePartition.java:1501)
at
kafka.server.share.DelayedShareFetch.lambda$acquirablePartitions$0(DelayedShareFetch.java:409)
at java.util.LinkedHashMap.forEach(...)
at
kafka.server.share.DelayedShareFetch.acquirablePartitions(DelayedShareFetch.java:406)
at
kafka.server.share.DelayedShareFetch.tryComplete(DelayedShareFetch.java:341)
at
org.apache.kafka.server.purgatory.DelayedOperation.safeTryComplete(DelayedOperation.java:136)
at
org.apache.kafka.server.purgatory.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperationPurgatory.java:342)
at
org.apache.kafka.server.purgatory.DelayedOperationPurgatory.checkAndComplete(DelayedOperationPurgatory.java:193)
at
kafka.server.ReplicaManager.$anonfun$addCompletePurgatoryAction$2(ReplicaManager.scala:909)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:504)
at
kafka.server.ReplicaManager.$anonfun$addCompletePurgatoryAction$1(ReplicaManager.scala:900)
at
org.apache.kafka.server.DelayedActionQueue.tryCompleteActions(DelayedActionQueue.java:45)
at kafka.server.ReplicaManager.tryCompleteActions(ReplicaManager.scala:602)
at kafka.server.KafkaApis.handle(KafkaApis.scala:258)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:163)
Found 1 deadlock.
```
### Root cause
`SharePartition.rollbackOrProcessStateUpdates` completes the returned
`CompletableFuture` **while still holding the `SharePartition` write lock**.
The `whenComplete`/`thenCompose` chain on that future runs the downstream
share-acknowledge completion path (`SharePartitionManager.acknowledge`'s
continuation → `handleShareAcknowledgeRequest$4` →
`SharePartitionManager.releaseSession` →
`ReplicaManager.completeDelayedShareFetchRequest` →
`DelayedOperationPurgatory.checkAndComplete` → `Watchers.tryCompleteWatched` →
`DelayedOperation.safeTryComplete`) on the same thread, still holding the write
lock.
At the same moment, a request-handler thread inside
`ReplicaManager.tryCompleteActions` has already entered
`DelayedOperation.safeTryComplete` (line 136 in that file) — taking the
`DelayedOperation.lock` — and then calls through
`DelayedShareFetch.tryComplete` → `acquirablePartitions` →
`SharePartition.maybeAcquireFetchLock` → `stateNotActive` → `partitionState`,
which takes the `SharePartition` **read** lock.
Lock ordering:
- **Persister callback**: `SharePartition.writeLock` (already held) →
`DelayedOperation.lock` (via `completeDelayedShareFetchRequest`). **BLOCKS.**
- **Request handler**: `DelayedOperation.lock` (already held) →
`SharePartition.readLock` (via `DelayedShareFetch.tryComplete`). **BLOCKS.**
Both orderings exist, so any interleaving that catches both in the critical
section deadlocks the broker.
### Exact line in 4.2.0
`core/src/main/java/kafka/server/share/SharePartition.java` —
`rollbackOrProcessStateUpdates`:
```java
2486: lock.writeLock().lock();
2487: try {
... // state updates, cacheStateUpdated = maybeUpdateCachedStateAndOffsets()
2520: future.complete(null); // <-- triggers the reentrant
share-ack chain synchronously,
// STILL HOLDING THE WRITE
LOCK.
2521: } finally {
2522: lock.writeLock().unlock();
2523: // Maybe complete the delayed share fetch request if the state
has been changed in cache
2524: // which might have moved start offset ahead. Hence, the
pending delayed share fetch
2525: // request can be completed. The call should be made outside
the lock to avoid deadlock.
2526: maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
2527: }
```
The comment at lines 2523-2525 recognizes the exact danger for
`maybeCompleteDelayedShareFetchRequest`. The same reasoning applies to
`future.complete(null)`: downstream `whenComplete` stages are registered by
callers (here `SharePartitionManager.acknowledge`) that do call into
`ReplicaManager.completeDelayedShareFetchRequest`.
### Trunk status
Still present on trunk as of 2026-04-20, at
`core/src/main/java/kafka/server/share/SharePartition.java:2642`:
```java
2640: // Update the cached state and start and end offsets
after acknowledging/releasing the acquired records.
2641: cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
2642: future.complete(null); // <-- still inside
writeLock
2643: } finally {
2644: lock.writeLock().unlock();
2645: // ... call should be made outside the lock to avoid
deadlock.
2646:
maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
```
### Related fixed issues (same bug class)
- KAFKA-18129 (fixed 4.0.0): `SharePartition#maybeInitialize` was completing
future inside write lock — same pattern, different callsite. Fix moved future
completion outside lock.
- KAFKA-18084 (fixed 4.0.0): added missing write locks in async callbacks in
SharePartition.
- KAFKA-18154: deadlock in ShareConsumerTest.
- KAFKA-18265 (fixed 4.2.0): SharePartition locking optimization.
None of these touched `rollbackOrProcessStateUpdates`'s `future.complete(null)`
callsite.
### Suggested fix
Same remedy KAFKA-18129 applied to `maybeInitialize`: move
`future.complete(null)` (and the exceptional counterparts) outside the `finally
lock.writeLock().unlock()` block. For example:
```java
boolean cacheStateUpdated = false;
Throwable completeException = null;
lock.writeLock().lock();
try {
// ... existing logic ...
cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
} catch (Throwable t) {
completeException = t;
} finally {
lock.writeLock().unlock();
}
if (completeException != null) {
future.completeExceptionally(completeException);
} else {
future.complete(null);
}
maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
```
(Exact form left to the share maintainers' judgment — same approach as
KAFKA-18129's PR #18053.)
### Artifacts available on request
- `DEADLOCK_EXTRACT.txt`: the 196-line JVM `Found 1 deadlock` output.
- `DEADLOCK_THREADDUMP.log`: full 159 KB thread dump.
- `INVESTIGATION.md`: notes on ruled-out hypotheses (GC, heap,
`__share_group_state` sizing, inter-broker replication, kgo client).
- GC log showing no pauses > 32 ms during the stall.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)