divijvaidya commented on code in PR #13873: URL: https://github.com/apache/kafka/pull/13873#discussion_r1234973245
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1046,15 +1036,13 @@ public void shouldTransitRestoredTaskToRunning() { .withInputPartitions(taskId00Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(task, tasks); - consumer.resume(task.inputPartitions()); - replay(consumer); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); Mockito.verify(task).completeRestoration(noOpResetter); Mockito.verify(task).clearTaskTimeout(); Mockito.verify(tasks).addTask(task); - verify(consumer); + Mockito.verify(consumer).resume(task.inputPartitions()); Review Comment: could you please add `verifyNoMoreInteraction`for consumer mock here. Asking because looks like there should be more than one invocation of consumer.resume() in this test but we are only testing for one. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() { assertEquals(taskManager.notPausedTasks().size(), 0); } - private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer) { + private static void expectAssignmentToBeCalled(final Consumer<byte[], byte[]> consumer) { final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0)); - expect(consumer.assignment()).andReturn(assignment); - consumer.resume(assignment); - expectLastCall(); + when(consumer.assignment()).thenReturn(assignment); + } + + private static void verifyResumeWasCalled(final Consumer<byte[], byte[]> consumer) { + final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0)); + Mockito.verify(consumer, atLeastOnce()).resume(assignment); + } + + private static void verifyResumeWasCalledWith(final Consumer<byte[], byte[]> consumer, Set<TopicPartition> assignment) { Review Comment: s/verifyResumeWasCalledWith /verifyResumeWasCalledWithAssignment ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() { assertEquals(taskManager.notPausedTasks().size(), 0); } - private static void expectRestoreToBeCompleted(final Consumer<byte[], byte[]> consumer) { + private static void expectAssignmentToBeCalled(final Consumer<byte[], byte[]> consumer) { final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0)); - expect(consumer.assignment()).andReturn(assignment); - consumer.resume(assignment); - expectLastCall(); + when(consumer.assignment()).thenReturn(assignment); + } + + private static void verifyResumeWasCalled(final Consumer<byte[], byte[]> consumer) { + final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0)); + Mockito.verify(consumer, atLeastOnce()).resume(assignment); Review Comment: why are we checking for atLeastOnce and not the exact times? Isn't this relaxing constrains from what we were doing with easy mock? (same for verifyResumeWasCalledWith) ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -918,9 +915,6 @@ public void shouldHandleMultipleRemovedTasksFromStateUpdater() { .thenReturn(convertedTask1); when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions)) .thenReturn(convertedTask0); - expect(consumer.assignment()).andReturn(emptySet()).anyTimes(); Review Comment: was this an unnecessary stub? asking because I didn't see mockito stub for consumer.assignment(). ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3219,17 +3128,6 @@ public void shouldCloseStandbyTasksOnShutdown() { // `handleAssignment` when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00)); - // `tryToCompleteRestoration` - expect(consumer.assignment()).andReturn(emptySet()); - consumer.resume(eq(emptySet())); - expectLastCall(); - - // `shutdown` - consumer.commitSync(Collections.emptyMap()); Review Comment: verify ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2688,20 +2614,8 @@ public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo .thenReturn(singletonList(task10)); final ConsumerGroupMetadata groupMetadata = new ConsumerGroupMetadata("appId"); - expect(consumer.groupMetadata()).andReturn(groupMetadata); - producer.commitTransaction(expectedCommittedOffsets, groupMetadata); - expectLastCall(); - - task00.committedOffsets(); - EasyMock.expectLastCall(); - task01.committedOffsets(); - EasyMock.expectLastCall(); - task02.committedOffsets(); - EasyMock.expectLastCall(); - task10.committedOffsets(); - EasyMock.expectLastCall(); Review Comment: verify ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4050,12 +3928,8 @@ public void shouldHaveRemainingPartitionsUncleared() { final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets); - expectRestoreToBeCompleted(consumer); + expectAssignmentToBeCalled(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); - consumer.commitSync(offsets); Review Comment: verify this? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3418,16 +3311,12 @@ public void shouldCommitProvidedTasksIfNeeded() { mkEntry(taskId05, taskId05Partitions) ); - expectRestoreToBeCompleted(consumer); + expectAssignmentToBeCalled(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(assignmentActive))) .thenReturn(Arrays.asList(task00, task01, task02)); when(standbyTaskCreator.createTasks(assignmentStandby)) .thenReturn(Arrays.asList(task03, task04, task05)); - consumer.commitSync(eq(emptyMap())); Review Comment: verify -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org