[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r475393747



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");

Review comment:
   It's because we we will write data to the .tmp file first and then swap 
to the CHECKPOINT_FILE. And in the swap action, we use `Files.move` with 
`ATOMIC_MOVE` option, which will try to replace the target file if exists. I 
cannot create `IOException` with this case. I added comments for this line to 
explain the reason. Thank you.
   ref: 
https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#move-java.nio.file.Path-java.nio.file.Path-java.nio.file.CopyOption...-





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r475393747



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");

Review comment:
   It's because we we will write data to the .tmp file first and then swap 
to the CHECKPOINT_FILE. And in the swap action, we use `Files.move` with 
`ATOMIC_MOVE` option, which will try to replace the target file if exists. I 
cannot create `IOException` case with this case. I added comments for this line 
to explain the reason. Thank you.
   ref: 
https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#move-java.nio.file.Path-java.nio.file.Path-java.nio.file.CopyOption...-





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r475393747



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");

Review comment:
   It's because we we will write data to the .tmp file first and then swap 
to the CHECKPOINT_FILE. And in the swap action, we use `Files.move` with 
`ATOMIC_MOVE` option, which will try to replace the target file if exists. I 
added comments for this line to explain the reason. Thank you.
   ref: 
https://docs.oracle.com/javase/8/docs/api/java/nio/file/Files.html#move-java.nio.file.Path-java.nio.file.Path-java.nio.file.CopyOption...-





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r475362316



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the 
IOException situation
+file.setWritable(false);
+
+try (final LogCaptureAppender appender =
+ 
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
+
+// checkpoint should fail due to the file is readonly
+stateManager.checkpoint();
+assertThat(appender.getMessages(), hasItem(containsString(
+"Failed to write offset checkpoint file to " + 
checkpointFile.getPath() + " for global stores")));

Review comment:
   I agree it'll be better to do the assertion after the try-block. But no, 
we can't move the assert out of the try-block because the `appender` is 
declared within try block. We can move the assert out of try-block if we don't 
use the try resource auto-close pattern, but I don't think it would be better. 
   Also, we assert within try-block for the `appender` tests in other places. I 
think they are all for the same reason as I mentioned above. 
   Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r475362316



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the 
IOException situation
+file.setWritable(false);
+
+try (final LogCaptureAppender appender =
+ 
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
+
+// checkpoint should fail due to the file is readonly
+stateManager.checkpoint();
+assertThat(appender.getMessages(), hasItem(containsString(
+"Failed to write offset checkpoint file to " + 
checkpointFile.getPath() + " for global stores")));

Review comment:
   No, we can't move the assert out of the try-block because the `appender` 
is declared within try block. We can move the assert out of try-block if we 
don't use the try resource auto-close pattern, but I don't think it would be 
better. 
   Also, we assert within try-block for the `LogCaptureAppender` in other 
places. I think they are all for the same reason as I mentioned above. 
   Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-24 Thread GitBox


showuon commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r475362316



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -180,6 +183,27 @@ public void shouldReadCheckpointOffsets() throws 
IOException {
 assertEquals(expected, offsets);
 }
 
+@Test
+public void shouldLogWarningMessageWhenIOExceptionInCheckPoint() throws 
IOException {
+final Map offsets = Collections.singletonMap(t1, 
25L);
+stateManager.initialize();
+stateManager.updateChangelogOffsets(offsets);
+
+final File file = new File(stateDirectory.globalStateDir(), 
StateManagerUtil.CHECKPOINT_FILE_NAME + ".tmp");
+file.createNewFile();
+// set the checkpoint tmp file to read-only to simulate the 
IOException situation
+file.setWritable(false);
+
+try (final LogCaptureAppender appender =
+ 
LogCaptureAppender.createAndRegister(GlobalStateManagerImpl.class)) {
+
+// checkpoint should fail due to the file is readonly
+stateManager.checkpoint();
+assertThat(appender.getMessages(), hasItem(containsString(
+"Failed to write offset checkpoint file to " + 
checkpointFile.getPath() + " for global stores")));

Review comment:
   I agree it'll be better to do the assertion after the try-block. But no, 
we can't move the assert out of the try-block because the `appender` is 
declared within try block. We can move the assert out of try-block if we don't 
use the try resource auto-close pattern, but I don't think it would be better. 
   Also, we assert within try-block for the `LogCaptureAppender` in other 
places. I think they are all for the same reason as I mentioned above. 
   Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9121: KAFKA-10351: add tests for IOExceptions for GlobalStateManagerImpl/OffsetCheckpoint

2020-08-23 Thread GitBox


showuon commented on a change in pull request #9121:
URL: https://github.com/apache/kafka/pull/9121#discussion_r475358692



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
##
@@ -135,6 +136,20 @@ public void shouldThrowOnInvalidOffsetInWrite() throws 
IOException {
 }
 }
 
+@Test
+public void shouldThrowIOExceptionWhenWritingToNotExistedFile() {
+final Map offsetsToWrite = 
Collections.singletonMap(
+new TopicPartition(topic, 0), 0L);
+
+// create a file with not existed path, and feed into OffsetCheckpoint

Review comment:
   agree!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org