kowshik commented on a change in pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#discussion_r759942807



##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1498,25 +1498,25 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     producerStateManager.takeSnapshot()
     updateHighWatermarkWithLogEndOffset()
     // Schedule an asynchronous flush of the old segment
-    scheduler.schedule("flush-log", () => flush(newSegment.baseOffset))
+    scheduler.schedule("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
     newSegment
   }
 
   /**
    * Flush all local log segments
    *
-   * @param inclusive should be true during a clean shutdown, and false 
otherwise. The reason is that
+   * @param forceFlushActiveSegment should be true during a clean shutdown, 
and false otherwise. The reason is that
    * we have to pass logEngOffset + 1 to the `localLog.flush(offset: Long): 
Unit` function to flush empty
    * active segments, which is important to make sure we don't lose the empty 
index file during shutdown.

Review comment:
       IIUC the main reason we would like to force flush the active segment is 
slightly different.
   Instead of `which is important to make sure we don't lose the empty index 
file during shutdown.`, could the comment say `which is important to make sure 
we persist the active segment file during shutdown, particularly when its 
empty.` ? 

##########
File path: core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
##########
@@ -1630,6 +1630,20 @@ class UnifiedLogTest {
     assertThrows(classOf[OffsetOutOfRangeException], () => 
LogTestUtils.readLog(log, 1026, 1000))
   }
 
+  @Test
+  def testFlushingEmptyActiveSegments(): Unit = {

Review comment:
       Sounds good.

##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1498,25 +1498,25 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     producerStateManager.takeSnapshot()
     updateHighWatermarkWithLogEndOffset()
     // Schedule an asynchronous flush of the old segment
-    scheduler.schedule("flush-log", () => flush(newSegment.baseOffset))
+    scheduler.schedule("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
     newSegment
   }
 
   /**
    * Flush all local log segments
    *
-   * @param inclusive should be true during a clean shutdown, and false 
otherwise. The reason is that
+   * @param forceFlushActiveSegment should be true during a clean shutdown, 
and false otherwise. The reason is that
    * we have to pass logEngOffset + 1 to the `localLog.flush(offset: Long): 
Unit` function to flush empty

Review comment:
       s/logEngOffset/logEndOffset




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to