This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7b669e8806c KAFKA-14273; Close file before atomic move (#14354)
7b669e8806c is described below

commit 7b669e8806ce9d122233afeec03eb4e15bde808a
Author: José Armando García Sancio <jsan...@users.noreply.github.com>
AuthorDate: Thu Sep 7 16:17:03 2023 -0700

    KAFKA-14273; Close file before atomic move (#14354)
    
    In the Windows OS atomic move are not allowed if the file has another open 
handle. E.g
    
    __cluster_metadata-0\quorum-state: The process cannot access the file 
because it is being used by another process
            at 
java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
            at 
java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
            at 
java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:403)
            at 
java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:293)
            at java.base/java.nio.file.Files.move(Files.java:1430)
            at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:949)
            at 
org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:932)
            at 
org.apache.kafka.raft.FileBasedStateStore.writeElectionStateToFile(FileBasedStateStore.java:152)
    
    This is fixed by first closing the temporary quorum-state file before 
attempting to move it.
    
    Reviewers: Colin Patrick McCabe <cmcc...@apache.org>
    Co-Authored-By: Renaldo Baur Filho <renald...@gmail.com>
---
 .../org/apache/kafka/raft/FileBasedStateStore.java | 31 +++++++++++++---------
 .../apache/kafka/raft/FileBasedStateStoreTest.java | 16 +++++++++++
 2 files changed, 35 insertions(+), 12 deletions(-)

diff --git a/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java 
b/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
index d567b15ece3..019fd147179 100644
--- a/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
+++ b/raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
@@ -64,6 +64,7 @@ public class FileBasedStateStore implements QuorumStateStore {
     private final File stateFile;
 
     static final String DATA_VERSION = "data_version";
+    static final short HIGHEST_SUPPORTED_VERSION = 0;
 
     public FileBasedStateStore(final File stateFile) {
         this.stateFile = stateFile;
@@ -144,21 +145,27 @@ public class FileBasedStateStore implements 
QuorumStateStore {
 
         log.trace("Writing tmp quorum state {}", temp.getAbsolutePath());
 
-        try (final FileOutputStream fileOutputStream = new 
FileOutputStream(temp);
-             final BufferedWriter writer = new BufferedWriter(
-                 new OutputStreamWriter(fileOutputStream, 
StandardCharsets.UTF_8))) {
-            short version = state.highestSupportedVersion();
-
-            ObjectNode jsonState = (ObjectNode) 
QuorumStateDataJsonConverter.write(state, version);
-            jsonState.set(DATA_VERSION, new ShortNode(version));
-            writer.write(jsonState.toString());
-            writer.flush();
-            fileOutputStream.getFD().sync();
+        try {
+            try (final FileOutputStream fileOutputStream = new 
FileOutputStream(temp);
+                 final BufferedWriter writer = new BufferedWriter(
+                     new OutputStreamWriter(fileOutputStream, 
StandardCharsets.UTF_8)
+                 )
+            ) {
+                ObjectNode jsonState = (ObjectNode) 
QuorumStateDataJsonConverter.write(state, HIGHEST_SUPPORTED_VERSION);
+                jsonState.set(DATA_VERSION, new 
ShortNode(HIGHEST_SUPPORTED_VERSION));
+                writer.write(jsonState.toString());
+                writer.flush();
+                fileOutputStream.getFD().sync();
+            }
             Utils.atomicMoveWithFallback(temp.toPath(), stateFile.toPath());
         } catch (IOException e) {
             throw new UncheckedIOException(
-                String.format("Error while writing the Quorum status from the 
file %s",
-                    stateFile.getAbsolutePath()), e);
+                String.format(
+                    "Error while writing the Quorum status from the file %s",
+                    stateFile.getAbsolutePath()
+                ),
+                e
+            );
         } finally {
             // cleanup the temp file when the write finishes (either success 
or fail).
             deleteFileIfExists(temp);
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java 
b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
index 841991f8d63..66bb9c3a15d 100644
--- a/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/FileBasedStateStoreTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.raft;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.types.TaggedFields;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.raft.generated.QuorumStateData;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.AfterEach;
 
@@ -107,6 +109,20 @@ public class FileBasedStateStoreTest {
         assertCantReadQuorumStateVersion(jsonString);
     }
 
+    @Test
+    public void testSupportedVersion() {
+        // If the next few checks fail, please check that they are compatible 
with previous releases of KRaft
+
+        // Check that FileBasedStateStore supports the latest version
+        assertEquals(FileBasedStateStore.HIGHEST_SUPPORTED_VERSION, 
QuorumStateData.HIGHEST_SUPPORTED_VERSION);
+        // Check that the supported versions haven't changed
+        assertEquals(0, QuorumStateData.HIGHEST_SUPPORTED_VERSION);
+        assertEquals(0, QuorumStateData.LOWEST_SUPPORTED_VERSION);
+        // For the latest version check that the number of tagged fields 
hasn't changed
+        TaggedFields taggedFields = (TaggedFields) 
QuorumStateData.SCHEMA_0.get(6).def.type;
+        assertEquals(0, taggedFields.numFields());
+    }
+
     public void assertCantReadQuorumStateVersion(String jsonString) throws 
IOException {
         final File stateFile = TestUtils.tempFile();
         stateStore = new FileBasedStateStore(stateFile);

Reply via email to