cadonna commented on code in PR #12524:
URL: https://github.com/apache/kafka/pull/12524#discussion_r975189051


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##########
@@ -310,48 +301,30 @@ public void shouldRestoreTimestampedStoreWithConverter() {
     @Test
     public void shouldUnregisterChangelogsDuringClose() {
         final ProcessorStateManager stateMgr = 
getStateManager(Task.TaskType.ACTIVE);
-        reset(storeMetadata);
-        final StateStore store = EasyMock.createMock(StateStore.class);
-        
expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition);
-        expect(storeMetadata.store()).andStubReturn(store);
-        expect(store.name()).andStubReturn(persistentStoreName);
-
-        context.uninitialize();
-        store.init((StateStoreContext) context, store);
-        replay(storeMetadata, context, store);
+        final StateStore store = mock(StateStore.class);
+        when(store.name()).thenReturn(persistentStoreName);
 
         stateMgr.registerStateStores(singletonList(store), context);
-        verify(context, store);
+        verify(context).uninitialize();
+        verify(store).init((StateStoreContext) context, store);
 
         stateMgr.registerStore(store, noopStateRestoreCallback, null);
         
assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition));
 
-        reset(store);
-        expect(store.name()).andStubReturn(persistentStoreName);
-        store.close();

Review Comment:
   You should account for this by putting `verify(store).close()` after line 
314. Or am I missing something?  



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -211,32 +205,22 @@ public void shouldThrowIfCommittingOnIllegalState() {
 
     @Test
     public void shouldAlwaysCheckpointStateIfEnforced() {
-        stateManager.flush();

Review Comment:
   You missed to verify this call.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -211,32 +205,22 @@ public void shouldThrowIfCommittingOnIllegalState() {
 
     @Test
     public void shouldAlwaysCheckpointStateIfEnforced() {
-        stateManager.flush();
-        EasyMock.expectLastCall().once();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager);
+        
when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());
 
         task = createStandbyTask();
 
         task.initializeIfNeeded();
         task.maybeCheckpoint(true);
 
-        EasyMock.verify(stateManager);
+        verify(stateManager).checkpoint();
     }
 
     @Test
     public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
-        stateManager.flush();
-        EasyMock.expectLastCall().once();
-        stateManager.checkpoint();

Review Comment:
   You missed to verify this call.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-        EasyMock.replay(stateManager);
+        doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
         task = createStandbyTask();
         task.initializeIfNeeded();
 
         task.suspend();
         task.closeDirty();
 
-        EasyMock.verify(stateManager);
+        verify(stateManager, atLeastOnce()).close();
     }
 
     @Test
     public void shouldSuspendAndCommitBeforeCloseClean() {
         stateManager.close();
-        EasyMock.expectLastCall();
+        verify(stateManager, atLeastOnce()).close();
         stateManager.checkpoint();

Review Comment:
   Remove this call on the mock. In Mockito it does not make sense.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -247,23 +231,19 @@ public void 
shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
         assertEquals(Collections.singletonMap(partition, 11000L), 
task.offsetSnapshotSinceLastFlush);
         task.maybeCheckpoint(false);  // this should not checkpoint
         assertEquals(Collections.singletonMap(partition, 11000L), 
task.offsetSnapshotSinceLastFlush);
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldFlushAndCheckpointStateManagerOnCommit() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
+        
when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());
         stateManager.flush();
-        EasyMock.expectLastCall();
+        verify(stateManager, atLeastOnce()).flush();

Review Comment:
   You again call a function on the mock and immediately verify it. Maybe you 
just missed to remove it here, since you added the verification on line 259. 
However, I do not understand why you use `atLeastOnce()`. It should be called 
only once according to the pre-migration code. If you remove 
`stateManager.flush()` on line 239, you will see that it is indeed only called 
once. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -276,32 +256,21 @@ public void 
shouldFlushAndCheckpointStateManagerOnCommit() {
         task.prepareCommit();
         task.postCommit(false);  // this should not checkpoint
 
-        EasyMock.verify(stateManager);
+        verify(stateManager, atLeastOnce()).flush();
     }
 
     @Test
     public void shouldReturnStateManagerChangelogOffsets() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition,
 50L));
-        EasyMock.replay(stateManager);
+        
when(stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(partition,
 50L));
 
         task = createStandbyTask();
 
         assertEquals(Collections.singletonMap(partition, 50L), 
task.changelogOffsets());
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldNotFlushAndThrowOnCloseDirty() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new 
ProcessorStateException("KABOOM!")).anyTimes();
-        stateManager.flush();

Review Comment:
   You missed to verify this call at the end of the test with 
`verify(stateManager, never()).flush()`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-        EasyMock.replay(stateManager);
+        doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
         task = createStandbyTask();
         task.initializeIfNeeded();
 
         task.suspend();
         task.closeDirty();
 
-        EasyMock.verify(stateManager);
+        verify(stateManager, atLeastOnce()).close();
     }
 
     @Test
     public void shouldSuspendAndCommitBeforeCloseClean() {
         stateManager.close();
-        EasyMock.expectLastCall();
+        verify(stateManager, atLeastOnce()).close();
         stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(Collections.singletonMap(partition, 60L));
-        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
-        EasyMock.replay(stateManager);
+        verify(stateManager, times(1)).checkpoint();

Review Comment:
   Please move this verification to after the call under test.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -211,32 +205,22 @@ public void shouldThrowIfCommittingOnIllegalState() {
 
     @Test
     public void shouldAlwaysCheckpointStateIfEnforced() {
-        stateManager.flush();
-        EasyMock.expectLastCall().once();
-        stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
-        EasyMock.replay(stateManager);
+        
when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());
 
         task = createStandbyTask();
 
         task.initializeIfNeeded();
         task.maybeCheckpoint(true);
 
-        EasyMock.verify(stateManager);
+        verify(stateManager).checkpoint();
     }
 
     @Test
     public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
-        stateManager.flush();

Review Comment:
   You missed to verify this call.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -177,11 +179,6 @@ public void 
shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce
 
     @Test
     public void shouldTransitToRunningAfterInitialization() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.registerStateStores(EasyMock.anyObject(), 
EasyMock.anyObject());

Review Comment:
   I think there are still some misunderstandings how EasyMock works. When you 
call a method on the mock, this call is registered in the mock. Registration 
means that the method is expected to be called during the call of the method 
under test. When you call `EasyMock.replay(mock)` you end the registration 
phase. From now on the mock stores the methods actually called on it and 
returns the return values specified on the stubbings (i.e., 
`expect(mock).andReturn(...)`) when the stubbed method is called. After 
replaying the mock, you call the method under test. Finally, with 
`EasyMock.verify(mock)` you verify that only the registered method calls were 
performed during the call to the method under test.
   In this specific case, the call `registerStateStores(EasyMock.anyObject(), 
EasyMock.anyObject())` was registered on the mock `stateManager` as an expected 
call. With `EasyMock.verify(stateManager);` it is verified that method 
`registerStateStores()` was called on the mock.
   In your migration this verification disappeared.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-        EasyMock.replay(stateManager);
+        doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
         task = createStandbyTask();
         task.initializeIfNeeded();
 
         task.suspend();
         task.closeDirty();
 
-        EasyMock.verify(stateManager);
+        verify(stateManager, atLeastOnce()).close();

Review Comment:
   Why this verification?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -247,23 +231,19 @@ public void 
shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
         assertEquals(Collections.singletonMap(partition, 11000L), 
task.offsetSnapshotSinceLastFlush);
         task.maybeCheckpoint(false);  // this should not checkpoint
         assertEquals(Collections.singletonMap(partition, 11000L), 
task.offsetSnapshotSinceLastFlush);
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldFlushAndCheckpointStateManagerOnCommit() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
+        
when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap());
         stateManager.flush();
-        EasyMock.expectLastCall();
+        verify(stateManager, atLeastOnce()).flush();
         stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(Collections.singletonMap(partition, 50L))
-                .andReturn(Collections.singletonMap(partition, 11000L))
-                .andReturn(Collections.singletonMap(partition, 11000L));
-        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();
-        EasyMock.replay(stateManager);
+        verify(stateManager, times(1)).checkpoint();

Review Comment:
   This verification should be moved to the end of the test. You do not need to 
use `times(1)` that is the default behavior. Do not forget to remove 
`stateManager.checkpoint();` on line 241. It does not make sense to call 
methods on the mocks in Mockito. Only in EasyMock calling methods on the mock 
is a way to specify the expected behavior.  



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java:
##########
@@ -360,14 +333,8 @@ public void shouldRecycleStoreAndReregisterChangelog() {
         
assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition));
         assertThat(stateMgr.getStore(persistentStoreName), equalTo(store));
 
-        reset(context, store);
-        context.uninitialize();

Review Comment:
   To migrate all verification, you also need to migrate this to 
`verify(context).uninitialize()` after line 336. Note here, I am not sure if 
you also need to count the previous call to `context.uninitialize()` verified 
on line 326. That is, you might need to add `verify(context, 
times(2)).uninitialize()`.  



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -276,32 +256,21 @@ public void 
shouldFlushAndCheckpointStateManagerOnCommit() {
         task.prepareCommit();
         task.postCommit(false);  // this should not checkpoint
 
-        EasyMock.verify(stateManager);
+        verify(stateManager, atLeastOnce()).flush();
     }
 
     @Test
     public void shouldReturnStateManagerChangelogOffsets() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition,
 50L));
-        EasyMock.replay(stateManager);
+        
when(stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(partition,
 50L));
 
         task = createStandbyTask();
 
         assertEquals(Collections.singletonMap(partition, 50L), 
task.changelogOffsets());
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldNotFlushAndThrowOnCloseDirty() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new 
ProcessorStateException("KABOOM!")).anyTimes();
-        stateManager.flush();
-        EasyMock.expectLastCall().andThrow(new AssertionError("Flush should 
not be called")).anyTimes();
-        stateManager.checkpoint();

Review Comment:
   You missed to verify this call at the end of the test with 
`verify(stateManager, never()).checkpoint()`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-        EasyMock.replay(stateManager);
+        doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
         task = createStandbyTask();
         task.initializeIfNeeded();
 
         task.suspend();
         task.closeDirty();
 
-        EasyMock.verify(stateManager);
+        verify(stateManager, atLeastOnce()).close();
     }
 
     @Test
     public void shouldSuspendAndCommitBeforeCloseClean() {

Review Comment:
   I think for this test you need to verify the calls on the mock with 
`InOrder` (see 
https://site.mockito.org/javadoc/current/org/mockito/InOrder.html). 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-        EasyMock.replay(stateManager);
+        doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
         task = createStandbyTask();
         task.initializeIfNeeded();
 
         task.suspend();
         task.closeDirty();
 
-        EasyMock.verify(stateManager);
+        verify(stateManager, atLeastOnce()).close();
     }
 
     @Test
     public void shouldSuspendAndCommitBeforeCloseClean() {
         stateManager.close();
-        EasyMock.expectLastCall();
+        verify(stateManager, atLeastOnce()).close();
         stateManager.checkpoint();
-        EasyMock.expectLastCall().once();
-        EasyMock.expect(stateManager.changelogOffsets())
-                .andReturn(Collections.singletonMap(partition, 60L));
-        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes();

Review Comment:
   You did not migrate this line neither to a stubbing nor to a verification.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-        EasyMock.replay(stateManager);
+        doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
         task = createStandbyTask();
         task.initializeIfNeeded();
 
         task.suspend();
         task.closeDirty();
 
-        EasyMock.verify(stateManager);
+        verify(stateManager, atLeastOnce()).close();
     }
 
     @Test
     public void shouldSuspendAndCommitBeforeCloseClean() {
         stateManager.close();

Review Comment:
   Remove this call on the mock. In Mockito it does not make sense.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java:
##########
@@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() {
 
         final double expectedCloseTaskMetric = 1.0;
         verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, 
metricName);
-
-        EasyMock.verify(stateManager);
     }
 
     @Test
     public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
-        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
-        stateManager.close();
-        EasyMock.expectLastCall().andThrow(new 
RuntimeException("KABOOM!")).anyTimes();
-        EasyMock.replay(stateManager);
+        doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
 
         task = createStandbyTask();
         task.initializeIfNeeded();
 
         task.suspend();
         task.closeDirty();
 
-        EasyMock.verify(stateManager);
+        verify(stateManager, atLeastOnce()).close();
     }
 
     @Test
     public void shouldSuspendAndCommitBeforeCloseClean() {
         stateManager.close();
-        EasyMock.expectLastCall();
+        verify(stateManager, atLeastOnce()).close();

Review Comment:
   Please move this verification to after the call under test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to