This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fdece9c358f MINOR:  Keep pendingTask as WakeupFuture if currentTask is 
completed already.  (#21586)
fdece9c358f is described below

commit fdece9c358f3c60a83f086d8adac9749d0e45fba
Author: Nikita Shupletsov <[email protected]>
AuthorDate: Mon Mar 9 15:42:22 2026 -0700

    MINOR:  Keep pendingTask as WakeupFuture if currentTask is completed 
already.  (#21586)
    
    System tests that use VerifiableConsumer are flaky because
    VerifiableConsumer isn't shutting down on request in certain situations.
    There can be a race condition in the commitSync method, as the future
    that we set as the active task to the wakeupTrigger can be already
    completed by the time we are setting it. Which leads to the wakeup
    request never being fulfilled.  Added a check if the task we are
    receiving in setActiveTask was triggered when we complete it
    exceptionally.
    
    Also added additional logging when a shutdown is requested to make
    debugging easier.
    
    Reviewers: Kirk True <[email protected]>, Bill Bejeck
     <[email protected]>
---
 .../clients/consumer/internals/WakeupTrigger.java  |  8 ++++--
 .../consumer/internals/WakeupTriggerTest.java      | 33 ++++++++++++++++++++++
 tests/kafkatest/services/verifiable_consumer.py    |  2 ++
 .../org/apache/kafka/tools/VerifiableConsumer.java |  9 ++++++
 4 files changed, 50 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
index 7893cf29f23..b7d8c001798 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java
@@ -78,8 +78,12 @@ public class WakeupTrigger {
             if (task == null) {
                 return new ActiveFuture(currentTask);
             } else if (task instanceof WakeupFuture) {
-                currentTask.completeExceptionally(new WakeupException());
-                return null;
+                boolean wasTriggered = currentTask.completeExceptionally(new 
WakeupException());
+
+                // If the Future was *already* completed when we invoke 
completeExceptionally, the WakeupException
+                // will be ignored. If it was already completed, we then need 
to return a new WakeupFuture so that the
+                // next call to setActiveTask will throw the WakeupException.
+                return wasTriggered ? null : new WakeupFuture();
             } else if (task instanceof DisabledWakeups) {
                 return task;
             }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
index 518f1cc6978..ad3a5071480 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java
@@ -219,6 +219,39 @@ public class WakeupTriggerTest {
         assertThrows(WakeupException.class, () -> 
wakeupTrigger.maybeTriggerWakeup());
     }
 
+    @Test
+    public void 
testExceptionTriggeredWhenTaskAsynchronouslyCompletedBeforeSet() {
+        final CompletableFuture<Void> task = new CompletableFuture<>();
+        task.complete(null);
+        wakeupTrigger.wakeup();
+        wakeupTrigger.setActiveTask(task);
+        assertNotNull(wakeupTrigger.getPendingTask());
+        assertInstanceOf(WakeupTrigger.WakeupFuture.class, 
wakeupTrigger.getPendingTask());
+        assertThrows(WakeupException.class, () -> 
wakeupTrigger.maybeTriggerWakeup());
+    }
+
+    @Test
+    public void testExceptionTriggeredWhenTaskAsynchronouslyFailedBeforeSet() {
+        final CompletableFuture<Void> task = new CompletableFuture<>();
+        task.completeExceptionally(new RuntimeException("Simulated error"));
+        wakeupTrigger.wakeup();
+        wakeupTrigger.setActiveTask(task);
+        assertNotNull(wakeupTrigger.getPendingTask());
+        assertInstanceOf(WakeupTrigger.WakeupFuture.class, 
wakeupTrigger.getPendingTask());
+        assertThrows(WakeupException.class, () -> 
wakeupTrigger.maybeTriggerWakeup());
+    }
+
+    @Test
+    public void 
testExceptionTriggeredWhenTaskAsynchronouslyCancelledBeforeSet() {
+        final CompletableFuture<Void> task = new CompletableFuture<>();
+        task.cancel(true);
+        wakeupTrigger.wakeup();
+        wakeupTrigger.setActiveTask(task);
+        assertNotNull(wakeupTrigger.getPendingTask());
+        assertInstanceOf(WakeupTrigger.WakeupFuture.class, 
wakeupTrigger.getPendingTask());
+        assertThrows(WakeupException.class, () -> 
wakeupTrigger.maybeTriggerWakeup());
+    }
+
     private void assertWakeupExceptionIsThrown(final CompletableFuture<?> 
future) {
         assertTrue(future.isCompletedExceptionally());
         assertInstanceOf(WakeupException.class,
diff --git a/tests/kafkatest/services/verifiable_consumer.py 
b/tests/kafkatest/services/verifiable_consumer.py
index dd05b693638..7ae3ee3c9dc 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -352,6 +352,8 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
                         handler.handle_partitions_revoked(event, node, 
self.logger)
                     elif name == "partitions_assigned":
                         handler.handle_partitions_assigned(event, node, 
self.logger)
+                    elif name == "shutdown_requested":
+                        self.logger.debug("Shutdown has been requested")
                     else:
                         self.logger.debug("%s: ignoring unknown event: %s" % 
(str(node.account), event))
 
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index d1e51a6ae78..25c82c713b1 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -259,6 +259,7 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
     public void close() {
         boolean interrupted = false;
         try {
+            printJson(new ShutdownRequested());
             consumer.wakeup();
             while (true) {
                 try {
@@ -295,6 +296,14 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
         }
     }
 
+    private static class ShutdownRequested extends ConsumerEvent {
+
+        @Override
+        public String name() {
+            return "shutdown_requested";
+        }
+    }
+
     private static class ShutdownComplete extends ConsumerEvent {
 
         @Override

Reply via email to