ccding commented on a change in pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#discussion_r753568521



##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1505,21 +1505,35 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   /**
    * Flush all local log segments
    */
-  def flush(): Unit = flush(logEndOffset)
+  def flushUpToAndExcludingLogEndOffset(): Unit = flush(logEndOffset, false)
+
+  /**
+   * Flush all local log segments including possible empty active segment.
+   *
+   * We have to pass logEngOffset + 1 to the `localLog.flush(offset: Long): 
Unit` function to flush empty
+   * active segments, which is important to make sure we don't lose the empty 
index file during shutdown.
+   *
+   * This function should only be called during shutdown.
+   */
+  def flushUpToAndIncludingLogEndOffset(): Unit = flush(logEndOffset, true)
 
   /**
    * Flush local log segments for all offsets up to offset-1
    *
    * @param offset The offset to flush up to (non-inclusive); the new recovery 
point
    */
-  def flush(offset: Long): Unit = {
-    maybeHandleIOException(s"Error while flushing log for $topicPartition in 
dir ${dir.getParent} with offset $offset") {
-      if (offset > localLog.recoveryPoint) {
-        debug(s"Flushing log up to offset $offset, last flushed: 
$lastFlushTime,  current time: ${time.milliseconds()}, " +
+  def flush(offset: Long): Unit = flush(offset, false)
+
+  private def flush(offset: Long, includingOffset: Boolean): Unit = {
+    val flushOffset = if (includingOffset) offset + 1  else offset
+    val recoveryPoint = offset
+    maybeHandleIOException(s"Error while flushing log for $topicPartition in 
dir ${dir.getParent} with offset $flushOffset") {
+      if (flushOffset > localLog.recoveryPoint) {
+        debug(s"Flushing log up to offset $flushOffset, last flushed: 
$lastFlushTime,  current time: ${time.milliseconds()}, " +

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to