hachikuji commented on a change in pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#discussion_r762370413



##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1498,28 +1498,44 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     producerStateManager.takeSnapshot()
     updateHighWatermarkWithLogEndOffset()
     // Schedule an asynchronous flush of the old segment
-    scheduler.schedule("flush-log", () => flush(newSegment.baseOffset))
+    scheduler.schedule("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
     newSegment
   }
 
   /**
    * Flush all local log segments
+   *
+   * @param forceFlushActiveSegment should be true during a clean shutdown, 
and false otherwise. The reason is that
+   * we have to pass logEndOffset + 1 to the `localLog.flush(offset: Long): 
Unit` function to flush empty
+   * active segments, which is important to make sure we persist the active 
segment file during shutdown, particularly
+   * when it's empty.
    */
-  def flush(): Unit = flush(logEndOffset)
+  def flush(forceFlushActiveSegment: Boolean): Unit = flush(logEndOffset, 
forceFlushActiveSegment)

Review comment:
       @kowshik What I am suggesting is that we could do the inclusive flush in 
all cases in order to simplify the API if the performance benefit is minimal. 
In addition to the simpler API, it would fix the problem of losing track of the 
log end offset in the general case (not just for clean shutdown).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to