cadonna commented on code in PR #13711: URL: https://github.com/apache/kafka/pull/13711#discussion_r1217778982
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1854,6 +1855,8 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { assertThat(task00.state(), is(Task.State.CLOSED)); assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); + + Mockito.verify(standbyTaskCreator, times(2)).createTasks(Collections.emptyMap()); Review Comment: Also here no verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1889,6 +1891,8 @@ public void closeClean() { is("Encounter unexpected fatal error for task 0_0") ); assertThat(thrown.getCause().getMessage(), is("KABOOM!")); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: See above ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2232,6 +2238,8 @@ public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() { assertThat(uncorruptedActive.state(), is(State.RUNNING)); verify(consumer); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: See above ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1766,15 +1767,16 @@ public void shouldComputeOffsetSumFromCheckpointFileForClosedTask() throws Excep taskManager.handleRebalanceStart(singleton("topic")); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singleton(closedTask)); - expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator); + replay(activeTaskCreator); taskManager.handleAssignment(taskId00Assignment, emptyMap()); closedTask.suspend(); closedTask.closeClean(); assertThat(closedTask.state(), is(State.CLOSED)); assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums)); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: Also here, I do not think you need to verify. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2507,19 +2517,20 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions assertThat(revokedActiveTask.state(), is(State.SUSPENDED)); assertThat(unrevokedActiveTask.state(), is(State.CREATED)); assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING)); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2581,6 +2593,7 @@ public void shouldUpdateInputPartitionsAfterRebalance() { assertThat(task00.state(), is(Task.State.RUNNING)); assertEquals(newPartitionsSet, task00.inputPartitions()); verify(activeTaskCreator, consumer); + Mockito.verify(standbyTaskCreator, times(2)).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1988,6 +1991,8 @@ public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() { ); assertThat(thrown.getCause(), instanceOf(RuntimeException.class)); assertThat(thrown.getCause().getMessage(), is("KABOOM!")); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: See above ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1740,13 +1740,14 @@ public void shouldComputeOffsetSumFromCheckpointFileForUninitializedTask() throw taskManager.handleRebalanceStart(singleton("topic")); final StateMachineTask uninitializedTask = new StateMachineTask(taskId00, taskId00Partitions, true); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singleton(uninitializedTask)); - expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator); + replay(activeTaskCreator); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(uninitializedTask.state(), is(State.CREATED)); assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums)); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: I do not think you need this verification since also with EasyMock the call is not verified (i.e., `verify()` on the mock is never called). ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2932,6 +2947,8 @@ public void shouldNotCommitCreatedTasksOnRevocationOrClosure() { taskManager.handleAssignment(emptyMap(), emptyMap()); assertThat(task00.state(), is(Task.State.CLOSED)); + + Mockito.verify(standbyTaskCreator, times(2)).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2880,6 +2893,8 @@ public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() { assertThat(task00.commitNeeded, is(true)); assertThat(task10.commitPrepared, is(false)); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2067,6 +2072,8 @@ public void suspend() { verify(stateManager); verify(consumer); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: See above ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2956,6 +2972,8 @@ public void suspend() { assertThat(task00.state(), is(Task.State.SUSPENDED)); verify(consumer); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2108,6 +2113,8 @@ public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions)); verify(consumer); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: See above ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2031,6 +2035,8 @@ public void postCommit(final boolean enforceCheckpoint) { verify(stateManager); verify(consumer); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: See above ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3122,6 +3140,8 @@ public Set<TopicPartition> changelogPartitions() { assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); // the active task creator should also get closed (so that it closes the thread producer if applicable) verify(activeTaskCreator); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); } Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3307,6 +3327,8 @@ public void suspend() { assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); // the active task creator should also get closed (so that it closes the thread producer if applicable) verify(activeTaskCreator); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2144,6 +2149,8 @@ public void shouldNotCommitNonRunningNonCorruptedTasks() { verify(activeTaskCreator); assertFalse(nonRunningNonCorruptedTask.commitPrepared); verify(consumer); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: See above ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3172,6 +3191,8 @@ public Set<TopicPartition> changelogPartitions() { assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); // the active task creator should also get closed (so that it closes the thread producer if applicable) verify(activeTaskCreator); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3716,6 +3738,8 @@ public Map<TopicPartition, OffsetAndMetadata> prepareCommit() { final RuntimeException thrown = assertThrows(RuntimeException.class, () -> taskManager.commitAll()); assertThat(thrown.getMessage(), equalTo("opsh.")); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2295,6 +2302,8 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions assertThat(corruptedActive.state(), is(Task.State.CREATED)); assertThat(uncorruptedActive.state(), is(Task.State.CREATED)); verify(consumer); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: See above ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2379,6 +2387,8 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions assertThat(corruptedTaskChangelogMarkedAsCorrupted.get(), is(true)); assertThat(uncorruptedTaskChangelogMarkedAsCorrupted.get(), is(true)); verify(consumer); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3441,6 +3462,8 @@ public void shouldInitializeNewActiveTasks() { assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); // verifies that we actually resume the assignment at the end of restoration. verify(consumer); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3818,6 +3842,8 @@ public Map<TopicPartition, Long> purgeableOffsets() { taskManager.maybePurgeCommittedRecords(); verify(adminClient); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2433,6 +2442,8 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions assertThat(revokedActiveTask.state(), is(State.SUSPENDED)); assertThat(unrevokedActiveTaskWithCommitNeeded.state(), is(State.CREATED)); assertThat(unrevokedActiveTaskWithoutCommitNeeded.state(), is(State.RUNNING)); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3958,6 +3983,8 @@ public void shouldProcessActiveTasks() { // check that if there's no records proccssible, we would stop early assertThat(taskManager.process(3, time), is(5)); assertThat(taskManager.process(3, time), is(0)); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2556,6 +2567,8 @@ public void shouldAddNonResumedSuspendedTasks() { assertThat(task01.state(), is(Task.State.RUNNING)); verify(activeTaskCreator); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2606,6 +2618,7 @@ public void shouldAddNewActiveTasks() { assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); verify(activeTaskCreator); Mockito.verify(changeLogReader).enforceRestoreActive(); + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2652,6 +2664,7 @@ public void initializeIfNeeded() { assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); verify(activeTaskCreator); Mockito.verify(changeLogReader).enforceRestoreActive(); + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4075,6 +4102,8 @@ public boolean process(final long wallClockTime) { assertThat(exception.taskId().isPresent(), is(true)); assertThat(exception.taskId().get(), is(taskId00)); assertThat(exception.getCause().getMessage(), is("oops")); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4046,6 +4072,8 @@ public boolean process(final long wallClockTime) { task00.addRecords(partition, singletonList(getConsumerRecord(partition, 0L))); assertThrows(TaskMigratedException.class, () -> taskManager.process(1, time)); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3780,6 +3803,8 @@ public Map<TopicPartition, Long> purgeableOffsets() { taskManager.maybePurgeCommittedRecords(); verify(adminClient); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4110,15 +4140,16 @@ public boolean maybePunctuateStreamTime() { expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); - expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer); + replay(activeTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); assertThat(task00.state(), is(Task.State.RUNNING)); assertThrows(KafkaException.class, () -> taskManager.punctuate()); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2911,18 +2925,19 @@ public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() { taskManager.handleAssignment(assignmentActive, Collections.emptyMap()); assertThat(task00.commitNeeded, is(true)); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4161,15 +4192,16 @@ public Set<TopicPartition> changelogPartitions() { }; expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); - expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer); + replay(activeTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false)); assertThat(task00.state(), is(Task.State.RESTORING)); // this could be a bit mysterious; we're verifying _no_ interactions on the consumer, // since the taskManager should _not_ resume the assignment while we're still in RESTORING verify(consumer); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2690,6 +2702,7 @@ public void completeRestoration(final java.util.function.Consumer<Set<TopicParti assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); verify(activeTaskCreator); Mockito.verify(changeLogReader).enforceRestoreActive(); + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4149,6 +4178,8 @@ public boolean maybePunctuateSystemTime() { // one for stream and one for system time assertThat(taskManager.punctuate(), equalTo(2)); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4204,6 +4235,8 @@ public void shouldHaveRemainingPartitionsUncleared() { "tasks have been cleaned up by the handleAssignment callback.") ); } + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3070,6 +3087,8 @@ public void closeDirty() { assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); // the active task creator should also get closed (so that it closes the thread producer if applicable) verify(activeTaskCreator); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2700,18 +2713,19 @@ public void shouldSuspendActiveTasksDuringRevocation() { expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andReturn(singletonList(task00)); - expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); consumer.commitSync(offsets); expectLastCall(); - replay(activeTaskCreator, standbyTaskCreator, consumer); + replay(activeTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); assertThat(task00.state(), is(Task.State.RUNNING)); taskManager.handleRevocation(taskId00Partitions); assertThat(task00.state(), is(Task.State.SUSPENDED)); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4088,15 +4117,16 @@ public boolean maybePunctuateStreamTime() { expectRestoreToBeCompleted(consumer); expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); - expect(standbyTaskCreator.createTasks(anyObject())).andStubReturn(Collections.emptySet()); - replay(activeTaskCreator, standbyTaskCreator, consumer); + replay(activeTaskCreator, consumer); taskManager.handleAssignment(taskId00Assignment, emptyMap()); assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); assertThat(task00.state(), is(Task.State.RUNNING)); assertThrows(TaskMigratedException.class, () -> taskManager.punctuate()); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4595,6 +4627,8 @@ public void suspend() { assertThat(thrown.getCause().getMessage(), is("KABOOM!")); assertThat(task00.state(), is(Task.State.SUSPENDED)); assertThat(task01.state(), is(Task.State.SUSPENDED)); + + Mockito.verify(standbyTaskCreator).createTasks(Collections.emptyMap()); Review Comment: No verification needed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -352,16 +356,17 @@ public void shouldRemoveUnusedStandbyTaskFromStateUpdater() { .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = Mockito.mock(TasksRegistry.class); final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); - when(stateUpdater.getTasks()).thenReturn(mkSet(standbyTaskToClose)); + final Set<Task> standbyTasks = mkSet(standbyTaskToClose); + when(stateUpdater.getTasks()).thenReturn(standbyTasks); Review Comment: nit: Why did you introduce variable `standbyTasks`? The variable is not used anywhere else. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org