cadonna commented on code in PR #12524: URL: https://github.com/apache/kafka/pull/12524#discussion_r975189051
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java: ########## @@ -310,48 +301,30 @@ public void shouldRestoreTimestampedStoreWithConverter() { @Test public void shouldUnregisterChangelogsDuringClose() { final ProcessorStateManager stateMgr = getStateManager(Task.TaskType.ACTIVE); - reset(storeMetadata); - final StateStore store = EasyMock.createMock(StateStore.class); - expect(storeMetadata.changelogPartition()).andStubReturn(persistentStorePartition); - expect(storeMetadata.store()).andStubReturn(store); - expect(store.name()).andStubReturn(persistentStoreName); - - context.uninitialize(); - store.init((StateStoreContext) context, store); - replay(storeMetadata, context, store); + final StateStore store = mock(StateStore.class); + when(store.name()).thenReturn(persistentStoreName); stateMgr.registerStateStores(singletonList(store), context); - verify(context, store); + verify(context).uninitialize(); + verify(store).init((StateStoreContext) context, store); stateMgr.registerStore(store, noopStateRestoreCallback, null); assertTrue(changelogReader.isPartitionRegistered(persistentStorePartition)); - reset(store); - expect(store.name()).andStubReturn(persistentStoreName); - store.close(); Review Comment: You should account for this by putting `verify(store).close()` after line 314. Or am I missing something? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -211,32 +205,22 @@ public void shouldThrowIfCommittingOnIllegalState() { @Test public void shouldAlwaysCheckpointStateIfEnforced() { - stateManager.flush(); Review Comment: You missed to verify this call. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -211,32 +205,22 @@ public void shouldThrowIfCommittingOnIllegalState() { @Test public void shouldAlwaysCheckpointStateIfEnforced() { - stateManager.flush(); - EasyMock.expectLastCall().once(); - stateManager.checkpoint(); - EasyMock.expectLastCall().once(); - EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes(); - EasyMock.replay(stateManager); + when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap()); task = createStandbyTask(); task.initializeIfNeeded(); task.maybeCheckpoint(true); - EasyMock.verify(stateManager); + verify(stateManager).checkpoint(); } @Test public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() { - stateManager.flush(); - EasyMock.expectLastCall().once(); - stateManager.checkpoint(); Review Comment: You missed to verify this call. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() { final double expectedCloseTaskMetric = 1.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - - EasyMock.verify(stateManager); } @Test public void shouldNotThrowFromStateManagerCloseInCloseDirty() { - EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); - stateManager.close(); - EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); - EasyMock.replay(stateManager); + doThrow(new RuntimeException("KABOOM!")).when(stateManager).close(); task = createStandbyTask(); task.initializeIfNeeded(); task.suspend(); task.closeDirty(); - EasyMock.verify(stateManager); + verify(stateManager, atLeastOnce()).close(); } @Test public void shouldSuspendAndCommitBeforeCloseClean() { stateManager.close(); - EasyMock.expectLastCall(); + verify(stateManager, atLeastOnce()).close(); stateManager.checkpoint(); Review Comment: Remove this call on the mock. In Mockito it does not make sense. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -247,23 +231,19 @@ public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() { assertEquals(Collections.singletonMap(partition, 11000L), task.offsetSnapshotSinceLastFlush); task.maybeCheckpoint(false); // this should not checkpoint assertEquals(Collections.singletonMap(partition, 11000L), task.offsetSnapshotSinceLastFlush); - - EasyMock.verify(stateManager); } @Test public void shouldFlushAndCheckpointStateManagerOnCommit() { - EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); + when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap()); stateManager.flush(); - EasyMock.expectLastCall(); + verify(stateManager, atLeastOnce()).flush(); Review Comment: You again call a function on the mock and immediately verify it. Maybe you just missed to remove it here, since you added the verification on line 259. However, I do not understand why you use `atLeastOnce()`. It should be called only once according to the pre-migration code. If you remove `stateManager.flush()` on line 239, you will see that it is indeed only called once. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -276,32 +256,21 @@ public void shouldFlushAndCheckpointStateManagerOnCommit() { task.prepareCommit(); task.postCommit(false); // this should not checkpoint - EasyMock.verify(stateManager); + verify(stateManager, atLeastOnce()).flush(); } @Test public void shouldReturnStateManagerChangelogOffsets() { - EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition, 50L)); - EasyMock.replay(stateManager); + when(stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(partition, 50L)); task = createStandbyTask(); assertEquals(Collections.singletonMap(partition, 50L), task.changelogOffsets()); - - EasyMock.verify(stateManager); } @Test public void shouldNotFlushAndThrowOnCloseDirty() { - EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); - stateManager.close(); - EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes(); - stateManager.flush(); Review Comment: You missed to verify this call at the end of the test with `verify(stateManager, never()).flush()`. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() { final double expectedCloseTaskMetric = 1.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - - EasyMock.verify(stateManager); } @Test public void shouldNotThrowFromStateManagerCloseInCloseDirty() { - EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); - stateManager.close(); - EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); - EasyMock.replay(stateManager); + doThrow(new RuntimeException("KABOOM!")).when(stateManager).close(); task = createStandbyTask(); task.initializeIfNeeded(); task.suspend(); task.closeDirty(); - EasyMock.verify(stateManager); + verify(stateManager, atLeastOnce()).close(); } @Test public void shouldSuspendAndCommitBeforeCloseClean() { stateManager.close(); - EasyMock.expectLastCall(); + verify(stateManager, atLeastOnce()).close(); stateManager.checkpoint(); - EasyMock.expectLastCall().once(); - EasyMock.expect(stateManager.changelogOffsets()) - .andReturn(Collections.singletonMap(partition, 60L)); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes(); - EasyMock.replay(stateManager); + verify(stateManager, times(1)).checkpoint(); Review Comment: Please move this verification to after the call under test. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -211,32 +205,22 @@ public void shouldThrowIfCommittingOnIllegalState() { @Test public void shouldAlwaysCheckpointStateIfEnforced() { - stateManager.flush(); - EasyMock.expectLastCall().once(); - stateManager.checkpoint(); - EasyMock.expectLastCall().once(); - EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes(); - EasyMock.replay(stateManager); + when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap()); task = createStandbyTask(); task.initializeIfNeeded(); task.maybeCheckpoint(true); - EasyMock.verify(stateManager); + verify(stateManager).checkpoint(); } @Test public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() { - stateManager.flush(); Review Comment: You missed to verify this call. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -177,11 +179,6 @@ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOExce @Test public void shouldTransitToRunningAfterInitialization() { - EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); - stateManager.registerStateStores(EasyMock.anyObject(), EasyMock.anyObject()); Review Comment: I think there are still some misunderstandings how EasyMock works. When you call a method on the mock, this call is registered in the mock. Registration means that the method is expected to be called during the call of the method under test. When you call `EasyMock.replay(mock)` you end the registration phase. From now on the mock stores the methods actually called on it and returns the return values specified on the stubbings (i.e., `expect(mock).andReturn(...)`) when the stubbed method is called. After replaying the mock, you call the method under test. Finally, with `EasyMock.verify(mock)` you verify that only the registered method calls were performed during the call to the method under test. In this specific case, the call `registerStateStores(EasyMock.anyObject(), EasyMock.anyObject())` was registered on the mock `stateManager` as an expected call. With `EasyMock.verify(stateManager);` it is verified that method `registerStateStores()` was called on the mock. In your migration this verification disappeared. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() { final double expectedCloseTaskMetric = 1.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - - EasyMock.verify(stateManager); } @Test public void shouldNotThrowFromStateManagerCloseInCloseDirty() { - EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); - stateManager.close(); - EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); - EasyMock.replay(stateManager); + doThrow(new RuntimeException("KABOOM!")).when(stateManager).close(); task = createStandbyTask(); task.initializeIfNeeded(); task.suspend(); task.closeDirty(); - EasyMock.verify(stateManager); + verify(stateManager, atLeastOnce()).close(); Review Comment: Why this verification? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -247,23 +231,19 @@ public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() { assertEquals(Collections.singletonMap(partition, 11000L), task.offsetSnapshotSinceLastFlush); task.maybeCheckpoint(false); // this should not checkpoint assertEquals(Collections.singletonMap(partition, 11000L), task.offsetSnapshotSinceLastFlush); - - EasyMock.verify(stateManager); } @Test public void shouldFlushAndCheckpointStateManagerOnCommit() { - EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); + when(stateManager.changelogOffsets()).thenReturn(Collections.emptyMap()); stateManager.flush(); - EasyMock.expectLastCall(); + verify(stateManager, atLeastOnce()).flush(); stateManager.checkpoint(); - EasyMock.expectLastCall().once(); - EasyMock.expect(stateManager.changelogOffsets()) - .andReturn(Collections.singletonMap(partition, 50L)) - .andReturn(Collections.singletonMap(partition, 11000L)) - .andReturn(Collections.singletonMap(partition, 11000L)); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes(); - EasyMock.replay(stateManager); + verify(stateManager, times(1)).checkpoint(); Review Comment: This verification should be moved to the end of the test. You do not need to use `times(1)` that is the default behavior. Do not forget to remove `stateManager.checkpoint();` on line 241. It does not make sense to call methods on the mocks in Mockito. Only in EasyMock calling methods on the mock is a way to specify the expected behavior. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java: ########## @@ -360,14 +333,8 @@ public void shouldRecycleStoreAndReregisterChangelog() { assertFalse(changelogReader.isPartitionRegistered(persistentStorePartition)); assertThat(stateMgr.getStore(persistentStoreName), equalTo(store)); - reset(context, store); - context.uninitialize(); Review Comment: To migrate all verification, you also need to migrate this to `verify(context).uninitialize()` after line 336. Note here, I am not sure if you also need to count the previous call to `context.uninitialize()` verified on line 326. That is, you might need to add `verify(context, times(2)).uninitialize()`. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -276,32 +256,21 @@ public void shouldFlushAndCheckpointStateManagerOnCommit() { task.prepareCommit(); task.postCommit(false); // this should not checkpoint - EasyMock.verify(stateManager); + verify(stateManager, atLeastOnce()).flush(); } @Test public void shouldReturnStateManagerChangelogOffsets() { - EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.singletonMap(partition, 50L)); - EasyMock.replay(stateManager); + when(stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(partition, 50L)); task = createStandbyTask(); assertEquals(Collections.singletonMap(partition, 50L), task.changelogOffsets()); - - EasyMock.verify(stateManager); } @Test public void shouldNotFlushAndThrowOnCloseDirty() { - EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); - stateManager.close(); - EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes(); - stateManager.flush(); - EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes(); - stateManager.checkpoint(); Review Comment: You missed to verify this call at the end of the test with `verify(stateManager, never()).checkpoint()`. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() { final double expectedCloseTaskMetric = 1.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - - EasyMock.verify(stateManager); } @Test public void shouldNotThrowFromStateManagerCloseInCloseDirty() { - EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); - stateManager.close(); - EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); - EasyMock.replay(stateManager); + doThrow(new RuntimeException("KABOOM!")).when(stateManager).close(); task = createStandbyTask(); task.initializeIfNeeded(); task.suspend(); task.closeDirty(); - EasyMock.verify(stateManager); + verify(stateManager, atLeastOnce()).close(); } @Test public void shouldSuspendAndCommitBeforeCloseClean() { Review Comment: I think for this test you need to verify the calls on the mock with `InOrder` (see https://site.mockito.org/javadoc/current/org/mockito/InOrder.html). ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() { final double expectedCloseTaskMetric = 1.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - - EasyMock.verify(stateManager); } @Test public void shouldNotThrowFromStateManagerCloseInCloseDirty() { - EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); - stateManager.close(); - EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); - EasyMock.replay(stateManager); + doThrow(new RuntimeException("KABOOM!")).when(stateManager).close(); task = createStandbyTask(); task.initializeIfNeeded(); task.suspend(); task.closeDirty(); - EasyMock.verify(stateManager); + verify(stateManager, atLeastOnce()).close(); } @Test public void shouldSuspendAndCommitBeforeCloseClean() { stateManager.close(); - EasyMock.expectLastCall(); + verify(stateManager, atLeastOnce()).close(); stateManager.checkpoint(); - EasyMock.expectLastCall().once(); - EasyMock.expect(stateManager.changelogOffsets()) - .andReturn(Collections.singletonMap(partition, 60L)); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(partition)).anyTimes(); Review Comment: You did not migrate this line neither to a stubbing nor to a verification. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() { final double expectedCloseTaskMetric = 1.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - - EasyMock.verify(stateManager); } @Test public void shouldNotThrowFromStateManagerCloseInCloseDirty() { - EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); - stateManager.close(); - EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); - EasyMock.replay(stateManager); + doThrow(new RuntimeException("KABOOM!")).when(stateManager).close(); task = createStandbyTask(); task.initializeIfNeeded(); task.suspend(); task.closeDirty(); - EasyMock.verify(stateManager); + verify(stateManager, atLeastOnce()).close(); } @Test public void shouldSuspendAndCommitBeforeCloseClean() { stateManager.close(); Review Comment: Remove this call on the mock. In Mockito it does not make sense. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java: ########## @@ -313,36 +282,29 @@ public void shouldNotFlushAndThrowOnCloseDirty() { final double expectedCloseTaskMetric = 1.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - - EasyMock.verify(stateManager); } @Test public void shouldNotThrowFromStateManagerCloseInCloseDirty() { - EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap()); - stateManager.close(); - EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); - EasyMock.replay(stateManager); + doThrow(new RuntimeException("KABOOM!")).when(stateManager).close(); task = createStandbyTask(); task.initializeIfNeeded(); task.suspend(); task.closeDirty(); - EasyMock.verify(stateManager); + verify(stateManager, atLeastOnce()).close(); } @Test public void shouldSuspendAndCommitBeforeCloseClean() { stateManager.close(); - EasyMock.expectLastCall(); + verify(stateManager, atLeastOnce()).close(); Review Comment: Please move this verification to after the call under test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org