philipnee commented on code in PR #14752:
URL: https://github.com/apache/kafka/pull/14752#discussion_r1392962733


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java:
##########
@@ -24,85 +25,133 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Ensures blocking APIs can be woken up by the consumer.wakeup().
+ * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}.
  */
-public class WakeupTrigger {
-    private final AtomicReference<Wakeupable> pendingTask = new 
AtomicReference<>(null);
+class WakeupTrigger {
+
+    private final static Object FAIL_NEXT_MARKER = new Object();
+    private final AtomicReference<Object> activeTask = new 
AtomicReference<>(null);
 
     /**
-     * Wakeup a pending task.  If there isn't any pending task, return a 
WakeupFuture, so that the subsequent call
-     * would know wakeup was previously called.
-     * <p>
-     * If there are active tasks, complete it with WakeupException, then unset 
pending task (return null here.
-     * If the current task has already been woken-up, do nothing.
+     * Wakeup a pending task.
+     *
+     * <p/>
+     *
+     * There are three cases that can happen when this method is invoked:
+     *
+     * <ul>
+     *     <li>
+     *         If there is no <em>active</em> task from a previous call to 
{@code setActiveTask}, we set an
+     *         internal indicator that the <em>next</em> attempt to start a 
long-running task (via a call to
+     *         {@link #setActiveTask(CompletableFuture)}) will fail with a 
{@link WakeupException}.
+     *     </li>
+     *     <li>
+     *         If there is an <em>active</em> task (i.e. there was a previous 
call to
+     *         {@link #setActiveTask(CompletableFuture)} for a long-running 
task), fail it via
+     *         {@link CompletableFuture#completeExceptionally(Throwable)} and 
then clear the <em>active</em> task.
+     *     </li>
+     *     <li>
+     *         If there is already an pending wakeup from a previous call to 
{@link Consumer#wakeup()}, do nothing.

Review Comment:
   "a pending"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to