This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 522263d1956 KAFKA-14273; Close file before atomic move (#14354)
522263d1956 is described below

commit 522263d1956231506e1afb660738484c4ce41667
Author: José Armando García Sancio <jsan...@users.noreply.github.com>
AuthorDate: Thu Sep 7 16:17:03 2023 -0700

    KAFKA-14273; Close file before atomic move (#14354)
    
    In the Windows OS atomic move are not allowed if the file has another open 
handle. E.g
    
    __cluster_metadata-0\quorum-state: The process cannot access the file 
because it is being used by another process
            at 
java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
            at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
            at 
java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403)
            at 
java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)
            at java.base/java.nio.file.Files.move(Files.java:1430)
            at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:949)
            at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:932)
            at 
org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
    
    This is fixed by first closing the temporary quorum-state file before 
attempting to move it.
    
    Reviewers: Colin Patrick McCabe <cmcc...@apache.org>
    Co-Authored-By: Renaldo Baur Filho <renald...@gmail.com>
---
 .../org/apache/kafka/raft/FileBasedStateStore.java | 31 +++++++++++++---------
 .../apache/kafka/raft/FileBasedStateStoreTest.java | 16 +++++++++++
 2 files changed, 35 insertions(+), 12 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java 
b/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
index e403613ccd5..b359493e506 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
@@ -63,6 +63,7 @@ public class FileBasedStateStore implements QuorumStateStore {
     private final File stateFile;
 
     static final String DATA_VERSION = "data_version";
+    static final short HIGHEST_SUPPORTED_VERSION = 0;
 
     public FileBasedStateStore(final File stateFile) {
         this.stateFile = stateFile;
@@ -139,21 +140,27 @@ public class FileBasedStateStore implements 
QuorumStateStore {
 
         log.trace("Writing tmp quorum state {}", temp.getAbsolutePath());
 
-        try (final FileOutputStream fileOutputStream = new 
FileOutputStream(temp);
-             final BufferedWriter writer = new BufferedWriter(
-                 new OutputStreamWriter(fileOutputStream, 
StandardCharsets.UTF_8))) {
-            short version = state.highestSupportedVersion();
-
-            ObjectNode jsonState = (ObjectNode) 
QuorumStateDataJsonConverter.write(state, version);
-            jsonState.set(DATA_VERSION, new ShortNode(version));
-            writer.write(jsonState.toString());
-            writer.flush();
-            fileOutputStream.getFD().sync();
+        try {
+            try (final FileOutputStream fileOutputStream = new 
FileOutputStream(temp);
+                 final BufferedWriter writer = new BufferedWriter(
+                     new OutputStreamWriter(fileOutputStream, 
StandardCharsets.UTF_8)
+                 )
+            ) {
+                ObjectNode jsonState = (ObjectNode) 
QuorumStateDataJsonConverter.write(state, HIGHEST_SUPPORTED_VERSION);
+                jsonState.set(DATA_VERSION, new 
ShortNode(HIGHEST_SUPPORTED_VERSION));
+                writer.write(jsonState.toString());
+                writer.flush();
+                fileOutputStream.getFD().sync();
+            }
             Utils.atomicMoveWithFallback(temp.toPath(), stateFile.toPath());
         } catch (IOException e) {
             throw new UncheckedIOException(
-                String.format("Error while writing the Quorum status from the 
file %s",
-                    stateFile.getAbsolutePath()), e);
+                String.format(
+                    "Error while writing the Quorum status from the file %s",
+                    stateFile.getAbsolutePath()
+                ),
+                e
+            );
         } finally {
             // cleanup the temp file when the write finishes (either success 
or fail).
             deleteFileIfExists(temp);
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java 
b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
index 5fa4f5c6880..d5a6570ef61 100644
--- a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.raft;
 
+import org.apache.kafka.common.protocol.types.TaggedFields;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.generated.QuorumStateData;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 
@@ -90,6 +92,20 @@ public class FileBasedStateStoreTest {
         assertFalse(stateFile.exists());
     }
 
+    @Test
+    public void testSupportedVersion() {
+        // If the next few checks fail, please check that they are compatible 
with previous releases of KRaft
+
+        // Check that FileBasedStateStore supports the latest version
+        assertEquals(FileBasedStateStore.HIGHEST_SUPPORTED_VERSION, 
QuorumStateData.HIGHEST_SUPPORTED_VERSION);
+        // Check that the supported versions haven't changed
+        assertEquals(0, QuorumStateData.HIGHEST_SUPPORTED_VERSION);
+        assertEquals(0, QuorumStateData.LOWEST_SUPPORTED_VERSION);
+        // For the latest version check that the number of tagged fields 
hasn't changed
+        TaggedFields taggedFields = (TaggedFields) 
QuorumStateData.SCHEMA_0.get(6).def.type;
+        assertEquals(0, taggedFields.numFields());
+    }
+
     @AfterEach
     public void cleanup() throws IOException {
         if (stateStore != null) {

Reply via email to