[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation
guozhangwang commented on code in PR #12600: URL: https://github.com/apache/kafka/pull/12600#discussion_r970973727 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final Map> activeTasksToCreate, - final Map> standbyTasksToCreate, - final Map> tasksToRecycle, - final Set tasksToCloseClean) { +private void handleTasksWithStateUpdater(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final Map> tasksToRecycle, + final Set tasksToCloseClean) { +handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); +handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate); +} + +private void handleRunningAndSuspendedTasks(final Map> activeTasksToCreate, +final Map> standbyTasksToCreate, +final Map> tasksToRecycle, +final Set tasksToCloseClean) { for (final Task task : tasks.allTasks()) { +if (!task.isActive()) { +throw new IllegalStateException("Standby tasks should only be managed by the state updater"); +} final TaskId taskId = task.id(); if (activeTasksToCreate.containsKey(taskId)) { -if (task.isActive()) { -final Set topicPartitions = activeTasksToCreate.get(taskId); -if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) { -task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id())); -} -task.resume(); -} else { -throw new IllegalStateException("Standby tasks should only be managed by the state updater"); -} +handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId)); activeTasksToCreate.remove(taskId); } else if (standbyTasksToCreate.containsKey(taskId)) { -if (!task.isActive()) { -throw new IllegalStateException("Standby tasks should only be managed by the state updater"); -} else { -tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); -} +tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); standbyTasksToCreate.remove(taskId); } else { tasksToCloseClean.add(task); } } } -private void classifyTasksWithStateUpdater(final Map> activeTasksToCreate, - final Map> standbyTasksToCreate, - final Map> tasksToRecycle, - final Set tasksToCloseClean) { -classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); +private void handleReAssignedActiveTask(final Task task, +final Set inputPartitions) { +if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) { +task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); +} +if (task.state() == State.SUSPENDED) { +task.resume(); +moveTaskFromTasksRegistryToStateUpdater(task); +} Review Comment: Yes, for within a single rebalance: with cooperative, the revocation and assignment happens at the same time, i.e. at the end of the rebalance, instead of revocation happening at the beginning and the assignment happens at the end, so for a revoked partition (hence task) we know it's definitely going to be reassigned for cooperative. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation
guozhangwang commented on code in PR #12600: URL: https://github.com/apache/kafka/pull/12600#discussion_r970078065 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java: ## @@ -103,42 +101,75 @@ public void addPendingStandbyTasksToCreate(final Map @Override public Set removePendingTaskToRecycle(final TaskId taskId) { -return pendingTasksToRecycle.remove(taskId); +if (containsTaskIdWithAction(taskId, Action.RECYCLE)) { +return pendingUpdateActions.remove(taskId).getInputPartitions(); +} +return null; } @Override public void addPendingTaskToRecycle(final TaskId taskId, final Set inputPartitions) { -pendingTasksToRecycle.put(taskId, inputPartitions); +pendingUpdateActions.put(taskId, PendingUpdateAction.createRecycleTask(inputPartitions)); } @Override public Set removePendingTaskToUpdateInputPartitions(final TaskId taskId) { -return pendingTasksToUpdateInputPartitions.remove(taskId); +if (containsTaskIdWithAction(taskId, Action.UPDATE_INPUT_PARTITIONS)) { +return pendingUpdateActions.remove(taskId).getInputPartitions(); +} +return null; } @Override public void addPendingTaskToUpdateInputPartitions(final TaskId taskId, final Set inputPartitions) { -pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions); +pendingUpdateActions.put(taskId, PendingUpdateAction.createUpdateInputPartition(inputPartitions)); } @Override public boolean removePendingTaskToCloseDirty(final TaskId taskId) { -return pendingTasksToCloseDirty.remove(taskId); +if (containsTaskIdWithAction(taskId, Action.CLOSE_DIRTY)) { +pendingUpdateActions.remove(taskId); +return true; +} +return false; } @Override public void addPendingTaskToCloseDirty(final TaskId taskId) { -pendingTasksToCloseDirty.add(taskId); +pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseDirty()); } @Override public boolean removePendingTaskToCloseClean(final TaskId taskId) { -return pendingTasksToCloseClean.remove(taskId); +if (containsTaskIdWithAction(taskId, Action.CLOSE_CLEAN)) { +pendingUpdateActions.remove(taskId); +return true; +} +return false; } @Override public void addPendingTaskToCloseClean(final TaskId taskId) { -pendingTasksToCloseClean.add(taskId); +pendingUpdateActions.put(taskId, PendingUpdateAction.createCloseClean()); +} + +@Override +public boolean removePendingActiveTaskToSuspend(final TaskId taskId) { +if (containsTaskIdWithAction(taskId, Action.SUSPEND)) { +pendingUpdateActions.remove(taskId); +return true; +} +return false; +} + +@Override +public void addPendingActiveTaskToSuspend(final TaskId taskId) { +pendingUpdateActions.put(taskId, PendingUpdateAction.createSuspend()); +} + +private boolean containsTaskIdWithAction(final TaskId taskId, final Action action) { +final PendingUpdateAction pendingUpdateAction = pendingUpdateActions.get(taskId); +return !(pendingUpdateAction == null || pendingUpdateAction.getAction() != action); Review Comment: nit: why not put the `!` inside as `pendingUpdateAction != null && pendingUpdateAction.getAction() == action`? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final Map> activeTasksToCreate, - final Map> standbyTasksToCreate, - final Map> tasksToRecycle, - final Set tasksToCloseClean) { +private void handleTasksWithStateUpdater(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final Map> tasksToRecycle, + final Set tasksToCloseClean) { +handleRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); +handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate); +} + +private void handleRunningAndSuspendedTasks(final Map> activeTasksToCreate, +final Map> standbyTasksToCreate, +final Map> tasksToRecycle, +final Set tasksToCloseClean) { for (final Task task : tasks.allTasks()) { +if (!task.isActive()) { +throw new IllegalStateException("Standby tasks should only be managed by the state updater"
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation
guozhangwang commented on code in PR #12600: URL: https://github.com/apache/kafka/pull/12600#discussion_r967380695 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -250,32 +250,502 @@ public void shouldClassifyExistingTasksWithoutStateUpdater() { } @Test -public void shouldClassifyExistingTasksWithStateUpdater() { -final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true); -final StandbyTask standbyTaskToRecycle = standbyTask(taskId02, mkSet(t2p2)).build(); -final StandbyTask standbyTaskToClose = standbyTask(taskId04, mkSet(t2p0)).build(); -final StreamTask restoringActiveTaskToRecycle = statefulTask(taskId03, mkSet(t1p3)).build(); -final StreamTask restoringActiveTaskToClose = statefulTask(taskId01, mkSet(t1p1)).build(); -final Map> standbyTasks = -mkMap(mkEntry(standbyTaskToRecycle.id(), standbyTaskToRecycle.changelogPartitions())); -final Map> restoringActiveTasks = mkMap( -mkEntry(restoringActiveTaskToRecycle.id(), restoringActiveTaskToRecycle.changelogPartitions()) +public void shouldPrepareActiveTaskInStateUpdaterToBeRecycled() { Review Comment: I made a brief pass on the testings, did not spot any obvious issues. Great job on improving the coverage! ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final Set remain for (final Task restoringTask : stateUpdater.getTasks()) { if (restoringTask.isActive()) { if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) { -tasks.addPendingTaskToCloseClean(restoringTask.id()); + tasks.addPendingActiveTaskToSuspend(restoringTask.id()); Review Comment: I think we should remove the task from other pending sets if it was in them. Since in handling removed / restored tasks from state updater checking the other pending sets (https://github.com/apache/kafka/pull/12600/files#diff-8baa5d7209fc00074bf3fe24d709c2dcf2a44c1623d7ced8c0e29c1d832a3bcbR804) has a higher order, right? I was thinking about the following steps: T1: a first rebalance completes where a task was recycled to active, it was added to pending-recycle, and call to `stateUpdater.remove`. T2: a new rebalance starts, where we `handleRevocation`, and then this task is revoked, we add it to the pending-suspend sets as well T3: finally the task is returned from state updater, we would check the pending-recycle first and trigger that logic, which would be wrong. More generally, I think as a principle, a task should only be in one of the pending sets in Tasks. So whenever we are adding a a task to a pending set, we should make sure they are not in other pending sets. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -817,14 +867,17 @@ private boolean handleRestoredTasksFromStateUpdater(final long now, for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) { Set inputPartitions; if ((inputPartitions = tasks.removePendingTaskToRecycle(task.id())) != null) { -recycleTask(task, inputPartitions, tasksToCloseDirty, taskExceptions); +recycleTaskFromStateUpdater(task, inputPartitions, tasksToCloseDirty, taskExceptions); } else if (tasks.removePendingTaskToCloseClean(task.id())) { closeTaskClean(task, tasksToCloseDirty, taskExceptions); } else if (tasks.removePendingTaskToCloseDirty(task.id())) { tasksToCloseDirty.add(task); } else if ((inputPartitions = tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) { task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); transitRestoredTaskToRunning(task, now, offsetResetter); +} else if (tasks.removePendingActiveTaskToSuspend(task.id())) { +task.suspend(); +tasks.addTask(task); Review Comment: Ah right, that makes sense. We'd still need the pending suspend tasks then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org