cadonna commented on code in PR #13873:
URL: https://github.com/apache/kafka/pull/13873#discussion_r1236586513


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -886,11 +885,9 @@ public void 
shouldSuspendRevokedTaskRemovedFromStateUpdater() {
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(consumer);

Review Comment:
   Here you need to add `Mockito.verifyNoInteractions(consumer)` because that 
was the intent of replaying a consumer without expectations and verifying it.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -886,11 +885,9 @@ public void 
shouldSuspendRevokedTaskRemovedFromStateUpdater() {
         when(stateUpdater.hasRemovedTasks()).thenReturn(true);
         when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
-        verify(consumer);
         Mockito.verify(statefulTask).suspend();
         Mockito.verify(tasks).addTask(statefulTask);
     }

Review Comment:
   nit (and probably my fault 🙂) 
   ```suggestion
       }
   
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1066,14 +1054,12 @@ public void 
shouldHandleTimeoutExceptionInTransitRestoredTaskToRunning() {
         final TaskManager taskManager = 
setUpTransitionToRunningOfRestoredTask(task, tasks);
         final TimeoutException timeoutException = new TimeoutException();
         doThrow(timeoutException).when(task).completeRestoration(noOpResetter);
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
         Mockito.verify(task).maybeInitTaskTimeoutOrThrow(anyLong(), 
Mockito.eq(timeoutException));
         Mockito.verify(tasks, never()).addTask(task);
         Mockito.verify(task, never()).clearTaskTimeout();
-        verify(consumer);

Review Comment:
   Also here, please add `Mockito.verifyNoInteractions(consumer)`.
   
   You also miss this verification in other places.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1046,15 +1036,13 @@ public void shouldTransitRestoredTaskToRunning() {
             .withInputPartitions(taskId00Partitions).build();
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTransitionToRunningOfRestoredTask(task, tasks);
-        consumer.resume(task.inputPartitions());
-        replay(consumer);
 
         taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
 
         Mockito.verify(task).completeRestoration(noOpResetter);
         Mockito.verify(task).clearTaskTimeout();
         Mockito.verify(tasks).addTask(task);
-        verify(consumer);
+        Mockito.verify(consumer).resume(task.inputPartitions());

Review Comment:
   I think, I cannot completely follow your reasoning here.  Why would you add 
`verifyNoMoreInteractions`? The important thing here is that the consumer 
resumes polling from the input partitions. However, I also see that with 
easymock this test verifies that `consumer.resume(task.inputPartitions())` is 
the only method called on the consumer mock. I am fine either way.  



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -918,9 +915,6 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
             .thenReturn(convertedTask1);
         when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, 
taskId00Partitions))
             .thenReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-        consumer.resume(anyObject());
-        expectLastCall().anyTimes();

Review Comment:
   It seems I was a bit sloppy here. When a task is removed from the state 
updater, there should be no interactions with the consumer. Please remove the 
expecations and verify for no interactions with the consumer mock.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -918,9 +915,6 @@ public void 
shouldHandleMultipleRemovedTasksFromStateUpdater() {
             .thenReturn(convertedTask1);
         when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, 
taskId00Partitions))
             .thenReturn(convertedTask0);
-        expect(consumer.assignment()).andReturn(emptySet()).anyTimes();

Review Comment:
   Mockito returns an empty collection by default. @clolov Could you confirm if 
this was the reason you removed the stub?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to