This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 92169b8f08c KAFKA-19357 AsyncConsumer#close hangs as commitAsync never 
completes when coordinator is missing (#19914)
92169b8f08c is described below

commit 92169b8f08c21826612a0aba01e5eda3464923c7
Author: YuChia Ma <[email protected]>
AuthorDate: Mon Sep 29 23:06:56 2025 +0800

    KAFKA-19357 AsyncConsumer#close hangs as commitAsync never completes when 
coordinator is missing (#19914)
    
    Problem:  When AsyncConsumer is closing, CoordinatorRequestManager stops
    looking for coordinator by returning EMPTY in poll() method when closing
    flag is true. This prevents commitAsync() and other
    coordinator-dependent operations from completing, causing close() to
    hang until timeout.
    
    Solution:
    Modified the closing flag check in poll() method of
    CommitRequestManager to be more targeted:
    - When both coordinators are unknown and the consumer is closing, only
    return EMPTY
    - When this condition is met, proactively fail all pending commit
    requests with CommitFailedException
    - This allows coordinator lookup to continue when coordinator is
    available during shutdown, while preventing indefinite hangs when
    coordinator is unreachable
    
    Reviewers: PoAn Yang <[email protected]>, Andrew Schofield
     <[email protected]>, TengYao Chi <[email protected]>, Kirk True
     <[email protected]>, Jhen-Yung Hsu <[email protected]>, Lan Ding
     <[email protected]>, TaiJuWu <[email protected]>, Ken Huang
     <[email protected]>, KuoChe <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../consumer/PlaintextConsumerCommitTest.java      | 37 +++++++++++++++++++
 .../consumer/internals/CommitRequestManager.java   |  8 +++++
 .../internals/CommitRequestManagerTest.java        | 41 ++++++++++++++++++++++
 3 files changed, 86 insertions(+)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
index b5bd27cf41b..c00d1ddab90 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCommitTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestDefaults;
 import org.apache.kafka.common.test.api.Type;
 import org.apache.kafka.test.MockConsumerInterceptor;
+import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.BeforeEach;
 
@@ -452,6 +453,40 @@ public class PlaintextConsumerCommitTest {
         }
     }
 
+    /**
+     * This is testing when closing the consumer but commit request has 
already been sent.
+     * During the closing, the consumer won't find the coordinator anymore.
+     */
+    @ClusterTest
+    public void testCommitAsyncFailsWhenCoordinatorUnavailableDuringClose() 
throws InterruptedException {
+        try (Producer<byte[], byte[]> producer = cluster.producer();
+             var consumer = createConsumer(GroupProtocol.CONSUMER, false)
+        ) {
+            sendRecords(producer, tp, 3, System.currentTimeMillis());
+            consumer.assign(List.of(tp));
+
+            var callback = new CountConsumerCommitCallback();
+
+            // Close the coordinator before committing because otherwise the 
commit will fail to find the coordinator.
+            cluster.brokerIds().forEach(cluster::shutdownBroker);
+
+            TestUtils.waitForCondition(() -> cluster.aliveBrokers().isEmpty(), 
"All brokers should be shut down");
+
+            consumer.poll(Duration.ofMillis(500));
+            consumer.commitAsync(Map.of(tp, new OffsetAndMetadata(1L)), 
callback);
+
+            long startTime = System.currentTimeMillis();
+            consumer.close(CloseOptions.timeout(Duration.ofMillis(500)));
+            long closeDuration = System.currentTimeMillis() - startTime;
+
+            assertTrue(closeDuration < 1000, "The closing process for the 
consumer was too long: " + closeDuration + " ms");
+            assertTrue(callback.lastError.isPresent());
+            assertEquals(CommitFailedException.class, 
callback.lastError.get().getClass());
+            assertEquals("Failed to commit offsets: Coordinator unknown and 
consumer is closing", callback.lastError.get().getMessage());
+            assertEquals(1, callback.exceptionCount);
+        }
+    }
+
     // TODO: This only works in the new consumer, but should be fixed for the 
old consumer as well
     @ClusterTest
     public void testCommitAsyncCompletedBeforeConsumerCloses() throws 
InterruptedException {
@@ -575,6 +610,7 @@ public class PlaintextConsumerCommitTest {
 
     private static class CountConsumerCommitCallback implements 
OffsetCommitCallback {
         private int successCount = 0;
+        private int exceptionCount = 0;
         private Optional<Exception> lastError = Optional.empty();
 
         @Override
@@ -582,6 +618,7 @@ public class PlaintextConsumerCommitTest {
             if (exception == null) {
                 successCount += 1;
             } else {
+                exceptionCount += 1;
                 lastError = Optional.of(exception);
             }
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index fe4d3806f2a..6aae084fd47 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -181,6 +181,14 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         // poll when the coordinator node is known and fatal error is not 
present
         if (coordinatorRequestManager.coordinator().isEmpty()) {
             pendingRequests.maybeFailOnCoordinatorFatalError();
+
+            if (closing && pendingRequests.hasUnsentRequests()) {
+                CommitFailedException exception = new CommitFailedException(
+                        "Failed to commit offsets: Coordinator unknown and 
consumer is closing");
+                pendingRequests.drainPendingCommits()
+                        .forEach(request -> 
request.future().completeExceptionally(exception));
+            }
+
             return EMPTY;
         }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 26d39715d27..afbb81eb53f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -1494,6 +1494,47 @@ public class CommitRequestManagerTest {
         
assertTrue(commitRequestManager.pendingRequests.unsentOffsetCommits.isEmpty());
     }
 
+    @Test
+    public void testPollWithFatalErrorDuringCoordinatorIsEmptyAndClosing() {
+        CommitRequestManager commitRequestManager = create(true, 100);
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(new 
TopicPartition("topic", 1),
+                new OffsetAndMetadata(0));
+
+        var commitFuture = commitRequestManager.commitAsync(offsets);
+
+        commitRequestManager.signalClose();
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+        when(coordinatorRequestManager.fatalError())
+                .thenReturn(Optional.of(new GroupAuthorizationException("Fatal 
error")));
+
+        assertEquals(NetworkClientDelegate.PollResult.EMPTY, 
commitRequestManager.poll(time.milliseconds()));
+
+        assertTrue(commitFuture.isCompletedExceptionally());
+
+        TestUtils.assertFutureThrows(GroupAuthorizationException.class, 
commitFuture, "Fatal error");
+    }
+
+    @Test
+    public void testPollWithClosingAndPendingRequests() {
+        CommitRequestManager commitRequestManager = create(true, 100);
+
+        Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(new 
TopicPartition("topic", 1),
+                new OffsetAndMetadata(0));
+
+        var commitFuture = commitRequestManager.commitAsync(offsets);
+
+        commitRequestManager.signalClose();
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
+
+        assertEquals(NetworkClientDelegate.PollResult.EMPTY, 
commitRequestManager.poll(time.milliseconds()));
+
+        assertTrue(commitFuture.isCompletedExceptionally());
+
+        TestUtils.assertFutureThrows(CommitFailedException.class, commitFuture,
+                "Failed to commit offsets: Coordinator unknown and consumer is 
closing");
+    }
+
     // Supplies (error, isRetriable)
     private static Stream<Arguments> partitionDataErrorSupplier() {
         return Stream.of(

Reply via email to