ocadaruma commented on code in PR #14242: URL: https://github.com/apache/kafka/pull/14242#discussion_r1372463913
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1617,10 +1617,16 @@ class UnifiedLog(@volatile var logStartOffset: Long, // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), // we manually override the state offset here prior to taking the snapshot. producerStateManager.updateMapEndOffset(newSegment.baseOffset) - producerStateManager.takeSnapshot() + // We avoid potentially-costly fsync call, since we acquire UnifiedLog#lock here + // which could block subsequent produces in the meantime. + // flush is done in the scheduler thread along with segment flushing below + val maybeSnapshot = producerStateManager.takeSnapshot(false) updateHighWatermarkWithLogEndOffset() // Schedule an asynchronous flush of the old segment - scheduler.scheduleOnce("flush-log", () => flushUptoOffsetExclusive(newSegment.baseOffset)) + scheduler.scheduleOnce("flush-log", () => { + maybeSnapshot.ifPresent(f => Utils.flushFileQuietly(f.toPath, "producer-snapshot")) Review Comment: Yeah, I also noticed that. I'll fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org