Hangleton commented on code in PR #12331: URL: https://github.com/apache/kafka/pull/12331#discussion_r1183474170
########## raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java: ########## @@ -139,16 +139,17 @@ private void writeElectionStateToFile(final File stateFile, QuorumStateData stat log.trace("Writing tmp quorum state {}", temp.getAbsolutePath()); - try (final FileOutputStream fileOutputStream = new FileOutputStream(temp); - final BufferedWriter writer = new BufferedWriter( - new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) { + final OpenOption[] options = {StandardOpenOption.WRITE, + StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE}; + + try (BufferedWriter writer = Files.newBufferedWriter(temp.toPath(), StandardCharsets.UTF_8, options)) { short version = state.highestSupportedVersion(); ObjectNode jsonState = (ObjectNode) QuorumStateDataJsonConverter.write(state, version); jsonState.set(DATA_VERSION, new ShortNode(version)); writer.write(jsonState.toString()); writer.flush(); - fileOutputStream.getFD().sync(); + writer.close(); Review Comment: Note that the try-with-resource also invokes `close`. ########## server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java: ########## @@ -74,13 +74,28 @@ public CheckpointFile(File file, public void write(Collection<T> entries) throws IOException { synchronized (lock) { + final OpenOption[] options = {StandardOpenOption.WRITE, + StandardOpenOption.CREATE, StandardOpenOption.SPARSE}; + // write to temp file and then swap with the existing file - try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile()); - BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) { - CheckpointWriteBuffer<T> checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter); - checkpointWriteBuffer.write(entries); + + try (BufferedWriter writer = Files.newBufferedWriter(tempPath, StandardCharsets.UTF_8, options)) { + // Write the version + writer.write(Integer.toString(version)); + writer.newLine(); + + // Write the entries count + writer.write(Integer.toString(entries.size())); + writer.newLine(); + + // Write each entry on a new line. + for (T entry : entries) { + writer.write(formatter.toString(entry)); + writer.newLine(); + } + writer.flush(); - fileOutputStream.getFD().sync(); + writer.close(); Review Comment: Ditto. ########## clients/src/main/java/org/apache/kafka/common/record/FileRecords.java: ########## @@ -461,13 +460,11 @@ private static FileChannel openChannel(File file, int initFileSize, boolean preallocate) throws IOException { if (mutable) { - if (fileAlreadyExists || !preallocate) { + if (preallocate && !fileAlreadyExists) { Review Comment: Why negate the condition and swap the statements from the if and else clauses? ########## server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java: ########## @@ -74,13 +74,28 @@ public CheckpointFile(File file, public void write(Collection<T> entries) throws IOException { synchronized (lock) { + final OpenOption[] options = {StandardOpenOption.WRITE, + StandardOpenOption.CREATE, StandardOpenOption.SPARSE}; Review Comment: Should `SPARSE` always be set? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/AbstractIndex.java: ########## @@ -86,27 +87,42 @@ public AbstractIndex(File file, long baseOffset, int maxIndexSize, boolean writa private void createAndAssignMmap() throws IOException { boolean newlyCreated = file.createNewFile(); - RandomAccessFile raf; + FileChannel channel; if (writable) - raf = new RandomAccessFile(file, "rw"); + channel = FileChannel.open(file.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.SPARSE); else - raf = new RandomAccessFile(file, "r"); + channel = FileChannel.open(file.toPath(), StandardOpenOption.READ); try { /* pre-allocate the file if necessary */ if (newlyCreated) { if (maxIndexSize < entrySize()) throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize); - raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize())); + + int size = roundDownToExactMultiple(maxIndexSize, entrySize()); + Utils.preallocateFile(channel, size); } - long length = raf.length(); - MappedByteBuffer mmap = createMappedBuffer(raf, newlyCreated, length, writable, entrySize()); + long length = channel.size(); + MappedByteBuffer mmap; + + if (writable) Review Comment: Why move this code from `createMappedBuffer`? ########## server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java: ########## @@ -74,13 +74,28 @@ public CheckpointFile(File file, public void write(Collection<T> entries) throws IOException { synchronized (lock) { + final OpenOption[] options = {StandardOpenOption.WRITE, + StandardOpenOption.CREATE, StandardOpenOption.SPARSE}; + // write to temp file and then swap with the existing file - try (FileOutputStream fileOutputStream = new FileOutputStream(tempPath.toFile()); - BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) { - CheckpointWriteBuffer<T> checkpointWriteBuffer = new CheckpointWriteBuffer<>(writer, version, formatter); Review Comment: Why moving away from the `CheckpointWriteBuffer`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org