showuon commented on code in PR #15474:
URL: https://github.com/apache/kafka/pull/15474#discussion_r1512604852


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java:
##########
@@ -379,8 +381,11 @@ public ValidationResult 
validateMessagesAndAssignOffsetsCompressed(LongRef offse
                             && batch.magic() > RecordBatch.MAGIC_VALUE_V0
                             && toMagic > RecordBatch.MAGIC_VALUE_V0) {
 
-                        if (record.timestamp() > maxTimestamp)
+                        if (record.timestamp() > maxTimestamp) {
                             maxTimestamp = record.timestamp();
+                            // The offset is only increased when it is a valid 
record
+                            offsetOfMaxTimestamp = initialOffset + 
validatedRecords.size();

Review Comment:
   Also set the correct offset of MaxTimestamp while records traversing. 



##########
clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java:
##########
@@ -263,13 +262,8 @@ public RecordsInfo info() {
         } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) {
             return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset);
         } else {
-            long shallowOffsetOfMaxTimestamp;
-            // Use the last offset when dealing with record batches
-            if (compressionType != CompressionType.NONE || magic >= 
RecordBatch.MAGIC_VALUE_V2)
-                shallowOffsetOfMaxTimestamp = lastOffset;

Review Comment:
   I don't understand why we should always set to the last offset here. This 
will fail the getOffsetByMaxTimestamp test. Is that expected? Maybe @ijuma 
could answer this? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to