hachikuji commented on a change in pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#discussion_r760521586



##########
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##########
@@ -443,7 +443,7 @@ private void onBecomeLeader(long currentTimeMs) {
     private void flushLeaderLog(LeaderState<T> state, long currentTimeMs) {
         // We update the end offset before flushing so that parked fetches can 
return sooner.
         updateLeaderEndOffsetAndTimestamp(state, currentTimeMs);
-        log.flush();
+        log.flush(false);

Review comment:
       This is probably ok. The way that kraft manages log retention is a bit 
different from normal partitions. In general, segments are only deleted once we 
have a snapshot which covers all of their data. That should mean that there is 
no risk today of losing the log end offset even if an unflushed empty segment 
is lost. In other words, even if all of the segments are lost, we should still 
have a snapshot to derive the log end offset from.
   
   cc @jsancio 

##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1498,28 +1498,44 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     producerStateManager.takeSnapshot()
     updateHighWatermarkWithLogEndOffset()
     // Schedule an asynchronous flush of the old segment
-    scheduler.schedule("flush-log", () => flush(newSegment.baseOffset))
+    scheduler.schedule("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
     newSegment
   }
 
   /**
    * Flush all local log segments
+   *
+   * @param forceFlushActiveSegment should be true during a clean shutdown, 
and false otherwise. The reason is that
+   * we have to pass logEndOffset + 1 to the `localLog.flush(offset: Long): 
Unit` function to flush empty
+   * active segments, which is important to make sure we persist the active 
segment file during shutdown, particularly
+   * when it's empty.
    */
-  def flush(): Unit = flush(logEndOffset)
+  def flush(forceFlushActiveSegment: Boolean): Unit = flush(logEndOffset, 
forceFlushActiveSegment)

Review comment:
       The addition of this flag seems to be an optimization so that we are not 
forced to flush newly created (empty) segments. Do you have a sense for how 
valuable this optimization is? The additional noise it adds to the log API is a 
bit unfortunate, so it would be helpful to understand what we're getting in 
return.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to