mjsax commented on code in PR #21365:
URL: https://github.com/apache/kafka/pull/21365#discussion_r2757103115
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -193,7 +193,9 @@ public synchronized void removeTask(final Task
taskToRemove) {
throw new IllegalStateException("Attempted to remove a task that
is not closed or suspended: " + taskId);
}
- if (taskToRemove.isActive()) {
+ if (pendingTasksToInit.contains(taskToRemove)) {
Review Comment:
I see -- it's because we add all pending tasks for closing above:
```
for (Task pendingTask : tasks.pendingTasksToInit()) {
```
So these tasks are in CREATED when we get them, but we first CLOSE them and
then remove them. I am just wondering if this violates the invariant that all
tasks in `pendingTasksToInit` _should_ be in CREATED stated (not 100% if this
is an actual contract we follow, but it would make sense IMHO), and if we
should use `drainPendingTasksToInit` instead of `pendingTasksToInit`, because
we intent to close these tasks -- this would avoid that we need to remove these
tasks here, as `drainPendingTasksToInit` would do the removal for us already.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1414,10 +1414,19 @@ void shutdown(final boolean clean) {
// TODO: change type to `StreamTask`
final Set<Task> activeTasks = new
TreeSet<>(Comparator.comparing(Task::id));
activeTasks.addAll(tasks.activeTasks());
+ final Set<Task> standbyTasks = new
TreeSet<>(Comparator.comparing(Task::id));
+ standbyTasks.addAll(tasks.standbyTasks());
+ for (Task pendingTask : tasks.pendingTasksToInit()) {
+ if (pendingTask.isActive()) {
+ activeTasks.add(pendingTask);
+ } else {
+ standbyTasks.add(pendingTask);
+ }
+ }
Review Comment:
Thanks for updating the PR description. It say "shutdown during rebalance
when active task become standby tasks" but seems it goes either way, and the PR
is actually fixing both direction?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
Review Comment:
Cf my comment below
(https://github.com/apache/kafka/pull/21365/changes#r2757103115) -- maybe we
would need to add a new member to `TasksRegistry` taking "pending tasks" which
got closed?
Would like to hear from @lucasbru about this, as he know this part of the
code better. I am just worried to create some mess here (maybe I am wrong). --
Maybe it's also ok to just "reuse" `pendingTaskToInit` to also track these
tasks _after_ they got CLOSED. Just not sure about it.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1523,7 +1532,7 @@ private Collection<Task> tryCloseCleanActiveTasks(final
Collection<Task> activeT
final boolean clean,
final
AtomicReference<RuntimeException> firstException) {
if (!clean) {
- return activeTaskIterable();
+ return activeTasksToClose;
Review Comment:
> even without my change it's already a bit incorrect
I see -- so it's not a problem currently, but only because of ordering (ie,
"luck"), but if ordering changes, it would break. Thanks for clarifying.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]