Nikita-Shupletsov commented on code in PR #20767:
URL: https://github.com/apache/kafka/pull/20767#discussion_r2497788821


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -208,11 +208,7 @@ private void performActionsOnTasks() {
                             addTask(taskAndAction.task());
                             break;
                         case REMOVE:
-                            if (taskAndAction.futureForRemove() == null) {
-                                removeTask(taskAndAction.taskId());
-                            } else {
-                                removeTask(taskAndAction.taskId(), 
taskAndAction.futureForRemove());
-                            }
+                            removeTask(taskAndAction.taskId(), 
taskAndAction.futureForRemove());

Review Comment:
   the only place where we create remove tasks is here: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java#L858
   and we always provide a future.
   
   initially, there was no future, but then 
https://github.com/apache/kafka/commit/366aeab488c996e5012c2934996c4370637e6d5f 
introduced futures and kept both old and new approaches. as the old approach is 
not used anywhere, and there is no way to receive a TaskAndAction object from 
the outside



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to