shashankhs11 commented on code in PR #20944:
URL: https://github.com/apache/kafka/pull/20944#discussion_r2560620857


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4360,55 +4326,85 @@ public void shouldHaveRemainingPartitionsUncleared() {
 
     @Test
     public void 
shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() {
-        final StateMachineTask migratedTask01 = new StateMachineTask(taskId01, 
taskId01Partitions, false, stateManager) {
-            @Override
-            public void suspend() {
-                super.suspend();
-                throw new TaskMigratedException("t1 close exception", new 
RuntimeException());
-            }
-        };
+        final StandbyTask migratedTask01 = standbyTask(taskId01, 
taskId01ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId01Partitions)
+            .build();
+        final StandbyTask migratedTask02 = standbyTask(taskId02, 
taskId02ChangelogPartitions)
+            .inState(State.RUNNING)
+            .withInputPartitions(taskId02Partitions)
+            .build();
 
-        final StateMachineTask migratedTask02 = new StateMachineTask(taskId02, 
taskId02Partitions, false, stateManager) {
-            @Override
-            public void suspend() {
-                super.suspend();
-                throw new TaskMigratedException("t2 close exception", new 
RuntimeException());
-            }
-        };
-        taskManager.addTask(migratedTask01);
-        taskManager.addTask(migratedTask02);
+        doThrow(new TaskMigratedException("t1 close exception", new 
RuntimeException()))
+            .when(migratedTask01).suspend();
+        doThrow(new TaskMigratedException("t2 close exception", new 
RuntimeException()))
+            .when(migratedTask02).suspend();
+
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
+
+        when(stateUpdater.tasks()).thenReturn(Set.of(migratedTask01, 
migratedTask02));
+
+        // mock futures for removing tasks from StateUpdater
+        final CompletableFuture<StateUpdater.RemovedTaskResult> future01 = new 
CompletableFuture<>();
+        when(stateUpdater.remove(taskId01)).thenReturn(future01);
+        future01.complete(new StateUpdater.RemovedTaskResult(migratedTask01));
+
+        final CompletableFuture<StateUpdater.RemovedTaskResult> future02 = new 
CompletableFuture<>();
+        when(stateUpdater.remove(taskId02)).thenReturn(future02);
+        future02.complete(new StateUpdater.RemovedTaskResult(migratedTask02));
 
         final TaskMigratedException thrown = assertThrows(
             TaskMigratedException.class,
             () -> taskManager.handleAssignment(emptyMap(), emptyMap())
         );
-        // The task map orders tasks based on topic group id and partition, so 
here
-        // t1 should always be the first.
+
+        // iteration order here is non-deterministic due to hashset.
+        // The last exception encountered wins, so we accept either
+        // t1 or t2's exception message.
         assertThat(
             thrown.getMessage(),
-            equalTo("t2 close exception; it means all tasks belonging to this 
thread should be migrated.")
+            anyOf(
+                equalTo("t1 close exception; it means all tasks belonging to 
this thread should be migrated."),
+                equalTo("t2 close exception; it means all tasks belonging to 
this thread should be migrated.")
+            )

Review Comment:
   - I reverted this test entirely and rewrote in new PR -- #20992 
   - I also added back `setupTaskManagerWithoutStateUpdater` in 76311ce
   - I will do a cleanup of this file in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to