clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1081139070
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +137,107 @@ public void
testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager
failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("state manager failed to
close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager,
stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws
IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
- expect(stateManager.baseDir()).andReturn(randomFile);
- Utils.delete(randomFile);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("Close
failed")).when(stateManager).close();
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:",
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false,
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws
IOException {
+ public void
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
-
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new
IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () ->
StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager,
stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () ->
StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to
close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory,
TaskType.ACTIVE);
+ logger, "logPrefix:", true, false, stateManager,
stateDirectory, TaskType.ACTIVE);
+
+ inOrder.verify(stateManager, never()).close();
+ inOrder.verify(stateManager, never()).baseDir();
+ inOrder.verify(stateDirectory, never()).unlock(taskId);
Review Comment:
This is a fair point. I used `inOrder` because the original test wanted to
be strict. This being said it wanted to be strict on the actual calls rather
than the calls which weren't made. Does
```
@Test
public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
final InOrder inOrder = inOrder(stateManager, stateDirectory);
when(stateManager.taskId()).thenReturn(taskId);
when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
logger, "logPrefix:", true, false, stateManager,
stateDirectory, TaskType.ACTIVE);
inOrder.verify(stateManager).taskId(); <- ENSURE ORDER
inOrder.verify(stateDirectory).lock(taskId); <- ENSURE ORDER
verify(stateManager, never()).close();
verify(stateManager, never()).baseDir();
verify(stateDirectory, never()).unlock(taskId);
verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
final InOrder inOrder = inOrder(stateManager, stateDirectory);
when(stateManager.taskId()).thenReturn(taskId);
when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, true, stateManager,
stateDirectory, TaskType.ACTIVE);
inOrder.verify(stateManager).taskId(); <- ENSURE ORDER
inOrder.verify(stateDirectory).lock(taskId); <- ENSURE ORDER
verify(stateManager, never()).close();
verify(stateManager, never()).baseDir();
verify(stateDirectory, never()).unlock(taskId);
verifyNoMoreInteractions(stateManager, stateDirectory);
}
```
look better to you?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]