jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r523284354
########## File path: core/src/main/scala/kafka/log/LogCleaner.scala ########## @@ -711,6 +723,9 @@ private[log] class Cleaner(val id: Int, shallowOffsetOfMaxTimestamp = result.shallowOffsetOfMaxTimestamp, records = retained) throttler.maybeThrottle(outputBuffer.limit()) + if (newCanUpdateBaseOffset) + dest.updateBaseOffset(result.minOffset()) + newCanUpdateBaseOffset = false Review comment: This is much more elegant! A question I would have is what we do if we return from trying all the segments and we end up never making a new segment. Currently the behavior is to return an empty segment with the original baseOffset. I'd lean towards keeping the existing logic, but I suppose we could also simply use deleteSegments instead of replaceSegments in this case too. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org