ccding commented on a change in pull request #11345: URL: https://github.com/apache/kafka/pull/11345#discussion_r753568521
########## File path: core/src/main/scala/kafka/log/UnifiedLog.scala ########## @@ -1505,21 +1505,35 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Flush all local log segments */ - def flush(): Unit = flush(logEndOffset) + def flushUpToAndExcludingLogEndOffset(): Unit = flush(logEndOffset, false) + + /** + * Flush all local log segments including possible empty active segment. + * + * We have to pass logEngOffset + 1 to the `localLog.flush(offset: Long): Unit` function to flush empty + * active segments, which is important to make sure we don't lose the empty index file during shutdown. + * + * This function should only be called during shutdown. + */ + def flushUpToAndIncludingLogEndOffset(): Unit = flush(logEndOffset, true) /** * Flush local log segments for all offsets up to offset-1 * * @param offset The offset to flush up to (non-inclusive); the new recovery point */ - def flush(offset: Long): Unit = { - maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset") { - if (offset > localLog.recoveryPoint) { - debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " + + def flush(offset: Long): Unit = flush(offset, false) + + private def flush(offset: Long, includingOffset: Boolean): Unit = { + val flushOffset = if (includingOffset) offset + 1 else offset + val recoveryPoint = offset + maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $flushOffset") { + if (flushOffset > localLog.recoveryPoint) { + debug(s"Flushing log up to offset $flushOffset, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " + Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org