This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 92169b8f08c KAFKA-19357 AsyncConsumer#close hangs as commitAsync never
completes when coordinator is missing (#19914)
92169b8f08c is described below
commit 92169b8f08c21826612a0aba01e5eda3464923c7
Author: YuChia Ma <[email protected]>
AuthorDate: Mon Sep 29 23:06:56 2025 +0800
KAFKA-19357 AsyncConsumer#close hangs as commitAsync never completes when
coordinator is missing (#19914)
Problem: When AsyncConsumer is closing, CoordinatorRequestManager stops
looking for coordinator by returning EMPTY in poll() method when closing
flag is true. This prevents commitAsync() and other
coordinator-dependent operations from completing, causing close() to
hang until timeout.
Solution:
Modified the closing flag check in poll() method of
CommitRequestManager to be more targeted:
- When both coordinators are unknown and the consumer is closing, only
return EMPTY
- When this condition is met, proactively fail all pending commit
requests with CommitFailedException
- This allows coordinator lookup to continue when coordinator is
available during shutdown, while preventing indefinite hangs when
coordinator is unreachable
Reviewers: PoAn Yang <[email protected]>, Andrew Schofield
<[email protected]>, TengYao Chi <[email protected]>, Kirk True
<[email protected]>, Jhen-Yung Hsu <[email protected]>, Lan Ding
<[email protected]>, TaiJuWu <[email protected]>, Ken Huang
<[email protected]>, KuoChe <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../consumer/PlaintextConsumerCommitTest.java | 37 +++++++++++++++++++
.../consumer/internals/CommitRequestManager.java | 8 +++++
.../internals/CommitRequestManagerTest.java | 41 ++++++++++++++++++++++
3 files changed, 86 insertions(+)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
index b5bd27cf41b..c00d1ddab90 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.test.MockConsumerInterceptor;
+import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.BeforeEach;
@@ -452,6 +453,40 @@ public class PlaintextConsumerCommitTest {
}
}
+ /**
+ * This is testing when closing the consumer but commit request has
already been sent.
+ * During the closing, the consumer won't find the coordinator anymore.
+ */
+ @ClusterTest
+ public void testCommitAsyncFailsWhenCoordinatorUnavailableDuringClose()
throws InterruptedException {
+ try (Producer<byte[], byte[]> producer = cluster.producer();
+ var consumer = createConsumer(GroupProtocol.CONSUMER, false)
+ ) {
+ sendRecords(producer, tp, 3, System.currentTimeMillis());
+ consumer.assign(List.of(tp));
+
+ var callback = new CountConsumerCommitCallback();
+
+ // Close the coordinator before committing because otherwise the
commit will fail to find the coordinator.
+ cluster.brokerIds().forEach(cluster::shutdownBroker);
+
+ TestUtils.waitForCondition(() -> cluster.aliveBrokers().isEmpty(),
"All brokers should be shut down");
+
+ consumer.poll(Duration.ofMillis(500));
+ consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)),
callback);
+
+ long startTime = System.currentTimeMillis();
+ consumer.close(CloseOptions.timeout(Duration.ofMillis(500)));
+ long closeDuration = System.currentTimeMillis() - startTime;
+
+ assertTrue(closeDuration < 1000, "The closing process for the
consumer was too long: " + closeDuration + " ms");
+ assertTrue(callback.lastError.isPresent());
+ assertEquals(CommitFailedException.class,
callback.lastError.get().getClass());
+ assertEquals("Failed to commit offsets: Coordinator unknown and
consumer is closing", callback.lastError.get().getMessage());
+ assertEquals(1, callback.exceptionCount);
+ }
+ }
+
// TODO: This only works in the new consumer, but should be fixed for the
old consumer as well
@ClusterTest
public void testCommitAsyncCompletedBeforeConsumerCloses() throws
InterruptedException {
@@ -575,6 +610,7 @@ public class PlaintextConsumerCommitTest {
private static class CountConsumerCommitCallback implements
OffsetCommitCallback {
private int successCount = 0;
+ private int exceptionCount = 0;
private Optional<Exception> lastError = Optional.empty();
@Override
@@ -582,6 +618,7 @@ public class PlaintextConsumerCommitTest {
if (exception == null) {
successCount += 1;
} else {
+ exceptionCount += 1;
lastError = Optional.of(exception);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index fe4d3806f2a..6aae084fd47 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -181,6 +181,14 @@ public class CommitRequestManager implements
RequestManager, MemberStateListener
// poll when the coordinator node is known and fatal error is not
present
if (coordinatorRequestManager.coordinator().isEmpty()) {
pendingRequests.maybeFailOnCoordinatorFatalError();
+
+ if (closing && pendingRequests.hasUnsentRequests()) {
+ CommitFailedException exception = new CommitFailedException(
+ "Failed to commit offsets: Coordinator unknown and
consumer is closing");
+ pendingRequests.drainPendingCommits()
+ .forEach(request ->
request.future().completeExceptionally(exception));
+ }
+
return EMPTY;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 26d39715d27..afbb81eb53f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -1494,6 +1494,47 @@ public class CommitRequestManagerTest {
assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
}
+ @Test
+ public void testPollWithFatalErrorDuringCoordinatorIsEmptyAndClosing() {
+ CommitRequestManager commitRequestManager = create(true, 100);
+
+ Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(new
TopicPartition("topic", 1),
+ new OffsetAndMetadata(0));
+
+ var commitFuture = commitRequestManager.commitAsync(offsets);
+
+ commitRequestManager.signalClose();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+ when(coordinatorRequestManager.fatalError())
+ .thenReturn(Optional.of(new GroupAuthorizationException("Fatal
error")));
+
+ assertEquals(NetworkClientDelegate.PollResult.EMPTY,
commitRequestManager.poll(time.milliseconds()));
+
+ assertTrue(commitFuture.isCompletedExceptionally());
+
+ TestUtils.assertFutureThrows(GroupAuthorizationException.class,
commitFuture, "Fatal error");
+ }
+
+ @Test
+ public void testPollWithClosingAndPendingRequests() {
+ CommitRequestManager commitRequestManager = create(true, 100);
+
+ Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(new
TopicPartition("topic", 1),
+ new OffsetAndMetadata(0));
+
+ var commitFuture = commitRequestManager.commitAsync(offsets);
+
+ commitRequestManager.signalClose();
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+
+ assertEquals(NetworkClientDelegate.PollResult.EMPTY,
commitRequestManager.poll(time.milliseconds()));
+
+ assertTrue(commitFuture.isCompletedExceptionally());
+
+ TestUtils.assertFutureThrows(CommitFailedException.class, commitFuture,
+ "Failed to commit offsets: Coordinator unknown and consumer is
closing");
+ }
+
// Supplies (error, isRetriable)
private static Stream<Arguments> partitionDataErrorSupplier() {
return Stream.of(