This is an automated email from the ASF dual-hosted git repository. jsancio pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 7b669e8806c KAFKA-14273; Close file before atomic move (#14354) 7b669e8806c is described below commit 7b669e8806ce9d122233afeec03eb4e15bde808a Author: José Armando García Sancio <jsan...@users.noreply.github.com> AuthorDate: Thu Sep 7 16:17:03 2023 -0700 KAFKA-14273; Close file before atomic move (#14354) In the Windows OS atomic move are not allowed if the file has another open handle. E.g __cluster_metadata-0\quorum-state: The process cannot access the file because it is being used by another process at java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) at java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403) at java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293) at java.base/java.nio.file.Files.move(Files.java:1430) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:949) at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:932) at org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152) This is fixed by first closing the temporary quorum-state file before attempting to move it. Reviewers: Colin Patrick McCabe <cmcc...@apache.org> Co-Authored-By: Renaldo Baur Filho <renald...@gmail.com> --- .../org/apache/kafka/raft/FileBasedStateStore.java | 31 +++++++++++++--------- .../apache/kafka/raft/FileBasedStateStoreTest.java | 16 +++++++++++ 2 files changed, 35 insertions(+), 12 deletions(-) diff --git a/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java b/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java index d567b15ece3..019fd147179 100644 --- a/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java +++ b/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java @@ -64,6 +64,7 @@ public class FileBasedStateStore implements QuorumStateStore { private final File stateFile; static final String DATA_VERSION = "data_version"; + static final short HIGHEST_SUPPORTED_VERSION = 0; public FileBasedStateStore(final File stateFile) { this.stateFile = stateFile; @@ -144,21 +145,27 @@ public class FileBasedStateStore implements QuorumStateStore { log.trace("Writing tmp quorum state {}", temp.getAbsolutePath()); - try (final FileOutputStream fileOutputStream = new FileOutputStream(temp); - final BufferedWriter writer = new BufferedWriter( - new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) { - short version = state.highestSupportedVersion(); - - ObjectNode jsonState = (ObjectNode) QuorumStateDataJsonConverter.write(state, version); - jsonState.set(DATA_VERSION, new ShortNode(version)); - writer.write(jsonState.toString()); - writer.flush(); - fileOutputStream.getFD().sync(); + try { + try (final FileOutputStream fileOutputStream = new FileOutputStream(temp); + final BufferedWriter writer = new BufferedWriter( + new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8) + ) + ) { + ObjectNode jsonState = (ObjectNode) QuorumStateDataJsonConverter.write(state, HIGHEST_SUPPORTED_VERSION); + jsonState.set(DATA_VERSION, new ShortNode(HIGHEST_SUPPORTED_VERSION)); + writer.write(jsonState.toString()); + writer.flush(); + fileOutputStream.getFD().sync(); + } Utils.atomicMoveWithFallback(temp.toPath(), stateFile.toPath()); } catch (IOException e) { throw new UncheckedIOException( - String.format("Error while writing the Quorum status from the file %s", - stateFile.getAbsolutePath()), e); + String.format( + "Error while writing the Quorum status from the file %s", + stateFile.getAbsolutePath() + ), + e + ); } finally { // cleanup the temp file when the write finishes (either success or fail). deleteFileIfExists(temp); diff --git a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java index 841991f8d63..66bb9c3a15d 100644 --- a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java @@ -19,7 +19,9 @@ package org.apache.kafka.raft; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.protocol.types.TaggedFields; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.raft.generated.QuorumStateData; import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.AfterEach; @@ -107,6 +109,20 @@ public class FileBasedStateStoreTest { assertCantReadQuorumStateVersion(jsonString); } + @Test + public void testSupportedVersion() { + // If the next few checks fail, please check that they are compatible with previous releases of KRaft + + // Check that FileBasedStateStore supports the latest version + assertEquals(FileBasedStateStore.HIGHEST_SUPPORTED_VERSION, QuorumStateData.HIGHEST_SUPPORTED_VERSION); + // Check that the supported versions haven't changed + assertEquals(0, QuorumStateData.HIGHEST_SUPPORTED_VERSION); + assertEquals(0, QuorumStateData.LOWEST_SUPPORTED_VERSION); + // For the latest version check that the number of tagged fields hasn't changed + TaggedFields taggedFields = (TaggedFields) QuorumStateData.SCHEMA_0.get(6).def.type; + assertEquals(0, taggedFields.numFields()); + } + public void assertCantReadQuorumStateVersion(String jsonString) throws IOException { final File stateFile = TestUtils.tempFile(); stateStore = new FileBasedStateStore(stateFile);