ocadaruma commented on code in PR #14242:
URL: https://github.com/apache/kafka/pull/14242#discussion_r1372463913


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1617,10 +1617,16 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     // may actually be ahead of the current producer state end offset (which 
corresponds to the log end offset),
     // we manually override the state offset here prior to taking the snapshot.
     producerStateManager.updateMapEndOffset(newSegment.baseOffset)
-    producerStateManager.takeSnapshot()
+    // We avoid potentially-costly fsync call, since we acquire 
UnifiedLog#lock here
+    // which could block subsequent produces in the meantime.
+    // flush is done in the scheduler thread along with segment flushing below
+    val maybeSnapshot = producerStateManager.takeSnapshot(false)
     updateHighWatermarkWithLogEndOffset()
     // Schedule an asynchronous flush of the old segment
-    scheduler.scheduleOnce("flush-log", () => 
flushUptoOffsetExclusive(newSegment.baseOffset))
+    scheduler.scheduleOnce("flush-log", () => {
+      maybeSnapshot.ifPresent(f => Utils.flushFileQuietly(f.toPath, 
"producer-snapshot"))

Review Comment:
   Yeah, I also noticed that. I'll fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to