Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
kirktrue closed pull request #14752: MINOR: WakeupTrigger cleanup URL: https://github.com/apache/kafka/pull/14752 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
github-actions[bot] commented on PR #14752: URL: https://github.com/apache/kafka/pull/14752#issuecomment-1940369577 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
philipnee commented on PR #14752: URL: https://github.com/apache/kafka/pull/14752#issuecomment-1810767025 Hi @kirktrue Thanks for taking time to clean up the task. I left some comments there. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
philipnee commented on code in PR #14752: URL: https://github.com/apache/kafka/pull/14752#discussion_r1392968241 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java: ## @@ -24,85 +25,133 @@ import java.util.concurrent.atomic.AtomicReference; /** - * Ensures blocking APIs can be woken up by the consumer.wakeup(). + * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}. */ -public class WakeupTrigger { -private final AtomicReference pendingTask = new AtomicReference<>(null); +class WakeupTrigger { + +private final static Object FAIL_NEXT_MARKER = new Object(); +private final AtomicReference activeTask = new AtomicReference<>(null); /** - * Wakeup a pending task. If there isn't any pending task, return a WakeupFuture, so that the subsequent call - * would know wakeup was previously called. - * - * If there are active tasks, complete it with WakeupException, then unset pending task (return null here. - * If the current task has already been woken-up, do nothing. + * Wakeup a pending task. + * + * + * + * There are three cases that can happen when this method is invoked: + * + * + * + * If there is no active task from a previous call to {@code setActiveTask}, we set an + * internal indicator that the next attempt to start a long-running task (via a call to + * {@link #setActiveTask(CompletableFuture)}) will fail with a {@link WakeupException}. + * + * + * If there is an active task (i.e. there was a previous call to + * {@link #setActiveTask(CompletableFuture)} for a long-running task), fail it via + * {@link CompletableFuture#completeExceptionally(Throwable)} and then clear the active task. + * + * + * If there is already an pending wakeup from a previous call to {@link Consumer#wakeup()}, do nothing. + * We keep the internal state as is so that the future calls to {@link #setActiveTask(CompletableFuture)} + * will fail as expected. + * + * */ -public void wakeup() { -pendingTask.getAndUpdate(task -> { -if (task == null) { -return new WakeupFuture(); -} else if (task instanceof ActiveFuture) { -ActiveFuture active = (ActiveFuture) task; -active.future().completeExceptionally(new WakeupException()); +void wakeup() { +activeTask.getAndUpdate(existingTask -> { +if (existingTask == null) { +// If there isn't an existing task, return our marker, so that the subsequent call will +// know wakeup was previously called. +return FAIL_NEXT_MARKER; +} else if (existingTask instanceof CompletableFuture) { +// If there is an existing "active" task, complete it with WakeupException. +CompletableFuture active = (CompletableFuture) existingTask; +active.completeExceptionally(new WakeupException()); + +// We return a null here to effectively unset the "active" task. return null; } else { -return task; +// This is the case where the existing task is the wakeup marker. So the user has apparently +// called Consumer.wakeup() more than once. +return existingTask; } }); } /** - * If there is no pending task, set the pending task active. - * If wakeup was called before setting an active task, the current task will complete exceptionally with - * WakeupException right - * away. - * if there is an active task, throw exception. - * @param currentTask - * @param - * @return + * This method should be called before execution a blocking operation in the {@link Consumer}. This will + * store an internal reference to the given active {@link CompletableFuture task} that can be + * {@link CompletableFuture#completeExceptionally(Throwable) forcibly failed} if the user invokes the + * {@link Consumer#wakeup()} call before or during its execution. + * + * + * + * There are three cases that can happen when this method is invoked: + * + * + * + * If there is no active task from a previous call to {@code setActiveTask} and no + * previous calls to {@link #wakeup()}, set the given {@link CompletableFuture} as the + * active task. + * + * + * If there was a previous call to {@link #wakeup()}, the given {@link CompletableFuture task} will fail + * via {@link CompletableFuture#completeExceptionally(Throwable)} and the active task will be Review Comment: similarly - should we ment
Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
philipnee commented on code in PR #14752: URL: https://github.com/apache/kafka/pull/14752#discussion_r1392964486 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java: ## @@ -24,85 +25,133 @@ import java.util.concurrent.atomic.AtomicReference; /** - * Ensures blocking APIs can be woken up by the consumer.wakeup(). + * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}. */ -public class WakeupTrigger { -private final AtomicReference pendingTask = new AtomicReference<>(null); +class WakeupTrigger { + +private final static Object FAIL_NEXT_MARKER = new Object(); +private final AtomicReference activeTask = new AtomicReference<>(null); /** - * Wakeup a pending task. If there isn't any pending task, return a WakeupFuture, so that the subsequent call - * would know wakeup was previously called. - * - * If there are active tasks, complete it with WakeupException, then unset pending task (return null here. - * If the current task has already been woken-up, do nothing. + * Wakeup a pending task. + * + * + * + * There are three cases that can happen when this method is invoked: + * + * + * + * If there is no active task from a previous call to {@code setActiveTask}, we set an + * internal indicator that the next attempt to start a long-running task (via a call to + * {@link #setActiveTask(CompletableFuture)}) will fail with a {@link WakeupException}. + * + * + * If there is an active task (i.e. there was a previous call to + * {@link #setActiveTask(CompletableFuture)} for a long-running task), fail it via + * {@link CompletableFuture#completeExceptionally(Throwable)} and then clear the active task. Review Comment: should we mention about WakeupException - something like active task compelted exceptionally via WakeupException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
philipnee commented on code in PR #14752: URL: https://github.com/apache/kafka/pull/14752#discussion_r1392962733 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java: ## @@ -24,85 +25,133 @@ import java.util.concurrent.atomic.AtomicReference; /** - * Ensures blocking APIs can be woken up by the consumer.wakeup(). + * Ensures blocking APIs can be woken up by {@link Consumer#wakeup()}. */ -public class WakeupTrigger { -private final AtomicReference pendingTask = new AtomicReference<>(null); +class WakeupTrigger { + +private final static Object FAIL_NEXT_MARKER = new Object(); +private final AtomicReference activeTask = new AtomicReference<>(null); /** - * Wakeup a pending task. If there isn't any pending task, return a WakeupFuture, so that the subsequent call - * would know wakeup was previously called. - * - * If there are active tasks, complete it with WakeupException, then unset pending task (return null here. - * If the current task has already been woken-up, do nothing. + * Wakeup a pending task. + * + * + * + * There are three cases that can happen when this method is invoked: + * + * + * + * If there is no active task from a previous call to {@code setActiveTask}, we set an + * internal indicator that the next attempt to start a long-running task (via a call to + * {@link #setActiveTask(CompletableFuture)}) will fail with a {@link WakeupException}. + * + * + * If there is an active task (i.e. there was a previous call to + * {@link #setActiveTask(CompletableFuture)} for a long-running task), fail it via + * {@link CompletableFuture#completeExceptionally(Throwable)} and then clear the active task. + * + * + * If there is already an pending wakeup from a previous call to {@link Consumer#wakeup()}, do nothing. Review Comment: "a pending" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: WakeupTrigger cleanup [kafka]
kirktrue commented on PR #14752: URL: https://github.com/apache/kafka/pull/14752#issuecomment-1809303495 @philipnee Would you tag this with `ctr` and review? This is a low priority clean up. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org