This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7782741262c KAFKA-10199: Change to RUNNING if no pending task to
recycle exist (#14145)
7782741262c is described below
commit 7782741262c08e5735f7c8e09727ec37cb5f7f02
Author: Bruno Cadonna <[email protected]>
AuthorDate: Fri Aug 4 09:07:58 2023 +0200
KAFKA-10199: Change to RUNNING if no pending task to recycle exist (#14145)
A stream thread should only change to RUNNING if there are no
active tasks in restoration in the state updater and if there
are no pending tasks to recycle.
There are situations in which a stream thread might only have
standby tasks that are recycled to active task after a rebalance.
In such situations, the stream thread might be faster in checking
active tasks in restoration then the state updater removing the
standby task to recycle from the state updater. If that happens
the stream thread changes to RUNNING although it should wait until
the standby tasks are recycled to active tasks and restored.
Reviewers: Walker Carlson <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../streams/processor/internals/TaskManager.java | 2 +-
.../kafka/streams/processor/internals/Tasks.java | 5 ++++
.../streams/processor/internals/TasksRegistry.java | 2 ++
.../processor/internals/TaskManagerTest.java | 32 +++++++++++++++++++++-
.../streams/processor/internals/TasksTest.java | 25 +++++++++++++++++
5 files changed, 64 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9152d20721f..13d01dd94ae 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -763,7 +763,7 @@ public class TaskManager {
if (stateUpdater.restoresActiveTasks()) {
handleRestoredTasksFromStateUpdater(now, offsetResetter);
}
- return !stateUpdater.restoresActiveTasks();
+ return !stateUpdater.restoresActiveTasks() &&
!tasks.hasPendingTasksToRecycle();
}
private void recycleTaskFromStateUpdater(final Task task,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
index 7b3f7860fb7..d809bc8338e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
@@ -114,6 +114,11 @@ class Tasks implements TasksRegistry {
pendingUpdateActions.put(taskId,
PendingUpdateAction.createRecycleTask(inputPartitions));
}
+ @Override
+ public boolean hasPendingTasksToRecycle() {
+ return pendingUpdateActions.values().stream().anyMatch(action ->
action.getAction() == Action.RECYCLE);
+ }
+
@Override
public Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final
TaskId taskId) {
if (containsTaskIdWithAction(taskId, Action.UPDATE_INPUT_PARTITIONS)) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
index c93c4f145c6..a04cd0fa6f8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TasksRegistry.java
@@ -37,6 +37,8 @@ public interface TasksRegistry {
Set<TopicPartition> removePendingTaskToRecycle(final TaskId taskId);
+ boolean hasPendingTasksToRecycle();
+
void addPendingTaskToRecycle(final TaskId taskId, final
Set<TopicPartition> inputPartitions);
Set<TopicPartition> removePendingTaskToUpdateInputPartitions(final TaskId
taskId);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 3c626a8adba..63d2a4b631f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -885,7 +885,7 @@ public class TaskManagerTest {
when(tasks.removePendingActiveTaskToSuspend(statefulTask.id())).thenReturn(true);
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+ taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks,
true);
replay(consumer);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
@@ -894,6 +894,7 @@ public class TaskManagerTest {
Mockito.verify(statefulTask).suspend();
Mockito.verify(tasks).addTask(statefulTask);
}
+
@Test
public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
final StreamTask taskToRecycle0 = statefulTask(taskId00,
taskId00ChangelogPartitions)
@@ -947,6 +948,35 @@ public class TaskManagerTest {
Mockito.verify(stateUpdater).add(taskToUpdateInputPartitions);
}
+ @Test
+ public void
shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() {
+ when(stateUpdater.restoresActiveTasks()).thenReturn(true);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ assertFalse(taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter));
+ }
+
+ @Test
+ public void
shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringButPendingTasksToRecycle()
{
+ when(stateUpdater.restoresActiveTasks()).thenReturn(false);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.hasPendingTasksToRecycle()).thenReturn(true);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ assertFalse(taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter));
+ }
+
+ @Test
+ public void
shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTasksToRecycle()
{
+ when(stateUpdater.restoresActiveTasks()).thenReturn(false);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ when(tasks.hasPendingTasksToRecycle()).thenReturn(false);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
+
+ assertTrue(taskManager.checkStateUpdater(time.milliseconds(),
noOpResetter));
+ }
+
@Test
public void
shouldAddActiveTaskWithRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend()
{
final StreamTask task = statefulTask(taskId00,
taskId00ChangelogPartitions)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
index d303eb4f603..8ff3bea570b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TasksTest.java
@@ -45,7 +45,10 @@ public class TasksTest {
private final static TopicPartition TOPIC_PARTITION_B_1 = new
TopicPartition("topicB", 1);
private final static TaskId TASK_0_0 = new TaskId(0, 0);
private final static TaskId TASK_0_1 = new TaskId(0, 1);
+ private final static TaskId TASK_0_2 = new TaskId(0, 2);
private final static TaskId TASK_1_0 = new TaskId(1, 0);
+ private final static TaskId TASK_1_1 = new TaskId(1, 1);
+ private final static TaskId TASK_1_2 = new TaskId(1, 2);
private final Tasks tasks = new Tasks(new LogContext());
@@ -122,6 +125,28 @@ public class TasksTest {
assertNull(tasks.removePendingTaskToRecycle(TASK_0_0));
}
+ @Test
+ public void shouldVerifyIfPendingTaskToRecycleExist() {
+ assertFalse(tasks.hasPendingTasksToRecycle());
+ tasks.addPendingTaskToRecycle(TASK_0_0, mkSet(TOPIC_PARTITION_A_0));
+ assertTrue(tasks.hasPendingTasksToRecycle());
+
+ tasks.addPendingTaskToRecycle(TASK_1_0, mkSet(TOPIC_PARTITION_A_1));
+ assertTrue(tasks.hasPendingTasksToRecycle());
+
+ tasks.addPendingTaskToCloseClean(TASK_0_1);
+ tasks.addPendingTaskToCloseDirty(TASK_0_2);
+ tasks.addPendingTaskToUpdateInputPartitions(TASK_1_1,
mkSet(TOPIC_PARTITION_B_0));
+ tasks.addPendingActiveTaskToSuspend(TASK_1_2);
+ assertTrue(tasks.hasPendingTasksToRecycle());
+
+ tasks.removePendingTaskToRecycle(TASK_0_0);
+ assertTrue(tasks.hasPendingTasksToRecycle());
+
+ tasks.removePendingTaskToRecycle(TASK_1_0);
+ assertFalse(tasks.hasPendingTasksToRecycle());
+ }
+
@Test
public void shouldAddAndRemovePendingTaskToUpdateInputPartitions() {
final Set<TopicPartition> expectedInputPartitions =
mkSet(TOPIC_PARTITION_A_0);