cadonna commented on code in PR #12466:
URL: https://github.com/apache/kafka/pull/12466#discussion_r942289824


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -298,13 +298,12 @@ public void handleAssignment(final Map<TaskId, 
Set<TopicPartition>> activeTasks,
             logPrefix
         );
 
-        final LinkedHashMap<TaskId, RuntimeException> taskCloseExceptions = 
new LinkedHashMap<>();
         final Map<TaskId, Set<TopicPartition>> activeTasksToCreate = new 
HashMap<>(activeTasks);
         final Map<TaskId, Set<TopicPartition>> standbyTasksToCreate = new 
HashMap<>(standbyTasks);
         final Map<Task, Set<TopicPartition>> tasksToRecycle = new HashMap<>();
         final Set<Task> tasksToCloseClean = new 
TreeSet<>(Comparator.comparing(Task::id));
 
-        tasks.purgePendingTasks(activeTasks.keySet(), standbyTasks.keySet());
+        tasks.purgePendingTasksToCreate(activeTasks.keySet(), 
standbyTasks.keySet());

Review Comment:
   Do we really need to purge the pending tasks here?
   Wouldn't it be clearer to clear the pending tasks to create? 
   We would add the pending tasks anyways on line 318 and 319.
   Maybe we can even first clear the pending tasks to create and then 
immediately call here:
   
   ```
   
tasks.addPendingActiveTasksToCreate(pendingTasksToCreate(activeTasksToCreate));
   
tasks.addPendingStandbyTasksToCreate(pendingTasksToCreate(standbyTasksToCreate));
   ```
   
   In this way the pending tasks to create get removed from 
`activeTaskToCreate` and `standbyTaskToCreate` which is OK because the pending 
tasks have never been created, thus they will never be iterated over in 
`classifyTasks*()`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -442,26 +440,25 @@ private void classifyTasksWithStateUpdater(final 
Map<TaskId, Set<TopicPartition>
         classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, 
tasksToRecycle, tasksToCloseClean);
         for (final Task task : stateUpdater.getTasks()) {
             final TaskId taskId = task.id();
+            final Set<TopicPartition> topicPartitions = 
activeTasksToCreate.get(taskId);
             if (activeTasksToCreate.containsKey(taskId)) {
                 if (task.isActive()) {
-                    final Set<TopicPartition> topicPartitions = 
activeTasksToCreate.get(taskId);
                     if (!task.inputPartitions().equals(topicPartitions)) {
-                        
tasks.addPendingTaskThatNeedsInputPartitionsUpdate(taskId);
+                        tasks.addPendingTaskToUpdateInputPartitions(taskId, 
topicPartitions);

Review Comment:
   Thank you for adding the topic partitions as a parameter.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -269,7 +271,6 @@ Collection<Task> tasks(final Collection<TaskId> taskIds) {
         return tasks;
     }
 
-    // TODO: change return type to `StreamTask`

Review Comment:
   Why did you remove the ToDo?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java:
##########
@@ -23,13 +23,16 @@
 
 import java.util.Collections;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
 import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
 import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+

Review Comment:
   nit: remove blank line



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java:
##########
@@ -71,6 +72,8 @@ public class ActiveTaskCreatorTest {
     private StateDirectory stateDirectory;
     @Mock(type = MockType.NICE)
     private ChangelogReader changeLogReader;
+    @Mock(type = MockType.NICE)
+    private ProcessorStateManager stateManager;

Review Comment:
   This is never used.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -43,9 +43,7 @@
 class Tasks {
     private final Logger log;
 
-    // TODO: change type to `StreamTask`
     private final Map<TaskId, Task> activeTasksPerId = new TreeMap<>();
-    // TODO: change type to `StandbyTask`
     private final Map<TaskId, Task> standbyTasksPerId = new TreeMap<>();

Review Comment:
   Why did you remove the ToDos?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -641,9 +641,9 @@ boolean tryToCompleteRestoration(final long now, final 
java.util.function.Consum
                 }
             }
         } else {
-            for (final Task task : tasks.drainPendingTaskToRestore()) {
-                stateUpdater.add(task);
-            }
+            addTaskstoStateUpdater();
+
+            handleRemovedTasksFromStateUpdater();

Review Comment:
   Could you add some unit tests for this code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to