This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b043ca20742 KAFKA-19683: Remove dead tests and modify tests in
TaskManagerTest [1/N] (#20501)
b043ca20742 is described below
commit b043ca207424b201bdce8224870925a4b0eb0628
Author: Shashank <[email protected]>
AuthorDate: Tue Sep 16 11:46:20 2025 -0700
KAFKA-19683: Remove dead tests and modify tests in TaskManagerTest [1/N]
(#20501)
This is the first part of cleaning up of the tests in `TaskManagerTest`
- Removed dead tests
- Added new tests as suggested earlier
Reviewers: Lucas Brutschy <[email protected]>
---
.../processor/internals/TaskManagerTest.java | 101 ++++++++-------------
1 file changed, 39 insertions(+), 62 deletions(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 26a1523131b..8d83d1e99fa 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -152,6 +152,7 @@ public class TaskManagerTest {
private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
private final TopicPartition t2p2 = new TopicPartition(topic2, 1);
private final TopicPartition t1p1changelog = new
TopicPartition("changelog", 1);
+ private final TopicPartition t1p1changelog2 = new
TopicPartition("changelog2", 1);
private final Set<TopicPartition> taskId01Partitions = Set.of(t1p1);
private final Set<TopicPartition> taskId01ChangelogPartitions =
Set.of(t1p1changelog);
private final Map<TaskId, Set<TopicPartition>> taskId01Assignment =
singletonMap(taskId01, taskId01Partitions);
@@ -218,6 +219,10 @@ public class TaskManagerTest {
taskManager =
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
}
+ private TaskManager setUpTaskManager(final ProcessingMode processingMode,
final TasksRegistry tasks) {
+ return setUpTaskManager(processingMode, tasks, false);
+ }
+
private TaskManager setUpTaskManager(final ProcessingMode processingMode,
final boolean stateUpdaterEnabled) {
return setUpTaskManager(processingMode, null, stateUpdaterEnabled,
false);
}
@@ -249,52 +254,6 @@ public class TaskManagerTest {
return taskManager;
}
- @Test
- public void shouldClassifyExistingTasksWithoutStateUpdater() {
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, false);
- final Map<TaskId, Set<TopicPartition>> runningActiveTasks =
mkMap(mkEntry(taskId01, Set.of(t1p1)));
- final Map<TaskId, Set<TopicPartition>> standbyTasks =
mkMap(mkEntry(taskId02, Set.of(t2p2)));
- final Map<TaskId, Set<TopicPartition>> restoringActiveTasks =
mkMap(mkEntry(taskId03, Set.of(t1p3)));
- final Map<TaskId, Set<TopicPartition>> activeTasks = new
HashMap<>(runningActiveTasks);
- activeTasks.putAll(restoringActiveTasks);
- handleAssignment(runningActiveTasks, standbyTasks,
restoringActiveTasks);
-
- taskManager.handleAssignment(activeTasks, standbyTasks);
-
- verifyNoInteractions(stateUpdater);
- }
-
- @Test
- public void
shouldNotUpdateExistingStandbyTaskIfStandbyIsReassignedWithSameInputPartitionWithoutStateUpdater()
{
- final StandbyTask standbyTask = standbyTask(taskId03,
taskId03ChangelogPartitions)
- .inState(State.RUNNING)
- .withInputPartitions(taskId03Partitions).build();
-
updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask,
taskId03Partitions);
- verify(standbyTask,
never()).updateInputPartitions(eq(taskId03Partitions), any());
- }
-
- @Test
- public void
shouldUpdateExistingStandbyTaskIfStandbyIsReassignedWithDifferentInputPartitionWithoutStateUpdater()
{
- final StandbyTask standbyTask = standbyTask(taskId03,
taskId03ChangelogPartitions)
- .inState(State.RUNNING)
- .withInputPartitions(taskId03Partitions).build();
-
updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask,
taskId04Partitions);
- verify(standbyTask).updateInputPartitions(eq(taskId04Partitions),
any());
- }
-
- private void
updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(final Task
standbyTask,
-
final Set<TopicPartition> newInputPartition) {
- final TasksRegistry tasks = mock(TasksRegistry.class);
- when(tasks.allTasks()).thenReturn(Set.of(standbyTask));
- final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
-
- taskManager.handleAssignment(
- Collections.emptyMap(),
- mkMap(mkEntry(standbyTask.id(), newInputPartition))
- );
-
- verify(standbyTask).resume();
- }
@Test
public void shouldLockAllTasksOnCorruptionWithProcessingThreads() {
@@ -1853,14 +1812,20 @@ public class TaskManagerTest {
}
@Test
- public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws
Exception {
- final Map<TopicPartition, Long> changelogOffsets = mkMap(
- mkEntry(new TopicPartition("changelog", 0), Task.LATEST_OFFSET),
- mkEntry(new TopicPartition("changelog", 1), Task.LATEST_OFFSET)
- );
- final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00,
Task.LATEST_OFFSET));
+ public void shouldComputeOffsetSumForRunningStatefulTask() {
+ final StreamTask runningStatefulTask = statefulTask(taskId00,
taskId00ChangelogPartitions)
+ .inState(State.RUNNING).build();
+ final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
+ when(runningStatefulTask.changelogOffsets())
+ .thenReturn(mkMap(mkEntry(t1p0changelog,
changelogOffsetOfRunningTask)));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+ when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00,
runningStatefulTask)));
- computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
+ assertThat(
+ taskManager.taskOffsetSums(),
+ is(mkMap(mkEntry(taskId00, changelogOffsetOfRunningTask)))
+ );
}
@Test
@@ -1911,14 +1876,14 @@ public class TaskManagerTest {
}
@Test
- public void
shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() {
+ public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTask()
{
final StreamTask runningStatefulTask = statefulTask(taskId00,
taskId00ChangelogPartitions)
.inState(State.RUNNING).build();
final StreamTask restoringStatefulTask = statefulTask(taskId01,
taskId01ChangelogPartitions)
.inState(State.RESTORING).build();
final StandbyTask restoringStandbyTask = standbyTask(taskId02,
taskId02ChangelogPartitions)
.inState(State.RUNNING).build();
- final long changelogOffsetOfRunningTask = 42L;
+ final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
final long changelogOffsetOfRestoringStatefulTask = 24L;
final long changelogOffsetOfRestoringStandbyTask = 84L;
when(runningStatefulTask.changelogOffsets())
@@ -1943,14 +1908,26 @@ public class TaskManagerTest {
}
@Test
- public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws
Exception {
- final Map<TopicPartition, Long> changelogOffsets = mkMap(
- mkEntry(new TopicPartition("changelog", 0),
OffsetCheckpoint.OFFSET_UNKNOWN),
- mkEntry(new TopicPartition("changelog", 1), 10L)
- );
- final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00,
10L));
+ public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() {
+ final StreamTask restoringStatefulTask = statefulTask(taskId01,
taskId01ChangelogPartitions)
+ .inState(State.RESTORING).build();
+ final long changelogOffsetOfRestoringStandbyTask = 84L;
+ when(restoringStatefulTask.changelogOffsets())
+ .thenReturn(mkMap(
+ mkEntry(t1p1changelog, changelogOffsetOfRestoringStandbyTask),
+ mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
+ ));
+ final TasksRegistry tasks = mock(TasksRegistry.class);
+ final TaskManager taskManager =
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+ when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01,
restoringStatefulTask)));
+ when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
- computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
+ assertThat(
+ taskManager.taskOffsetSums(),
+ is(mkMap(
+ mkEntry(taskId01, changelogOffsetOfRestoringStandbyTask)
+ ))
+ );
}
private void computeOffsetSumAndVerify(final Map<TopicPartition, Long>
changelogOffsets,