Re: [PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]

2024-05-16 Thread via GitHub


cadonna commented on code in PR #15882:
URL: https://github.com/apache/kafka/pull/15882#discussion_r1603565420


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1750,6 +1761,7 @@ Map allTasks() {
 if (stateUpdater != null) {
 final Map ret = 
stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
 ret.putAll(tasks.allTasksPerId());
+
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
 x -> x)));

Review Comment:
   Here the PR: https://github.com/apache/kafka/pull/15978



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]

2024-05-16 Thread via GitHub


cadonna commented on code in PR #15882:
URL: https://github.com/apache/kafka/pull/15882#discussion_r1603147808


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1750,6 +1761,7 @@ Map allTasks() {
 if (stateUpdater != null) {
 final Map ret = 
stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
 ret.putAll(tasks.allTasksPerId());
+
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
 x -> x)));

Review Comment:
   @chia7712 Thanks for notifying me. I will have a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]

2024-05-15 Thread via GitHub


chia7712 commented on code in PR #15882:
URL: https://github.com/apache/kafka/pull/15882#discussion_r1602161837


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1750,6 +1761,7 @@ Map allTasks() {
 if (stateUpdater != null) {
 final Map ret = 
stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
 ret.putAll(tasks.allTasksPerId());
+
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
 x -> x)));

Review Comment:
   @cadonna Could you please take a look at 
https://issues.apache.org/jira/browse/KAFKA-16774?
   
   It seems this change could make 
StreamThreadTest#shouldCloseAllTaskProducersOnCloseIfEosEnabled failed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]

2024-05-13 Thread via GitHub


cadonna merged PR #15882:
URL: https://github.com/apache/kafka/pull/15882


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]

2024-05-08 Thread via GitHub


cadonna commented on code in PR #15882:
URL: https://github.com/apache/kafka/pull/15882#discussion_r1593995260


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -544,69 +585,78 @@ private void handleReassignedActiveTask(final Task task,
 }
 
 private void handleTasksInStateUpdater(final Map> activeTasksToCreate,
-   final Map> standbyTasksToCreate) {
+   final Map> standbyTasksToCreate,
+   final Map> tasksToRecycle,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater,
+   final Map 
failedTasks) {
+final Map> newInputPartitions = new 
HashMap<>();
+final Map> standbyInputPartitions = new 
HashMap<>();
+final Map> activeInputPartitions = new 
HashMap<>();
+final Map> 
futuresForUpdatingInputPartitions = new LinkedHashMap<>();
+final Map> 
futuresForActiveTasksToRecycle = new LinkedHashMap<>();
+final Map> 
futuresForStandbyTasksToRecycle = new LinkedHashMap<>();
+final Map> 
futuresForTasksToClose = new LinkedHashMap<>();
 for (final Task task : stateUpdater.getTasks()) {
 final TaskId taskId = task.id();
 if (activeTasksToCreate.containsKey(taskId)) {
-final Set inputPartitions = 
activeTasksToCreate.get(taskId);
-if (task.isActive() && 
!task.inputPartitions().equals(inputPartitions)) {
-if (tasks.removePendingTaskToCloseClean(taskId)) {
-
tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(taskId, 
inputPartitions);
-} else {
-tasks.addPendingTaskToUpdateInputPartitions(taskId, 
inputPartitions);
-stateUpdater.remove(taskId);
-}
-} else if (task.isActive()) {
-if (tasks.removePendingActiveTaskToSuspend(taskId)) {
-log.info(
-"We were planning on suspending a task {} because 
it was revoked " +
-"The task got reassigned to this thread, so we 
cancel suspending " +
-"of the task, but add it back to the state 
updater, since we do not know " +
-"if it is fully restored yet.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
-}
-if (tasks.removePendingTaskToCloseClean(taskId)) {
-log.info(
-"We were planning on closing task {} because we 
lost one of its partitions." +
-"The task got reassigned to this thread, so cancel 
closing  of the task, but add it back to the " +
-"state updater, since we may have to catch up on 
the changelog.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
+if (task.isActive()) {
+if 
(!task.inputPartitions().equals(activeTasksToCreate.get(taskId))) {
+final 
CompletableFuture future = 
stateUpdater.removeWithFuture(taskId);
+futuresForUpdatingInputPartitions.put(taskId, future);
+newInputPartitions.put(taskId, 
activeTasksToCreate.get(taskId));
 }
 } else {
-removeTaskToRecycleFromStateUpdater(taskId, 
inputPartitions);
+final CompletableFuture 
future = stateUpdater.removeWithFuture(taskId);
+futuresForStandbyTasksToRecycle.put(taskId, future);
+activeInputPartitions.put(taskId, 
activeTasksToCreate.get(taskId));
 }
 activeTasksToCreate.remove(taskId);
 } else if (standbyTasksToCreate.containsKey(taskId)) {
 if (task.isActive()) {
-removeTaskToRecycleFromStateUpdater(taskId, 
standbyTasksToCreate.get(taskId));
-} else {
-if (tasks.removePendingTaskToRecycle(taskId) != null) {
-log.info(
-"We were planning on recycling standby task {} to 
an active task." +
-"The task got reassigned to this thread as a 
standby task, so cancel recycling of the task, " +
-"but add it back to the state updater, since 
we may have to catch up on the changelog.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
-}
+final Compl

Re: [PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]

2024-05-08 Thread via GitHub


lucasbru commented on code in PR #15882:
URL: https://github.com/apache/kafka/pull/15882#discussion_r1593900987


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -544,69 +585,78 @@ private void handleReassignedActiveTask(final Task task,
 }
 
 private void handleTasksInStateUpdater(final Map> activeTasksToCreate,
-   final Map> standbyTasksToCreate) {
+   final Map> standbyTasksToCreate,
+   final Map> tasksToRecycle,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater,
+   final Map 
failedTasks) {
+final Map> newInputPartitions = new 
HashMap<>();
+final Map> standbyInputPartitions = new 
HashMap<>();
+final Map> activeInputPartitions = new 
HashMap<>();
+final Map> 
futuresForUpdatingInputPartitions = new LinkedHashMap<>();
+final Map> 
futuresForActiveTasksToRecycle = new LinkedHashMap<>();
+final Map> 
futuresForStandbyTasksToRecycle = new LinkedHashMap<>();
+final Map> 
futuresForTasksToClose = new LinkedHashMap<>();
 for (final Task task : stateUpdater.getTasks()) {
 final TaskId taskId = task.id();
 if (activeTasksToCreate.containsKey(taskId)) {
-final Set inputPartitions = 
activeTasksToCreate.get(taskId);
-if (task.isActive() && 
!task.inputPartitions().equals(inputPartitions)) {
-if (tasks.removePendingTaskToCloseClean(taskId)) {
-
tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(taskId, 
inputPartitions);
-} else {
-tasks.addPendingTaskToUpdateInputPartitions(taskId, 
inputPartitions);
-stateUpdater.remove(taskId);
-}
-} else if (task.isActive()) {
-if (tasks.removePendingActiveTaskToSuspend(taskId)) {
-log.info(
-"We were planning on suspending a task {} because 
it was revoked " +
-"The task got reassigned to this thread, so we 
cancel suspending " +
-"of the task, but add it back to the state 
updater, since we do not know " +
-"if it is fully restored yet.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
-}
-if (tasks.removePendingTaskToCloseClean(taskId)) {
-log.info(
-"We were planning on closing task {} because we 
lost one of its partitions." +
-"The task got reassigned to this thread, so cancel 
closing  of the task, but add it back to the " +
-"state updater, since we may have to catch up on 
the changelog.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
+if (task.isActive()) {
+if 
(!task.inputPartitions().equals(activeTasksToCreate.get(taskId))) {
+final 
CompletableFuture future = 
stateUpdater.removeWithFuture(taskId);
+futuresForUpdatingInputPartitions.put(taskId, future);
+newInputPartitions.put(taskId, 
activeTasksToCreate.get(taskId));
 }
 } else {
-removeTaskToRecycleFromStateUpdater(taskId, 
inputPartitions);
+final CompletableFuture 
future = stateUpdater.removeWithFuture(taskId);
+futuresForStandbyTasksToRecycle.put(taskId, future);
+activeInputPartitions.put(taskId, 
activeTasksToCreate.get(taskId));
 }
 activeTasksToCreate.remove(taskId);
 } else if (standbyTasksToCreate.containsKey(taskId)) {
 if (task.isActive()) {
-removeTaskToRecycleFromStateUpdater(taskId, 
standbyTasksToCreate.get(taskId));
-} else {
-if (tasks.removePendingTaskToRecycle(taskId) != null) {
-log.info(
-"We were planning on recycling standby task {} to 
an active task." +
-"The task got reassigned to this thread as a 
standby task, so cancel recycling of the task, " +
-"but add it back to the state updater, since 
we may have to catch up on the changelog.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
-}
+final Comp

Re: [PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]

2024-05-08 Thread via GitHub


cadonna commented on code in PR #15882:
URL: https://github.com/apache/kafka/pull/15882#discussion_r1593836743


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -365,17 +365,47 @@ public void handleAssignment(final Map> activeTasks,
 // 1. for tasks that are already owned, just update input partitions / 
resume and skip re-creating them
 // 2. for tasks that have changed active/standby status, just recycle 
and skip re-creating them
 // 3. otherwise, close them since they are no longer owned
+final Map failedTasks = new 
LinkedHashMap<>();
 if (stateUpdater == null) {
 handleTasksWithoutStateUpdater(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
 } else {
-handleTasksWithStateUpdater(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+final Map> 
tasksToRecycleFromStateUpdater = new HashMap<>();

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]

2024-05-08 Thread via GitHub


cadonna commented on code in PR #15882:
URL: https://github.com/apache/kafka/pull/15882#discussion_r1593835933


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -544,69 +585,78 @@ private void handleReassignedActiveTask(final Task task,
 }
 
 private void handleTasksInStateUpdater(final Map> activeTasksToCreate,
-   final Map> standbyTasksToCreate) {
+   final Map> standbyTasksToCreate,
+   final Map> tasksToRecycle,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater,
+   final Map 
failedTasks) {
+final Map> newInputPartitions = new 
HashMap<>();
+final Map> standbyInputPartitions = new 
HashMap<>();
+final Map> activeInputPartitions = new 
HashMap<>();
+final Map> 
futuresForUpdatingInputPartitions = new LinkedHashMap<>();
+final Map> 
futuresForActiveTasksToRecycle = new LinkedHashMap<>();
+final Map> 
futuresForStandbyTasksToRecycle = new LinkedHashMap<>();
+final Map> 
futuresForTasksToClose = new LinkedHashMap<>();
 for (final Task task : stateUpdater.getTasks()) {
 final TaskId taskId = task.id();
 if (activeTasksToCreate.containsKey(taskId)) {
-final Set inputPartitions = 
activeTasksToCreate.get(taskId);
-if (task.isActive() && 
!task.inputPartitions().equals(inputPartitions)) {
-if (tasks.removePendingTaskToCloseClean(taskId)) {
-
tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(taskId, 
inputPartitions);
-} else {
-tasks.addPendingTaskToUpdateInputPartitions(taskId, 
inputPartitions);
-stateUpdater.remove(taskId);
-}
-} else if (task.isActive()) {
-if (tasks.removePendingActiveTaskToSuspend(taskId)) {
-log.info(
-"We were planning on suspending a task {} because 
it was revoked " +
-"The task got reassigned to this thread, so we 
cancel suspending " +
-"of the task, but add it back to the state 
updater, since we do not know " +
-"if it is fully restored yet.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
-}
-if (tasks.removePendingTaskToCloseClean(taskId)) {
-log.info(
-"We were planning on closing task {} because we 
lost one of its partitions." +
-"The task got reassigned to this thread, so cancel 
closing  of the task, but add it back to the " +
-"state updater, since we may have to catch up on 
the changelog.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
+if (task.isActive()) {
+if 
(!task.inputPartitions().equals(activeTasksToCreate.get(taskId))) {
+final 
CompletableFuture future = 
stateUpdater.removeWithFuture(taskId);
+futuresForUpdatingInputPartitions.put(taskId, future);
+newInputPartitions.put(taskId, 
activeTasksToCreate.get(taskId));
 }
 } else {
-removeTaskToRecycleFromStateUpdater(taskId, 
inputPartitions);
+final CompletableFuture 
future = stateUpdater.removeWithFuture(taskId);
+futuresForStandbyTasksToRecycle.put(taskId, future);
+activeInputPartitions.put(taskId, 
activeTasksToCreate.get(taskId));
 }
 activeTasksToCreate.remove(taskId);
 } else if (standbyTasksToCreate.containsKey(taskId)) {
 if (task.isActive()) {
-removeTaskToRecycleFromStateUpdater(taskId, 
standbyTasksToCreate.get(taskId));
-} else {
-if (tasks.removePendingTaskToRecycle(taskId) != null) {
-log.info(
-"We were planning on recycling standby task {} to 
an active task." +
-"The task got reassigned to this thread as a 
standby task, so cancel recycling of the task, " +
-"but add it back to the state updater, since 
we may have to catch up on the changelog.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
-}
+final Compl

Re: [PR] KAFKA-10199: Handle assignment with new remove operation in state updater [kafka]

2024-05-08 Thread via GitHub


lucasbru commented on code in PR #15882:
URL: https://github.com/apache/kafka/pull/15882#discussion_r1593738288


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -544,69 +585,78 @@ private void handleReassignedActiveTask(final Task task,
 }
 
 private void handleTasksInStateUpdater(final Map> activeTasksToCreate,
-   final Map> standbyTasksToCreate) {
+   final Map> standbyTasksToCreate,
+   final Map> tasksToRecycle,
+   final Set 
tasksToCloseCleanFromStateUpdater,
+   final Set 
tasksToCloseDirtyFromStateUpdater,
+   final Map 
failedTasks) {
+final Map> newInputPartitions = new 
HashMap<>();
+final Map> standbyInputPartitions = new 
HashMap<>();
+final Map> activeInputPartitions = new 
HashMap<>();
+final Map> 
futuresForUpdatingInputPartitions = new LinkedHashMap<>();
+final Map> 
futuresForActiveTasksToRecycle = new LinkedHashMap<>();
+final Map> 
futuresForStandbyTasksToRecycle = new LinkedHashMap<>();
+final Map> 
futuresForTasksToClose = new LinkedHashMap<>();
 for (final Task task : stateUpdater.getTasks()) {
 final TaskId taskId = task.id();
 if (activeTasksToCreate.containsKey(taskId)) {
-final Set inputPartitions = 
activeTasksToCreate.get(taskId);
-if (task.isActive() && 
!task.inputPartitions().equals(inputPartitions)) {
-if (tasks.removePendingTaskToCloseClean(taskId)) {
-
tasks.addPendingTaskToCloseReviveAndUpdateInputPartitions(taskId, 
inputPartitions);
-} else {
-tasks.addPendingTaskToUpdateInputPartitions(taskId, 
inputPartitions);
-stateUpdater.remove(taskId);
-}
-} else if (task.isActive()) {
-if (tasks.removePendingActiveTaskToSuspend(taskId)) {
-log.info(
-"We were planning on suspending a task {} because 
it was revoked " +
-"The task got reassigned to this thread, so we 
cancel suspending " +
-"of the task, but add it back to the state 
updater, since we do not know " +
-"if it is fully restored yet.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
-}
-if (tasks.removePendingTaskToCloseClean(taskId)) {
-log.info(
-"We were planning on closing task {} because we 
lost one of its partitions." +
-"The task got reassigned to this thread, so cancel 
closing  of the task, but add it back to the " +
-"state updater, since we may have to catch up on 
the changelog.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
+if (task.isActive()) {
+if 
(!task.inputPartitions().equals(activeTasksToCreate.get(taskId))) {
+final 
CompletableFuture future = 
stateUpdater.removeWithFuture(taskId);
+futuresForUpdatingInputPartitions.put(taskId, future);
+newInputPartitions.put(taskId, 
activeTasksToCreate.get(taskId));
 }
 } else {
-removeTaskToRecycleFromStateUpdater(taskId, 
inputPartitions);
+final CompletableFuture 
future = stateUpdater.removeWithFuture(taskId);
+futuresForStandbyTasksToRecycle.put(taskId, future);
+activeInputPartitions.put(taskId, 
activeTasksToCreate.get(taskId));
 }
 activeTasksToCreate.remove(taskId);
 } else if (standbyTasksToCreate.containsKey(taskId)) {
 if (task.isActive()) {
-removeTaskToRecycleFromStateUpdater(taskId, 
standbyTasksToCreate.get(taskId));
-} else {
-if (tasks.removePendingTaskToRecycle(taskId) != null) {
-log.info(
-"We were planning on recycling standby task {} to 
an active task." +
-"The task got reassigned to this thread as a 
standby task, so cancel recycling of the task, " +
-"but add it back to the state updater, since 
we may have to catch up on the changelog.",
-taskId);
-tasks.addPendingTaskToAddBack(taskId);
-}
+final Comp