[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on a change in pull request #9121: URL: https://github.com/apache/kafka/pull/9121#discussion_r475393747 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } +@Test +public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { +final Map offsets = Collections.singletonMap(t1, 25L); +stateManager.initialize(); +stateManager.updateChangelogOffsets(offsets); + +final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); Review comment: It's because we we will write data to the .tmp file first and then swap to the CHECKPOINT_FILE. And in the swap action, we use `Files.move` with `ATOMIC_MOVE` option, which will try to replace the target file if exists. I cannot create `IOException` with this case. I added comments for this line to explain the reason. Thank you. ref: https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#move-java.nio.file.Path-java.nio.file.Path-java.nio.file.CopyOption...- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on a change in pull request #9121: URL: https://github.com/apache/kafka/pull/9121#discussion_r475393747 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } +@Test +public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { +final Map offsets = Collections.singletonMap(t1, 25L); +stateManager.initialize(); +stateManager.updateChangelogOffsets(offsets); + +final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); Review comment: It's because we we will write data to the .tmp file first and then swap to the CHECKPOINT_FILE. And in the swap action, we use `Files.move` with `ATOMIC_MOVE` option, which will try to replace the target file if exists. I cannot create `IOException` case with this case. I added comments for this line to explain the reason. Thank you. ref: https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#move-java.nio.file.Path-java.nio.file.Path-java.nio.file.CopyOption...- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on a change in pull request #9121: URL: https://github.com/apache/kafka/pull/9121#discussion_r475393747 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } +@Test +public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { +final Map offsets = Collections.singletonMap(t1, 25L); +stateManager.initialize(); +stateManager.updateChangelogOffsets(offsets); + +final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); Review comment: It's because we we will write data to the .tmp file first and then swap to the CHECKPOINT_FILE. And in the swap action, we use `Files.move` with `ATOMIC_MOVE` option, which will try to replace the target file if exists. I added comments for this line to explain the reason. Thank you. ref: https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#move-java.nio.file.Path-java.nio.file.Path-java.nio.file.CopyOption...- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on a change in pull request #9121: URL: https://github.com/apache/kafka/pull/9121#discussion_r475362316 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } +@Test +public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { +final Map offsets = Collections.singletonMap(t1, 25L); +stateManager.initialize(); +stateManager.updateChangelogOffsets(offsets); + +final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); +file.createNewFile(); +// set the checkpoint tmp file to read-only to simulate the IOException situation +file.setWritable(false); + +try (final LogCaptureAppender appender = + LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) { + +// checkpoint should fail due to the file is readonly +stateManager.checkpoint(); +assertThat(appender.getMessages(), hasItem(containsString( +"Failed to write offset checkpoint file to " + checkpointFile.getPath() + " for global stores"))); Review comment: I agree it'll be better to do the assertion after the try-block. But no, we can't move the assert out of the try-block because the `appender` is declared within try block. We can move the assert out of try-block if we don't use the try resource auto-close pattern, but I don't think it would be better. Also, we assert within try-block for the `appender` tests in other places. I think they are all for the same reason as I mentioned above. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on a change in pull request #9121: URL: https://github.com/apache/kafka/pull/9121#discussion_r475362316 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } +@Test +public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { +final Map offsets = Collections.singletonMap(t1, 25L); +stateManager.initialize(); +stateManager.updateChangelogOffsets(offsets); + +final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); +file.createNewFile(); +// set the checkpoint tmp file to read-only to simulate the IOException situation +file.setWritable(false); + +try (final LogCaptureAppender appender = + LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) { + +// checkpoint should fail due to the file is readonly +stateManager.checkpoint(); +assertThat(appender.getMessages(), hasItem(containsString( +"Failed to write offset checkpoint file to " + checkpointFile.getPath() + " for global stores"))); Review comment: No, we can't move the assert out of the try-block because the `appender` is declared within try block. We can move the assert out of try-block if we don't use the try resource auto-close pattern, but I don't think it would be better. Also, we assert within try-block for the `LogCaptureAppender` in other places. I think they are all for the same reason as I mentioned above. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on a change in pull request #9121: URL: https://github.com/apache/kafka/pull/9121#discussion_r475362316 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws IOException { assertEquals(expected, offsets); } +@Test +public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws IOException { +final Map offsets = Collections.singletonMap(t1, 25L); +stateManager.initialize(); +stateManager.updateChangelogOffsets(offsets); + +final File file = new File(stateDirectory.globalStateDir(), StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp"); +file.createNewFile(); +// set the checkpoint tmp file to read-only to simulate the IOException situation +file.setWritable(false); + +try (final LogCaptureAppender appender = + LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) { + +// checkpoint should fail due to the file is readonly +stateManager.checkpoint(); +assertThat(appender.getMessages(), hasItem(containsString( +"Failed to write offset checkpoint file to " + checkpointFile.getPath() + " for global stores"))); Review comment: I agree it'll be better to do the assertion after the try-block. But no, we can't move the assert out of the try-block because the `appender` is declared within try block. We can move the assert out of try-block if we don't use the try resource auto-close pattern, but I don't think it would be better. Also, we assert within try-block for the `LogCaptureAppender` in other places. I think they are all for the same reason as I mentioned above. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint
showuon commented on a change in pull request #9121: URL: https://github.com/apache/kafka/pull/9121#discussion_r475358692 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java ## @@ -135,6 +136,20 @@ public void shouldThrowOnInvalidOffsetInWrite() throws IOException { } } +@Test +public void shouldThrowIOExceptionWhenWritingToNotExistedFile() { +final Map offsetsToWrite = Collections.singletonMap( +new TopicPartition(topic, 0), 0L); + +// create a file with not existed path, and feed into OffsetCheckpoint Review comment: agree! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org