mjsax commented on code in PR #21365:
URL: https://github.com/apache/kafka/pull/21365#discussion_r2757103115


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -193,7 +193,9 @@ public synchronized void removeTask(final Task 
taskToRemove) {
             throw new IllegalStateException("Attempted to remove a task that 
is not closed or suspended: " + taskId);
         }
 
-        if (taskToRemove.isActive()) {
+        if (pendingTasksToInit.contains(taskToRemove)) {

Review Comment:
   I see -- it's because we add all pending tasks for closing above:
   ```
    for (Task pendingTask : tasks.pendingTasksToInit()) {
   ```
   
   So these tasks are in CREATED when we get them, but we first CLOSE them and 
then remove them. I am just wondering if this violates the invariant that all 
tasks in `pendingTasksToInit` _should_ be in CREATED stated (not 100% if this 
is an actual contract we follow, but it would make sense IMHO), and if we 
should use `drainPendingTasksToInit` instead of `pendingTasksToInit`, because 
we intent to close these tasks -- this would avoid that we need to remove these 
tasks here, as `drainPendingTasksToInit` would do the removal for us already.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1414,10 +1414,19 @@ void shutdown(final boolean clean) {
         // TODO: change type to `StreamTask`
         final Set<Task> activeTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
         activeTasks.addAll(tasks.activeTasks());
+        final Set<Task> standbyTasks = new 
TreeSet<>(Comparator.comparing(Task::id));
+        standbyTasks.addAll(tasks.standbyTasks());
+        for (Task pendingTask : tasks.pendingTasksToInit()) {
+            if (pendingTask.isActive()) {
+                activeTasks.add(pendingTask);
+            } else {
+                standbyTasks.add(pendingTask);
+            }
+        }

Review Comment:
   Thanks for updating the PR description. It say "shutdown during rebalance 
when active task become standby tasks" but seems it goes either way, and the PR 
is actually fixing both direction?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########


Review Comment:
   Cf my comment below 
(https://github.com/apache/kafka/pull/21365/changes#r2757103115) -- maybe we 
would need to add a new member to `TasksRegistry` taking "pending tasks" which 
got closed?
   
   Would like to hear from @lucasbru  about this, as he know this part of the 
code better. I am just worried to create some mess here (maybe I am wrong). -- 
Maybe it's also ok to just "reuse" `pendingTaskToInit` to also track these 
tasks _after_ they got CLOSED. Just not sure about it.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1523,7 +1532,7 @@ private Collection<Task> tryCloseCleanActiveTasks(final 
Collection<Task> activeT
                                                       final boolean clean,
                                                       final 
AtomicReference<RuntimeException> firstException) {
         if (!clean) {
-            return activeTaskIterable();
+            return activeTasksToClose;

Review Comment:
   > even without my change it's already a bit incorrect
   
   I see -- so it's not a problem currently, but only because of ordering (ie, 
"luck"), but if ordering changes, it would break. Thanks for clarifying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to