cadonna commented on code in PR #14716:
URL: https://github.com/apache/kafka/pull/14716#discussion_r1555453187
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -293,17 +288,12 @@ public void cleanup() throws IOException {
task = null;
}
Utils.delete(BASE_DIR);
- mockito.finishMocking();
}
@Test
- public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws
IOException {
- stateDirectory = EasyMock.createNiceMock(StateDirectory.class);
- EasyMock.expect(stateDirectory.lock(taskId)).andReturn(false);
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
- stateManager.registerStore(stateStore,
stateStore.stateRestoreCallback, null);
- EasyMock.expectLastCall();
- EasyMock.replay(stateDirectory, stateManager);
+ public void shouldThrowLockExceptionIfFailedToLockStateDirectory() {
+ stateDirectory = mock(StateDirectory.class);
Review Comment:
I am wondering whether we leak resources, if we assign a mock to
`stateDirectory` without closing the state directory before.
In `setup()` an actual state directory is created with
```
stateDirectory = new StateDirectory(createConfig("100"), new MockTime(),
true, false);
```
It has been there before this PR, but we should fix it.
You can either close the state directory before the mock is assigned or
remove the creation of the state directory from `setup()` and create it in each
test method that uses it.
Same is true for the other occurrences in this test class.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1912,42 +1823,32 @@ public void shouldThrowIfPostCommittingOnIllegalState()
{
@Test
public void shouldSkipCheckpointingSuspendedCreatedTask() {
- stateManager.checkpoint();
- EasyMock.expectLastCall().andThrow(new AssertionError("Should not have
tried to checkpoint"));
-
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
- EasyMock.replay(stateManager, recordCollector);
-
task = createStatefulTask(createConfig("100"), true);
task.suspend();
task.postCommit(true);
+
+ verify(stateManager, never()).checkpoint();
}
@Test
public void shouldCheckpointForSuspendedTask() {
- stateManager.checkpoint();
- EasyMock.expectLastCall().once();
- EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(singletonMap(partition1, 1L));
-
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
- EasyMock.replay(stateManager, recordCollector);
+ when(stateManager.changelogOffsets())
+ .thenReturn(singletonMap(partition1, 1L));
Review Comment:
nit:
```suggestion
when(stateManager.changelogOffsets()).thenReturn(singletonMap(partition1, 1L));
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -411,25 +384,22 @@ public void seek(final TopicPartition partition, final
long offset) {
shouldNotSeek.set(new AssertionError("Should not seek"));
+ @SuppressWarnings("unchecked")
final java.util.function.Consumer<Set<TopicPartition>> resetter =
- EasyMock.mock(java.util.function.Consumer.class);
- resetter.accept(Collections.singleton(partition1));
- EasyMock.expectLastCall();
- EasyMock.replay(resetter);
+ mock(java.util.function.Consumer.class);
+ doNothing().when(resetter).accept(Collections.singleton(partition1));
Review Comment:
This should be a verification. However, there is an issue here. If I add it
to the verifications with
```java
verify(resetter).accept(Collections.singleton(partition1));
```
the test fails.
The reason is that when ` accept()` is called, the argument is indeed
`Collections.singleton(partition1)` but after the call the collection is
cleared:
https://github.com/apache/kafka/blob/d144b7ee387308a59e52cbdabc7b66dd3b2926cc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L955
At the time the call is verified the argument changed. Apparently, Mockito
stores the reference to the argument in the invocation.
One way to solve this is the following:
```java
final java.util.function.Consumer<Set<TopicPartition>> resetter =
mock(java.util.function.Consumer.class);
final Set<TopicPartition> partitionsAtCall = new HashSet<>();
doAnswer(
invocation -> {
partitionsAtCall.addAll(invocation.getArgument(0));
return null;
}
).when(resetter).accept(Collections.singleton(partition1));
task.initializeIfNeeded();
task.completeRestoration(resetter);
// because we mocked the `resetter` positions don't change
assertThat(consumer.position(partition1), equalTo(5L));
assertThat(consumer.position(partition2), equalTo(15L));
assertThat(partitionsAtCall, equalTo(Collections.singleton(partition1)));
```
You could also just keep your code and trust in `StrictStubs`.
Either way, please put a comment to explain why we cannot simply verify the
call.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2229,24 +2070,17 @@ public void shouldThrowOnCloseCleanFlushError() {
final double expectedCloseTaskMetric = 0.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics,
metricName);
- EasyMock.verify(stateManager);
- EasyMock.reset(stateManager);
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- EasyMock.replay(stateManager);
+ verify(stateManager).flush();
+ verify(stateManager).checkpoint();
+ verify(stateManager, never()).close();
}
@Test
public void shouldThrowOnCloseCleanCheckpointError() {
final long offset = 54300L;
- EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap());
- stateManager.checkpoint();
- EasyMock.expectLastCall().andThrow(new
ProcessorStateException("KABOOM!")).anyTimes();
- stateManager.close();
- EasyMock.expectLastCall().andThrow(new AssertionError("Close should
not be called!")).anyTimes();
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- EasyMock.expect(stateManager.changelogOffsets())
- .andReturn(singletonMap(partition1, offset));
- EasyMock.replay(recordCollector, stateManager);
+ doThrow(new
ProcessorStateException("KABOOM!")).when(stateManager).checkpoint();
+ when(stateManager.changelogOffsets())
+ .thenReturn(singletonMap(partition1, offset));
Review Comment:
nit:
```suggestion
when(stateManager.changelogOffsets()).thenReturn(singletonMap(partition1,
offset));
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2265,36 +2099,22 @@ public void shouldThrowOnCloseCleanCheckpointError() {
final double expectedCloseTaskMetric = 0.0;
verifyCloseTaskMetric(expectedCloseTaskMetric, streamsMetrics,
metricName);
- EasyMock.verify(stateManager);
- EasyMock.reset(stateManager);
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
- stateManager.close();
- EasyMock.expectLastCall();
- EasyMock.replay(stateManager);
+ verify(stateManager, never()).close();
}
@Test
public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
- stateManager.close();
- EasyMock.expectLastCall().andThrow(new
RuntimeException("KABOOM!")).anyTimes();
-
EasyMock.expect(stateManager.changelogOffsets()).andReturn(Collections.emptyMap()).anyTimes();
- EasyMock.replay(stateManager);
+ doThrow(new RuntimeException("KABOOM!")).when(stateManager).close();
task = createOptimizedStatefulTask(createConfig("100"), consumer);
task.initializeIfNeeded();
task.suspend();
task.closeDirty();
Review Comment:
Use the following to be more explicit:
```java
assertDoesNotThrow(() -> task.closeDirty());
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1958,64 +1859,58 @@ public void
shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
task.suspend();
task.postCommit(false);
- EasyMock.verify(stateManager);
+
+ verify(stateManager).checkpoint(); // checkpoint should only be called
once
Review Comment:
Is this comment necessary? If you want to make it more explicit use
```java
verify(stateManager, times(1)).checkpoint();
```
instead.
But I think, `verify(stateManager).checkpoint()` is clear enough.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]