Hangleton commented on code in PR #12331:
URL: https://github.com/apache/kafka/pull/12331#discussion_r1183474170


##########
raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java:
##########
@@ -139,16 +139,17 @@ private void writeElectionStateToFile(final File 
stateFile, QuorumStateData stat
 
         log.trace("Writing tmp quorum state {}", temp.getAbsolutePath());
 
-        try (final FileOutputStream fileOutputStream = new 
FileOutputStream(temp);
-             final BufferedWriter writer = new BufferedWriter(
-                 new OutputStreamWriter(fileOutputStream, 
StandardCharsets.UTF_8))) {
+        final OpenOption[] options = {StandardOpenOption.WRITE,
+            StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE};
+
+        try (BufferedWriter writer = Files.newBufferedWriter(temp.toPath(), 
StandardCharsets.UTF_8, options)) {
             short version = state.highestSupportedVersion();
 
             ObjectNode jsonState = (ObjectNode) 
QuorumStateDataJsonConverter.write(state, version);
             jsonState.set(DATA_VERSION, new ShortNode(version));
             writer.write(jsonState.toString());
             writer.flush();
-            fileOutputStream.getFD().sync();
+            writer.close();

Review Comment:
   Note that the try-with-resource also invokes `close`.



##########
server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java:
##########
@@ -74,13 +74,28 @@ public CheckpointFile(File file,
 
     public void write(Collection<T> entries) throws IOException {
         synchronized (lock) {
+            final OpenOption[] options = {StandardOpenOption.WRITE,
+                StandardOpenOption.CREATE, StandardOpenOption.SPARSE};
+
             // write to temp file and then swap with the existing file
-            try (FileOutputStream fileOutputStream = new 
FileOutputStream(tempPath.toFile());
-                 BufferedWriter writer = new BufferedWriter(new 
OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
-                CheckpointWriteBuffer<T> checkpointWriteBuffer = new 
CheckpointWriteBuffer<>(writer, version, formatter);
-                checkpointWriteBuffer.write(entries);
+
+            try (BufferedWriter writer = Files.newBufferedWriter(tempPath, 
StandardCharsets.UTF_8, options)) {
+                // Write the version
+                writer.write(Integer.toString(version));
+                writer.newLine();
+
+                // Write the entries count
+                writer.write(Integer.toString(entries.size()));
+                writer.newLine();
+
+                // Write each entry on a new line.
+                for (T entry : entries) {
+                    writer.write(formatter.toString(entry));
+                    writer.newLine();
+                }
+
                 writer.flush();
-                fileOutputStream.getFD().sync();
+                writer.close();

Review Comment:
   Ditto.



##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -461,13 +460,11 @@ private static FileChannel openChannel(File file,
                                            int initFileSize,
                                            boolean preallocate) throws 
IOException {
         if (mutable) {
-            if (fileAlreadyExists || !preallocate) {
+            if (preallocate && !fileAlreadyExists) {

Review Comment:
   Why negate the condition and swap the statements from the if and else 
clauses?



##########
server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java:
##########
@@ -74,13 +74,28 @@ public CheckpointFile(File file,
 
     public void write(Collection<T> entries) throws IOException {
         synchronized (lock) {
+            final OpenOption[] options = {StandardOpenOption.WRITE,
+                StandardOpenOption.CREATE, StandardOpenOption.SPARSE};

Review Comment:
   Should `SPARSE` always be set?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java:
##########
@@ -86,27 +87,42 @@ public AbstractIndex(File file, long baseOffset, int 
maxIndexSize, boolean writa
 
     private void createAndAssignMmap() throws IOException {
         boolean newlyCreated = file.createNewFile();
-        RandomAccessFile raf;
+        FileChannel channel;
         if (writable)
-            raf = new RandomAccessFile(file, "rw");
+            channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, 
StandardOpenOption.WRITE, StandardOpenOption.SPARSE);
         else
-            raf = new RandomAccessFile(file, "r");
+            channel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
 
         try {
             /* pre-allocate the file if necessary */
             if (newlyCreated) {
                 if (maxIndexSize < entrySize())
                     throw new IllegalArgumentException("Invalid max index 
size: " + maxIndexSize);
-                raf.setLength(roundDownToExactMultiple(maxIndexSize, 
entrySize()));
+
+                int size = roundDownToExactMultiple(maxIndexSize, entrySize());
+                Utils.preallocateFile(channel, size);
             }
 
-            long length = raf.length();
-            MappedByteBuffer mmap = createMappedBuffer(raf, newlyCreated, 
length, writable, entrySize());
+            long length = channel.size();
+            MappedByteBuffer mmap;
+
+            if (writable)

Review Comment:
   Why move this code from `createMappedBuffer`?



##########
server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java:
##########
@@ -74,13 +74,28 @@ public CheckpointFile(File file,
 
     public void write(Collection<T> entries) throws IOException {
         synchronized (lock) {
+            final OpenOption[] options = {StandardOpenOption.WRITE,
+                StandardOpenOption.CREATE, StandardOpenOption.SPARSE};
+
             // write to temp file and then swap with the existing file
-            try (FileOutputStream fileOutputStream = new 
FileOutputStream(tempPath.toFile());
-                 BufferedWriter writer = new BufferedWriter(new 
OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
-                CheckpointWriteBuffer<T> checkpointWriteBuffer = new 
CheckpointWriteBuffer<>(writer, version, formatter);

Review Comment:
   Why moving away from the `CheckpointWriteBuffer`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to