cadonna commented on code in PR #14716: URL: https://github.com/apache/kafka/pull/14716#discussion_r1555453187
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -293,17 +288,12 @@ public void cleanup() throws IOException { task = null; } Utils.delete(BASE_DIR); - mockito.finishMocking(); } @Test - public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException { - stateDirectory = EasyMock.createNiceMock(StateDirectory.class); - EasyMock.expect(stateDirectory.lock(taskId)).andReturn(false); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()); - stateManager.registerStore(stateStore, stateStore.stateRestoreCallback, null); - EasyMock.expectLastCall(); - EasyMock.replay(stateDirectory, stateManager); + public void shouldThrowLockExceptionIfFailedToLockStateDirectory() { + stateDirectory = mock(StateDirectory.class); Review Comment: I am wondering whether we leak resources, if we assign a mock to `stateDirectory` without closing the state directory before. In `setup()` an actual state directory is created with ``` stateDirectory = new StateDirectory(createConfig("100"), new MockTime(), true, false); ``` It has been there before this PR, but we should fix it. You can either close the state directory before the mock is assigned or remove the creation of the state directory from `setup()` and create it in each test method that uses it. Same is true for the other occurrences in this test class. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -1912,42 +1823,32 @@ public void shouldThrowIfPostCommittingOnIllegalState() { @Test public void shouldSkipCheckpointingSuspendedCreatedTask() { - stateManager.checkpoint(); - EasyMock.expectLastCall().andThrow(new AssertionError("Should not have tried to checkpoint")); - EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes(); - EasyMock.replay(stateManager, recordCollector); - task = createStatefulTask(createConfig("100"), true); task.suspend(); task.postCommit(true); + + verify(stateManager, never()).checkpoint(); } @Test public void shouldCheckpointForSuspendedTask() { - stateManager.checkpoint(); - EasyMock.expectLastCall().once(); - EasyMock.expect(stateManager.changelogOffsets()) - .andReturn(singletonMap(partition1, 1L)); - EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes(); - EasyMock.replay(stateManager, recordCollector); + when(stateManager.changelogOffsets()) + .thenReturn(singletonMap(partition1, 1L)); Review Comment: nit: ```suggestion when(stateManager.changelogOffsets()).thenReturn(singletonMap(partition1, 1L)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -411,25 +384,22 @@ public void seek(final TopicPartition partition, final long offset) { shouldNotSeek.set(new AssertionError("Should not seek")); + @SuppressWarnings("unchecked") final java.util.function.Consumer<Set<TopicPartition>> resetter = - EasyMock.mock(java.util.function.Consumer.class); - resetter.accept(Collections.singleton(partition1)); - EasyMock.expectLastCall(); - EasyMock.replay(resetter); + mock(java.util.function.Consumer.class); + doNothing().when(resetter).accept(Collections.singleton(partition1)); Review Comment: This should be a verification. However, there is an issue here. If I add it to the verifications with ```java verify(resetter).accept(Collections.singleton(partition1)); ``` the test fails. The reason is that when ` accept()` is called, the argument is indeed `Collections.singleton(partition1)` but after the call the collection is cleared: https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L955 At the time the call is verified the argument changed. Apparently, Mockito stores the reference to the argument in the invocation. One way to solve this is the following: ```java final java.util.function.Consumer<Set<TopicPartition>> resetter = mock(java.util.function.Consumer.class); final Set<TopicPartition> partitionsAtCall = new HashSet<>(); doAnswer( invocation -> { partitionsAtCall.addAll(invocation.getArgument(0)); return null; } ).when(resetter).accept(Collections.singleton(partition1)); task.initializeIfNeeded(); task.completeRestoration(resetter); // because we mocked the `resetter` positions don't change assertThat(consumer.position(partition1), equalTo(5L)); assertThat(consumer.position(partition2), equalTo(15L)); assertThat(partitionsAtCall, equalTo(Collections.singleton(partition1))); ``` You could also just keep your code and trust in `StrictStubs`. Either way, please put a comment to explain why we cannot simply verify the call. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -2229,24 +2070,17 @@ public void shouldThrowOnCloseCleanFlushError() { final double expectedCloseTaskMetric = 0.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - EasyMock.verify(stateManager); - EasyMock.reset(stateManager); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); - EasyMock.replay(stateManager); + verify(stateManager).flush(); + verify(stateManager).checkpoint(); + verify(stateManager, never()).close(); } @Test public void shouldThrowOnCloseCleanCheckpointError() { final long offset = 54300L; - EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()); - stateManager.checkpoint(); - EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes(); - stateManager.close(); - EasyMock.expectLastCall().andThrow(new AssertionError("Close should not be called!")).anyTimes(); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); - EasyMock.expect(stateManager.changelogOffsets()) - .andReturn(singletonMap(partition1, offset)); - EasyMock.replay(recordCollector, stateManager); + doThrow(new ProcessorStateException("KABOOM!")).when(stateManager).checkpoint(); + when(stateManager.changelogOffsets()) + .thenReturn(singletonMap(partition1, offset)); Review Comment: nit: ```suggestion when(stateManager.changelogOffsets()).thenReturn(singletonMap(partition1, offset)); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -2265,36 +2099,22 @@ public void shouldThrowOnCloseCleanCheckpointError() { final double expectedCloseTaskMetric = 0.0; verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics, metricName); - EasyMock.verify(stateManager); - EasyMock.reset(stateManager); - EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes(); - stateManager.close(); - EasyMock.expectLastCall(); - EasyMock.replay(stateManager); + verify(stateManager, never()).close(); } @Test public void shouldNotThrowFromStateManagerCloseInCloseDirty() { - stateManager.close(); - EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes(); - EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes(); - EasyMock.replay(stateManager); + doThrow(new RuntimeException("KABOOM!")).when(stateManager).close(); task = createOptimizedStatefulTask(createConfig("100"), consumer); task.initializeIfNeeded(); task.suspend(); task.closeDirty(); Review Comment: Use the following to be more explicit: ```java assertDoesNotThrow(() -> task.closeDirty()); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java: ########## @@ -1958,64 +1859,58 @@ public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() { task.suspend(); task.postCommit(false); - EasyMock.verify(stateManager); + + verify(stateManager).checkpoint(); // checkpoint should only be called once Review Comment: Is this comment necessary? If you want to make it more explicit use ```java verify(stateManager, times(1)).checkpoint(); ``` instead. But I think, `verify(stateManager).checkpoint()` is clear enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org