clolov commented on code in PR #12821: URL: https://github.com/apache/kafka/pull/12821#discussion_r1024170827
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); - ctrl.verify(); + // The unlock logic should still be executed. + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { - expect(stateManager.taskId()).andReturn(taskId); + when(stateManager.taskId()).thenReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateDirectory.lock(taskId)).thenReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - - stateDirectory.unlock(taskId); - - ctrl.checkOrder(true); - ctrl.replay(); + doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); - - stateManager.close(); - expectLastCall(); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - - stateDirectory.unlock(taskId); - expectLastCall(); - - ctrl.checkOrder(true); - ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); - ctrl.verify(); + verify(stateManager).close(); + verify(stateDirectory).unlock(taskId); } @Test public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { final File randomFile = new File("/random/path"); - mockStatic(Utils.class); - - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("Close failed")); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); - expect(stateManager.baseDir()).andReturn(randomFile); + doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); - Utils.delete(randomFile); + when(stateManager.baseDir()).thenReturn(randomFile); - stateDirectory.unlock(taskId); - expectLastCall(); + try (MockedStatic<Utils> utils = mockStatic(Utils.class)) { + assertThrows(ProcessorStateException.class, () -> + StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); + } - ctrl.checkOrder(true); - ctrl.replay(); - - replayAll(); - - assertThrows(ProcessorStateException.class, () -> - StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { final File unknownFile = new File("/unknown/path"); - mockStatic(Utils.class); - - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); - - stateManager.close(); - expectLastCall(); - - expect(stateManager.baseDir()).andReturn(unknownFile); - Utils.delete(unknownFile); - expectLastCall().andThrow(new IOException("Deletion failed")); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); + when(stateManager.baseDir()).thenReturn(unknownFile); - stateDirectory.unlock(taskId); - expectLastCall(); + try (MockedStatic<Utils> utils = mockStatic(Utils.class)) { + utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed")); - ctrl.checkOrder(true); - ctrl.replay(); + final ProcessorStateException thrown = assertThrows( + ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger, + "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - replayAll(); + assertEquals(IOException.class, thrown.getCause().getClass()); + } - final ProcessorStateException thrown = assertThrows( - ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger, - "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - - assertEquals(IOException.class, thrown.getCause().getClass()); - - ctrl.verify(); + verify(stateManager).close(); + verify(stateDirectory).unlock(taskId); } @Test - public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() { - expect(stateManager.taskId()).andReturn(taskId); - - expect(stateDirectory.lock(taskId)).andReturn(false); - - stateManager.close(); - expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!")); - - ctrl.checkOrder(true); - ctrl.replay(); - - replayAll(); + public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(false); StateManagerUtil.closeStateManager( - logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE); - } - - @Test - public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException { Review Comment: As in, it appears a bit strange to mock everything on a path to the Utils.delete when this logic short-circuits if we use `expect(stateDirectory.lock(taskId)).andReturn(false);`. ``` try { if (stateDirectory.lock(id)) { <---- WE JUMP FROM HERE... try { stateMgr.close(); <---- WE VERIFY THIS IS NOT CALLED } catch (final ProcessorStateException e) { firstException.compareAndSet(null, e); } finally { try { if (wipeStateStore) { log.debug("Wiping state stores for {} task {}", taskType, id); // we can just delete the whole dir of the task, including the state store images and the checkpoint files, // and then we write an empty checkpoint file indicating that the previous close is graceful and we just // need to re-bootstrap the restoration from the beginning Utils.delete(stateMgr.baseDir() <---- WE VERIFY THIS IS NOT CALLED); } } finally { stateDirectory.unlock(id); <---- WE VERIFY THIS IS NOT CALLED } } } } catch (final IOException e) { final ProcessorStateException exception = new ProcessorStateException( String.format("%sFatal error while trying to close the state manager for task %s", logPrefix, id), e ); firstException.compareAndSet(null, exception); } <--- ...ALL THE WAY TO HERE ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org