hachikuji commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r537776855
########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -386,6 +390,10 @@ public long maxOffset() { return maxOffset; } + public long minOffset() { Review comment: Can we modify some of the test cases in `MemoryRecordsTest` to ensure this is getting set correctly? It might also be useful to document what exactly the min offset is with respect to batches and records. ########## File path: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ########## @@ -198,7 +198,7 @@ private static FilterResult filterTo(TopicPartition partition, Iterable<MutableR if (!retainedRecords.isEmpty()) { if (writeOriginalBatch) { batch.writeTo(bufferOutputStream); - filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false); + filterResult.updateRetainedBatchMetadata(batch, retainedRecords.get(0).offset(), retainedRecords.size(), false); Review comment: Hmm... It seems a little inconsistent to use the offset of the first record. When the batch is empty, we use the base offset of the batch. Shouldn't we do the same here? Otherwise we will end up with some batches which have base offset smaller than the segment base offset. Note that the base offset is always preserved by compaction. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org