divijvaidya commented on code in PR #12331: URL: https://github.com/apache/kafka/pull/12331#discussion_r908259484
########## clients/src/main/java/org/apache/kafka/common/record/FileRecords.java: ########## @@ -461,14 +463,20 @@ private static FileChannel openChannel(File file, int initFileSize, boolean preallocate) throws IOException { if (mutable) { - if (fileAlreadyExists || !preallocate) { - return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, - StandardOpenOption.WRITE); - } else { - RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); - randomAccessFile.setLength(initFileSize); - return randomAccessFile.getChannel(); + if (preallocate && !fileAlreadyExists) { + final OpenOption[] options = { StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE }; + try (final SeekableByteChannel channel = Files.newByteChannel(file.toPath(), options)) { + channel.position(initFileSize-Integer.BYTES); + + final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES).putInt(0); + buffer.rewind(); + channel.write(buffer); + } } + /* A separate open call is needed even when having a RandomAccessFile Review Comment: this comment might need modification now since we are not using RandomAccessFile anymore. ########## clients/src/main/java/org/apache/kafka/common/record/FileRecords.java: ########## @@ -461,14 +463,20 @@ private static FileChannel openChannel(File file, int initFileSize, boolean preallocate) throws IOException { if (mutable) { - if (fileAlreadyExists || !preallocate) { - return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, - StandardOpenOption.WRITE); - } else { - RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); - randomAccessFile.setLength(initFileSize); - return randomAccessFile.getChannel(); + if (preallocate && !fileAlreadyExists) { + final OpenOption[] options = { StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE }; + try (final SeekableByteChannel channel = Files.newByteChannel(file.toPath(), options)) { + channel.position(initFileSize-Integer.BYTES); + + final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES).putInt(0); + buffer.rewind(); + channel.write(buffer); + } Review Comment: This block of code to create an empty file of a particular size could be moved into utility classes which contain other file handling helper methods such as `org.apache.kafka.common.utils.Utils.java` You could name it: `FileChannel Utils.createPreallocatedFile(Path, Size)` ########## clients/src/main/java/org/apache/kafka/common/record/FileRecords.java: ########## @@ -461,14 +463,20 @@ private static FileChannel openChannel(File file, int initFileSize, boolean preallocate) throws IOException { if (mutable) { - if (fileAlreadyExists || !preallocate) { - return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, - StandardOpenOption.WRITE); - } else { - RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); - randomAccessFile.setLength(initFileSize); - return randomAccessFile.getChannel(); + if (preallocate && !fileAlreadyExists) { + final OpenOption[] options = { StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE }; Review Comment: nit indentation ########## clients/src/main/java/org/apache/kafka/common/record/FileRecords.java: ########## @@ -461,14 +461,15 @@ private static FileChannel openChannel(File file, int initFileSize, boolean preallocate) throws IOException { if (mutable) { - if (fileAlreadyExists || !preallocate) { - return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, - StandardOpenOption.WRITE); - } else { - RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw"); - randomAccessFile.setLength(initFileSize); - return randomAccessFile.getChannel(); + if (preallocate && !fileAlreadyExists) { + try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw")) { Review Comment: I might not have made myself clear with the previous comment. Let me try to clarify. `FileChannel` is an implementation of `SeekableByteChannel`. In the current code you are opening the file twice when preallocate flag is on. Once to fill it up with dummy data and second to return a channel. We need to do it only once. You can use `FileChannel` once to replace the data as well as return outside. You won't have to open twice. The code change will look like: ``` if (preallocate && !fileAlreadyExists) { final OpenOption[] options = { StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE }; final FileChannel channel = FileChannel.open(file.toPath(), options)) channel.position(initFileSize-Integer.BYTES); final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES).putInt(0); buffer.rewind(); channel.write(buffer); //TODO: reset the position() return channel; } else { return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE); } ``` ########## core/src/main/scala/kafka/log/AbstractIndex.scala: ########## @@ -108,22 +109,33 @@ abstract class AbstractIndex(@volatile private var _file: File, val baseOffset: @volatile protected var mmap: MappedByteBuffer = { val newlyCreated = file.createNewFile() - val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r") - try { - /* pre-allocate the file if necessary */ - if(newlyCreated) { - if(maxIndexSize < entrySize) - throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) + /* pre-allocate the file if necessary */ + if(newlyCreated) { + if(maxIndexSize < entrySize) + throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize) + val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r") Review Comment: You can replace RandomAccessFile here with FileChannel just as we did at the other place and hence, eliminating the need for opening the file twice. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org