clolov commented on code in PR #13681: URL: https://github.com/apache/kafka/pull/13681#discussion_r1191109013
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() { when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions); when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions); taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); - expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer))) - .andStubReturn(task01Converted); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); - expectLastCall().once(); + when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), Mockito.eq(taskId01Partitions), Review Comment: ```suggestion when(activeTaskCreator.createActiveTaskFromStandby(task01, taskId01Partitions, ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -879,13 +880,10 @@ public void shouldCloseTasksRemovedFromStateUpdater() { when(tasks.removePendingTaskToCloseClean(task00.id())).thenReturn(true); when(tasks.removePendingTaskToCloseClean(task01.id())).thenReturn(true); taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); - expectLastCall().once(); - replay(activeTaskCreator); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); - verify(activeTaskCreator); + Mockito.verify(activeTaskCreator, times(1)).closeAndRemoveTaskProducerIfNeeded(any()); Review Comment: ```suggestion Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any()); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -960,10 +958,8 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { when(stateUpdater.drainRemovedTasks()) .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions)); when(stateUpdater.restoresActiveTasks()).thenReturn(true); - expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), eq(taskId01Partitions), eq(consumer))) - .andStubReturn(convertedTask1); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); - expectLastCall().times(2); + when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(taskToRecycle1), Mockito.eq(taskId01Partitions), Review Comment: ```suggestion when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, taskId01Partitions, ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() { when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions); when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions); taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); - expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer))) - .andStubReturn(task01Converted); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); - expectLastCall().once(); + when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), Mockito.eq(taskId01Partitions), + Mockito.eq(consumer))).thenReturn(task01Converted); expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions))) .andStubReturn(task00Converted); - replay(activeTaskCreator, standbyTaskCreator); + replay(standbyTaskCreator); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); - verify(activeTaskCreator, standbyTaskCreator); + verify(standbyTaskCreator); + Mockito.verify(activeTaskCreator, times(1)).closeAndRemoveTaskProducerIfNeeded(any()); Review Comment: ```suggestion Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any()); ``` The default check of verify is times(1) as far as I know. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2162,11 +2140,12 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() { // handleAssignment expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andStubReturn(singleton(corruptedStandby)); - expect(activeTaskCreator.createTasks(anyObject(), eq(taskId01Assignment))).andStubReturn(singleton(runningNonCorruptedActive)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId01Assignment))) + .thenReturn(singleton(runningNonCorruptedActive)); Review Comment: ```suggestion .thenReturn(singleton(runningNonCorruptedActive)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2082,17 +2062,16 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { assignment.putAll(taskId01Assignment); // `handleAssignment` - expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))) - .andStubReturn(asList(corruptedTask, nonCorruptedTask)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))) + .thenReturn(asList(corruptedTask, nonCorruptedTask)); Review Comment: ```suggestion .thenReturn(asList(corruptedTask, nonCorruptedTask)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2202,14 +2181,15 @@ public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() { final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); assignment.putAll(taskId00Assignment); assignment.putAll(taskId01Assignment); - expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))) + .thenReturn(asList(corruptedActive, uncorruptedActive)); Review Comment: ```suggestion .thenReturn(asList(corruptedActive, uncorruptedActive)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2082,17 +2062,16 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { assignment.putAll(taskId01Assignment); // `handleAssignment` - expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))) - .andStubReturn(asList(corruptedTask, nonCorruptedTask)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))) + .thenReturn(asList(corruptedTask, nonCorruptedTask)); Review Comment: ```suggestion .thenReturn(asList(corruptedTask, nonCorruptedTask)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2412,15 +2394,15 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions expectRestoreToBeCompleted(consumer); - expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))) + .thenReturn(asList(revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded)); Review Comment: ```suggestion .thenReturn(asList(revokedActiveTask, unrevokedActiveTaskWithCommitNeeded, unrevokedActiveTaskWithoutCommitNeeded)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2750,11 +2726,10 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo ); expectRestoreToBeCompleted(consumer); - expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))) - .andReturn(asList(task00, task01, task02)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))) + .thenReturn(asList(task00, task01, task02)); Review Comment: ```suggestion .thenReturn(asList(task00, task01, task02)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3087,13 +3053,11 @@ public Set<TopicPartition> changelogPartitions() { final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets); - expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(singletonList(task00)); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00)); - expectLastCall().andThrow(new RuntimeException("whatever")); - activeTaskCreator.closeThreadProducerIfNeeded(); - expectLastCall(); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(singletonList(task00)); + doThrow(new RuntimeException("whatever")) + .when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(Mockito.eq(taskId00)); Review Comment: ```suggestion .when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3016,14 +2985,10 @@ public void closeDirty() { } }; - expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))) - .andStubReturn(asList(task00, task01, task02, task03)); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); - expectLastCall().times(4); - activeTaskCreator.closeThreadProducerIfNeeded(); - expectLastCall(); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))) + .thenReturn(asList(task00, task01, task02, task03)); Review Comment: ```suggestion .thenReturn(asList(task00, task01, task02, task03)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2822,16 +2797,14 @@ public void shouldCommitAllNeededTasksOnHandleRevocation() { ); expectRestoreToBeCompleted(consumer); - expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))) - .andReturn(asList(task00, task01, task02)); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(taskId00); - expectLastCall(); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))) + .thenReturn(asList(task00, task01, task02)); Review Comment: ```suggestion .thenReturn(asList(task00, task01, task02)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3120,8 +3084,9 @@ public Set<TopicPartition> changelogPartitions() { assertThat(exception.getCause().getMessage(), is("whatever")); assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); + Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(Mockito.eq(taskId00)); Review Comment: ```suggestion Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3484,14 +3435,14 @@ public void shouldCommitActiveAndStandbyTasks() { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false); expectRestoreToBeCompleted(consumer); - expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))) - .andStubReturn(singletonList(task00)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))) + .thenReturn(singletonList(task00)); Review Comment: ```suggestion .thenReturn(singletonList(task00)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2082,17 +2062,16 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { assignment.putAll(taskId01Assignment); // `handleAssignment` - expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))) - .andStubReturn(asList(corruptedTask, nonCorruptedTask)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))) + .thenReturn(asList(corruptedTask, nonCorruptedTask)); Review Comment: Could you keep the indentation consistent with the rest of the file? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() { @Test public void shouldCommitViaProducerIfEosAlphaEnabled() { final StreamsProducer producer = EasyMock.mock(StreamsProducer.class); - expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class))) - .andReturn(producer) - .andReturn(producer); + when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))) + .thenReturn(producer) Review Comment: ```suggestion .thenReturn(producer) ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -960,10 +958,8 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { when(stateUpdater.drainRemovedTasks()) .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions)); when(stateUpdater.restoresActiveTasks()).thenReturn(true); - expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), eq(taskId01Partitions), eq(consumer))) - .andStubReturn(convertedTask1); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); - expectLastCall().times(2); + when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(taskToRecycle1), Mockito.eq(taskId01Partitions), + Mockito.eq(consumer))).thenReturn(convertedTask1); Review Comment: ```suggestion consumer)).thenReturn(convertedTask1); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() { when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions); when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions); taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); - expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer))) - .andStubReturn(task01Converted); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); - expectLastCall().once(); + when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), Mockito.eq(taskId01Partitions), + Mockito.eq(consumer))).thenReturn(task01Converted); Review Comment: ```suggestion consumer)).thenReturn(task01Converted); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3528,14 +3479,14 @@ public void shouldCommitProvidedTasksIfNeeded() { ); expectRestoreToBeCompleted(consumer); - expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))) - .andStubReturn(Arrays.asList(task00, task01, task02)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))) + .thenReturn(Arrays.asList(task00, task01, task02)); Review Comment: ```suggestion .thenReturn(Arrays.asList(task00, task01, task02)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3587,12 +3537,12 @@ public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throw makeTaskFolders(taskId00.toString(), task01.toString()); expectLockObtainedFor(taskId00, taskId01); expectRestoreToBeCompleted(consumer); - expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))) - .andStubReturn(singletonList(task00)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))) + .thenReturn(singletonList(task00)); Review Comment: ```suggestion .thenReturn(singletonList(task00)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() { @Test public void shouldCommitViaProducerIfEosAlphaEnabled() { final StreamsProducer producer = EasyMock.mock(StreamsProducer.class); - expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class))) - .andReturn(producer) - .andReturn(producer); + when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))) + .thenReturn(producer) + .thenReturn(producer); Review Comment: ```suggestion ``` You do not need to specify this multiple times. If you specify it once Mockito should take care of continuing to return the same mock on every invocation. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2125,12 +2104,12 @@ public void shouldNotCommitNonRunningNonCorruptedTasks() { assignment.putAll(taskId01Assignment); // `handleAssignment` - expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))) - .andStubReturn(asList(corruptedTask, nonRunningNonCorruptedTask)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))) + .thenReturn(asList(corruptedTask, nonRunningNonCorruptedTask)); Review Comment: ```suggestion .thenReturn(asList(corruptedTask, nonRunningNonCorruptedTask)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() { @Test public void shouldCommitViaProducerIfEosAlphaEnabled() { final StreamsProducer producer = EasyMock.mock(StreamsProducer.class); - expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class))) - .andReturn(producer) - .andReturn(producer); + when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))) + .thenReturn(producer) Review Comment: ```suggestion .thenReturn(producer) ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2330,7 +2311,8 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); assignment.putAll(taskId00Assignment); assignment.putAll(taskId01Assignment); - expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActiveTask, uncorruptedActiveTask)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))) + .thenReturn(asList(corruptedActiveTask, uncorruptedActiveTask)); Review Comment: ```suggestion .thenReturn(asList(corruptedActiveTask, uncorruptedActiveTask)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2254,7 +2234,8 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); assignment.putAll(taskId00Assignment); assignment.putAll(taskId01Assignment); - expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(asList(corruptedActive, uncorruptedActive)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))) + .thenReturn(asList(corruptedActive, uncorruptedActive)); Review Comment: ```suggestion .thenReturn(asList(corruptedActive, uncorruptedActive)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4493,9 +4441,9 @@ public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); final StreamsProducer producer = mock(StreamsProducer.class); - expect(activeTaskCreator.threadProducer()) - .andReturn(producer) - .andReturn(producer); + when(activeTaskCreator.threadProducer()) + .thenReturn(producer) + .thenReturn(producer); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3920,9 +3869,10 @@ public void shouldProcessActiveTasks() { assignment.put(taskId01, taskId01Partitions); expectRestoreToBeCompleted(consumer); - expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(Arrays.asList(task00, task01)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))) + .thenReturn(Arrays.asList(task00, task01)); Review Comment: ```suggestion .thenReturn(Arrays.asList(task00, task01)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4448,10 +4396,10 @@ public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() { final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false); final StreamsProducer producer = mock(StreamsProducer.class); - expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class))) - .andReturn(producer) - .andReturn(producer) - .andReturn(producer); + when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))) + .thenReturn(producer) + .thenReturn(producer) Review Comment: You do not need to specify this 3 times. If you specify it once Mockito takes care of returning the same result on every invocation. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() { @Test public void shouldCommitViaProducerIfEosAlphaEnabled() { final StreamsProducer producer = EasyMock.mock(StreamsProducer.class); - expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class))) - .andReturn(producer) - .andReturn(producer); + when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))) + .thenReturn(producer) Review Comment: ```suggestion .thenReturn(producer) ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3171,7 +3134,7 @@ public Set<TopicPartition> changelogPartitions() { assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); // the active task creator should also get closed (so that it closes the thread producer if applicable) - verify(activeTaskCreator); + Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(Mockito.eq(taskId00)); Review Comment: ```suggestion Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4448,10 +4396,10 @@ public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() { final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false); final StreamsProducer producer = mock(StreamsProducer.class); - expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class))) - .andReturn(producer) - .andReturn(producer) - .andReturn(producer); + when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))) + .thenReturn(producer) + .thenReturn(producer) Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -539,18 +542,15 @@ public void shouldCreateActiveTaskDuringAssignment() { final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); final Set<Task> createdTasks = mkSet(activeTaskToBeCreated); - expect(activeTaskCreator.createTasks(consumer, mkMap( - mkEntry(activeTaskToBeCreated.id(), activeTaskToBeCreated.inputPartitions()))) - ).andReturn(createdTasks); + final Map<TaskId, Set<TopicPartition>> tasksToBeCreated = mkMap( + mkEntry(activeTaskToBeCreated.id(), activeTaskToBeCreated.inputPartitions())); Review Comment: ```suggestion mkEntry(activeTaskToBeCreated.id(), activeTaskToBeCreated.inputPartitions())); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2476,10 +2458,9 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions expectRestoreToBeCompleted(consumer); - expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))).andReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))) + .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded)); Review Comment: ```suggestion .thenReturn(asList(revokedActiveTask, unrevokedActiveTask, unrevokedActiveTaskWithoutCommitNeeded)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4635,19 +4582,18 @@ public void shouldConvertStandbyTaskToActiveTask() { final StreamTask activeTask = mock(StreamTask.class); when(activeTask.id()).thenReturn(taskId00); when(activeTask.inputPartitions()).thenReturn(taskId00Partitions); - - expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet()); expect(standbyTaskCreator.createTasks(eq(taskId00Assignment))).andReturn(singletonList(standbyTask)); - expect(activeTaskCreator.createActiveTaskFromStandby(eq(standbyTask), eq(taskId00Partitions), anyObject())).andReturn(activeTask); - expect(activeTaskCreator.createTasks(anyObject(), eq(Collections.emptyMap()))).andReturn(Collections.emptySet()); + when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(standbyTask), Mockito.eq(taskId00Partitions), any())) + .thenReturn(activeTask); Review Comment: ```suggestion .thenReturn(activeTask); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1266,16 +1259,15 @@ public void shouldHandleExceptionThrownDuringClosingTaskProducerInCloseCleanRest .withInputPartitions(taskId00Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); final TaskManager taskManager = setUpCloseCleanRestoredTask(statefulTask, tasks); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id()); - expectLastCall().andThrow(new RuntimeException("Something happened")); - replay(activeTaskCreator); + final TaskId taskId = statefulTask.id(); + doThrow(new RuntimeException("Something happened")) + .when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId); Review Comment: ```suggestion .when(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4448,10 +4396,10 @@ public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() { final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false); final StreamsProducer producer = mock(StreamsProducer.class); - expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class))) - .andReturn(producer) - .andReturn(producer) - .andReturn(producer); + when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))) + .thenReturn(producer) + .thenReturn(producer) + .thenReturn(producer); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3137,13 +3102,11 @@ public Set<TopicPartition> changelogPartitions() { } }; - expect(activeTaskCreator.createTasks(anyObject(), eq(assignment))).andStubReturn(singletonList(task00)); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(eq(taskId00)); - expectLastCall(); - activeTaskCreator.closeThreadProducerIfNeeded(); - expectLastCall().andThrow(new RuntimeException("whatever")); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignment))).thenReturn(singletonList(task00)); + doThrow(new RuntimeException("whatever")).when(activeTaskCreator).closeThreadProducerIfNeeded(); expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList()); - replay(activeTaskCreator, standbyTaskCreator); + expect(standbyTaskCreator.createTasks(eq(emptyMap()))).andStubReturn(emptyList()); Review Comment: Isn't this the same as line 3107? If it is, is there a reason we need a copy of it? If not, can we get rid of it? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -767,20 +770,19 @@ public void shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() { final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose)); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id()); - expect(activeTaskCreator.createTasks( - consumer, - mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions())) - )).andReturn(emptySet()); expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet()); - replay(activeTaskCreator, standbyTaskCreator); + replay(standbyTaskCreator); taskManager.handleAssignment( mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions())), Collections.emptyMap() ); - verify(activeTaskCreator, standbyTaskCreator); + verify(standbyTaskCreator); + Mockito.verify(activeTaskCreator).createTasks( + consumer, Review Comment: ```suggestion consumer, ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -767,20 +770,19 @@ public void shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() { final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose)); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id()); - expect(activeTaskCreator.createTasks( - consumer, - mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions())) - )).andReturn(emptySet()); expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet()); - replay(activeTaskCreator, standbyTaskCreator); + replay(standbyTaskCreator); taskManager.handleAssignment( mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions())), Collections.emptyMap() ); - verify(activeTaskCreator, standbyTaskCreator); + verify(standbyTaskCreator); + Mockito.verify(activeTaskCreator).createTasks( + consumer, Review Comment: ```suggestion consumer, ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -767,20 +770,19 @@ public void shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() { final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(tasks.allTasks()).thenReturn(mkSet(activeTaskToClose)); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(activeTaskToClose.id()); - expect(activeTaskCreator.createTasks( - consumer, - mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions())) - )).andReturn(emptySet()); expect(standbyTaskCreator.createTasks(Collections.emptyMap())).andReturn(emptySet()); - replay(activeTaskCreator, standbyTaskCreator); + replay(standbyTaskCreator); taskManager.handleAssignment( mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions())), Collections.emptyMap() ); - verify(activeTaskCreator, standbyTaskCreator); + verify(standbyTaskCreator); + Mockito.verify(activeTaskCreator).createTasks( + consumer, + mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions())) Review Comment: ```suggestion mkMap(mkEntry(activeTaskToCreate.id(), activeTaskToCreate.inputPartitions())) ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3428,10 +3380,10 @@ public void shouldShutDownStateUpdaterAndAddRemovedTasksToTaskRegistry() { public void shouldInitializeNewActiveTasks() { final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true); expectRestoreToBeCompleted(consumer); - expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))) - .andStubReturn(singletonList(task00)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))) + .thenReturn(singletonList(task00)); Review Comment: ```suggestion .thenReturn(singletonList(task00)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -845,17 +847,16 @@ public void shouldRecycleTasksRemovedFromStateUpdater() { when(tasks.removePendingTaskToRecycle(task00.id())).thenReturn(taskId00Partitions); when(tasks.removePendingTaskToRecycle(task01.id())).thenReturn(taskId01Partitions); taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); - expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer))) - .andStubReturn(task01Converted); - activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); - expectLastCall().once(); + when(activeTaskCreator.createActiveTaskFromStandby(Mockito.eq(task01), Mockito.eq(taskId01Partitions), Review Comment: If none of the things are captors or things which allow you to generalise you don't need to use matchers. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3636,9 +3586,9 @@ public void shouldCommitViaConsumerIfEosDisabled() { @Test public void shouldCommitViaProducerIfEosAlphaEnabled() { final StreamsProducer producer = EasyMock.mock(StreamsProducer.class); - expect(activeTaskCreator.streamsProducerForTask(anyObject(TaskId.class))) - .andReturn(producer) - .andReturn(producer); + when(activeTaskCreator.streamsProducerForTask(any(TaskId.class))) + .thenReturn(producer) + .thenReturn(producer); Review Comment: ```suggestion .thenReturn(producer); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3876,14 +3825,14 @@ public void shouldMaybeCommitAllActiveTasksThatNeedCommit() { ); expectRestoreToBeCompleted(consumer); - expect(activeTaskCreator.createTasks(anyObject(), eq(assignmentActive))) - .andStubReturn(asList(task00, task01, task02, task03)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))) + .thenReturn(asList(task00, task01, task02, task03)); Review Comment: ```suggestion .thenReturn(asList(task00, task01, task02, task03)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4058,10 +4008,10 @@ public boolean process(final long wallClockTime) { }; expectRestoreToBeCompleted(consumer); - expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))) - .andStubReturn(singletonList(task00)); + when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))) + .thenReturn(singletonList(task00)); Review Comment: ```suggestion .thenReturn(singletonList(task00)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org