hachikuji commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r537776855



##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -386,6 +390,10 @@ public long maxOffset() {
             return maxOffset;
         }
 
+        public long minOffset() {

Review comment:
       Can we modify some of the test cases in `MemoryRecordsTest` to ensure 
this is getting set correctly?
   
   It might also be useful to document what exactly the min offset is with 
respect to batches and records.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##########
@@ -198,7 +198,7 @@ private static FilterResult filterTo(TopicPartition 
partition, Iterable<MutableR
             if (!retainedRecords.isEmpty()) {
                 if (writeOriginalBatch) {
                     batch.writeTo(bufferOutputStream);
-                    filterResult.updateRetainedBatchMetadata(batch, 
retainedRecords.size(), false);
+                    filterResult.updateRetainedBatchMetadata(batch, 
retainedRecords.get(0).offset(), retainedRecords.size(), false);

Review comment:
       Hmm... It seems a little inconsistent to use the offset of the first 
record. When the batch is empty, we use the base offset of the batch. Shouldn't 
we do the same here? Otherwise we will end up with some batches which have base 
offset smaller than the segment base offset. Note that the base offset is 
always preserved by compaction.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to