Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2024-02-16 Thread via GitHub


kirktrue closed pull request #14752: MINOR: WakeupTrigger cleanup
URL: https://github.com/apache/kafka/pull/14752


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2024-02-12 Thread via GitHub


github-actions[bot] commented on PR #14752:
URL: https://github.com/apache/kafka/pull/14752#issuecomment-1940369577

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2023-11-14 Thread via GitHub


philipnee commented on PR #14752:
URL: https://github.com/apache/kafka/pull/14752#issuecomment-1810767025

   Hi @kirktrue Thanks for taking time to clean up the task. I left some 
comments there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2023-11-14 Thread via GitHub


philipnee commented on code in PR #14752:
URL: https://github.com/apache/kafka/pull/14752#discussion_r1392968241


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java:
##
@@ -24,85 +25,133 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Ensures blocking APIs can be woken up by the consumer.wakeup().
+ * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}.
  */
-public class WakeupTrigger {
-private final AtomicReference pendingTask = new 
AtomicReference<>(null);
+class WakeupTrigger {
+
+private final static Object FAIL_NEXT_MARKER = new Object();
+private final AtomicReference activeTask = new 
AtomicReference<>(null);
 
 /**
- * Wakeup a pending task.  If there isn't any pending task, return a 
WakeupFuture, so that the subsequent call
- * would know wakeup was previously called.
- * 
- * If there are active tasks, complete it with WakeupException, then unset 
pending task (return null here.
- * If the current task has already been woken-up, do nothing.
+ * Wakeup a pending task.
+ *
+ * 
+ *
+ * There are three cases that can happen when this method is invoked:
+ *
+ * 
+ * 
+ * If there is no active task from a previous call to 
{@code setActiveTask}, we set an
+ * internal indicator that the next attempt to start a 
long-running task (via a call to
+ * {@link #setActiveTask(CompletableFuture)}) will fail with a 
{@link WakeupException}.
+ * 
+ * 
+ * If there is an active task (i.e. there was a previous 
call to
+ * {@link #setActiveTask(CompletableFuture)} for a long-running 
task), fail it via
+ * {@link CompletableFuture#completeExceptionally(Throwable)} and 
then clear the active task.
+ * 
+ * 
+ * If there is already an pending wakeup from a previous call to 
{@link Consumer#wakeup()}, do nothing.
+ * We keep the internal state as is so that the future calls to 
{@link #setActiveTask(CompletableFuture)}
+ * will fail as expected.
+ * 
+ * 
  */
-public void wakeup() {
-pendingTask.getAndUpdate(task -> {
-if (task == null) {
-return new WakeupFuture();
-} else if (task instanceof ActiveFuture) {
-ActiveFuture active = (ActiveFuture) task;
-active.future().completeExceptionally(new WakeupException());
+void wakeup() {
+activeTask.getAndUpdate(existingTask -> {
+if (existingTask == null) {
+// If there isn't an existing task, return our marker, so that 
the subsequent call will
+// know wakeup was previously called.
+return FAIL_NEXT_MARKER;
+} else if (existingTask instanceof CompletableFuture) {
+// If there is an existing "active" task, complete it with 
WakeupException.
+CompletableFuture active = (CompletableFuture) 
existingTask;
+active.completeExceptionally(new WakeupException());
+
+// We return a null here to effectively unset the "active" 
task.
 return null;
 } else {
-return task;
+// This is the case where the existing task is the wakeup 
marker. So the user has apparently
+// called Consumer.wakeup() more than once.
+return existingTask;
 }
 });
 }
 
 /**
- * If there is no pending task, set the pending task active.
- * If wakeup was called before setting an active task, the current 
task will complete exceptionally with
- * WakeupException right
- * away.
- * if there is an active task, throw exception.
- * @param currentTask
- * @param 
- * @return
+ * This method should be called before execution a blocking operation in 
the {@link Consumer}. This will
+ * store an internal reference to the given active {@link 
CompletableFuture task} that can be
+ * {@link CompletableFuture#completeExceptionally(Throwable) forcibly 
failed} if the user invokes the
+ * {@link Consumer#wakeup()} call before or during its execution.
+ *
+ * 
+ *
+ * There are three cases that can happen when this method is invoked:
+ *
+ * 
+ * 
+ * If there is no active task from a previous call to 
{@code setActiveTask} and no
+ * previous calls to {@link #wakeup()}, set the given {@link 
CompletableFuture} as the
+ * active task.
+ * 
+ * 
+ * If there was a previous call to {@link #wakeup()}, the given 
{@link CompletableFuture task} will fail
+ * via {@link CompletableFuture#completeExceptionally(Throwable)} 
and the active task will be

Review Comment:
   similarly - should we ment

Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2023-11-14 Thread via GitHub


philipnee commented on code in PR #14752:
URL: https://github.com/apache/kafka/pull/14752#discussion_r1392964486


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java:
##
@@ -24,85 +25,133 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Ensures blocking APIs can be woken up by the consumer.wakeup().
+ * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}.
  */
-public class WakeupTrigger {
-private final AtomicReference pendingTask = new 
AtomicReference<>(null);
+class WakeupTrigger {
+
+private final static Object FAIL_NEXT_MARKER = new Object();
+private final AtomicReference activeTask = new 
AtomicReference<>(null);
 
 /**
- * Wakeup a pending task.  If there isn't any pending task, return a 
WakeupFuture, so that the subsequent call
- * would know wakeup was previously called.
- * 
- * If there are active tasks, complete it with WakeupException, then unset 
pending task (return null here.
- * If the current task has already been woken-up, do nothing.
+ * Wakeup a pending task.
+ *
+ * 
+ *
+ * There are three cases that can happen when this method is invoked:
+ *
+ * 
+ * 
+ * If there is no active task from a previous call to 
{@code setActiveTask}, we set an
+ * internal indicator that the next attempt to start a 
long-running task (via a call to
+ * {@link #setActiveTask(CompletableFuture)}) will fail with a 
{@link WakeupException}.
+ * 
+ * 
+ * If there is an active task (i.e. there was a previous 
call to
+ * {@link #setActiveTask(CompletableFuture)} for a long-running 
task), fail it via
+ * {@link CompletableFuture#completeExceptionally(Throwable)} and 
then clear the active task.

Review Comment:
   should we mention about WakeupException - something like active task 
compelted exceptionally via WakeupException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2023-11-14 Thread via GitHub


philipnee commented on code in PR #14752:
URL: https://github.com/apache/kafka/pull/14752#discussion_r1392962733


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java:
##
@@ -24,85 +25,133 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Ensures blocking APIs can be woken up by the consumer.wakeup().
+ * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}.
  */
-public class WakeupTrigger {
-private final AtomicReference pendingTask = new 
AtomicReference<>(null);
+class WakeupTrigger {
+
+private final static Object FAIL_NEXT_MARKER = new Object();
+private final AtomicReference activeTask = new 
AtomicReference<>(null);
 
 /**
- * Wakeup a pending task.  If there isn't any pending task, return a 
WakeupFuture, so that the subsequent call
- * would know wakeup was previously called.
- * 
- * If there are active tasks, complete it with WakeupException, then unset 
pending task (return null here.
- * If the current task has already been woken-up, do nothing.
+ * Wakeup a pending task.
+ *
+ * 
+ *
+ * There are three cases that can happen when this method is invoked:
+ *
+ * 
+ * 
+ * If there is no active task from a previous call to 
{@code setActiveTask}, we set an
+ * internal indicator that the next attempt to start a 
long-running task (via a call to
+ * {@link #setActiveTask(CompletableFuture)}) will fail with a 
{@link WakeupException}.
+ * 
+ * 
+ * If there is an active task (i.e. there was a previous 
call to
+ * {@link #setActiveTask(CompletableFuture)} for a long-running 
task), fail it via
+ * {@link CompletableFuture#completeExceptionally(Throwable)} and 
then clear the active task.
+ * 
+ * 
+ * If there is already an pending wakeup from a previous call to 
{@link Consumer#wakeup()}, do nothing.

Review Comment:
   "a pending"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: WakeupTrigger cleanup [kafka]

2023-11-13 Thread via GitHub


kirktrue commented on PR #14752:
URL: https://github.com/apache/kafka/pull/14752#issuecomment-1809303495

   @philipnee Would you tag this with `ctr` and review?
   
   This is a low priority clean up.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org