hachikuji commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r539719072
########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -198,7 +198,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR if (!retainedRecords.isEmpty()) { if (writeOriginalBatch) { batch.writeTo(bufferOutputStream); - filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false); + filterResult.updateRetainedBatchMetadata(batch, retainedRecords.get(0).offset(), retainedRecords.size(), false); Review comment: I believe that a call to `DeleteRecords` can advance the start offset to an arbitrary offset which could be in the middle of a batch. I do not think today that we have any hard guarantees that we won't return batches or even records that are lower than the current log start offset. It might be worth experimenting with this to clarify what the behavior is today. With that said, given the fact that we always return the whole batch anyway, my preference would probably be to try and keep the log start offset aligned on a batch boundary. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org