showuon commented on code in PR #15474: URL: https://github.com/apache/kafka/pull/15474#discussion_r1512604852
########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java: ########## @@ -379,8 +381,11 @@ public ValidationResult validateMessagesAndAssignOffsetsCompressed(LongRef offse && batch.magic() > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) { - if (record.timestamp() > maxTimestamp) + if (record.timestamp() > maxTimestamp) { maxTimestamp = record.timestamp(); + // The offset is only increased when it is a valid record + offsetOfMaxTimestamp = initialOffset + validatedRecords.size(); Review Comment: Also set the correct offset of MaxTimestamp while records traversing. ########## clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java: ########## @@ -263,13 +262,8 @@ public RecordsInfo info() { } else if (maxTimestamp == RecordBatch.NO_TIMESTAMP) { return new RecordsInfo(RecordBatch.NO_TIMESTAMP, lastOffset); } else { - long shallowOffsetOfMaxTimestamp; - // Use the last offset when dealing with record batches - if (compressionType != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) - shallowOffsetOfMaxTimestamp = lastOffset; Review Comment: I don't understand why we should always set to the last offset here. This will fail the getOffsetByMaxTimestamp test. Is that expected? Maybe @ijuma could answer this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org