divijvaidya commented on code in PR #12331:
URL: https://github.com/apache/kafka/pull/12331#discussion_r908259484


##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -461,14 +463,20 @@ private static FileChannel openChannel(File file,
                                            int initFileSize,
                                            boolean preallocate) throws 
IOException {
         if (mutable) {
-            if (fileAlreadyExists || !preallocate) {
-                return FileChannel.open(file.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.READ,
-                        StandardOpenOption.WRITE);
-            } else {
-                RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"rw");
-                randomAccessFile.setLength(initFileSize);
-                return randomAccessFile.getChannel();
+            if (preallocate && !fileAlreadyExists) {
+                               final OpenOption[] options = { 
StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW, 
StandardOpenOption.SPARSE };
+                               try (final SeekableByteChannel channel = 
Files.newByteChannel(file.toPath(), options)) {
+                                       
channel.position(initFileSize-Integer.BYTES);
+
+                                       final ByteBuffer buffer = 
ByteBuffer.allocate(Integer.BYTES).putInt(0);
+                                       buffer.rewind();
+                                       channel.write(buffer);
+                               }
             }
+            /* A separate open call is needed even when having a 
RandomAccessFile

Review Comment:
   this comment might need modification now since we are not using 
RandomAccessFile anymore.



##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -461,14 +463,20 @@ private static FileChannel openChannel(File file,
                                            int initFileSize,
                                            boolean preallocate) throws 
IOException {
         if (mutable) {
-            if (fileAlreadyExists || !preallocate) {
-                return FileChannel.open(file.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.READ,
-                        StandardOpenOption.WRITE);
-            } else {
-                RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"rw");
-                randomAccessFile.setLength(initFileSize);
-                return randomAccessFile.getChannel();
+            if (preallocate && !fileAlreadyExists) {
+                               final OpenOption[] options = { 
StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW, 
StandardOpenOption.SPARSE };
+                               try (final SeekableByteChannel channel = 
Files.newByteChannel(file.toPath(), options)) {
+                                       
channel.position(initFileSize-Integer.BYTES);
+
+                                       final ByteBuffer buffer = 
ByteBuffer.allocate(Integer.BYTES).putInt(0);
+                                       buffer.rewind();
+                                       channel.write(buffer);
+                               }

Review Comment:
   This block of code to create an empty file of a particular size could be 
moved into utility classes which contain other file handling helper methods 
such as `org.apache.kafka.common.utils.Utils.java`
   
   You could name it: `FileChannel Utils.createPreallocatedFile(Path, Size)`



##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -461,14 +463,20 @@ private static FileChannel openChannel(File file,
                                            int initFileSize,
                                            boolean preallocate) throws 
IOException {
         if (mutable) {
-            if (fileAlreadyExists || !preallocate) {
-                return FileChannel.open(file.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.READ,
-                        StandardOpenOption.WRITE);
-            } else {
-                RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"rw");
-                randomAccessFile.setLength(initFileSize);
-                return randomAccessFile.getChannel();
+            if (preallocate && !fileAlreadyExists) {
+                               final OpenOption[] options = { 
StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW, 
StandardOpenOption.SPARSE };

Review Comment:
   nit
   indentation



##########
clients/src/main/java/org/apache/kafka/common/record/FileRecords.java:
##########
@@ -461,14 +461,15 @@ private static FileChannel openChannel(File file,
                                            int initFileSize,
                                            boolean preallocate) throws 
IOException {
         if (mutable) {
-            if (fileAlreadyExists || !preallocate) {
-                return FileChannel.open(file.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.READ,
-                        StandardOpenOption.WRITE);
-            } else {
-                RandomAccessFile randomAccessFile = new RandomAccessFile(file, 
"rw");
-                randomAccessFile.setLength(initFileSize);
-                return randomAccessFile.getChannel();
+            if (preallocate && !fileAlreadyExists) {
+                try (RandomAccessFile randomAccessFile = new 
RandomAccessFile(file, "rw")) {

Review Comment:
   I might not have made myself clear with the previous comment. Let me try to 
clarify. `FileChannel` is an implementation of `SeekableByteChannel`. In the 
current code you are opening the file twice when preallocate flag is on. Once 
to fill it up with dummy data and second to return a channel. We need to do it 
only once.
   
   You can use `FileChannel` once to replace the data as well as return 
outside. You won't have to open twice.
   
   The code change will look like:
   ```
   if (preallocate && !fileAlreadyExists) {
     final OpenOption[] options = { StandardOpenOption.READ, 
StandardOpenOption.WRITE, 
     StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE };
     final FileChannel channel = FileChannel.open(file.toPath(), options))
     channel.position(initFileSize-Integer.BYTES);
     final ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES).putInt(0);
     buffer.rewind();
     channel.write(buffer);
     //TODO: reset the position()
     return channel;
   } else {
     return FileChannel.open(file.toPath(), StandardOpenOption.CREATE, 
StandardOpenOption.READ,
                           StandardOpenOption.WRITE);
   }
   ```



##########
core/src/main/scala/kafka/log/AbstractIndex.scala:
##########
@@ -108,22 +109,33 @@ abstract class AbstractIndex(@volatile private var _file: 
File, val baseOffset:
   @volatile
   protected var mmap: MappedByteBuffer = {
     val newlyCreated = file.createNewFile()
-    val raf = if (writable) new RandomAccessFile(file, "rw") else new 
RandomAccessFile(file, "r")
-    try {
-      /* pre-allocate the file if necessary */
-      if(newlyCreated) {
-        if(maxIndexSize < entrySize)
-          throw new IllegalArgumentException("Invalid max index size: " + 
maxIndexSize)
+    /* pre-allocate the file if necessary */
+    if(newlyCreated) {
+      if(maxIndexSize < entrySize)
+        throw new IllegalArgumentException("Invalid max index size: " + 
maxIndexSize)
+      val raf = if (writable) new RandomAccessFile(file, "rw") else new 
RandomAccessFile(file, "r")

Review Comment:
   You can replace RandomAccessFile here with FileChannel just as we did at the 
other place and hence, eliminating the need for opening the file twice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to