junrao commented on a change in pull request #10405:
URL: https://github.com/apache/kafka/pull/10405#discussion_r602428120



##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -249,6 +266,7 @@ public void renameTo(File f) throws IOException {
         } finally {
             this.file = f;
         }
+        needFlushParentDir.set(true);

Review comment:
       Hmm, this seems problematic. For example, when we do log cleaning, the 
steps are (1) write cleaned data to a new segment with .clean suffix; (2) flush 
the new segment; (3) rename the .clean file to .swap; (4) rename .swap to .log. 
There is no additional flush called after renaming. So, this flag won't trigger 
the flushing of the parent directory. 
   
   One way is to add a method that explicitly forces the flushing of the parent 
directory after renaming and add the call after step 4.
   
   Also, it seems that we also need the logic to flush the parent directory of 
topic-partition. This is needed when new topic partition is added/deleted in a 
broker or when moving partition across disks in JBOD. The latter has the 
following steps: (1) copy log segment in directory topic-partition in one disk 
to directory topic-partition-future in another disk; (2) once the copying is 
done, rename topic-partition-future to topic-partition. Here, after step (2) it 
seems that we need the logic to flush the parent directory in both the old and 
the new disk.

##########
File path: clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
##########
@@ -427,7 +445,7 @@ public static FileRecords open(File file,
                                    boolean preallocate) throws IOException {
         FileChannel channel = openChannel(file, mutable, fileAlreadyExists, 
initFileSize, preallocate);
         int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE;
-        return new FileRecords(file, channel, 0, end, false);
+        return new FileRecords(file, channel, 0, end, false, mutable && 
!fileAlreadyExists);

Review comment:
       The condition `mutable && !fileAlreadyExists`doesn't seem complete. When 
a broker is restarted, all existing log segments are opened with mutable and 
fileAlreadyExists. However, segments beyond the recovery point may not have 
been flushed before. When they are flushed, we need to also flush the parent 
directory.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to