junrao commented on a change in pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#discussion_r753539711



##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1505,21 +1505,35 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   /**
    * Flush all local log segments
    */
-  def flush(): Unit = flush(logEndOffset)
+  def flushUpToAndExcludingLogEndOffset(): Unit = flush(logEndOffset, false)

Review comment:
       We could just have a single method flushToLogEndOffset(boolean 
inclusive)?

##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1505,21 +1505,35 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   /**
    * Flush all local log segments
    */
-  def flush(): Unit = flush(logEndOffset)
+  def flushUpToAndExcludingLogEndOffset(): Unit = flush(logEndOffset, false)
+
+  /**
+   * Flush all local log segments including possible empty active segment.
+   *
+   * We have to pass logEngOffset + 1 to the `localLog.flush(offset: Long): 
Unit` function to flush empty
+   * active segments, which is important to make sure we don't lose the empty 
index file during shutdown.
+   *
+   * This function should only be called during shutdown.
+   */
+  def flushUpToAndIncludingLogEndOffset(): Unit = flush(logEndOffset, true)
 
   /**
    * Flush local log segments for all offsets up to offset-1
    *
    * @param offset The offset to flush up to (non-inclusive); the new recovery 
point
    */
-  def flush(offset: Long): Unit = {
-    maybeHandleIOException(s"Error while flushing log for $topicPartition in 
dir ${dir.getParent} with offset $offset") {
-      if (offset > localLog.recoveryPoint) {
-        debug(s"Flushing log up to offset $offset, last flushed: 
$lastFlushTime,  current time: ${time.milliseconds()}, " +
+  def flush(offset: Long): Unit = flush(offset, false)
+
+  private def flush(offset: Long, includingOffset: Boolean): Unit = {
+    val flushOffset = if (includingOffset) offset + 1  else offset
+    val recoveryPoint = offset
+    maybeHandleIOException(s"Error while flushing log for $topicPartition in 
dir ${dir.getParent} with offset $flushOffset") {
+      if (flushOffset > localLog.recoveryPoint) {
+        debug(s"Flushing log up to offset $flushOffset, last flushed: 
$lastFlushTime,  current time: ${time.milliseconds()}, " +

Review comment:
       Since recoveryPoint is not always flushOffset, could we log 
recoveryPoint too?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -212,7 +212,7 @@ final class KafkaMetadataLog private (
   }
 
   override def flush(): Unit = {
-    log.flush()
+    log.flushUpToAndExcludingLogEndOffset()

Review comment:
       It seems that the Raft log could have the same issue during a clean 
shutdown. The easiest fix is to flush it inclusively during close.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to