[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

2022-09-14 Thread GitBox


guozhangwang commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r970973727


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final 
Map> 
activeTasksToCreate,
-  final Map> 
standbyTasksToCreate,
-  final Map> 
tasksToRecycle,
-  final Set tasksToCloseClean) {
+private void handleTasksWithStateUpdater(final Map> activeTasksToCreate,
+ final Map> standbyTasksToCreate,
+ final Map> tasksToRecycle,
+ final Set 
tasksToCloseClean) {
+handleRunningAndSuspendedTasks(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate);
+}
+
+private void handleRunningAndSuspendedTasks(final Map> activeTasksToCreate,
+final Map> standbyTasksToCreate,
+final Map> tasksToRecycle,
+final Set 
tasksToCloseClean) {
 for (final Task task : tasks.allTasks()) {
+if (!task.isActive()) {
+throw new IllegalStateException("Standby tasks should only be 
managed by the state updater");
+}
 final TaskId taskId = task.id();
 if (activeTasksToCreate.containsKey(taskId)) {
-if (task.isActive()) {
-final Set topicPartitions = 
activeTasksToCreate.get(taskId);
-if (tasks.updateActiveTaskInputPartitions(task, 
topicPartitions)) {
-task.updateInputPartitions(topicPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
-}
-task.resume();
-} else {
-throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-}
+handleReAssignedActiveTask(task, 
activeTasksToCreate.get(taskId));
 activeTasksToCreate.remove(taskId);
 } else if (standbyTasksToCreate.containsKey(taskId)) {
-if (!task.isActive()) {
-throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-} else {
-tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-}
+tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
 standbyTasksToCreate.remove(taskId);
 } else {
 tasksToCloseClean.add(task);
 }
 }
 }
 
-private void classifyTasksWithStateUpdater(final Map> activeTasksToCreate,
-   final Map> standbyTasksToCreate,
-   final Map> tasksToRecycle,
-   final Set 
tasksToCloseClean) {
-classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, 
tasksToRecycle, tasksToCloseClean);
+private void handleReAssignedActiveTask(final Task task,
+final Set 
inputPartitions) {
+if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+}
+if (task.state() == State.SUSPENDED) {
+task.resume();
+moveTaskFromTasksRegistryToStateUpdater(task);
+}

Review Comment:
   Yes, for within a single rebalance: with cooperative, the revocation and 
assignment happens at the same time, i.e. at the end of the rebalance, instead 
of revocation happening at the beginning and the assignment happens at the end, 
so for a revoked partition (hence task) we know it's definitely going to be 
reassigned for cooperative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

2022-09-13 Thread GitBox


guozhangwang commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r970078065


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -103,42 +101,75 @@ public void addPendingStandbyTasksToCreate(final 
Map
 
 @Override
 public Set removePendingTaskToRecycle(final TaskId taskId) 
{
-return pendingTasksToRecycle.remove(taskId);
+if (containsTaskIdWithAction(taskId, Action.RECYCLE)) {
+return pendingUpdateActions.remove(taskId).getInputPartitions();
+}
+return null;
 }
 
 @Override
 public void addPendingTaskToRecycle(final TaskId taskId, final 
Set inputPartitions) {
-pendingTasksToRecycle.put(taskId, inputPartitions);
+pendingUpdateActions.put(taskId, 
PendingUpdateAction.createRecycleTask(inputPartitions));
 }
 
 @Override
 public Set removePendingTaskToUpdateInputPartitions(final 
TaskId taskId) {
-return pendingTasksToUpdateInputPartitions.remove(taskId);
+if (containsTaskIdWithAction(taskId, Action.UPDATE_INPUT_PARTITIONS)) {
+return pendingUpdateActions.remove(taskId).getInputPartitions();
+}
+return null;
 }
 
 @Override
 public void addPendingTaskToUpdateInputPartitions(final TaskId taskId, 
final Set inputPartitions) {
-pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions);
+pendingUpdateActions.put(taskId, 
PendingUpdateAction.createUpdateInputPartition(inputPartitions));
 }
 
 @Override
 public boolean removePendingTaskToCloseDirty(final TaskId taskId) {
-return pendingTasksToCloseDirty.remove(taskId);
+if (containsTaskIdWithAction(taskId, Action.CLOSE_DIRTY)) {
+pendingUpdateActions.remove(taskId);
+return true;
+}
+return false;
 }
 
 @Override
 public void addPendingTaskToCloseDirty(final TaskId taskId) {
-pendingTasksToCloseDirty.add(taskId);
+pendingUpdateActions.put(taskId, 
PendingUpdateAction.createCloseDirty());
 }
 
 @Override
 public boolean removePendingTaskToCloseClean(final TaskId taskId) {
-return pendingTasksToCloseClean.remove(taskId);
+if (containsTaskIdWithAction(taskId, Action.CLOSE_CLEAN)) {
+pendingUpdateActions.remove(taskId);
+return true;
+}
+return false;
 }
 
 @Override
 public void addPendingTaskToCloseClean(final TaskId taskId) {
-pendingTasksToCloseClean.add(taskId);
+pendingUpdateActions.put(taskId, 
PendingUpdateAction.createCloseClean());
+}
+
+@Override
+public boolean removePendingActiveTaskToSuspend(final TaskId taskId) {
+if (containsTaskIdWithAction(taskId, Action.SUSPEND)) {
+pendingUpdateActions.remove(taskId);
+return true;
+}
+return false;
+}
+
+@Override
+public void addPendingActiveTaskToSuspend(final TaskId taskId) {
+pendingUpdateActions.put(taskId, PendingUpdateAction.createSuspend());
+}
+
+private boolean containsTaskIdWithAction(final TaskId taskId, final Action 
action) {
+final PendingUpdateAction pendingUpdateAction = 
pendingUpdateActions.get(taskId);
+return !(pendingUpdateAction == null || 
pendingUpdateAction.getAction() != action);

Review Comment:
   nit: why not put the `!` inside as `pendingUpdateAction != null && 
pendingUpdateAction.getAction() == action`?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final 
Map> 
activeTasksToCreate,
-  final Map> 
standbyTasksToCreate,
-  final Map> 
tasksToRecycle,
-  final Set tasksToCloseClean) {
+private void handleTasksWithStateUpdater(final Map> activeTasksToCreate,
+ final Map> standbyTasksToCreate,
+ final Map> tasksToRecycle,
+ final Set 
tasksToCloseClean) {
+handleRunningAndSuspendedTasks(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate);
+}
+
+private void handleRunningAndSuspendedTasks(final Map> activeTasksToCreate,
+final Map> standbyTasksToCreate,
+final Map> tasksToRecycle,
+final Set 
tasksToCloseClean) {
 for (final Task task : tasks.allTasks()) {
+if (!task.isActive()) {
+throw new IllegalStateException("Standby tasks should only be 
managed by the state updater"

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

2022-09-09 Thread GitBox


guozhangwang commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r967380695


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -250,32 +250,502 @@ public void 
shouldClassifyExistingTasksWithoutStateUpdater() {
 }
 
 @Test
-public void shouldClassifyExistingTasksWithStateUpdater() {
-final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true);
-final StandbyTask standbyTaskToRecycle = standbyTask(taskId02, 
mkSet(t2p2)).build();
-final StandbyTask standbyTaskToClose = standbyTask(taskId04, 
mkSet(t2p0)).build();
-final StreamTask restoringActiveTaskToRecycle = statefulTask(taskId03, 
mkSet(t1p3)).build();
-final StreamTask restoringActiveTaskToClose = statefulTask(taskId01, 
mkSet(t1p1)).build();
-final Map> standbyTasks =
-mkMap(mkEntry(standbyTaskToRecycle.id(), 
standbyTaskToRecycle.changelogPartitions()));
-final Map> restoringActiveTasks = mkMap(
-mkEntry(restoringActiveTaskToRecycle.id(), 
restoringActiveTaskToRecycle.changelogPartitions())
+public void shouldPrepareActiveTaskInStateUpdaterToBeRecycled() {

Review Comment:
   I made a brief pass on the testings, did not spot any obvious issues. Great 
job on improving the coverage!



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final 
Set remain
 for (final Task restoringTask : stateUpdater.getTasks()) {
 if (restoringTask.isActive()) {
 if 
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-tasks.addPendingTaskToCloseClean(restoringTask.id());
+
tasks.addPendingActiveTaskToSuspend(restoringTask.id());

Review Comment:
   I think we should remove the task from other pending sets if it was in them. 
Since in handling removed / restored tasks from state updater checking the 
other pending sets 
(https://github.com/apache/kafka/pull/12600/files#diff-8baa5d7209fc00074bf3fe24d709c2dcf2a44c1623d7ced8c0e29c1d832a3bcbR804)
 has a higher order, right?
   
   I was thinking about the following steps:
   
   T1: a first rebalance completes where a task was recycled to active, it was 
added to pending-recycle, and call to `stateUpdater.remove`.
   T2: a new rebalance starts, where we `handleRevocation`, and then this task 
is revoked, we add it to the pending-suspend sets as well
   T3: finally the task is returned from state updater, we would check the 
pending-recycle first and trigger that logic, which would be wrong.
   
   More generally, I think as a principle, a task should only be in one of the 
pending sets in Tasks. So whenever we are adding a a task to a pending set, we 
should make sure they are not in other pending sets.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -817,14 +867,17 @@ private boolean handleRestoredTasksFromStateUpdater(final 
long now,
 for (final Task task : stateUpdater.drainRestoredActiveTasks(timeout)) 
{
 Set inputPartitions;
 if ((inputPartitions = 
tasks.removePendingTaskToRecycle(task.id())) != null) {
-recycleTask(task, inputPartitions, tasksToCloseDirty, 
taskExceptions);
+recycleTaskFromStateUpdater(task, inputPartitions, 
tasksToCloseDirty, taskExceptions);
 } else if (tasks.removePendingTaskToCloseClean(task.id())) {
 closeTaskClean(task, tasksToCloseDirty, taskExceptions);
 } else if (tasks.removePendingTaskToCloseDirty(task.id())) {
 tasksToCloseDirty.add(task);
 } else if ((inputPartitions = 
tasks.removePendingTaskToUpdateInputPartitions(task.id())) != null) {
 task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
 transitRestoredTaskToRunning(task, now, offsetResetter);
+} else if (tasks.removePendingActiveTaskToSuspend(task.id())) {
+task.suspend();
+tasks.addTask(task);

Review Comment:
   Ah right, that makes sense. We'd still need the pending suspend tasks then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org