philipnee commented on code in PR #14752:
URL: https://github.com/apache/kafka/pull/14752#discussion_r1392968241


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java:
##########
@@ -24,85 +25,133 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Ensures blocking APIs can be woken up by the consumer.wakeup().
+ * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}.
  */
-public class WakeupTrigger {
-    private final AtomicReference<Wakeupable> pendingTask = new 
AtomicReference<>(null);
+class WakeupTrigger {
+
+    private final static Object FAIL_NEXT_MARKER = new Object();
+    private final AtomicReference<Object> activeTask = new 
AtomicReference<>(null);
 
     /**
-     * Wakeup a pending task.  If there isn't any pending task, return a 
WakeupFuture, so that the subsequent call
-     * would know wakeup was previously called.
-     * <p>
-     * If there are active tasks, complete it with WakeupException, then unset 
pending task (return null here.
-     * If the current task has already been woken-up, do nothing.
+     * Wakeup a pending task.
+     *
+     * <p/>
+     *
+     * There are three cases that can happen when this method is invoked:
+     *
+     * <ul>
+     *     <li>
+     *         If there is no <em>active</em> task from a previous call to 
{@code setActiveTask}, we set an
+     *         internal indicator that the <em>next</em> attempt to start a 
long-running task (via a call to
+     *         {@link #setActiveTask(CompletableFuture)}) will fail with a 
{@link WakeupException}.
+     *     </li>
+     *     <li>
+     *         If there is an <em>active</em> task (i.e. there was a previous 
call to
+     *         {@link #setActiveTask(CompletableFuture)} for a long-running 
task), fail it via
+     *         {@link CompletableFuture#completeExceptionally(Throwable)} and 
then clear the <em>active</em> task.
+     *     </li>
+     *     <li>
+     *         If there is already an pending wakeup from a previous call to 
{@link Consumer#wakeup()}, do nothing.
+     *         We keep the internal state as is so that the future calls to 
{@link #setActiveTask(CompletableFuture)}
+     *         will fail as expected.
+     *     </li>
+     * </ul>
      */
-    public void wakeup() {
-        pendingTask.getAndUpdate(task -> {
-            if (task == null) {
-                return new WakeupFuture();
-            } else if (task instanceof ActiveFuture) {
-                ActiveFuture active = (ActiveFuture) task;
-                active.future().completeExceptionally(new WakeupException());
+    void wakeup() {
+        activeTask.getAndUpdate(existingTask -> {
+            if (existingTask == null) {
+                // If there isn't an existing task, return our marker, so that 
the subsequent call will
+                // know wakeup was previously called.
+                return FAIL_NEXT_MARKER;
+            } else if (existingTask instanceof CompletableFuture<?>) {
+                // If there is an existing "active" task, complete it with 
WakeupException.
+                CompletableFuture<?> active = (CompletableFuture<?>) 
existingTask;
+                active.completeExceptionally(new WakeupException());
+
+                // We return a null here to effectively unset the "active" 
task.
                 return null;
             } else {
-                return task;
+                // This is the case where the existing task is the wakeup 
marker. So the user has apparently
+                // called Consumer.wakeup() more than once.
+                return existingTask;
             }
         });
     }
 
     /**
-     *     If there is no pending task, set the pending task active.
-     *     If wakeup was called before setting an active task, the current 
task will complete exceptionally with
-     *     WakeupException right
-     *     away.
-     *     if there is an active task, throw exception.
-     * @param currentTask
-     * @param <T>
-     * @return
+     * This method should be called before execution a blocking operation in 
the {@link Consumer}. This will
+     * store an internal reference to the given <em>active</em> {@link 
CompletableFuture task} that can be
+     * {@link CompletableFuture#completeExceptionally(Throwable) forcibly 
failed} if the user invokes the
+     * {@link Consumer#wakeup()} call before or during its execution.
+     *
+     * <p/>
+     *
+     * There are three cases that can happen when this method is invoked:
+     *
+     * <ul>
+     *     <li>
+     *         If there is no <em>active</em> task from a previous call to 
{@code setActiveTask} <em>and</em> no
+     *         previous calls to {@link #wakeup()}, set the given {@link 
CompletableFuture} as the
+     *         <em>active</em> task.
+     *     </li>
+     *     <li>
+     *         If there was a previous call to {@link #wakeup()}, the given 
{@link CompletableFuture task} will fail
+     *         via {@link CompletableFuture#completeExceptionally(Throwable)} 
and the <em>active</em> task will be

Review Comment:
   similarly - should we mention WakeupException
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to