This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b043ca20742 KAFKA-19683: Remove dead tests and modify tests in 
TaskManagerTest [1/N] (#20501)
b043ca20742 is described below

commit b043ca207424b201bdce8224870925a4b0eb0628
Author: Shashank <[email protected]>
AuthorDate: Tue Sep 16 11:46:20 2025 -0700

    KAFKA-19683: Remove dead tests and modify tests in TaskManagerTest [1/N] 
(#20501)
    
    This is the first part of cleaning up of the tests in `TaskManagerTest`
    - Removed dead tests
    - Added new tests as suggested earlier
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../processor/internals/TaskManagerTest.java       | 101 ++++++++-------------
 1 file changed, 39 insertions(+), 62 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 26a1523131b..8d83d1e99fa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -152,6 +152,7 @@ public class TaskManagerTest {
     private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
     private final TopicPartition t2p2 = new TopicPartition(topic2, 1);
     private final TopicPartition t1p1changelog = new 
TopicPartition("changelog", 1);
+    private final TopicPartition t1p1changelog2 = new 
TopicPartition("changelog2", 1);
     private final Set<TopicPartition> taskId01Partitions = Set.of(t1p1);
     private final Set<TopicPartition> taskId01ChangelogPartitions = 
Set.of(t1p1changelog);
     private final Map<TaskId, Set<TopicPartition>> taskId01Assignment = 
singletonMap(taskId01, taskId01Partitions);
@@ -218,6 +219,10 @@ public class TaskManagerTest {
         taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
     }
 
+    private TaskManager setUpTaskManager(final ProcessingMode processingMode, 
final TasksRegistry tasks) {
+        return setUpTaskManager(processingMode, tasks, false);
+    }
+
     private TaskManager setUpTaskManager(final ProcessingMode processingMode, 
final boolean stateUpdaterEnabled) {
         return setUpTaskManager(processingMode, null, stateUpdaterEnabled, 
false);
     }
@@ -249,52 +254,6 @@ public class TaskManagerTest {
         return taskManager;
     }
 
-    @Test
-    public void shouldClassifyExistingTasksWithoutStateUpdater() {
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, false);
-        final Map<TaskId, Set<TopicPartition>> runningActiveTasks = 
mkMap(mkEntry(taskId01, Set.of(t1p1)));
-        final Map<TaskId, Set<TopicPartition>> standbyTasks = 
mkMap(mkEntry(taskId02, Set.of(t2p2)));
-        final Map<TaskId, Set<TopicPartition>> restoringActiveTasks = 
mkMap(mkEntry(taskId03, Set.of(t1p3)));
-        final Map<TaskId, Set<TopicPartition>> activeTasks = new 
HashMap<>(runningActiveTasks);
-        activeTasks.putAll(restoringActiveTasks);
-        handleAssignment(runningActiveTasks, standbyTasks, 
restoringActiveTasks);
-
-        taskManager.handleAssignment(activeTasks, standbyTasks);
-
-        verifyNoInteractions(stateUpdater);
-    }
-
-    @Test
-    public void 
shouldNotUpdateExistingStandbyTaskIfStandbyIsReassignedWithSameInputPartitionWithoutStateUpdater()
 {
-        final StandbyTask standbyTask = standbyTask(taskId03, 
taskId03ChangelogPartitions)
-            .inState(State.RUNNING)
-            .withInputPartitions(taskId03Partitions).build();
-        
updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask, 
taskId03Partitions);
-        verify(standbyTask, 
never()).updateInputPartitions(eq(taskId03Partitions), any());
-    }
-
-    @Test
-    public void 
shouldUpdateExistingStandbyTaskIfStandbyIsReassignedWithDifferentInputPartitionWithoutStateUpdater()
 {
-        final StandbyTask standbyTask = standbyTask(taskId03, 
taskId03ChangelogPartitions)
-            .inState(State.RUNNING)
-            .withInputPartitions(taskId03Partitions).build();
-        
updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(standbyTask, 
taskId04Partitions);
-        verify(standbyTask).updateInputPartitions(eq(taskId04Partitions), 
any());
-    }
-
-    private void 
updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(final Task 
standbyTask,
-                                                                               
    final Set<TopicPartition> newInputPartition) {
-        final TasksRegistry tasks = mock(TasksRegistry.class);
-        when(tasks.allTasks()).thenReturn(Set.of(standbyTask));
-        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);
-
-        taskManager.handleAssignment(
-            Collections.emptyMap(),
-            mkMap(mkEntry(standbyTask.id(), newInputPartition))
-        );
-
-        verify(standbyTask).resume();
-    }
 
     @Test
     public void shouldLockAllTasksOnCorruptionWithProcessingThreads() {
@@ -1853,14 +1812,20 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws 
Exception {
-        final Map<TopicPartition, Long> changelogOffsets = mkMap(
-            mkEntry(new TopicPartition("changelog", 0), Task.LATEST_OFFSET),
-            mkEntry(new TopicPartition("changelog", 1), Task.LATEST_OFFSET)
-        );
-        final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 
Task.LATEST_OFFSET));
+    public void shouldComputeOffsetSumForRunningStatefulTask() {
+        final StreamTask runningStatefulTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RUNNING).build();
+        final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
+        when(runningStatefulTask.changelogOffsets())
+            .thenReturn(mkMap(mkEntry(t1p0changelog, 
changelogOffsetOfRunningTask)));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
 
-        computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
+        assertThat(
+            taskManager.taskOffsetSums(),
+            is(mkMap(mkEntry(taskId00, changelogOffsetOfRunningTask)))
+        );
     }
 
     @Test
@@ -1911,14 +1876,14 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void 
shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() {
+    public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTask() 
{
         final StreamTask runningStatefulTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
             .inState(State.RUNNING).build();
         final StreamTask restoringStatefulTask = statefulTask(taskId01, 
taskId01ChangelogPartitions)
             .inState(State.RESTORING).build();
         final StandbyTask restoringStandbyTask = standbyTask(taskId02, 
taskId02ChangelogPartitions)
             .inState(State.RUNNING).build();
-        final long changelogOffsetOfRunningTask = 42L;
+        final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
         final long changelogOffsetOfRestoringStatefulTask = 24L;
         final long changelogOffsetOfRestoringStandbyTask = 84L;
         when(runningStatefulTask.changelogOffsets())
@@ -1943,14 +1908,26 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws 
Exception {
-        final Map<TopicPartition, Long> changelogOffsets = mkMap(
-            mkEntry(new TopicPartition("changelog", 0), 
OffsetCheckpoint.OFFSET_UNKNOWN),
-            mkEntry(new TopicPartition("changelog", 1), 10L)
-        );
-        final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 
10L));
+    public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() {
+        final StreamTask restoringStatefulTask = statefulTask(taskId01, 
taskId01ChangelogPartitions)
+            .inState(State.RESTORING).build();
+        final long changelogOffsetOfRestoringStandbyTask = 84L;
+        when(restoringStatefulTask.changelogOffsets())
+            .thenReturn(mkMap(
+                mkEntry(t1p1changelog, changelogOffsetOfRestoringStandbyTask),
+                mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN)
+            ));
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+        when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, 
restoringStatefulTask)));
+        when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
 
-        computeOffsetSumAndVerify(changelogOffsets, expectedOffsetSums);
+        assertThat(
+            taskManager.taskOffsetSums(),
+            is(mkMap(
+                mkEntry(taskId01, changelogOffsetOfRestoringStandbyTask)
+            ))
+        );
     }
 
     private void computeOffsetSumAndVerify(final Map<TopicPartition, Long> 
changelogOffsets,

Reply via email to