divijvaidya commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1234973245


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1046,15 +1036,13 @@ public void shouldTransitRestoredTaskToRunning() {
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTransitionToRunningOfRestoredTask(task, tasks);
-        consumer.resume(task.inputPartitions());
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
         Mockito.verify(task).completeRestoration(noOpResetter);
         Mockito.verify(task).clearTaskTimeout();
         Mockito.verify(tasks).addTask(task);
-        verify(consumer);
+        Mockito.verify(consumer).resume(task.inputPartitions());

Review Comment:
   could you please add `verifyNoMoreInteraction`for consumer mock here. Asking 
because looks like there should be more than one invocation of 
consumer.resume() in this test but we are only testing for one.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], 
byte[]> consumer) {
+    private static void expectAssignmentToBeCalled(final Consumer<byte[], 
byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.resume(assignment);
-        expectLastCall();
+        when(consumer.assignment()).thenReturn(assignment);
+    }
+
+    private static void verifyResumeWasCalled(final Consumer<byte[], byte[]> 
consumer) {
+        final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
+        Mockito.verify(consumer, atLeastOnce()).resume(assignment);
+    }
+
+    private static void verifyResumeWasCalledWith(final Consumer<byte[], 
byte[]> consumer, Set<TopicPartition> assignment) {

Review Comment:
   s/verifyResumeWasCalledWith /verifyResumeWasCalledWithAssignment



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4521,11 +4385,18 @@ public void shouldListNotPausedTasks() {
         assertEquals(taskManager.notPausedTasks().size(), 0);
     }
 
-    private static void expectRestoreToBeCompleted(final Consumer<byte[], 
byte[]> consumer) {
+    private static void expectAssignmentToBeCalled(final Consumer<byte[], 
byte[]> consumer) {
         final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
-        expect(consumer.assignment()).andReturn(assignment);
-        consumer.resume(assignment);
-        expectLastCall();
+        when(consumer.assignment()).thenReturn(assignment);
+    }
+
+    private static void verifyResumeWasCalled(final Consumer<byte[], byte[]> 
consumer) {
+        final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
+        Mockito.verify(consumer, atLeastOnce()).resume(assignment);

Review Comment:
   why are we checking for atLeastOnce and not the exact times? Isn't this 
relaxing constrains from what we were doing with easy mock?
   
   (same for verifyResumeWasCalledWith)



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -918,9 +915,6 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
             .thenReturn(convertedTask1);
         when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, 
taskId00Partitions))
             .thenReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();

Review Comment:
   was this an unnecessary stub? asking because I didn't see mockito stub for 
consumer.assignment().



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3219,17 +3128,6 @@ public void shouldCloseStandbyTasksOnShutdown() {
         // `handleAssignment`
         
when(standbyTaskCreator.createTasks(assignment)).thenReturn(singletonList(task00));
 
-        // `tryToCompleteRestoration`
-        expect(consumer.assignment()).andReturn(emptySet());
-        consumer.resume(eq(emptySet()));
-        expectLastCall();
-
-        // `shutdown`
-        consumer.commitSync(Collections.emptyMap());

Review Comment:
   verify



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2688,20 +2614,8 @@ public void 
shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEo
             .thenReturn(singletonList(task10));
 
         final ConsumerGroupMetadata groupMetadata = new 
ConsumerGroupMetadata("appId");
-        expect(consumer.groupMetadata()).andReturn(groupMetadata);
-        producer.commitTransaction(expectedCommittedOffsets, groupMetadata);
-        expectLastCall();
-
-        task00.committedOffsets();
-        EasyMock.expectLastCall();
-        task01.committedOffsets();
-        EasyMock.expectLastCall();
-        task02.committedOffsets();
-        EasyMock.expectLastCall();
-        task10.committedOffsets();
-        EasyMock.expectLastCall();

Review Comment:
   verify



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4050,12 +3928,8 @@ public void shouldHaveRemainingPartitionsUncleared() {
         final Map<TopicPartition, OffsetAndMetadata> offsets = 
singletonMap(t1p0, new OffsetAndMetadata(0L, null));
         task00.setCommittableOffsetsAndMetadata(offsets);
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        consumer.commitSync(offsets);

Review Comment:
   verify this?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -3418,16 +3311,12 @@ public void shouldCommitProvidedTasksIfNeeded() {
             mkEntry(taskId05, taskId05Partitions)
         );
 
-        expectRestoreToBeCompleted(consumer);
+        expectAssignmentToBeCalled(consumer);
         when(activeTaskCreator.createTasks(any(), 
Mockito.eq(assignmentActive)))
             .thenReturn(Arrays.asList(task00, task01, task02));
         when(standbyTaskCreator.createTasks(assignmentStandby))
             .thenReturn(Arrays.asList(task03, task04, task05));
 
-        consumer.commitSync(eq(emptyMap()));

Review Comment:
   verify



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to