Travis Bischel created KAFKA-20505:
--------------------------------------

             Summary: Deadlock in KIP-932 share path: 
`SharePartition.rollbackOrProcessStateUpdates` completes future inside write 
lock, reenters `DelayedOperation` lock held by request handler
                 Key: KAFKA-20505
                 URL: https://issues.apache.org/jira/browse/KAFKA-20505
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 4.2.0, 4.2.1
            Reporter: Travis Bischel
         Attachments: DEADLOCK_EXTRACT.txt, DEADLOCK_THREADDUMP.log

I had Claude dig in over night to figure out why my tests against a local 3 
node cluster were hanging, after ruling out problems within my own test. The 
following is the full analysis it came up with:

---

The JVM's own deadlock detector (`kill -3` SIGQUIT thread dump) reports a 
cross-thread deadlock between two locks in the share-fetch + share-acknowledge 
path on a single-broker KRaft 4.2.0 cluster. Once it fires, every `ShareFetch` 
and `ShareAcknowledge` against the affected broker hangs until the session 
times out; the broker process remains alive with the JVM detecting the deadlock 
in its own threads.

### Reproduction

- Single broker, 4.2.0, KRaft mode, `network_mode: host`, `-Xmx1g`.
- Relevant non-default broker configs:
  ```
  share.coordinator.state.topic.replication.factor=1
  share.coordinator.state.topic.num.partitions=5
  share.coordinator.state.topic.min.isr=1
  group.coordinator.rebalance.protocols=classic,consumer,share
  ```
- Driver: franz-go `TestShareGroupETL` (heavy share-group ETL, rebalances, 
ack/release/reject mix): (currently in my branch 
[1295|https://github.com/twmb/franz-go/pull/1295] but will be merged to master 
soon)
  ```
  KGO_TEST_RF=1 go test -v -run TestShareGroupETL -timeout 2m
  ```
- Occurs reliably within 6 consecutive iterations on a freshly started cluster. 
First one or two runs pass, then a subsequent run timestamps no progress; 
client gets `context deadline exceeded` on every `ShareFetch` to the broker.
- Also reproduces on 3-node cluster; eliminating replication (single broker) 
does not prevent it.

### JVM deadlock (verbatim from `kill -3 <pid>`)

```
Found one Java-level deadlock:
=============================
"PersisterStateManager":
  waiting for ownable synchronizer 0x00000000d9b000a0, (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "data-plane-kafka-request-handler-4"

"data-plane-kafka-request-handler-4":
  waiting for ownable synchronizer 0x00000000d9b2d3b8, (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync),
  which is held by "PersisterStateManager"

Java stack information for the threads listed above:
===================================================
"PersisterStateManager":
    at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
    - parking to wait for  <0x00000000d9b000a0> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
    at java.util.concurrent.locks.LockSupport.park(...)
    at java.util.concurrent.locks.ReentrantLock.lock(...)
    at 
org.apache.kafka.server.purgatory.DelayedOperation.safeTryComplete(DelayedOperation.java:133)
    at 
org.apache.kafka.server.purgatory.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperationPurgatory.java:342)
    at 
org.apache.kafka.server.purgatory.DelayedOperationPurgatory.checkAndComplete(DelayedOperationPurgatory.java:193)
    at 
kafka.server.ReplicaManager.completeDelayedShareFetchRequest(ReplicaManager.scala:376)
    at 
kafka.server.share.SharePartitionManager.lambda$releaseSession$0(SharePartitionManager.java:395)
    at java.util.ArrayList.forEach(...)
    at 
kafka.server.share.SharePartitionManager.releaseSession(SharePartitionManager.java:376)
    at 
kafka.server.KafkaApis.$anonfun$handleShareAcknowledgeRequest$4(KafkaApis.scala:3597)
    at java.util.concurrent.CompletableFuture.uniHandle(...)
    at java.util.concurrent.CompletableFuture.postComplete(...)
    at java.util.concurrent.CompletableFuture.complete(...)
    at 
kafka.server.share.SharePartitionManager.lambda$acknowledge$1(SharePartitionManager.java:321)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(...)
    at java.util.concurrent.CompletableFuture.postComplete(...)
    at 
kafka.server.share.SharePartition.lambda$rollbackOrProcessStateUpdates$1(SharePartition.java:2520)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(...)
    at java.util.concurrent.CompletableFuture.postComplete(...)
    at 
kafka.server.share.SharePartition.lambda$writeShareGroupState$0(SharePartition.java:2797)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(...)
    at java.util.concurrent.CompletableFuture.postComplete(...)
    at 
org.apache.kafka.server.share.persister.PersisterStateManager$WriteStateHandler.handleRequestResponse(PersisterStateManager.java:806)
    at 
org.apache.kafka.server.share.persister.PersisterStateManager$PersisterStateManagerHandler.onComplete(PersisterStateManager.java:366)
    at 
org.apache.kafka.server.share.persister.PersisterStateManager$SendThread.lambda$generateRequests$5(PersisterStateManager.java:1555)
    at 
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:154)
    at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:669)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:661)
    at 
org.apache.kafka.server.util.InterBrokerSendThread.pollOnce(InterBrokerSendThread.java:110)
    at 
org.apache.kafka.server.util.InterBrokerSendThread.doWork(InterBrokerSendThread.java:137)
    at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)

"data-plane-kafka-request-handler-4":
    at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
    - parking to wait for  <0x00000000d9b2d3b8> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
    at java.util.concurrent.locks.LockSupport.park(...)
    at java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(...)
    at 
kafka.server.share.SharePartition.partitionState(SharePartition.java:2435)
    at 
kafka.server.share.SharePartition.stateNotActive(SharePartition.java:1615)
    at 
kafka.server.share.SharePartition.maybeAcquireFetchLock(SharePartition.java:1501)
    at 
kafka.server.share.DelayedShareFetch.lambda$acquirablePartitions$0(DelayedShareFetch.java:409)
    at java.util.LinkedHashMap.forEach(...)
    at 
kafka.server.share.DelayedShareFetch.acquirablePartitions(DelayedShareFetch.java:406)
    at 
kafka.server.share.DelayedShareFetch.tryComplete(DelayedShareFetch.java:341)
    at 
org.apache.kafka.server.purgatory.DelayedOperation.safeTryComplete(DelayedOperation.java:136)
    at 
org.apache.kafka.server.purgatory.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperationPurgatory.java:342)
    at 
org.apache.kafka.server.purgatory.DelayedOperationPurgatory.checkAndComplete(DelayedOperationPurgatory.java:193)
    at 
kafka.server.ReplicaManager.$anonfun$addCompletePurgatoryAction$2(ReplicaManager.scala:909)
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:504)
    at 
kafka.server.ReplicaManager.$anonfun$addCompletePurgatoryAction$1(ReplicaManager.scala:900)
    at 
org.apache.kafka.server.DelayedActionQueue.tryCompleteActions(DelayedActionQueue.java:45)
    at kafka.server.ReplicaManager.tryCompleteActions(ReplicaManager.scala:602)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:258)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:163)

Found 1 deadlock.
```

### Root cause

`SharePartition.rollbackOrProcessStateUpdates` completes the returned 
`CompletableFuture` **while still holding the `SharePartition` write lock**. 
The `whenComplete`/`thenCompose` chain on that future runs the downstream 
share-acknowledge completion path (`SharePartitionManager.acknowledge`'s 
continuation → `handleShareAcknowledgeRequest$4` → 
`SharePartitionManager.releaseSession` → 
`ReplicaManager.completeDelayedShareFetchRequest` → 
`DelayedOperationPurgatory.checkAndComplete` → `Watchers.tryCompleteWatched` → 
`DelayedOperation.safeTryComplete`) on the same thread, still holding the write 
lock.

At the same moment, a request-handler thread inside 
`ReplicaManager.tryCompleteActions` has already entered 
`DelayedOperation.safeTryComplete` (line 136 in that file) — taking the 
`DelayedOperation.lock` — and then calls through 
`DelayedShareFetch.tryComplete` → `acquirablePartitions` → 
`SharePartition.maybeAcquireFetchLock` → `stateNotActive` → `partitionState`, 
which takes the `SharePartition` **read** lock.

Lock ordering:
- **Persister callback**: `SharePartition.writeLock` (already held) → 
`DelayedOperation.lock` (via `completeDelayedShareFetchRequest`). **BLOCKS.**
- **Request handler**: `DelayedOperation.lock` (already held) → 
`SharePartition.readLock` (via `DelayedShareFetch.tryComplete`). **BLOCKS.**

Both orderings exist, so any interleaving that catches both in the critical 
section deadlocks the broker.

### Exact line in 4.2.0

`core/src/main/java/kafka/server/share/SharePartition.java` — 
`rollbackOrProcessStateUpdates`:

```java
2486:        lock.writeLock().lock();
2487:        try {
... // state updates, cacheStateUpdated = maybeUpdateCachedStateAndOffsets()
2520:            future.complete(null);          // <-- triggers the reentrant 
share-ack chain synchronously,
                                                //     STILL HOLDING THE WRITE 
LOCK.
2521:        } finally {
2522:            lock.writeLock().unlock();
2523:            // Maybe complete the delayed share fetch request if the state 
has been changed in cache
2524:            // which might have moved start offset ahead. Hence, the 
pending delayed share fetch
2525:            // request can be completed. The call should be made outside 
the lock to avoid deadlock.
2526:            maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
2527:        }
```

The comment at lines 2523-2525 recognizes the exact danger for 
`maybeCompleteDelayedShareFetchRequest`. The same reasoning applies to 
`future.complete(null)`: downstream `whenComplete` stages are registered by 
callers (here `SharePartitionManager.acknowledge`) that do call into 
`ReplicaManager.completeDelayedShareFetchRequest`.

### Trunk status

Still present on trunk as of 2026-04-20, at 
`core/src/main/java/kafka/server/share/SharePartition.java:2642`:

```java
2640:                    // Update the cached state and start and end offsets 
after acknowledging/releasing the acquired records.
2641:                    cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
2642:                    future.complete(null);     // <-- still inside 
writeLock
2643:                } finally {
2644:                    lock.writeLock().unlock();
2645:                    // ... call should be made outside the lock to avoid 
deadlock.
2646:                    
maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
```

### Related fixed issues (same bug class)

- KAFKA-18129 (fixed 4.0.0): `SharePartition#maybeInitialize` was completing 
future inside write lock — same pattern, different callsite. Fix moved future 
completion outside lock.
- KAFKA-18084 (fixed 4.0.0): added missing write locks in async callbacks in 
SharePartition.
- KAFKA-18154: deadlock in ShareConsumerTest.
- KAFKA-18265 (fixed 4.2.0): SharePartition locking optimization.

None of these touched `rollbackOrProcessStateUpdates`'s `future.complete(null)` 
callsite.

### Suggested fix

Same remedy KAFKA-18129 applied to `maybeInitialize`: move 
`future.complete(null)` (and the exceptional counterparts) outside the `finally 
lock.writeLock().unlock()` block. For example:

```java
boolean cacheStateUpdated = false;
Throwable completeException = null;
lock.writeLock().lock();
try {
    // ... existing logic ...
    cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
} catch (Throwable t) {
    completeException = t;
} finally {
    lock.writeLock().unlock();
}
if (completeException != null) {
    future.completeExceptionally(completeException);
} else {
    future.complete(null);
}
maybeCompleteDelayedShareFetchRequest(cacheStateUpdated);
```

(Exact form left to the share maintainers' judgment — same approach as 
KAFKA-18129's PR #18053.)

### Artifacts available on request

- `DEADLOCK_EXTRACT.txt`: the 196-line JVM `Found 1 deadlock` output.
- `DEADLOCK_THREADDUMP.log`: full 159 KB thread dump.
- `INVESTIGATION.md`: notes on ruled-out hypotheses (GC, heap, 
`__share_group_state` sizing, inter-broker replication, kgo client).
- GC log showing no pauses > 32 ms during the stall.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to