clolov commented on code in PR #13681:
URL: https://github.com/apache/kafka/pull/13681#discussion_r1191109013


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() {
         
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
         
when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
         taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), 
eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
+        when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), 
Mockito.eq(taskId01Partitions),

Review Comment:
   ```suggestion
           when(activeTaskCreator.createActiveTaskFromStandby(task01, 
taskId01Partitions,
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -879,13 +880,10 @@ public void shouldCloseTasksRemovedFromStateUpdater() {
         
when(tasks.removePendingTaskToCloseClean(task00.id())).thenReturn(true);
         
when(tasks.removePendingTaskToCloseClean(task01.id())).thenReturn(true);
         taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
-        replay(activeTaskCreator);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(activeTaskCreator);
+        Mockito.verify(activeTaskCreator, 
times(1)).closeAndRemoveTaskProducerIfNeeded(any());

Review Comment:
   ```suggestion
           
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -960,10 +958,8 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
         when(stateUpdater.drainRemovedTasks())
             .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, 
taskToUpdateInputPartitions));
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-        
expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), 
eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(convertedTask1);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().times(2);
+        
when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(taskToRecycle1), 
Mockito.eq(taskId01Partitions),

Review Comment:
   ```suggestion
           when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, 
taskId01Partitions,
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() {
         
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
         
when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
         taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), 
eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
+        when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), 
Mockito.eq(taskId01Partitions),
+                Mockito.eq(consumer))).thenReturn(task01Converted);
         expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), 
eq(taskId00Partitions)))
             .andStubReturn(task00Converted);
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator, 
times(1)).closeAndRemoveTaskProducerIfNeeded(any());

Review Comment:
   ```suggestion
           
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());
   ```
   The default check of verify is times(1) as far as I know.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2162,11 +2140,12 @@ public Map<TopicPartition, OffsetAndMetadata> 
prepareCommit() {
 
         // handleAssignment
         
expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singleton(corruptedStandby));
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId01Assignment))).andStubReturn(singleton(runningNonCorruptedActive));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId01Assignment)))
+                .thenReturn(singleton(runningNonCorruptedActive));

Review Comment:
   ```suggestion
               .thenReturn(singleton(runningNonCorruptedActive));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2082,17 +2062,16 @@ public void 
shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         assignment.putAll(taskId01Assignment);
 
         // `handleAssignment`
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
-            .andStubReturn(asList(corruptedTask, nonCorruptedTask));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedTask, nonCorruptedTask));

Review Comment:
   ```suggestion
               .thenReturn(asList(corruptedTask, nonCorruptedTask));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2202,14 +2181,15 @@ public void 
shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.putAll(taskId00Assignment);
         assignment.putAll(taskId01Assignment);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedActive, uncorruptedActive));

Review Comment:
   ```suggestion
               .thenReturn(asList(corruptedActive, uncorruptedActive));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2082,17 +2062,16 @@ public void 
shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         assignment.putAll(taskId01Assignment);
 
         // `handleAssignment`
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
-            .andStubReturn(asList(corruptedTask, nonCorruptedTask));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedTask, nonCorruptedTask));

Review Comment:
   ```suggestion
               .thenReturn(asList(corruptedTask, nonCorruptedTask));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2412,15 +2394,15 @@ public void markChangelogAsCorrupted(final 
Collection<TopicPartition> partitions
 
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive))).andReturn(asList(revokedActiveTask, 
unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
+                .thenReturn(asList(revokedActiveTask, 
unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));

Review Comment:
   ```suggestion
               .thenReturn(asList(revokedActiveTask, 
unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2750,11 +2726,10 @@ public void 
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
         );
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive)))
-            .andReturn(asList(task00, task01, task02));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
+                .thenReturn(asList(task00, task01, task02));

Review Comment:
   ```suggestion
               .thenReturn(asList(task00, task01, task02));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3087,13 +3053,11 @@ public Set<TopicPartition> changelogPartitions() {
         final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         task00.setCommittableOffsetsAndMetadata(offsets);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(singletonList(task00));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
-        expectLastCall().andThrow(new RuntimeException("whatever"));
-        activeTaskCreator.closeThreadProducerIfNeeded();
-        expectLastCall();
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignment))).thenReturn(singletonList(task00));
+        doThrow(new RuntimeException("whatever"))
+                
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(Mockito.eq(taskId00));

Review Comment:
   ```suggestion
               
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3016,14 +2985,10 @@ public void closeDirty() {
             }
         };
 
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
-            .andStubReturn(asList(task00, task01, task02, task03));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().times(4);
-        activeTaskCreator.closeThreadProducerIfNeeded();
-        expectLastCall();
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(task00, task01, task02, task03));

Review Comment:
   ```suggestion
               .thenReturn(asList(task00, task01, task02, task03));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2822,16 +2797,14 @@ public void 
shouldCommitAllNeededTasksOnHandleRevocation() {
         );
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive)))
-            .andReturn(asList(task00, task01, task02));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00);
-        expectLastCall();
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
+                .thenReturn(asList(task00, task01, task02));

Review Comment:
   ```suggestion
               .thenReturn(asList(task00, task01, task02));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3120,8 +3084,9 @@ public Set<TopicPartition> changelogPartitions() {
         assertThat(exception.getCause().getMessage(), is("whatever"));
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(Mockito.eq(taskId00));

Review Comment:
   ```suggestion
           
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3484,14 +3435,14 @@ public void shouldCommitActiveAndStandbyTasks() {
         final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, false);
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment)))
-            .andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment)))
+                .thenReturn(singletonList(task00));

Review Comment:
   ```suggestion
               .thenReturn(singletonList(task00));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2082,17 +2062,16 @@ public void 
shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
         assignment.putAll(taskId01Assignment);
 
         // `handleAssignment`
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
-            .andStubReturn(asList(corruptedTask, nonCorruptedTask));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedTask, nonCorruptedTask));

Review Comment:
   Could you keep the indentation consistent with the rest of the file?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() {
     @Test
     public void shouldCommitViaProducerIfEosAlphaEnabled() {
         final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-        
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)

Review Comment:
   ```suggestion
               .thenReturn(producer)
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -960,10 +958,8 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
         when(stateUpdater.drainRemovedTasks())
             .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, 
taskToUpdateInputPartitions));
         when(stateUpdater.restoresActiveTasks()).thenReturn(true);
-        
expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), 
eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(convertedTask1);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().times(2);
+        
when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(taskToRecycle1), 
Mockito.eq(taskId01Partitions),
+                Mockito.eq(consumer))).thenReturn(convertedTask1);

Review Comment:
   ```suggestion
                   consumer)).thenReturn(convertedTask1);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() {
         
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
         
when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
         taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), 
eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
+        when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), 
Mockito.eq(taskId01Partitions),
+                Mockito.eq(consumer))).thenReturn(task01Converted);

Review Comment:
   ```suggestion
                   consumer)).thenReturn(task01Converted);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3528,14 +3479,14 @@ public void shouldCommitProvidedTasksIfNeeded() {
         );
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive)))
-            .andStubReturn(Arrays.asList(task00, task01, task02));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
+                .thenReturn(Arrays.asList(task00, task01, task02));

Review Comment:
   ```suggestion
               .thenReturn(Arrays.asList(task00, task01, task02));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3587,12 +3537,12 @@ public void 
shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throw
         makeTaskFolders(taskId00.toString(), task01.toString());
         expectLockObtainedFor(taskId00, taskId01);
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment)))
-            .andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment)))
+                .thenReturn(singletonList(task00));

Review Comment:
   ```suggestion
               .thenReturn(singletonList(task00));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() {
     @Test
     public void shouldCommitViaProducerIfEosAlphaEnabled() {
         final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-        
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)
+                .thenReturn(producer);

Review Comment:
   ```suggestion
   ```
   You do not need to specify this multiple times. If you specify it once 
Mockito should take care of continuing to return the same mock on every 
invocation.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2125,12 +2104,12 @@ public void 
shouldNotCommitNonRunningNonCorruptedTasks() {
         assignment.putAll(taskId01Assignment);
 
         // `handleAssignment`
-        expect(activeTaskCreator.createTasks(anyObject(), eq(assignment)))
-            .andStubReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedTask, nonRunningNonCorruptedTask));

Review Comment:
   ```suggestion
               .thenReturn(asList(corruptedTask, nonRunningNonCorruptedTask));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() {
     @Test
     public void shouldCommitViaProducerIfEosAlphaEnabled() {
         final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-        
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)

Review Comment:
   ```suggestion
               .thenReturn(producer)
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2330,7 +2311,8 @@ public void markChangelogAsCorrupted(final 
Collection<TopicPartition> partitions
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.putAll(taskId00Assignment);
         assignment.putAll(taskId01Assignment);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(asList(corruptedActiveTask, 
uncorruptedActiveTask));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedActiveTask, 
uncorruptedActiveTask));

Review Comment:
   ```suggestion
               .thenReturn(asList(corruptedActiveTask, uncorruptedActiveTask));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2254,7 +2234,8 @@ public void markChangelogAsCorrupted(final 
Collection<TopicPartition> partitions
         final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>();
         assignment.putAll(taskId00Assignment);
         assignment.putAll(taskId01Assignment);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(asList(corruptedActive, uncorruptedActive));

Review Comment:
   ```suggestion
               .thenReturn(asList(corruptedActive, uncorruptedActive));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4493,9 +4441,9 @@ public void 
shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
-        expect(activeTaskCreator.threadProducer())
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.threadProducer())
+                .thenReturn(producer)
+                .thenReturn(producer);

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3920,9 +3869,10 @@ public void shouldProcessActiveTasks() {
         assignment.put(taskId01, taskId01Partitions);
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(Arrays.asList(task00, task01));
+        when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment)))
+                .thenReturn(Arrays.asList(task00, task01));

Review Comment:
   ```suggestion
               .thenReturn(Arrays.asList(task00, task01));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4448,10 +4396,10 @@ public void 
shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
-        
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)
+                .thenReturn(producer)

Review Comment:
   You do not need to specify this 3 times. If you specify it once Mockito 
takes care of returning the same result on every invocation.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() {
     @Test
     public void shouldCommitViaProducerIfEosAlphaEnabled() {
         final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-        
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)

Review Comment:
   ```suggestion
               .thenReturn(producer)
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3171,7 +3134,7 @@ public Set<TopicPartition> changelogPartitions() {
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         // the active task creator should also get closed (so that it closes 
the thread producer if applicable)
-        verify(activeTaskCreator);
+        
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(Mockito.eq(taskId00));

Review Comment:
   ```suggestion
           
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4448,10 +4396,10 @@ public void 
shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
-        
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)
+                .thenReturn(producer)

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -539,18 +542,15 @@ public void shouldCreateActiveTaskDuringAssignment() {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         final Set<Task> createdTasks = mkSet(activeTaskToBeCreated);
-        expect(activeTaskCreator.createTasks(consumer, mkMap(
-            mkEntry(activeTaskToBeCreated.id(), 
activeTaskToBeCreated.inputPartitions())))
-        ).andReturn(createdTasks);
+        final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap(
+                mkEntry(activeTaskToBeCreated.id(), 
activeTaskToBeCreated.inputPartitions()));

Review Comment:
   ```suggestion
               mkEntry(activeTaskToBeCreated.id(), 
activeTaskToBeCreated.inputPartitions()));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2476,10 +2458,9 @@ public void markChangelogAsCorrupted(final 
Collection<TopicPartition> partitions
 
         expectRestoreToBeCompleted(consumer);
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTask, 
unrevokedActiveTaskWithoutCommitNeeded));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
+                .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, 
unrevokedActiveTaskWithoutCommitNeeded));

Review Comment:
   ```suggestion
               .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, 
unrevokedActiveTaskWithoutCommitNeeded));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4635,19 +4582,18 @@ public void shouldConvertStandbyTaskToActiveTask() {
         final StreamTask activeTask = mock(StreamTask.class);
         when(activeTask.id()).thenReturn(taskId00);
         when(activeTask.inputPartitions()).thenReturn(taskId00Partitions);
-
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
         
expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andReturn(singletonList(standbyTask));
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask), 
eq(taskId00Partitions), anyObject())).andReturn(activeTask);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(Collections.emptyMap()))).andReturn(Collections.emptySet());
+        
when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(standbyTask), 
Mockito.eq(taskId00Partitions), any()))
+                .thenReturn(activeTask);

Review Comment:
   ```suggestion
               .thenReturn(activeTask);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1266,16 +1259,15 @@ public void 
shouldHandleExceptionThrownDuringClosingTaskProducerInCloseCleanRest
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpCloseCleanRestoredTask(statefulTask, tasks);
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
-        expectLastCall().andThrow(new RuntimeException("Something happened"));
-        replay(activeTaskCreator);
+        final TaskId taskId = statefulTask.id();
+        doThrow(new RuntimeException("Something happened"))
+                
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId);

Review Comment:
   ```suggestion
               
.when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4448,10 +4396,10 @@ public void 
shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false);
 
         final StreamsProducer producer = mock(StreamsProducer.class);
-        
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)
+                .thenReturn(producer)
+                .thenReturn(producer);

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3137,13 +3102,11 @@ public Set<TopicPartition> changelogPartitions() {
             }
         };
 
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignment))).andStubReturn(singletonList(task00));
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00));
-        expectLastCall();
-        activeTaskCreator.closeThreadProducerIfNeeded();
-        expectLastCall().andThrow(new RuntimeException("whatever"));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignment))).thenReturn(singletonList(task00));
+        doThrow(new 
RuntimeException("whatever")).when(activeTaskCreator).closeThreadProducerIfNeeded();
         
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());
-        replay(activeTaskCreator, standbyTaskCreator);
+        
expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList());

Review Comment:
   Isn't this the same as line 3107? If it is, is there a reason we need a copy 
of it? If not, can we get rid of it?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -767,20 +770,19 @@ public void 
shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
-        expect(activeTaskCreator.createTasks(
-            consumer,
-            mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions()))
-        )).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions())),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(
+                consumer,

Review Comment:
   ```suggestion
               consumer,
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -767,20 +770,19 @@ public void 
shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
-        expect(activeTaskCreator.createTasks(
-            consumer,
-            mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions()))
-        )).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions())),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(
+                consumer,

Review Comment:
   ```suggestion
               consumer,
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -767,20 +770,19 @@ public void 
shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() {
         final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
         when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose));
-        
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id());
-        expect(activeTaskCreator.createTasks(
-            consumer,
-            mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions()))
-        )).andReturn(emptySet());
         
expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet());
-        replay(activeTaskCreator, standbyTaskCreator);
+        replay(standbyTaskCreator);
 
         taskManager.handleAssignment(
             mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions())),
             Collections.emptyMap()
         );
 
-        verify(activeTaskCreator, standbyTaskCreator);
+        verify(standbyTaskCreator);
+        Mockito.verify(activeTaskCreator).createTasks(
+                consumer,
+                mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions()))

Review Comment:
   ```suggestion
               mkMap(mkEntry(activeTaskToCreate.id(), 
activeTaskToCreate.inputPartitions()))
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3428,10 +3380,10 @@ public void 
shouldShutDownStateUpdaterAndAddRemovedTasksToTaskRegistry() {
     public void shouldInitializeNewActiveTasks() {
         final StateMachineTask task00 = new StateMachineTask(taskId00, 
taskId00Partitions, true);
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment)))
-            .andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment)))
+                .thenReturn(singletonList(task00));

Review Comment:
   ```suggestion
               .thenReturn(singletonList(task00));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() {
         
when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions);
         
when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions);
         taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), 
eq(taskId01Partitions), eq(consumer)))
-            .andStubReturn(task01Converted);
-        activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-        expectLastCall().once();
+        when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), 
Mockito.eq(taskId01Partitions),

Review Comment:
   If none of the things are captors or things which allow you to generalise 
you don't need to use matchers.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() {
     @Test
     public void shouldCommitViaProducerIfEosAlphaEnabled() {
         final StreamsProducer producer = EasyMock.mock(StreamsProducer.class);
-        
expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class)))
-            .andReturn(producer)
-            .andReturn(producer);
+        when(activeTaskCreator.streamsProducerForTask(any(TaskId.class)))
+                .thenReturn(producer)
+                .thenReturn(producer);

Review Comment:
   ```suggestion
               .thenReturn(producer);
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3876,14 +3825,14 @@ public void 
shouldMaybeCommitAllActiveTasksThatNeedCommit() {
         );
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(assignmentActive)))
-            .andStubReturn(asList(task00, task01, task02, task03));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
+                .thenReturn(asList(task00, task01, task02, task03));

Review Comment:
   ```suggestion
               .thenReturn(asList(task00, task01, task02, task03));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4058,10 +4008,10 @@ public boolean process(final long wallClockTime) {
         };
 
         expectRestoreToBeCompleted(consumer);
-        expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment)))
-            .andStubReturn(singletonList(task00));
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment)))
+                .thenReturn(singletonList(task00));

Review Comment:
   ```suggestion
               .thenReturn(singletonList(task00));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to