This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7782741262c KAFKA-10199: Change to RUNNING if no pending task to 
recycle exist (#14145)
7782741262c is described below

commit 7782741262c08e5735f7c8e09727ec37cb5f7f02
Author: Bruno Cadonna <[email protected]>
AuthorDate: Fri Aug 4 09:07:58 2023 +0200

    KAFKA-10199: Change to RUNNING if no pending task to recycle exist (#14145)
    
    A stream thread should only change to RUNNING if there are no
    active tasks in restoration in the state updater and if there
    are no pending tasks to recycle.
    
    There are situations in which a stream thread might only have
    standby tasks that are recycled to active task after a rebalance.
    In such situations, the stream thread might be faster in checking
    active tasks in restoration then the state updater removing the
    standby task to recycle from the state updater. If that happens
    the stream thread changes to RUNNING although it should wait until
    the standby tasks are recycled to active tasks and restored.
    
    Reviewers: Walker Carlson <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 .../streams/processor/internals/TaskManager.java   |  2 +-
 .../kafka/streams/processor/internals/Tasks.java   |  5 ++++
 .../streams/processor/internals/TasksRegistry.java |  2 ++
 .../processor/internals/TaskManagerTest.java       | 32 +++++++++++++++++++++-
 .../streams/processor/internals/TasksTest.java     | 25 +++++++++++++++++
 5 files changed, 64 insertions(+), 2 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9152d20721f..13d01dd94ae 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -763,7 +763,7 @@ public class TaskManager {
         if (stateUpdater.restoresActiveTasks()) {
             handleRestoredTasksFromStateUpdater(now, offsetResetter);
         }
-        return !stateUpdater.restoresActiveTasks();
+        return !stateUpdater.restoresActiveTasks() && 
!tasks.hasPendingTasksToRecycle();
     }
 
     private void recycleTaskFromStateUpdater(final Task task,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 7b3f7860fb7..d809bc8338e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -114,6 +114,11 @@ class Tasks implements TasksRegistry {
         pendingUpdateActions.put(taskId, 
PendingUpdateAction.createRecycleTask(inputPartitions));
     }
 
+    @Override
+    public boolean hasPendingTasksToRecycle() {
+        return pendingUpdateActions.values().stream().anyMatch(action -> 
action.getAction() == Action.RECYCLE);
+    }
+
     @Override
     public Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final 
TaskId taskId) {
         if (containsTaskIdWithAction(taskId, Action.UPDATE_INPUT_PARTITIONS)) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
index c93c4f145c6..a04cd0fa6f8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -37,6 +37,8 @@ public interface TasksRegistry {
 
     Set<TopicPartition> removePendingTaskToRecycle(final TaskId taskId);
 
+    boolean hasPendingTasksToRecycle();
+
     void addPendingTaskToRecycle(final TaskId taskId, final 
Set<TopicPartition> inputPartitions);
 
     Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId 
taskId);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 3c626a8adba..63d2a4b631f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -885,7 +885,7 @@ public class TaskManagerTest {
         
when(tasks.removePendingActiveTaskToSuspend(statefulTask.id())).thenReturn(true);
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+        taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, 
true);
         replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@@ -894,6 +894,7 @@ public class TaskManagerTest {
         Mockito.verify(statefulTask).suspend();
         Mockito.verify(tasks).addTask(statefulTask);
     }
+
     @Test
     public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
         final StreamTask taskToRecycle0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
@@ -947,6 +948,35 @@ public class TaskManagerTest {
         Mockito.verify(stateUpdater).add(taskToUpdateInputPartitions);
     }
 
+    @Test
+    public void 
shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() {
+        when(stateUpdater.restoresActiveTasks()).thenReturn(true);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        assertFalse(taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter));
+    }
+
+    @Test
+    public void 
shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringButPendingTasksToRecycle()
 {
+        when(stateUpdater.restoresActiveTasks()).thenReturn(false);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.hasPendingTasksToRecycle()).thenReturn(true);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        assertFalse(taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter));
+    }
+
+    @Test
+    public void 
shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTasksToRecycle()
 {
+        when(stateUpdater.restoresActiveTasks()).thenReturn(false);
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        when(tasks.hasPendingTasksToRecycle()).thenReturn(false);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+        assertTrue(taskManager.checkStateUpdater(time.milliseconds(), 
noOpResetter));
+    }
+
     @Test
     public void 
shouldAddActiveTaskWithRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend()
 {
         final StreamTask task = statefulTask(taskId00, 
taskId00ChangelogPartitions)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index d303eb4f603..8ff3bea570b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -45,7 +45,10 @@ public class TasksTest {
     private final static TopicPartition TOPIC_PARTITION_B_1 = new 
TopicPartition("topicB", 1);
     private final static TaskId TASK_0_0 = new TaskId(0, 0);
     private final static TaskId TASK_0_1 = new TaskId(0, 1);
+    private final static TaskId TASK_0_2 = new TaskId(0, 2);
     private final static TaskId TASK_1_0 = new TaskId(1, 0);
+    private final static TaskId TASK_1_1 = new TaskId(1, 1);
+    private final static TaskId TASK_1_2 = new TaskId(1, 2);
 
     private final Tasks tasks = new Tasks(new LogContext());
 
@@ -122,6 +125,28 @@ public class TasksTest {
         assertNull(tasks.removePendingTaskToRecycle(TASK_0_0));
     }
 
+    @Test
+    public void shouldVerifyIfPendingTaskToRecycleExist() {
+        assertFalse(tasks.hasPendingTasksToRecycle());
+        tasks.addPendingTaskToRecycle(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+        assertTrue(tasks.hasPendingTasksToRecycle());
+
+        tasks.addPendingTaskToRecycle(TASK_1_0, mkSet(TOPIC_PARTITION_A_1));
+        assertTrue(tasks.hasPendingTasksToRecycle());
+
+        tasks.addPendingTaskToCloseClean(TASK_0_1);
+        tasks.addPendingTaskToCloseDirty(TASK_0_2);
+        tasks.addPendingTaskToUpdateInputPartitions(TASK_1_1, 
mkSet(TOPIC_PARTITION_B_0));
+        tasks.addPendingActiveTaskToSuspend(TASK_1_2);
+        assertTrue(tasks.hasPendingTasksToRecycle());
+
+        tasks.removePendingTaskToRecycle(TASK_0_0);
+        assertTrue(tasks.hasPendingTasksToRecycle());
+
+        tasks.removePendingTaskToRecycle(TASK_1_0);
+        assertFalse(tasks.hasPendingTasksToRecycle());
+    }
+
     @Test
     public void shouldAddAndRemovePendingTaskToUpdateInputPartitions() {
         final Set<TopicPartition> expectedInputPartitions = 
mkSet(TOPIC_PARTITION_A_0);

Reply via email to