This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f922ff6d1fb KAFKA-19259: Async consumer fetch intermittent delays on 
console consumer (#19980)
f922ff6d1fb is described below

commit f922ff6d1fb3d62d10a5fe051bde3a8ef5d68a9a
Author: Kirk True <[email protected]>
AuthorDate: Fri Sep 5 07:50:47 2025 -0700

    KAFKA-19259: Async consumer fetch intermittent delays on console consumer 
(#19980)
    
    There’s a difference in the two consumers’ `pollForFetches()` methods in
    this case: `ClassicKafkaConsumer` doesn't block waiting for data in the
    fetch buffer, but `AsyncKafkaConsumer` does.
    
    In `ClassicKafkaConsumer.pollForFetches()`, after enqueuing the `FETCH`
    request, the consumer makes a call to `ConsumerNetworkClient.poll()`. In
    most cases `poll()` returns almost immediately because it successfully
    sent the `FETCH` request. So even when the `pollTimeout` value is, e.g.
    3000, the call to `ConsumerNetworkClient.poll()` doesn't block that long
    waiting for a response.
    
    After sending out a `FETCH` request, `AsyncKafkaConsumer` then calls
    `FetchBuffer.awaitNotEmpty()` and proceeds to block there for the full
    length of the timeout. In some cases, the response to the `FETCH` comes
    back with no results, which doesn't unblock
    `FetchBuffer.awaitNotEmpty()`. So because the application thread is
    still waiting for data in the buffer, it remains blocked, preventing any
    more `FETCH` requests from being sent, causing the long pauses in the
    console consumer.
    
    Reviewers: Lianet Magrans <[email protected]>, Andrew Schofield
     <[email protected]>
---
 .../clients/consumer/PlaintextConsumerTest.java    | 71 ++++++++++++++++++++++
 .../clients/consumer/internals/AbstractFetch.java  |  9 +++
 .../consumer/internals/AsyncKafkaConsumer.java     |  2 +-
 .../clients/consumer/internals/FetchBuffer.java    | 32 +++++-----
 .../consumer/internals/FetchBufferTest.java        |  2 +-
 5 files changed, 96 insertions(+), 20 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
index c69c9c35fd4..13e681cfdda 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java
@@ -63,6 +63,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
@@ -109,6 +110,7 @@ import static 
org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 @ClusterTestDefaults(
     types = {Type.KRAFT},
@@ -1593,6 +1595,75 @@ public class PlaintextConsumerTest {
         }
     }
 
+    @ClusterTest
+    public void testClassicConsumerStallBetweenPoll() throws Exception {
+        testStallBetweenPoll(GroupProtocol.CLASSIC);
+    }
+
+    @ClusterTest
+    public void testAsyncConsumerStallBetweenPoll() throws Exception {
+        testStallBetweenPoll(GroupProtocol.CONSUMER);
+    }
+
+    /**
+     * This test is to prove that the intermittent stalling that has been 
experienced when using the asynchronous
+     * consumer, as filed under KAFKA-19259, have been fixed.
+     *
+     * <p/>
+     *
+     * The basic idea is to have one thread that produces a record every 500 
ms. and the main thread that consumes
+     * records without pausing between polls for much more than the produce 
delay. In the test case filed in
+     * KAFKA-19259, the consumer sometimes pauses for up to 5-10 seconds 
despite records being produced every second.
+     */
+    private void testStallBetweenPoll(GroupProtocol groupProtocol) throws 
Exception {
+        var testTopic = "stall-test-topic";
+        var numPartitions = 6;
+        cluster.createTopic(testTopic, numPartitions, (short) BROKER_COUNT);
+
+        // The producer must produce slowly to tickle the scenario.
+        var produceDelay = 500;
+
+        var executor = Executors.newScheduledThreadPool(1);
+
+        try (var producer = cluster.producer()) {
+            // Start a thread running that produces records at a relative 
trickle.
+            executor.scheduleWithFixedDelay(
+                () -> producer.send(new ProducerRecord<>(testTopic, 
TestUtils.randomBytes(64))),
+                0,
+                produceDelay,
+                TimeUnit.MILLISECONDS
+            );
+
+            Map<String, Object> consumerConfig = Map.of(GROUP_PROTOCOL_CONFIG, 
groupProtocol.name().toLowerCase(Locale.ROOT));
+
+            // Assign a tolerance for how much time is allowed to pass between 
Consumer.poll() calls given that there
+            // should be *at least* one record to read every second.
+            var pollDelayTolerance = 2000;
+
+            try (Consumer<byte[], byte[]> consumer = 
cluster.consumer(consumerConfig)) {
+                consumer.subscribe(List.of(testTopic));
+
+                // This is here to allow the consumer time to settle the group 
membership/assignment.
+                awaitNonEmptyRecords(consumer, new TopicPartition(testTopic, 
0));
+
+                // Keep track of the last time the poll is invoked to ensure 
the deltas between invocations don't
+                // exceed the delay threshold defined above.
+                var beforePoll = System.currentTimeMillis();
+                consumer.poll(Duration.ofSeconds(5));
+                consumer.poll(Duration.ofSeconds(5));
+                var afterPoll = System.currentTimeMillis();
+                var pollDelay = afterPoll - beforePoll;
+
+                if (pollDelay > pollDelayTolerance)
+                    fail("Detected a stall of " + pollDelay + " ms between 
Consumer.poll() invocations despite a Producer producing records every " + 
produceDelay + " ms");
+            } finally {
+                executor.shutdownNow();
+                // Wait for any active tasks to terminate to ensure consumer 
is not closed while being used from another thread
+                assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS), 
"Executor did not terminate");
+            }
+        }
+    }
+
     private ConsumerRecords<byte[], byte[]> awaitNonEmptyRecords(
         Consumer<byte[], byte[]> consumer,
         TopicPartition tp
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index 9d96712a473..e3e52f7525d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -147,6 +147,7 @@ public abstract class AbstractFetch implements Closeable {
      * @param data {@link FetchSessionHandler.FetchRequestData} that 
represents the session data
      * @param resp {@link ClientResponse} from which the {@link FetchResponse} 
will be retrieved
      */
+    @SuppressWarnings("NPathComplexity")
     protected void handleFetchSuccess(final Node fetchTarget,
                                       final 
FetchSessionHandler.FetchRequestData data,
                                       final ClientResponse resp) {
@@ -174,6 +175,8 @@ public abstract class AbstractFetch implements Closeable {
             final Set<TopicPartition> partitions = new 
HashSet<>(responseData.keySet());
             final FetchMetricsAggregator metricAggregator = new 
FetchMetricsAggregator(metricsManager, partitions);
 
+            boolean needsWakeup = true;
+
             Map<TopicPartition, Metadata.LeaderIdAndEpoch> 
partitionsWithUpdatedLeaderInfo = new HashMap<>();
             for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> 
entry : responseData.entrySet()) {
                 TopicPartition partition = entry.getKey();
@@ -220,8 +223,14 @@ public abstract class AbstractFetch implements Closeable {
                         metricAggregator,
                         fetchOffset);
                 fetchBuffer.add(completedFetch);
+                needsWakeup = false;
             }
 
+            // "Wake" the fetch buffer on any response, even if it's empty, to 
allow the consumer to not block
+            // indefinitely waiting on the fetch buffer to get data.
+            if (needsWakeup)
+                fetchBuffer.wakeup();
+
             if (!partitionsWithUpdatedLeaderInfo.isEmpty()) {
                 List<Node> leaderNodes = new ArrayList<>();
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 5512c962606..93ed987ddc0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1811,7 +1811,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         // use of a shorter, dedicated "pollTimer" here which updates "timer" 
so that calling method (poll) will
         // correctly handle the overall timeout.
         try {
-            fetchBuffer.awaitNotEmpty(pollTimer);
+            fetchBuffer.awaitWakeup(pollTimer);
         } catch (InterruptException e) {
             log.trace("Interrupt during fetch", e);
             throw e;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java
index 23adf9c9afa..6cf5bc301b3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchBuffer.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -51,7 +52,7 @@ public class FetchBuffer implements AutoCloseable {
     private final Logger log;
     private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
     private final Lock lock;
-    private final Condition notEmptyCondition;
+    private final Condition blockingCondition;
     private final IdempotentCloser idempotentCloser = new IdempotentCloser();
 
     private final AtomicBoolean wokenup = new AtomicBoolean(false);
@@ -62,7 +63,7 @@ public class FetchBuffer implements AutoCloseable {
         this.log = logContext.logger(FetchBuffer.class);
         this.completedFetches = new ConcurrentLinkedQueue<>();
         this.lock = new ReentrantLock();
-        this.notEmptyCondition = lock.newCondition();
+        this.blockingCondition = lock.newCondition();
     }
 
     /**
@@ -95,13 +96,7 @@ public class FetchBuffer implements AutoCloseable {
     }
 
     void add(CompletedFetch completedFetch) {
-        try {
-            lock.lock();
-            completedFetches.add(completedFetch);
-            notEmptyCondition.signalAll();
-        } finally {
-            lock.unlock();
-        }
+        addAll(List.of(completedFetch));
     }
 
     void addAll(Collection<CompletedFetch> completedFetches) {
@@ -111,7 +106,8 @@ public class FetchBuffer implements AutoCloseable {
         try {
             lock.lock();
             this.completedFetches.addAll(completedFetches);
-            notEmptyCondition.signalAll();
+            wokenup.set(true);
+            blockingCondition.signalAll();
         } finally {
             lock.unlock();
         }
@@ -154,23 +150,23 @@ public class FetchBuffer implements AutoCloseable {
     }
 
     /**
-     * Allows the caller to await presence of data in the buffer. The method 
will block, returning only
+     * Allows the caller to await a response from the broker for requested 
data. The method will block, returning only
      * under one of the following conditions:
      *
      * <ol>
-     *     <li>The buffer was already non-empty on entry</li>
-     *     <li>The buffer was populated during the wait</li>
+     *     <li>The buffer was already woken</li>
+     *     <li>The buffer was woken during the wait</li>
      *     <li>The remaining time on the {@link Timer timer} elapsed</li>
      *     <li>The thread was interrupted</li>
      * </ol>
      *
      * @param timer Timer that provides time to wait
      */
-    void awaitNotEmpty(Timer timer) {
+    void awaitWakeup(Timer timer) {
         try {
             lock.lock();
 
-            while (isEmpty() && !wokenup.compareAndSet(true, false)) {
+            while (!wokenup.compareAndSet(true, false)) {
                 // Update the timer before we head into the loop in case it 
took a while to get the lock.
                 timer.update();
 
@@ -185,7 +181,7 @@ public class FetchBuffer implements AutoCloseable {
                     break;
                 }
 
-                if (!notEmptyCondition.await(timer.remainingMs(), 
TimeUnit.MILLISECONDS)) {
+                if (!blockingCondition.await(timer.remainingMs(), 
TimeUnit.MILLISECONDS)) {
                     break;
                 }
             }
@@ -198,10 +194,10 @@ public class FetchBuffer implements AutoCloseable {
     }
 
     void wakeup() {
-        wokenup.set(true);
         try {
             lock.lock();
-            notEmptyCondition.signalAll();
+            wokenup.set(true);
+            blockingCondition.signalAll();
         } finally {
             lock.unlock();
         }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
index 9d6b0c2da88..5b2f6d6f48e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchBufferTest.java
@@ -177,7 +177,7 @@ public class FetchBufferTest {
         try (FetchBuffer fetchBuffer = new FetchBuffer(logContext)) {
             final Thread waitingThread = new Thread(() -> {
                 final Timer timer = time.timer(Duration.ofMinutes(1));
-                fetchBuffer.awaitNotEmpty(timer);
+                fetchBuffer.awaitWakeup(timer);
             });
             waitingThread.start();
             fetchBuffer.wakeup();

Reply via email to