chia7712 commented on code in PR #16167:
URL: https://github.com/apache/kafka/pull/16167#discussion_r1680405766


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java:
##########
@@ -0,0 +1,2250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.InvalidTimestampException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordValidationStats;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogValidatorTest {
+    private final Time time = Time.SYSTEM;
+    private final TopicPartition topicPartition = new TopicPartition("topic", 
0);
+    private final MetricsRecorder metricsRecorder = new MetricsRecorder();
+
+    static class MetricsRecorder implements LogValidator.MetricsRecorder {
+        public int recordInvalidMagicCount = 0;
+        public int recordInvalidOffsetCount = 0;
+        public int recordInvalidChecksumsCount = 0;
+        public int recordInvalidSequenceCount = 0;
+        public int recordNoKeyCompactedTopicCount = 0;
+
+        public void recordInvalidMagic() {
+            recordInvalidMagicCount += 1;
+        }
+
+        public void recordInvalidOffset() {
+            recordInvalidOffsetCount += 1;
+        }
+
+        public void recordInvalidSequence() {
+            recordInvalidSequenceCount += 1;
+        }
+
+        public void recordInvalidChecksums() {
+            recordInvalidChecksumsCount += 1;
+        }
+
+        public void recordNoKeyCompactedTopic() {
+            recordNoKeyCompactedTopicCount += 1;
+        }
+    }
+
+    @Test
+    public void testOnlyOneBatch() {
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.gzip().build());
+    }
+
+    @Test
+    public void testAllowMultiBatch() {
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, 
Compression.NONE);
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, 
Compression.NONE);
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, 
Compression.gzip().build());
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, 
Compression.gzip().build());
+    }
+
+    @Test
+    public void testValidationOfBatchesWithNonSequentialInnerOffsets() {
+        Arrays.stream(RecordVersion.values()).forEach(version -> {
+            int numRecords = 20;
+            Compression compression = Compression.gzip().build();
+            MemoryRecords invalidRecords = 
recordsWithNonSequentialInnerOffsets(version.value, compression, numRecords);
+
+            // Validation for v2 and above is strict for this case. For older 
formats, we fix invalid
+            // internal offsets by rewriting the batch.
+            if (version.value >= RecordBatch.MAGIC_VALUE_V2) {
+                assertThrows(InvalidRecordException.class, () ->
+                        validateMessages(invalidRecords, version.value, 
CompressionType.GZIP, compression)
+                );
+            } else {
+                ValidationResult result = validateMessages(invalidRecords, 
version.value, CompressionType.GZIP, compression);
+                List<Long> recordsResult = new ArrayList<>();
+                result.validatedRecords.records().forEach(s -> 
recordsResult.add(s.offset()));
+                assertEquals(LongStream.range(0, 
numRecords).boxed().collect(Collectors.toList()), recordsResult);
+            }
+        });
+    }
+
+    @Test
+    public void testMisMatchMagic() {
+        Compression compress = Compression.gzip().build();
+        checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, compress);
+        checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, 
RecordBatch.MAGIC_VALUE_V0, compress);
+    }
+
+    @Test
+    void testUncompressedBatchWithoutRecordsNotAllowed() {
+        testBatchWithoutRecordsNotAllowed(CompressionType.NONE, 
Compression.NONE);
+    }
+
+    @Test
+    void testRecompressedBatchWithoutRecordsNotAllowed() {
+        testBatchWithoutRecordsNotAllowed(CompressionType.NONE, 
Compression.gzip().build());
+    }
+
+    private void checkOnlyOneBatch(Byte magic, Compression sourceCompression,
+                                   Compression targetCompression) {
+        assertThrows(InvalidRecordException.class,
+                () -> validateMessages(createTwoBatchedRecords(magic, 
sourceCompression),
+                        magic, sourceCompression.type(), targetCompression)
+        );
+    }
+
+    @Test
+    public void testLogAppendTimeNonCompressedV0() {

Review Comment:
   ditto



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java:
##########
@@ -0,0 +1,2250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.InvalidTimestampException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordValidationStats;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogValidatorTest {
+    private final Time time = Time.SYSTEM;
+    private final TopicPartition topicPartition = new TopicPartition("topic", 
0);
+    private final MetricsRecorder metricsRecorder = new MetricsRecorder();
+
+    static class MetricsRecorder implements LogValidator.MetricsRecorder {
+        public int recordInvalidMagicCount = 0;
+        public int recordInvalidOffsetCount = 0;
+        public int recordInvalidChecksumsCount = 0;
+        public int recordInvalidSequenceCount = 0;
+        public int recordNoKeyCompactedTopicCount = 0;
+
+        public void recordInvalidMagic() {
+            recordInvalidMagicCount += 1;
+        }
+
+        public void recordInvalidOffset() {
+            recordInvalidOffsetCount += 1;
+        }
+
+        public void recordInvalidSequence() {
+            recordInvalidSequenceCount += 1;
+        }
+
+        public void recordInvalidChecksums() {
+            recordInvalidChecksumsCount += 1;
+        }
+
+        public void recordNoKeyCompactedTopic() {
+            recordNoKeyCompactedTopicCount += 1;
+        }
+    }
+
+    @Test
+    public void testOnlyOneBatch() {
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.gzip().build());
+    }
+
+    @Test
+    public void testAllowMultiBatch() {

Review Comment:
   ditto



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java:
##########
@@ -0,0 +1,2250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.InvalidTimestampException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordValidationStats;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogValidatorTest {
+    private final Time time = Time.SYSTEM;
+    private final TopicPartition topicPartition = new TopicPartition("topic", 
0);
+    private final MetricsRecorder metricsRecorder = new MetricsRecorder();
+
+    static class MetricsRecorder implements LogValidator.MetricsRecorder {
+        public int recordInvalidMagicCount = 0;
+        public int recordInvalidOffsetCount = 0;
+        public int recordInvalidChecksumsCount = 0;
+        public int recordInvalidSequenceCount = 0;
+        public int recordNoKeyCompactedTopicCount = 0;
+
+        public void recordInvalidMagic() {
+            recordInvalidMagicCount += 1;
+        }
+
+        public void recordInvalidOffset() {
+            recordInvalidOffsetCount += 1;
+        }
+
+        public void recordInvalidSequence() {
+            recordInvalidSequenceCount += 1;
+        }
+
+        public void recordInvalidChecksums() {
+            recordInvalidChecksumsCount += 1;
+        }
+
+        public void recordNoKeyCompactedTopic() {
+            recordNoKeyCompactedTopicCount += 1;
+        }
+    }
+
+    @Test
+    public void testOnlyOneBatch() {
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.gzip().build());
+    }
+
+    @Test
+    public void testAllowMultiBatch() {
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, 
Compression.NONE);
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, 
Compression.NONE);
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, 
Compression.gzip().build());
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, 
Compression.gzip().build());
+    }
+
+    @Test
+    public void testValidationOfBatchesWithNonSequentialInnerOffsets() {
+        Arrays.stream(RecordVersion.values()).forEach(version -> {
+            int numRecords = 20;
+            Compression compression = Compression.gzip().build();
+            MemoryRecords invalidRecords = 
recordsWithNonSequentialInnerOffsets(version.value, compression, numRecords);
+
+            // Validation for v2 and above is strict for this case. For older 
formats, we fix invalid
+            // internal offsets by rewriting the batch.
+            if (version.value >= RecordBatch.MAGIC_VALUE_V2) {
+                assertThrows(InvalidRecordException.class, () ->
+                        validateMessages(invalidRecords, version.value, 
CompressionType.GZIP, compression)
+                );
+            } else {
+                ValidationResult result = validateMessages(invalidRecords, 
version.value, CompressionType.GZIP, compression);
+                List<Long> recordsResult = new ArrayList<>();
+                result.validatedRecords.records().forEach(s -> 
recordsResult.add(s.offset()));
+                assertEquals(LongStream.range(0, 
numRecords).boxed().collect(Collectors.toList()), recordsResult);
+            }
+        });
+    }
+
+    @Test
+    public void testMisMatchMagic() {
+        Compression compress = Compression.gzip().build();
+        checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, compress);
+        checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, 
RecordBatch.MAGIC_VALUE_V0, compress);
+    }
+
+    @Test
+    void testUncompressedBatchWithoutRecordsNotAllowed() {
+        testBatchWithoutRecordsNotAllowed(CompressionType.NONE, 
Compression.NONE);
+    }
+
+    @Test
+    void testRecompressedBatchWithoutRecordsNotAllowed() {
+        testBatchWithoutRecordsNotAllowed(CompressionType.NONE, 
Compression.gzip().build());
+    }
+
+    private void checkOnlyOneBatch(Byte magic, Compression sourceCompression,
+                                   Compression targetCompression) {
+        assertThrows(InvalidRecordException.class,
+                () -> validateMessages(createTwoBatchedRecords(magic, 
sourceCompression),
+                        magic, sourceCompression.type(), targetCompression)
+        );
+    }
+
+    @Test
+    public void testLogAppendTimeNonCompressedV0() {
+        checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V0);
+    }
+
+    @Test
+    public void testLogAppendTimeNonCompressedV1() {
+        checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    public void testLogAppendTimeNonCompressedV2() {
+        checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+
+    private void testBatchWithoutRecordsNotAllowed(CompressionType 
sourceCompression, Compression targetCompression) {
+        long offset = 1234567;
+        long producerId = 1324L;
+        short producerEpoch = 10;
+        int baseSequence = 984;
+        boolean isTransactional = true;
+        int partitionLeaderEpoch = 40;
+
+        ByteBuffer buffer = 
ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
+        DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch,
+                baseSequence, 0L, 5L, partitionLeaderEpoch, 
TimestampType.CREATE_TIME, System.currentTimeMillis(),
+                isTransactional, false);
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        assertThrows(InvalidRecordException.class, () -> new 
LogValidator(records,
+                topicPartition,
+                time,
+                sourceCompression,
+                targetCompression,
+                false,
+                RecordBatch.CURRENT_MAGIC_VALUE,
+                TimestampType.CREATE_TIME,
+                5000L,
+                5000L,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        ).validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(offset), metricsRecorder, 
RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        ));
+    }
+
+    private void checkMismatchMagic(byte batchMagic, byte recordMagic, 
Compression compression) {
+        assertThrows(RecordValidationException.class,
+                () -> 
validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, 
compression), batchMagic, compression.type(), compression));
+
+        assertTrue(metricsRecorder.recordInvalidMagicCount > 0);
+    }
+
+    @Test
+    void testNonCompressedV2() {

Review Comment:
   ditto



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java:
##########
@@ -0,0 +1,2250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.InvalidTimestampException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordValidationStats;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogValidatorTest {
+    private final Time time = Time.SYSTEM;
+    private final TopicPartition topicPartition = new TopicPartition("topic", 
0);
+    private final MetricsRecorder metricsRecorder = new MetricsRecorder();
+
+    static class MetricsRecorder implements LogValidator.MetricsRecorder {
+        public int recordInvalidMagicCount = 0;
+        public int recordInvalidOffsetCount = 0;
+        public int recordInvalidChecksumsCount = 0;
+        public int recordInvalidSequenceCount = 0;
+        public int recordNoKeyCompactedTopicCount = 0;
+
+        public void recordInvalidMagic() {
+            recordInvalidMagicCount += 1;
+        }
+
+        public void recordInvalidOffset() {
+            recordInvalidOffsetCount += 1;
+        }
+
+        public void recordInvalidSequence() {
+            recordInvalidSequenceCount += 1;
+        }
+
+        public void recordInvalidChecksums() {
+            recordInvalidChecksumsCount += 1;
+        }
+
+        public void recordNoKeyCompactedTopic() {
+            recordNoKeyCompactedTopicCount += 1;
+        }
+    }
+
+    @Test
+    public void testOnlyOneBatch() {

Review Comment:
   Could you please use `CsvSource` to rewrite them?



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java:
##########
@@ -0,0 +1,2250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.InvalidTimestampException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordValidationStats;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogValidatorTest {
+    private final Time time = Time.SYSTEM;
+    private final TopicPartition topicPartition = new TopicPartition("topic", 
0);
+    private final MetricsRecorder metricsRecorder = new MetricsRecorder();
+
+    static class MetricsRecorder implements LogValidator.MetricsRecorder {
+        public int recordInvalidMagicCount = 0;
+        public int recordInvalidOffsetCount = 0;
+        public int recordInvalidChecksumsCount = 0;
+        public int recordInvalidSequenceCount = 0;
+        public int recordNoKeyCompactedTopicCount = 0;
+
+        public void recordInvalidMagic() {
+            recordInvalidMagicCount += 1;
+        }
+
+        public void recordInvalidOffset() {
+            recordInvalidOffsetCount += 1;
+        }
+
+        public void recordInvalidSequence() {
+            recordInvalidSequenceCount += 1;
+        }
+
+        public void recordInvalidChecksums() {
+            recordInvalidChecksumsCount += 1;
+        }
+
+        public void recordNoKeyCompactedTopic() {
+            recordNoKeyCompactedTopicCount += 1;
+        }
+    }
+
+    @Test
+    public void testOnlyOneBatch() {
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.gzip().build());
+    }
+
+    @Test
+    public void testAllowMultiBatch() {
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, 
Compression.NONE);
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, 
Compression.NONE);
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, 
Compression.gzip().build());
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, 
Compression.gzip().build());
+    }
+
+    @Test
+    public void testValidationOfBatchesWithNonSequentialInnerOffsets() {
+        Arrays.stream(RecordVersion.values()).forEach(version -> {
+            int numRecords = 20;
+            Compression compression = Compression.gzip().build();
+            MemoryRecords invalidRecords = 
recordsWithNonSequentialInnerOffsets(version.value, compression, numRecords);
+
+            // Validation for v2 and above is strict for this case. For older 
formats, we fix invalid
+            // internal offsets by rewriting the batch.
+            if (version.value >= RecordBatch.MAGIC_VALUE_V2) {
+                assertThrows(InvalidRecordException.class, () ->
+                        validateMessages(invalidRecords, version.value, 
CompressionType.GZIP, compression)
+                );
+            } else {
+                ValidationResult result = validateMessages(invalidRecords, 
version.value, CompressionType.GZIP, compression);
+                List<Long> recordsResult = new ArrayList<>();
+                result.validatedRecords.records().forEach(s -> 
recordsResult.add(s.offset()));
+                assertEquals(LongStream.range(0, 
numRecords).boxed().collect(Collectors.toList()), recordsResult);
+            }
+        });
+    }
+
+    @Test
+    public void testMisMatchMagic() {

Review Comment:
   ditto



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java:
##########
@@ -0,0 +1,2250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.InvalidTimestampException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordValidationStats;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogValidatorTest {
+    private final Time time = Time.SYSTEM;
+    private final TopicPartition topicPartition = new TopicPartition("topic", 
0);
+    private final MetricsRecorder metricsRecorder = new MetricsRecorder();
+
+    static class MetricsRecorder implements LogValidator.MetricsRecorder {
+        public int recordInvalidMagicCount = 0;
+        public int recordInvalidOffsetCount = 0;
+        public int recordInvalidChecksumsCount = 0;
+        public int recordInvalidSequenceCount = 0;
+        public int recordNoKeyCompactedTopicCount = 0;
+
+        public void recordInvalidMagic() {
+            recordInvalidMagicCount += 1;
+        }
+
+        public void recordInvalidOffset() {
+            recordInvalidOffsetCount += 1;
+        }
+
+        public void recordInvalidSequence() {
+            recordInvalidSequenceCount += 1;
+        }
+
+        public void recordInvalidChecksums() {
+            recordInvalidChecksumsCount += 1;
+        }
+
+        public void recordNoKeyCompactedTopic() {
+            recordNoKeyCompactedTopicCount += 1;
+        }
+    }
+
+    @Test
+    public void testOnlyOneBatch() {
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.gzip().build());
+    }
+
+    @Test
+    public void testAllowMultiBatch() {
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, 
Compression.NONE);
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, 
Compression.NONE);
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, 
Compression.gzip().build());
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, 
Compression.gzip().build());
+    }
+
+    @Test
+    public void testValidationOfBatchesWithNonSequentialInnerOffsets() {
+        Arrays.stream(RecordVersion.values()).forEach(version -> {
+            int numRecords = 20;
+            Compression compression = Compression.gzip().build();
+            MemoryRecords invalidRecords = 
recordsWithNonSequentialInnerOffsets(version.value, compression, numRecords);
+
+            // Validation for v2 and above is strict for this case. For older 
formats, we fix invalid
+            // internal offsets by rewriting the batch.
+            if (version.value >= RecordBatch.MAGIC_VALUE_V2) {
+                assertThrows(InvalidRecordException.class, () ->
+                        validateMessages(invalidRecords, version.value, 
CompressionType.GZIP, compression)
+                );
+            } else {
+                ValidationResult result = validateMessages(invalidRecords, 
version.value, CompressionType.GZIP, compression);
+                List<Long> recordsResult = new ArrayList<>();
+                result.validatedRecords.records().forEach(s -> 
recordsResult.add(s.offset()));
+                assertEquals(LongStream.range(0, 
numRecords).boxed().collect(Collectors.toList()), recordsResult);
+            }
+        });
+    }
+
+    @Test
+    public void testMisMatchMagic() {
+        Compression compress = Compression.gzip().build();
+        checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, compress);
+        checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, 
RecordBatch.MAGIC_VALUE_V0, compress);
+    }
+
+    @Test
+    void testUncompressedBatchWithoutRecordsNotAllowed() {
+        testBatchWithoutRecordsNotAllowed(CompressionType.NONE, 
Compression.NONE);
+    }
+
+    @Test
+    void testRecompressedBatchWithoutRecordsNotAllowed() {
+        testBatchWithoutRecordsNotAllowed(CompressionType.NONE, 
Compression.gzip().build());
+    }
+
+    private void checkOnlyOneBatch(Byte magic, Compression sourceCompression,
+                                   Compression targetCompression) {
+        assertThrows(InvalidRecordException.class,
+                () -> validateMessages(createTwoBatchedRecords(magic, 
sourceCompression),
+                        magic, sourceCompression.type(), targetCompression)
+        );
+    }
+
+    @Test
+    public void testLogAppendTimeNonCompressedV0() {
+        checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V0);
+    }
+
+    @Test
+    public void testLogAppendTimeNonCompressedV1() {
+        checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    public void testLogAppendTimeNonCompressedV2() {
+        checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+
+    private void testBatchWithoutRecordsNotAllowed(CompressionType 
sourceCompression, Compression targetCompression) {
+        long offset = 1234567;
+        long producerId = 1324L;
+        short producerEpoch = 10;
+        int baseSequence = 984;
+        boolean isTransactional = true;
+        int partitionLeaderEpoch = 40;
+
+        ByteBuffer buffer = 
ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
+        DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch,
+                baseSequence, 0L, 5L, partitionLeaderEpoch, 
TimestampType.CREATE_TIME, System.currentTimeMillis(),
+                isTransactional, false);
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        assertThrows(InvalidRecordException.class, () -> new 
LogValidator(records,
+                topicPartition,
+                time,
+                sourceCompression,
+                targetCompression,
+                false,
+                RecordBatch.CURRENT_MAGIC_VALUE,
+                TimestampType.CREATE_TIME,
+                5000L,
+                5000L,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        ).validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(offset), metricsRecorder, 
RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        ));
+    }
+
+    private void checkMismatchMagic(byte batchMagic, byte recordMagic, 
Compression compression) {
+        assertThrows(RecordValidationException.class,
+                () -> 
validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, 
compression), batchMagic, compression.type(), compression));
+
+        assertTrue(metricsRecorder.recordInvalidMagicCount > 0);
+    }
+
+    @Test
+    void testNonCompressedV2() {
+        checkNonCompressed(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    void testRecompressionV1() {
+        checkRecompression(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    void testRecompressionV2() {
+        checkRecompression(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    void testCreateTimeUpConversionV0ToV1() {
+        checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    void testCreateTimeUpConversionV0ToV2() {
+        checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    public void testCreateTimeUpConversionV1ToV2() {
+        long timestamp = System.currentTimeMillis();
+        Compression compression = Compression.gzip().build();
+        MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, 
timestamp, compression);
+
+        LogValidator validator = new LogValidator(
+                records,
+                topicPartition,
+                time,
+                CompressionType.GZIP,
+                compression,
+                false,
+                RecordBatch.MAGIC_VALUE_V2,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        );
+
+        LogValidator.ValidationResult validatedResults = 
validator.validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        );
+
+        MemoryRecords validatedRecords = validatedResults.validatedRecords;
+
+        for (RecordBatch batch : validatedRecords.batches()) {
+            assertTrue(batch.isValid());
+            maybeCheckBaseTimestamp(timestamp, batch);
+            assertEquals(timestamp, batch.maxTimestamp());
+            assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
+            assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
+            assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
+        }
+
+        assertEquals(timestamp, validatedResults.maxTimestampMs);
+        assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, "Offset 
of max timestamp should be the last offset 2.");
+        assertTrue(validatedResults.messageSizeMaybeChanged, "Message size 
should have been changed");
+
+        verifyRecordValidationStats(
+                validatedResults.recordValidationStats,
+                3,
+                records,
+                true
+        );
+    }
+
+    private void checkCreateTimeUpConversionFromV0(byte toMagic) {
+        Compression compression = Compression.gzip().build();
+        MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.NO_TIMESTAMP, compression);
+        LogValidator logValidator = new LogValidator(records,
+                topicPartition,
+                time,
+                CompressionType.GZIP,
+                compression,
+                false,
+                toMagic,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        );
+        LogValidator.ValidationResult validatedResults = 
logValidator.validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        );
+        MemoryRecords validatedRecords = validatedResults.validatedRecords;
+
+        for (RecordBatch batch : validatedRecords.batches()) {
+            assertTrue(batch.isValid());
+            maybeCheckBaseTimestamp(RecordBatch.NO_TIMESTAMP, batch);
+            assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp());
+            assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
+            assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
+            assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
+        }
+        assertEquals(RecordBatch.NO_TIMESTAMP, validatedResults.maxTimestampMs,
+                "Max timestamp should be " + RecordBatch.NO_TIMESTAMP);
+        assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp);
+        assertTrue(validatedResults.messageSizeMaybeChanged, "Message size 
should have been changed");
+
+        verifyRecordValidationStats(validatedResults.recordValidationStats, 3, 
records, true);
+    }
+
+    private void checkRecompression(byte magic) {
+        long now = System.currentTimeMillis();
+        // Set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
+        List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now);
+
+        long producerId;
+        short producerEpoch;
+        int baseSequence;
+        boolean isTransactional;
+        int partitionLeaderEpoch;
+
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            producerId = 1324L;
+            producerEpoch = 10;
+            baseSequence = 984;
+            isTransactional = true;
+            partitionLeaderEpoch = 40;
+        } else {
+            producerId = RecordBatch.NO_PRODUCER_ID;
+            producerEpoch = RecordBatch.NO_PRODUCER_EPOCH;
+            baseSequence = RecordBatch.NO_SEQUENCE;
+            isTransactional = false;
+            partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH;
+        }
+
+        MemoryRecords records = MemoryRecords.withRecords(
+                magic,
+                0L,
+                Compression.NONE,
+                TimestampType.CREATE_TIME,
+                producerId,
+                producerEpoch,
+                baseSequence,
+                partitionLeaderEpoch,
+                isTransactional,
+                new SimpleRecord(timestampSeq.get(0), "hello".getBytes()),
+                new SimpleRecord(timestampSeq.get(1), "there".getBytes()),
+                new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes())
+        );
+
+        // V2 has single batch, and other versions have many single-record 
batches
+        assertEquals(magic >= RecordBatch.MAGIC_VALUE_V2 ? 1 : 3, 
iteratorSize(records.batches().iterator()));
+
+        LogValidator.ValidationResult validatingResults = new LogValidator(
+                records,
+                topicPartition,
+                time,
+                CompressionType.NONE,
+                Compression.gzip().build(),
+                false,
+                magic,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                partitionLeaderEpoch,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        ).validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0L),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        );
+
+        MemoryRecords validatedRecords = validatingResults.validatedRecords;
+
+        int i = 0;
+        for (RecordBatch batch : validatedRecords.batches()) {
+            assertTrue(batch.isValid());
+            assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            maybeCheckBaseTimestamp(timestampSeq.get(0), batch);
+            assertEquals(batch.maxTimestamp(), batch.maxTimestamp());
+            assertEquals(producerEpoch, batch.producerEpoch());
+            assertEquals(producerId, batch.producerId());
+            assertEquals(baseSequence, batch.baseSequence());
+            assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
+
+            for (Record record : batch) {
+                record.ensureValid();
+                assertEquals((long) timestampSeq.get(i), record.timestamp());
+                i++;
+            }
+        }
+
+        assertEquals(now + 1, validatingResults.maxTimestampMs,
+                "Max timestamp should be " + (now + 1));
+
+        // Both V2 and V1 have single batch in the validated records when 
compression is enabled, and hence their shallow
+        // OffsetOfMaxTimestamp is the last offset of the single batch
+        assertEquals(1, iteratorSize(validatedRecords.batches().iterator()));
+        assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp);
+        assertTrue(validatingResults.messageSizeMaybeChanged,
+                "Message size should have been changed");
+
+        verifyRecordValidationStats(validatingResults.recordValidationStats, 
3, records, true);
+    }
+
+    private MemoryRecords recordsWithInvalidInnerMagic(byte batchMagicValue, 
byte recordMagicValue, Compression codec) {
+        List<LegacyRecord> records = new ArrayList<>();
+
+        for (int id = 0; id < 20; id++) {
+            records.add(LegacyRecord.create(
+                    recordMagicValue,
+                    RecordBatch.NO_TIMESTAMP,
+                    Integer.toString(id).getBytes(),
+                    Integer.toString(id).getBytes()
+            ));
+        }
+
+        ByteBuffer buffer = ByteBuffer.allocate(Math.min(Math.max(
+                records.stream().mapToInt(LegacyRecord::sizeInBytes).sum() / 
2, 1024), 1 << 16));
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
batchMagicValue, codec,
+                TimestampType.CREATE_TIME, 0L);
+
+        AtomicLong offset = new AtomicLong(1234567);
+        records.forEach(record -> {
+            builder.appendUncheckedWithOffset(offset.get(), record);
+            offset.incrementAndGet();
+        });
+
+        return builder.build();
+    }
+
+    private MemoryRecords recordsWithNonSequentialInnerOffsets(Byte 
magicValue, Compression compression, int numRecords) {
+        List<SimpleRecord> records = IntStream.range(0, numRecords)
+                .mapToObj(id -> new 
SimpleRecord(String.valueOf(id).getBytes()))
+                .collect(Collectors.toList());
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
magicValue, compression, TimestampType.CREATE_TIME, 0L);
+
+        records.forEach(record ->
+                assertDoesNotThrow(() -> builder.appendUncheckedWithOffset(0, 
record))
+        );
+
+        return builder.build();
+    }
+
+    private void checkAllowMultiBatch(Byte magic, Compression 
sourceCompression, Compression targetCompression) {
+        validateMessages(createTwoBatchedRecords(magic,  sourceCompression), 
magic, sourceCompression.type(), targetCompression);
+    }
+
+
+    private ValidationResult validateMessages(MemoryRecords records,
+                                              Byte magic,
+                                              CompressionType 
sourceCompressionType,
+                                              Compression 
targetCompressionType) {
+        MockTime mockTime = new MockTime(0L, 0L);
+        return new LogValidator(records,
+                topicPartition,
+                mockTime,
+                sourceCompressionType,
+                targetCompressionType,
+                false,
+                magic,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                RecordBatch.NO_PRODUCER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.IBP_2_3_IV1
+        ).validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0L), metricsRecorder, 
RequestLocal.withThreadConfinedCaching().bufferSupplier());
+    }
+
+    private MemoryRecords createTwoBatchedRecords(Byte magicValue,
+                                                  Compression codec) {
+        ByteBuffer buf = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, 
codec, TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, "1".getBytes(), "a".getBytes());
+        builder.close();
+        builder = MemoryRecords.builder(buf, magicValue, codec, 
TimestampType.CREATE_TIME, 1L);
+        builder.append(11L, "2".getBytes(), "b".getBytes());
+        builder.append(12L, "3".getBytes(), "c".getBytes());
+        builder.close();
+
+        buf.flip();
+        return MemoryRecords.readableRecords(buf.slice());
+    }
+
+    private MemoryRecords createRecords(byte magicValue,
+                                        long timestamp,
+                                        Compression codec) {
+        List<byte[]> records = Arrays.asList("hello".getBytes(), 
"there".getBytes(), "beautiful".getBytes());
+        return createRecords(records, magicValue, timestamp, codec);
+    }
+
+    @Test
+    void testCompressedV1() {
+        checkCompressed(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    private void checkCompressed(byte magic) {
+        long now = System.currentTimeMillis();
+        // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
+        List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now);
+
+        long producerId;
+        short producerEpoch;
+        int baseSequence;
+        boolean isTransactional;
+        int partitionLeaderEpoch;
+
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            producerId = 1324L;
+            producerEpoch = 10;
+            baseSequence = 984;
+            isTransactional = true;
+            partitionLeaderEpoch = 40;
+        } else {
+            producerId = RecordBatch.NO_PRODUCER_ID;
+            producerEpoch = RecordBatch.NO_PRODUCER_EPOCH;
+            baseSequence = RecordBatch.NO_SEQUENCE;
+            isTransactional = false;
+            partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH;
+        }
+
+        List<SimpleRecord> recordList = Arrays.asList(
+                new SimpleRecord(timestampSeq.get(0), "hello".getBytes()),
+                new SimpleRecord(timestampSeq.get(1), "there".getBytes()),
+                new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes())
+        );
+
+        MemoryRecords records = MemoryRecords.withRecords(
+                magic,
+                0L,
+                Compression.gzip().build(),
+                TimestampType.CREATE_TIME,
+                producerId,
+                producerEpoch,
+                baseSequence,
+                partitionLeaderEpoch,
+                isTransactional,
+                recordList.toArray(new SimpleRecord[0])
+        );
+
+        LogValidator validator = new LogValidator(
+                records,
+                topicPartition,
+                time,
+                CompressionType.GZIP,
+                Compression.gzip().build(),
+                false,
+                magic,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                partitionLeaderEpoch,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        );
+
+        LogValidator.ValidationResult validatedResults = 
validator.validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        );
+
+        MemoryRecords validatedRecords = validatedResults.validatedRecords;
+
+        int i = 0;
+        for (RecordBatch batch : validatedRecords.batches()) {
+            assertTrue(batch.isValid());
+            assertEquals(batch.timestampType(), TimestampType.CREATE_TIME);
+            maybeCheckBaseTimestamp(timestampSeq.get(0), batch);
+            assertEquals(batch.maxTimestamp(), batch.maxTimestamp());
+            assertEquals(producerEpoch, batch.producerEpoch());
+            assertEquals(producerId, batch.producerId());
+            assertEquals(baseSequence, batch.baseSequence());
+            assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
+
+            for (Record record : batch) {
+                record.ensureValid();
+                assertEquals((long) timestampSeq.get(i), record.timestamp());
+                i++;
+            }
+        }
+
+        assertEquals(now + 1, validatedResults.maxTimestampMs, "Max timestamp 
should be " + (now + 1));
+
+        int expectedShallowOffsetOfMaxTimestamp = 2;
+        assertEquals(expectedShallowOffsetOfMaxTimestamp, 
validatedResults.shallowOffsetOfMaxTimestamp, "Shallow offset of max timestamp 
should be 2");
+        assertFalse(validatedResults.messageSizeMaybeChanged, "Message size 
should not have been changed");
+
+        verifyRecordValidationStats(validatedResults.recordValidationStats, 0, 
records, true);
+    }
+
+    private MemoryRecords createRecords(List<byte[]> records,
+                                        byte magicValue,
+                                        long timestamp,
+                                        Compression codec) {
+        ByteBuffer buf = ByteBuffer.allocate(512);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, 
codec, TimestampType.CREATE_TIME, 0L);
+
+        AtomicInteger offset = new AtomicInteger(0);
+        records.forEach(item ->
+                builder.appendWithOffset(offset.getAndIncrement(), timestamp, 
null, item));
+        return builder.build();
+    }
+
+    @Test
+    void testLogAppendTimeWithRecompressionV1() {
+        checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    void testLogAppendTimeWithRecompressionV2() {
+        checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    void testLogAppendTimeWithoutRecompressionV1() {
+        checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    void testCompressedV2() {
+        checkCompressed(RecordBatch.MAGIC_VALUE_V2);
+    }
+    @Test
+    void testInvalidOffsetRangeAndRecordCount() {
+        // The batch to be written contains 3 records, so the correct 
lastOffsetDelta is 2
+        validateRecordBatchWithCountOverrides(2, 3);
+
+        // Count and offset range are inconsistent or invalid
+        assertInvalidBatchCountOverrides(0, 3);
+        assertInvalidBatchCountOverrides(15, 3);
+        assertInvalidBatchCountOverrides(-3, 3);
+        assertInvalidBatchCountOverrides(2, -3);
+        assertInvalidBatchCountOverrides(2, 6);
+        assertInvalidBatchCountOverrides(2, 0);
+        assertInvalidBatchCountOverrides(-3, -2);
+
+        // Count and offset range are consistent, but do not match the actual 
number of records
+        assertInvalidBatchCountOverrides(5, 6);
+        assertInvalidBatchCountOverrides(1, 2);
+    }
+
+    @Test
+    void testLogAppendTimeWithoutRecompressionV2() {
+        checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    void testInvalidCreateTimeNonCompressedV1() {
+        long now = System.currentTimeMillis();
+        MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, now 
- 1001L,
+                Compression.NONE);
+        assertThrows(RecordValidationException.class, () -> new LogValidator(
+                records,
+                topicPartition,
+                time,
+                CompressionType.NONE,
+                Compression.NONE,
+                false,
+                RecordBatch.MAGIC_VALUE_V1,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        ).validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        ));
+    }
+
+    @Test
+    public void testInvalidCreateTimeCompressedV1() {
+        long now = System.currentTimeMillis();
+        Compression compression = Compression.gzip().build();
+        MemoryRecords records = createRecords(
+                RecordBatch.MAGIC_VALUE_V1,
+                now - 1001L,
+                compression
+        );
+
+        assertThrows(RecordValidationException.class, () ->
+                new LogValidator(
+                        records,
+                        new TopicPartition("topic", 0),
+                        time,
+                        CompressionType.GZIP,
+                        compression,
+                        false,
+                        RecordBatch.MAGIC_VALUE_V1,
+                        TimestampType.CREATE_TIME,
+                        1000L,
+                        1000L,
+                        RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                        AppendOrigin.CLIENT,
+                        MetadataVersion.latestTesting()
+                ).validateMessagesAndAssignOffsets(
+                        PrimitiveRef.ofLong(0),
+                        metricsRecorder,
+                        
RequestLocal.withThreadConfinedCaching().bufferSupplier()
+                )
+        );
+    }
+
+    @Test
+    public void testInvalidCreateTimeNonCompressedV2() {
+        long now = System.currentTimeMillis();
+        MemoryRecords records = createRecords(
+                RecordBatch.MAGIC_VALUE_V2,
+                now - 1001L,
+                Compression.NONE
+        );
+
+        assertThrows(RecordValidationException.class, () ->
+                new LogValidator(
+                        records,
+                        new TopicPartition("topic", 0),
+                        time,
+                        CompressionType.NONE,
+                        Compression.NONE,
+                        false,
+                        RecordBatch.MAGIC_VALUE_V2,
+                        TimestampType.CREATE_TIME,
+                        1000L,
+                        1000L,
+                        RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                        AppendOrigin.CLIENT,
+                        MetadataVersion.latestTesting()
+                ).validateMessagesAndAssignOffsets(
+                        PrimitiveRef.ofLong(0),
+                        metricsRecorder,
+                        
RequestLocal.withThreadConfinedCaching().bufferSupplier()
+                )
+        );
+    }
+
+    @Test
+    public void testInvalidChecksum() {

Review Comment:
   ditto



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java:
##########
@@ -0,0 +1,2250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.InvalidTimestampException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordValidationStats;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogValidatorTest {
+    private final Time time = Time.SYSTEM;
+    private final TopicPartition topicPartition = new TopicPartition("topic", 
0);
+    private final MetricsRecorder metricsRecorder = new MetricsRecorder();
+
+    static class MetricsRecorder implements LogValidator.MetricsRecorder {
+        public int recordInvalidMagicCount = 0;
+        public int recordInvalidOffsetCount = 0;
+        public int recordInvalidChecksumsCount = 0;
+        public int recordInvalidSequenceCount = 0;
+        public int recordNoKeyCompactedTopicCount = 0;
+
+        public void recordInvalidMagic() {
+            recordInvalidMagicCount += 1;
+        }
+
+        public void recordInvalidOffset() {
+            recordInvalidOffsetCount += 1;
+        }
+
+        public void recordInvalidSequence() {
+            recordInvalidSequenceCount += 1;
+        }
+
+        public void recordInvalidChecksums() {
+            recordInvalidChecksumsCount += 1;
+        }
+
+        public void recordNoKeyCompactedTopic() {
+            recordNoKeyCompactedTopicCount += 1;
+        }
+    }
+
+    @Test
+    public void testOnlyOneBatch() {
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.gzip().build());
+    }
+
+    @Test
+    public void testAllowMultiBatch() {
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, 
Compression.NONE);
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, 
Compression.NONE);
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, 
Compression.gzip().build());
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, 
Compression.gzip().build());
+    }
+
+    @Test
+    public void testValidationOfBatchesWithNonSequentialInnerOffsets() {
+        Arrays.stream(RecordVersion.values()).forEach(version -> {
+            int numRecords = 20;
+            Compression compression = Compression.gzip().build();
+            MemoryRecords invalidRecords = 
recordsWithNonSequentialInnerOffsets(version.value, compression, numRecords);
+
+            // Validation for v2 and above is strict for this case. For older 
formats, we fix invalid
+            // internal offsets by rewriting the batch.
+            if (version.value >= RecordBatch.MAGIC_VALUE_V2) {
+                assertThrows(InvalidRecordException.class, () ->
+                        validateMessages(invalidRecords, version.value, 
CompressionType.GZIP, compression)
+                );
+            } else {
+                ValidationResult result = validateMessages(invalidRecords, 
version.value, CompressionType.GZIP, compression);
+                List<Long> recordsResult = new ArrayList<>();
+                result.validatedRecords.records().forEach(s -> 
recordsResult.add(s.offset()));
+                assertEquals(LongStream.range(0, 
numRecords).boxed().collect(Collectors.toList()), recordsResult);
+            }
+        });
+    }
+
+    @Test
+    public void testMisMatchMagic() {
+        Compression compress = Compression.gzip().build();
+        checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, compress);
+        checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, 
RecordBatch.MAGIC_VALUE_V0, compress);
+    }
+
+    @Test
+    void testUncompressedBatchWithoutRecordsNotAllowed() {
+        testBatchWithoutRecordsNotAllowed(CompressionType.NONE, 
Compression.NONE);
+    }
+
+    @Test
+    void testRecompressedBatchWithoutRecordsNotAllowed() {
+        testBatchWithoutRecordsNotAllowed(CompressionType.NONE, 
Compression.gzip().build());
+    }
+
+    private void checkOnlyOneBatch(Byte magic, Compression sourceCompression,
+                                   Compression targetCompression) {
+        assertThrows(InvalidRecordException.class,
+                () -> validateMessages(createTwoBatchedRecords(magic, 
sourceCompression),
+                        magic, sourceCompression.type(), targetCompression)
+        );
+    }
+
+    @Test
+    public void testLogAppendTimeNonCompressedV0() {
+        checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V0);
+    }
+
+    @Test
+    public void testLogAppendTimeNonCompressedV1() {
+        checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    public void testLogAppendTimeNonCompressedV2() {
+        checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+
+    private void testBatchWithoutRecordsNotAllowed(CompressionType 
sourceCompression, Compression targetCompression) {
+        long offset = 1234567;
+        long producerId = 1324L;
+        short producerEpoch = 10;
+        int baseSequence = 984;
+        boolean isTransactional = true;
+        int partitionLeaderEpoch = 40;
+
+        ByteBuffer buffer = 
ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
+        DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch,
+                baseSequence, 0L, 5L, partitionLeaderEpoch, 
TimestampType.CREATE_TIME, System.currentTimeMillis(),
+                isTransactional, false);
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        assertThrows(InvalidRecordException.class, () -> new 
LogValidator(records,
+                topicPartition,
+                time,
+                sourceCompression,
+                targetCompression,
+                false,
+                RecordBatch.CURRENT_MAGIC_VALUE,
+                TimestampType.CREATE_TIME,
+                5000L,
+                5000L,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        ).validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(offset), metricsRecorder, 
RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        ));
+    }
+
+    private void checkMismatchMagic(byte batchMagic, byte recordMagic, 
Compression compression) {
+        assertThrows(RecordValidationException.class,
+                () -> 
validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, 
compression), batchMagic, compression.type(), compression));
+
+        assertTrue(metricsRecorder.recordInvalidMagicCount > 0);
+    }
+
+    @Test
+    void testNonCompressedV2() {
+        checkNonCompressed(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    void testRecompressionV1() {
+        checkRecompression(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    void testRecompressionV2() {
+        checkRecompression(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    void testCreateTimeUpConversionV0ToV1() {
+        checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    void testCreateTimeUpConversionV0ToV2() {
+        checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    public void testCreateTimeUpConversionV1ToV2() {
+        long timestamp = System.currentTimeMillis();
+        Compression compression = Compression.gzip().build();
+        MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, 
timestamp, compression);
+
+        LogValidator validator = new LogValidator(
+                records,
+                topicPartition,
+                time,
+                CompressionType.GZIP,
+                compression,
+                false,
+                RecordBatch.MAGIC_VALUE_V2,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        );
+
+        LogValidator.ValidationResult validatedResults = 
validator.validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        );
+
+        MemoryRecords validatedRecords = validatedResults.validatedRecords;
+
+        for (RecordBatch batch : validatedRecords.batches()) {
+            assertTrue(batch.isValid());
+            maybeCheckBaseTimestamp(timestamp, batch);
+            assertEquals(timestamp, batch.maxTimestamp());
+            assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
+            assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
+            assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
+        }
+
+        assertEquals(timestamp, validatedResults.maxTimestampMs);
+        assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, "Offset 
of max timestamp should be the last offset 2.");
+        assertTrue(validatedResults.messageSizeMaybeChanged, "Message size 
should have been changed");
+
+        verifyRecordValidationStats(
+                validatedResults.recordValidationStats,
+                3,
+                records,
+                true
+        );
+    }
+
+    private void checkCreateTimeUpConversionFromV0(byte toMagic) {
+        Compression compression = Compression.gzip().build();
+        MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.NO_TIMESTAMP, compression);
+        LogValidator logValidator = new LogValidator(records,
+                topicPartition,
+                time,
+                CompressionType.GZIP,
+                compression,
+                false,
+                toMagic,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        );
+        LogValidator.ValidationResult validatedResults = 
logValidator.validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        );
+        MemoryRecords validatedRecords = validatedResults.validatedRecords;
+
+        for (RecordBatch batch : validatedRecords.batches()) {
+            assertTrue(batch.isValid());
+            maybeCheckBaseTimestamp(RecordBatch.NO_TIMESTAMP, batch);
+            assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp());
+            assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
+            assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
+            assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
+        }
+        assertEquals(RecordBatch.NO_TIMESTAMP, validatedResults.maxTimestampMs,
+                "Max timestamp should be " + RecordBatch.NO_TIMESTAMP);
+        assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp);
+        assertTrue(validatedResults.messageSizeMaybeChanged, "Message size 
should have been changed");
+
+        verifyRecordValidationStats(validatedResults.recordValidationStats, 3, 
records, true);
+    }
+
+    private void checkRecompression(byte magic) {
+        long now = System.currentTimeMillis();
+        // Set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
+        List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now);
+
+        long producerId;
+        short producerEpoch;
+        int baseSequence;
+        boolean isTransactional;
+        int partitionLeaderEpoch;
+
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            producerId = 1324L;
+            producerEpoch = 10;
+            baseSequence = 984;
+            isTransactional = true;
+            partitionLeaderEpoch = 40;
+        } else {
+            producerId = RecordBatch.NO_PRODUCER_ID;
+            producerEpoch = RecordBatch.NO_PRODUCER_EPOCH;
+            baseSequence = RecordBatch.NO_SEQUENCE;
+            isTransactional = false;
+            partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH;
+        }
+
+        MemoryRecords records = MemoryRecords.withRecords(
+                magic,
+                0L,
+                Compression.NONE,
+                TimestampType.CREATE_TIME,
+                producerId,
+                producerEpoch,
+                baseSequence,
+                partitionLeaderEpoch,
+                isTransactional,
+                new SimpleRecord(timestampSeq.get(0), "hello".getBytes()),
+                new SimpleRecord(timestampSeq.get(1), "there".getBytes()),
+                new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes())
+        );
+
+        // V2 has single batch, and other versions have many single-record 
batches
+        assertEquals(magic >= RecordBatch.MAGIC_VALUE_V2 ? 1 : 3, 
iteratorSize(records.batches().iterator()));
+
+        LogValidator.ValidationResult validatingResults = new LogValidator(
+                records,
+                topicPartition,
+                time,
+                CompressionType.NONE,
+                Compression.gzip().build(),
+                false,
+                magic,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                partitionLeaderEpoch,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        ).validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0L),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        );
+
+        MemoryRecords validatedRecords = validatingResults.validatedRecords;
+
+        int i = 0;
+        for (RecordBatch batch : validatedRecords.batches()) {
+            assertTrue(batch.isValid());
+            assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            maybeCheckBaseTimestamp(timestampSeq.get(0), batch);
+            assertEquals(batch.maxTimestamp(), batch.maxTimestamp());
+            assertEquals(producerEpoch, batch.producerEpoch());
+            assertEquals(producerId, batch.producerId());
+            assertEquals(baseSequence, batch.baseSequence());
+            assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
+
+            for (Record record : batch) {
+                record.ensureValid();
+                assertEquals((long) timestampSeq.get(i), record.timestamp());
+                i++;
+            }
+        }
+
+        assertEquals(now + 1, validatingResults.maxTimestampMs,
+                "Max timestamp should be " + (now + 1));
+
+        // Both V2 and V1 have single batch in the validated records when 
compression is enabled, and hence their shallow
+        // OffsetOfMaxTimestamp is the last offset of the single batch
+        assertEquals(1, iteratorSize(validatedRecords.batches().iterator()));
+        assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp);
+        assertTrue(validatingResults.messageSizeMaybeChanged,
+                "Message size should have been changed");
+
+        verifyRecordValidationStats(validatingResults.recordValidationStats, 
3, records, true);
+    }
+
+    private MemoryRecords recordsWithInvalidInnerMagic(byte batchMagicValue, 
byte recordMagicValue, Compression codec) {
+        List<LegacyRecord> records = new ArrayList<>();
+
+        for (int id = 0; id < 20; id++) {
+            records.add(LegacyRecord.create(
+                    recordMagicValue,
+                    RecordBatch.NO_TIMESTAMP,
+                    Integer.toString(id).getBytes(),
+                    Integer.toString(id).getBytes()
+            ));
+        }
+
+        ByteBuffer buffer = ByteBuffer.allocate(Math.min(Math.max(
+                records.stream().mapToInt(LegacyRecord::sizeInBytes).sum() / 
2, 1024), 1 << 16));
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
batchMagicValue, codec,
+                TimestampType.CREATE_TIME, 0L);
+
+        AtomicLong offset = new AtomicLong(1234567);
+        records.forEach(record -> {
+            builder.appendUncheckedWithOffset(offset.get(), record);
+            offset.incrementAndGet();
+        });
+
+        return builder.build();
+    }
+
+    private MemoryRecords recordsWithNonSequentialInnerOffsets(Byte 
magicValue, Compression compression, int numRecords) {
+        List<SimpleRecord> records = IntStream.range(0, numRecords)
+                .mapToObj(id -> new 
SimpleRecord(String.valueOf(id).getBytes()))
+                .collect(Collectors.toList());
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
magicValue, compression, TimestampType.CREATE_TIME, 0L);
+
+        records.forEach(record ->
+                assertDoesNotThrow(() -> builder.appendUncheckedWithOffset(0, 
record))
+        );
+
+        return builder.build();
+    }
+
+    private void checkAllowMultiBatch(Byte magic, Compression 
sourceCompression, Compression targetCompression) {
+        validateMessages(createTwoBatchedRecords(magic,  sourceCompression), 
magic, sourceCompression.type(), targetCompression);
+    }
+
+
+    private ValidationResult validateMessages(MemoryRecords records,
+                                              Byte magic,
+                                              CompressionType 
sourceCompressionType,
+                                              Compression 
targetCompressionType) {
+        MockTime mockTime = new MockTime(0L, 0L);
+        return new LogValidator(records,
+                topicPartition,
+                mockTime,
+                sourceCompressionType,
+                targetCompressionType,
+                false,
+                magic,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                RecordBatch.NO_PRODUCER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.IBP_2_3_IV1
+        ).validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0L), metricsRecorder, 
RequestLocal.withThreadConfinedCaching().bufferSupplier());
+    }
+
+    private MemoryRecords createTwoBatchedRecords(Byte magicValue,
+                                                  Compression codec) {
+        ByteBuffer buf = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, 
codec, TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, "1".getBytes(), "a".getBytes());
+        builder.close();
+        builder = MemoryRecords.builder(buf, magicValue, codec, 
TimestampType.CREATE_TIME, 1L);
+        builder.append(11L, "2".getBytes(), "b".getBytes());
+        builder.append(12L, "3".getBytes(), "c".getBytes());
+        builder.close();
+
+        buf.flip();
+        return MemoryRecords.readableRecords(buf.slice());
+    }
+
+    private MemoryRecords createRecords(byte magicValue,
+                                        long timestamp,
+                                        Compression codec) {
+        List<byte[]> records = Arrays.asList("hello".getBytes(), 
"there".getBytes(), "beautiful".getBytes());
+        return createRecords(records, magicValue, timestamp, codec);
+    }
+
+    @Test
+    void testCompressedV1() {
+        checkCompressed(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    private void checkCompressed(byte magic) {
+        long now = System.currentTimeMillis();
+        // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
+        List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now);
+
+        long producerId;
+        short producerEpoch;
+        int baseSequence;
+        boolean isTransactional;
+        int partitionLeaderEpoch;
+
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            producerId = 1324L;
+            producerEpoch = 10;
+            baseSequence = 984;
+            isTransactional = true;
+            partitionLeaderEpoch = 40;
+        } else {
+            producerId = RecordBatch.NO_PRODUCER_ID;
+            producerEpoch = RecordBatch.NO_PRODUCER_EPOCH;
+            baseSequence = RecordBatch.NO_SEQUENCE;
+            isTransactional = false;
+            partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH;
+        }
+
+        List<SimpleRecord> recordList = Arrays.asList(
+                new SimpleRecord(timestampSeq.get(0), "hello".getBytes()),
+                new SimpleRecord(timestampSeq.get(1), "there".getBytes()),
+                new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes())
+        );
+
+        MemoryRecords records = MemoryRecords.withRecords(
+                magic,
+                0L,
+                Compression.gzip().build(),
+                TimestampType.CREATE_TIME,
+                producerId,
+                producerEpoch,
+                baseSequence,
+                partitionLeaderEpoch,
+                isTransactional,
+                recordList.toArray(new SimpleRecord[0])
+        );
+
+        LogValidator validator = new LogValidator(
+                records,
+                topicPartition,
+                time,
+                CompressionType.GZIP,
+                Compression.gzip().build(),
+                false,
+                magic,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                partitionLeaderEpoch,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        );
+
+        LogValidator.ValidationResult validatedResults = 
validator.validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        );
+
+        MemoryRecords validatedRecords = validatedResults.validatedRecords;
+
+        int i = 0;
+        for (RecordBatch batch : validatedRecords.batches()) {
+            assertTrue(batch.isValid());
+            assertEquals(batch.timestampType(), TimestampType.CREATE_TIME);
+            maybeCheckBaseTimestamp(timestampSeq.get(0), batch);
+            assertEquals(batch.maxTimestamp(), batch.maxTimestamp());
+            assertEquals(producerEpoch, batch.producerEpoch());
+            assertEquals(producerId, batch.producerId());
+            assertEquals(baseSequence, batch.baseSequence());
+            assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
+
+            for (Record record : batch) {
+                record.ensureValid();
+                assertEquals((long) timestampSeq.get(i), record.timestamp());
+                i++;
+            }
+        }
+
+        assertEquals(now + 1, validatedResults.maxTimestampMs, "Max timestamp 
should be " + (now + 1));
+
+        int expectedShallowOffsetOfMaxTimestamp = 2;
+        assertEquals(expectedShallowOffsetOfMaxTimestamp, 
validatedResults.shallowOffsetOfMaxTimestamp, "Shallow offset of max timestamp 
should be 2");
+        assertFalse(validatedResults.messageSizeMaybeChanged, "Message size 
should not have been changed");
+
+        verifyRecordValidationStats(validatedResults.recordValidationStats, 0, 
records, true);
+    }
+
+    private MemoryRecords createRecords(List<byte[]> records,
+                                        byte magicValue,
+                                        long timestamp,
+                                        Compression codec) {
+        ByteBuffer buf = ByteBuffer.allocate(512);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, 
codec, TimestampType.CREATE_TIME, 0L);
+
+        AtomicInteger offset = new AtomicInteger(0);
+        records.forEach(item ->
+                builder.appendWithOffset(offset.getAndIncrement(), timestamp, 
null, item));
+        return builder.build();
+    }
+
+    @Test
+    void testLogAppendTimeWithRecompressionV1() {
+        checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    void testLogAppendTimeWithRecompressionV2() {
+        checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    void testLogAppendTimeWithoutRecompressionV1() {
+        checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    void testCompressedV2() {
+        checkCompressed(RecordBatch.MAGIC_VALUE_V2);
+    }
+    @Test
+    void testInvalidOffsetRangeAndRecordCount() {

Review Comment:
   Could you please separate the different cases? for example:
   ```java
       @Test
       void testRecordBatchWithCountOverrides() {
           // The batch to be written contains 3 records, so the correct 
lastOffsetDelta is 2
           validateRecordBatchWithCountOverrides(2, 3);
       }
   
       @ParameterizedTest
       @CsvSource({"0,3", "15,3", "-3,3"})
       void testInconsistentCountAndOffset(int lastOffsetDelta, int count) {
           // Count and offset range are inconsistent or invalid
           assertInvalidBatchCountOverrides(lastOffsetDelta, count);
       }
   
       @ParameterizedTest
       @CsvSource({"5,6", "1,2"})
       void testUnmatchedNumberOfRecords(int lastOffsetDelta, int count) {
           // Count and offset range are consistent, but do not match the 
actual number of records
           assertInvalidBatchCountOverrides(lastOffsetDelta, count);
       }
   ```



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java:
##########
@@ -0,0 +1,2250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import kafka.server.RequestLocal;
+
+import org.apache.kafka.common.InvalidRecordException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.compress.GzipCompression;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.InvalidTimestampException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
+import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordValidationStats;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.PrimitiveRef;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogValidatorTest {
+    private final Time time = Time.SYSTEM;
+    private final TopicPartition topicPartition = new TopicPartition("topic", 
0);
+    private final MetricsRecorder metricsRecorder = new MetricsRecorder();
+
+    static class MetricsRecorder implements LogValidator.MetricsRecorder {
+        public int recordInvalidMagicCount = 0;
+        public int recordInvalidOffsetCount = 0;
+        public int recordInvalidChecksumsCount = 0;
+        public int recordInvalidSequenceCount = 0;
+        public int recordNoKeyCompactedTopicCount = 0;
+
+        public void recordInvalidMagic() {
+            recordInvalidMagicCount += 1;
+        }
+
+        public void recordInvalidOffset() {
+            recordInvalidOffsetCount += 1;
+        }
+
+        public void recordInvalidSequence() {
+            recordInvalidSequenceCount += 1;
+        }
+
+        public void recordInvalidChecksums() {
+            recordInvalidChecksumsCount += 1;
+        }
+
+        public void recordNoKeyCompactedTopic() {
+            recordNoKeyCompactedTopicCount += 1;
+        }
+    }
+
+    @Test
+    public void testOnlyOneBatch() {
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.gzip().build());
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.NONE);
+        checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, 
Compression.gzip().build());
+    }
+
+    @Test
+    public void testAllowMultiBatch() {
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, 
Compression.NONE);
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, 
Compression.NONE);
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, 
Compression.gzip().build());
+        checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, 
Compression.gzip().build());
+    }
+
+    @Test
+    public void testValidationOfBatchesWithNonSequentialInnerOffsets() {
+        Arrays.stream(RecordVersion.values()).forEach(version -> {
+            int numRecords = 20;
+            Compression compression = Compression.gzip().build();
+            MemoryRecords invalidRecords = 
recordsWithNonSequentialInnerOffsets(version.value, compression, numRecords);
+
+            // Validation for v2 and above is strict for this case. For older 
formats, we fix invalid
+            // internal offsets by rewriting the batch.
+            if (version.value >= RecordBatch.MAGIC_VALUE_V2) {
+                assertThrows(InvalidRecordException.class, () ->
+                        validateMessages(invalidRecords, version.value, 
CompressionType.GZIP, compression)
+                );
+            } else {
+                ValidationResult result = validateMessages(invalidRecords, 
version.value, CompressionType.GZIP, compression);
+                List<Long> recordsResult = new ArrayList<>();
+                result.validatedRecords.records().forEach(s -> 
recordsResult.add(s.offset()));
+                assertEquals(LongStream.range(0, 
numRecords).boxed().collect(Collectors.toList()), recordsResult);
+            }
+        });
+    }
+
+    @Test
+    public void testMisMatchMagic() {
+        Compression compress = Compression.gzip().build();
+        checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, compress);
+        checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, 
RecordBatch.MAGIC_VALUE_V0, compress);
+    }
+
+    @Test
+    void testUncompressedBatchWithoutRecordsNotAllowed() {
+        testBatchWithoutRecordsNotAllowed(CompressionType.NONE, 
Compression.NONE);
+    }
+
+    @Test
+    void testRecompressedBatchWithoutRecordsNotAllowed() {
+        testBatchWithoutRecordsNotAllowed(CompressionType.NONE, 
Compression.gzip().build());
+    }
+
+    private void checkOnlyOneBatch(Byte magic, Compression sourceCompression,
+                                   Compression targetCompression) {
+        assertThrows(InvalidRecordException.class,
+                () -> validateMessages(createTwoBatchedRecords(magic, 
sourceCompression),
+                        magic, sourceCompression.type(), targetCompression)
+        );
+    }
+
+    @Test
+    public void testLogAppendTimeNonCompressedV0() {
+        checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V0);
+    }
+
+    @Test
+    public void testLogAppendTimeNonCompressedV1() {
+        checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    public void testLogAppendTimeNonCompressedV2() {
+        checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+
+    private void testBatchWithoutRecordsNotAllowed(CompressionType 
sourceCompression, Compression targetCompression) {
+        long offset = 1234567;
+        long producerId = 1324L;
+        short producerEpoch = 10;
+        int baseSequence = 984;
+        boolean isTransactional = true;
+        int partitionLeaderEpoch = 40;
+
+        ByteBuffer buffer = 
ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
+        DefaultRecordBatch.writeEmptyHeader(buffer, 
RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch,
+                baseSequence, 0L, 5L, partitionLeaderEpoch, 
TimestampType.CREATE_TIME, System.currentTimeMillis(),
+                isTransactional, false);
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        assertThrows(InvalidRecordException.class, () -> new 
LogValidator(records,
+                topicPartition,
+                time,
+                sourceCompression,
+                targetCompression,
+                false,
+                RecordBatch.CURRENT_MAGIC_VALUE,
+                TimestampType.CREATE_TIME,
+                5000L,
+                5000L,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        ).validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(offset), metricsRecorder, 
RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        ));
+    }
+
+    private void checkMismatchMagic(byte batchMagic, byte recordMagic, 
Compression compression) {
+        assertThrows(RecordValidationException.class,
+                () -> 
validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, 
compression), batchMagic, compression.type(), compression));
+
+        assertTrue(metricsRecorder.recordInvalidMagicCount > 0);
+    }
+
+    @Test
+    void testNonCompressedV2() {
+        checkNonCompressed(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    void testRecompressionV1() {
+        checkRecompression(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    void testRecompressionV2() {
+        checkRecompression(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    void testCreateTimeUpConversionV0ToV1() {
+        checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    void testCreateTimeUpConversionV0ToV2() {
+        checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    public void testCreateTimeUpConversionV1ToV2() {
+        long timestamp = System.currentTimeMillis();
+        Compression compression = Compression.gzip().build();
+        MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, 
timestamp, compression);
+
+        LogValidator validator = new LogValidator(
+                records,
+                topicPartition,
+                time,
+                CompressionType.GZIP,
+                compression,
+                false,
+                RecordBatch.MAGIC_VALUE_V2,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        );
+
+        LogValidator.ValidationResult validatedResults = 
validator.validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        );
+
+        MemoryRecords validatedRecords = validatedResults.validatedRecords;
+
+        for (RecordBatch batch : validatedRecords.batches()) {
+            assertTrue(batch.isValid());
+            maybeCheckBaseTimestamp(timestamp, batch);
+            assertEquals(timestamp, batch.maxTimestamp());
+            assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
+            assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
+            assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
+        }
+
+        assertEquals(timestamp, validatedResults.maxTimestampMs);
+        assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, "Offset 
of max timestamp should be the last offset 2.");
+        assertTrue(validatedResults.messageSizeMaybeChanged, "Message size 
should have been changed");
+
+        verifyRecordValidationStats(
+                validatedResults.recordValidationStats,
+                3,
+                records,
+                true
+        );
+    }
+
+    private void checkCreateTimeUpConversionFromV0(byte toMagic) {
+        Compression compression = Compression.gzip().build();
+        MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.NO_TIMESTAMP, compression);
+        LogValidator logValidator = new LogValidator(records,
+                topicPartition,
+                time,
+                CompressionType.GZIP,
+                compression,
+                false,
+                toMagic,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        );
+        LogValidator.ValidationResult validatedResults = 
logValidator.validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        );
+        MemoryRecords validatedRecords = validatedResults.validatedRecords;
+
+        for (RecordBatch batch : validatedRecords.batches()) {
+            assertTrue(batch.isValid());
+            maybeCheckBaseTimestamp(RecordBatch.NO_TIMESTAMP, batch);
+            assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp());
+            assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
+            assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
+            assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
+        }
+        assertEquals(RecordBatch.NO_TIMESTAMP, validatedResults.maxTimestampMs,
+                "Max timestamp should be " + RecordBatch.NO_TIMESTAMP);
+        assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp);
+        assertTrue(validatedResults.messageSizeMaybeChanged, "Message size 
should have been changed");
+
+        verifyRecordValidationStats(validatedResults.recordValidationStats, 3, 
records, true);
+    }
+
+    private void checkRecompression(byte magic) {
+        long now = System.currentTimeMillis();
+        // Set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
+        List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now);
+
+        long producerId;
+        short producerEpoch;
+        int baseSequence;
+        boolean isTransactional;
+        int partitionLeaderEpoch;
+
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            producerId = 1324L;
+            producerEpoch = 10;
+            baseSequence = 984;
+            isTransactional = true;
+            partitionLeaderEpoch = 40;
+        } else {
+            producerId = RecordBatch.NO_PRODUCER_ID;
+            producerEpoch = RecordBatch.NO_PRODUCER_EPOCH;
+            baseSequence = RecordBatch.NO_SEQUENCE;
+            isTransactional = false;
+            partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH;
+        }
+
+        MemoryRecords records = MemoryRecords.withRecords(
+                magic,
+                0L,
+                Compression.NONE,
+                TimestampType.CREATE_TIME,
+                producerId,
+                producerEpoch,
+                baseSequence,
+                partitionLeaderEpoch,
+                isTransactional,
+                new SimpleRecord(timestampSeq.get(0), "hello".getBytes()),
+                new SimpleRecord(timestampSeq.get(1), "there".getBytes()),
+                new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes())
+        );
+
+        // V2 has single batch, and other versions have many single-record 
batches
+        assertEquals(magic >= RecordBatch.MAGIC_VALUE_V2 ? 1 : 3, 
iteratorSize(records.batches().iterator()));
+
+        LogValidator.ValidationResult validatingResults = new LogValidator(
+                records,
+                topicPartition,
+                time,
+                CompressionType.NONE,
+                Compression.gzip().build(),
+                false,
+                magic,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                partitionLeaderEpoch,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        ).validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0L),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        );
+
+        MemoryRecords validatedRecords = validatingResults.validatedRecords;
+
+        int i = 0;
+        for (RecordBatch batch : validatedRecords.batches()) {
+            assertTrue(batch.isValid());
+            assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+            maybeCheckBaseTimestamp(timestampSeq.get(0), batch);
+            assertEquals(batch.maxTimestamp(), batch.maxTimestamp());
+            assertEquals(producerEpoch, batch.producerEpoch());
+            assertEquals(producerId, batch.producerId());
+            assertEquals(baseSequence, batch.baseSequence());
+            assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
+
+            for (Record record : batch) {
+                record.ensureValid();
+                assertEquals((long) timestampSeq.get(i), record.timestamp());
+                i++;
+            }
+        }
+
+        assertEquals(now + 1, validatingResults.maxTimestampMs,
+                "Max timestamp should be " + (now + 1));
+
+        // Both V2 and V1 have single batch in the validated records when 
compression is enabled, and hence their shallow
+        // OffsetOfMaxTimestamp is the last offset of the single batch
+        assertEquals(1, iteratorSize(validatedRecords.batches().iterator()));
+        assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp);
+        assertTrue(validatingResults.messageSizeMaybeChanged,
+                "Message size should have been changed");
+
+        verifyRecordValidationStats(validatingResults.recordValidationStats, 
3, records, true);
+    }
+
+    private MemoryRecords recordsWithInvalidInnerMagic(byte batchMagicValue, 
byte recordMagicValue, Compression codec) {
+        List<LegacyRecord> records = new ArrayList<>();
+
+        for (int id = 0; id < 20; id++) {
+            records.add(LegacyRecord.create(
+                    recordMagicValue,
+                    RecordBatch.NO_TIMESTAMP,
+                    Integer.toString(id).getBytes(),
+                    Integer.toString(id).getBytes()
+            ));
+        }
+
+        ByteBuffer buffer = ByteBuffer.allocate(Math.min(Math.max(
+                records.stream().mapToInt(LegacyRecord::sizeInBytes).sum() / 
2, 1024), 1 << 16));
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
batchMagicValue, codec,
+                TimestampType.CREATE_TIME, 0L);
+
+        AtomicLong offset = new AtomicLong(1234567);
+        records.forEach(record -> {
+            builder.appendUncheckedWithOffset(offset.get(), record);
+            offset.incrementAndGet();
+        });
+
+        return builder.build();
+    }
+
+    private MemoryRecords recordsWithNonSequentialInnerOffsets(Byte 
magicValue, Compression compression, int numRecords) {
+        List<SimpleRecord> records = IntStream.range(0, numRecords)
+                .mapToObj(id -> new 
SimpleRecord(String.valueOf(id).getBytes()))
+                .collect(Collectors.toList());
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
magicValue, compression, TimestampType.CREATE_TIME, 0L);
+
+        records.forEach(record ->
+                assertDoesNotThrow(() -> builder.appendUncheckedWithOffset(0, 
record))
+        );
+
+        return builder.build();
+    }
+
+    private void checkAllowMultiBatch(Byte magic, Compression 
sourceCompression, Compression targetCompression) {
+        validateMessages(createTwoBatchedRecords(magic,  sourceCompression), 
magic, sourceCompression.type(), targetCompression);
+    }
+
+
+    private ValidationResult validateMessages(MemoryRecords records,
+                                              Byte magic,
+                                              CompressionType 
sourceCompressionType,
+                                              Compression 
targetCompressionType) {
+        MockTime mockTime = new MockTime(0L, 0L);
+        return new LogValidator(records,
+                topicPartition,
+                mockTime,
+                sourceCompressionType,
+                targetCompressionType,
+                false,
+                magic,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                RecordBatch.NO_PRODUCER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.IBP_2_3_IV1
+        ).validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0L), metricsRecorder, 
RequestLocal.withThreadConfinedCaching().bufferSupplier());
+    }
+
+    private MemoryRecords createTwoBatchedRecords(Byte magicValue,
+                                                  Compression codec) {
+        ByteBuffer buf = ByteBuffer.allocate(2048);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, 
codec, TimestampType.CREATE_TIME, 0L);
+        builder.append(10L, "1".getBytes(), "a".getBytes());
+        builder.close();
+        builder = MemoryRecords.builder(buf, magicValue, codec, 
TimestampType.CREATE_TIME, 1L);
+        builder.append(11L, "2".getBytes(), "b".getBytes());
+        builder.append(12L, "3".getBytes(), "c".getBytes());
+        builder.close();
+
+        buf.flip();
+        return MemoryRecords.readableRecords(buf.slice());
+    }
+
+    private MemoryRecords createRecords(byte magicValue,
+                                        long timestamp,
+                                        Compression codec) {
+        List<byte[]> records = Arrays.asList("hello".getBytes(), 
"there".getBytes(), "beautiful".getBytes());
+        return createRecords(records, magicValue, timestamp, codec);
+    }
+
+    @Test
+    void testCompressedV1() {
+        checkCompressed(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    private void checkCompressed(byte magic) {
+        long now = System.currentTimeMillis();
+        // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp
+        List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now);
+
+        long producerId;
+        short producerEpoch;
+        int baseSequence;
+        boolean isTransactional;
+        int partitionLeaderEpoch;
+
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            producerId = 1324L;
+            producerEpoch = 10;
+            baseSequence = 984;
+            isTransactional = true;
+            partitionLeaderEpoch = 40;
+        } else {
+            producerId = RecordBatch.NO_PRODUCER_ID;
+            producerEpoch = RecordBatch.NO_PRODUCER_EPOCH;
+            baseSequence = RecordBatch.NO_SEQUENCE;
+            isTransactional = false;
+            partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH;
+        }
+
+        List<SimpleRecord> recordList = Arrays.asList(
+                new SimpleRecord(timestampSeq.get(0), "hello".getBytes()),
+                new SimpleRecord(timestampSeq.get(1), "there".getBytes()),
+                new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes())
+        );
+
+        MemoryRecords records = MemoryRecords.withRecords(
+                magic,
+                0L,
+                Compression.gzip().build(),
+                TimestampType.CREATE_TIME,
+                producerId,
+                producerEpoch,
+                baseSequence,
+                partitionLeaderEpoch,
+                isTransactional,
+                recordList.toArray(new SimpleRecord[0])
+        );
+
+        LogValidator validator = new LogValidator(
+                records,
+                topicPartition,
+                time,
+                CompressionType.GZIP,
+                Compression.gzip().build(),
+                false,
+                magic,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                partitionLeaderEpoch,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        );
+
+        LogValidator.ValidationResult validatedResults = 
validator.validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        );
+
+        MemoryRecords validatedRecords = validatedResults.validatedRecords;
+
+        int i = 0;
+        for (RecordBatch batch : validatedRecords.batches()) {
+            assertTrue(batch.isValid());
+            assertEquals(batch.timestampType(), TimestampType.CREATE_TIME);
+            maybeCheckBaseTimestamp(timestampSeq.get(0), batch);
+            assertEquals(batch.maxTimestamp(), batch.maxTimestamp());
+            assertEquals(producerEpoch, batch.producerEpoch());
+            assertEquals(producerId, batch.producerId());
+            assertEquals(baseSequence, batch.baseSequence());
+            assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
+
+            for (Record record : batch) {
+                record.ensureValid();
+                assertEquals((long) timestampSeq.get(i), record.timestamp());
+                i++;
+            }
+        }
+
+        assertEquals(now + 1, validatedResults.maxTimestampMs, "Max timestamp 
should be " + (now + 1));
+
+        int expectedShallowOffsetOfMaxTimestamp = 2;
+        assertEquals(expectedShallowOffsetOfMaxTimestamp, 
validatedResults.shallowOffsetOfMaxTimestamp, "Shallow offset of max timestamp 
should be 2");
+        assertFalse(validatedResults.messageSizeMaybeChanged, "Message size 
should not have been changed");
+
+        verifyRecordValidationStats(validatedResults.recordValidationStats, 0, 
records, true);
+    }
+
+    private MemoryRecords createRecords(List<byte[]> records,
+                                        byte magicValue,
+                                        long timestamp,
+                                        Compression codec) {
+        ByteBuffer buf = ByteBuffer.allocate(512);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, 
codec, TimestampType.CREATE_TIME, 0L);
+
+        AtomicInteger offset = new AtomicInteger(0);
+        records.forEach(item ->
+                builder.appendWithOffset(offset.getAndIncrement(), timestamp, 
null, item));
+        return builder.build();
+    }
+
+    @Test
+    void testLogAppendTimeWithRecompressionV1() {
+        checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    void testLogAppendTimeWithRecompressionV2() {
+        checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    void testLogAppendTimeWithoutRecompressionV1() {
+        checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V1);
+    }
+
+    @Test
+    void testCompressedV2() {
+        checkCompressed(RecordBatch.MAGIC_VALUE_V2);
+    }
+    @Test
+    void testInvalidOffsetRangeAndRecordCount() {
+        // The batch to be written contains 3 records, so the correct 
lastOffsetDelta is 2
+        validateRecordBatchWithCountOverrides(2, 3);
+
+        // Count and offset range are inconsistent or invalid
+        assertInvalidBatchCountOverrides(0, 3);
+        assertInvalidBatchCountOverrides(15, 3);
+        assertInvalidBatchCountOverrides(-3, 3);
+        assertInvalidBatchCountOverrides(2, -3);
+        assertInvalidBatchCountOverrides(2, 6);
+        assertInvalidBatchCountOverrides(2, 0);
+        assertInvalidBatchCountOverrides(-3, -2);
+
+        // Count and offset range are consistent, but do not match the actual 
number of records
+        assertInvalidBatchCountOverrides(5, 6);
+        assertInvalidBatchCountOverrides(1, 2);
+    }
+
+    @Test
+    void testLogAppendTimeWithoutRecompressionV2() {
+        checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V2);
+    }
+
+    @Test
+    void testInvalidCreateTimeNonCompressedV1() {
+        long now = System.currentTimeMillis();
+        MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, now 
- 1001L,
+                Compression.NONE);
+        assertThrows(RecordValidationException.class, () -> new LogValidator(
+                records,
+                topicPartition,
+                time,
+                CompressionType.NONE,
+                Compression.NONE,
+                false,
+                RecordBatch.MAGIC_VALUE_V1,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        ).validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        ));
+    }
+
+    @Test
+    public void testInvalidCreateTimeCompressedV1() {
+        long now = System.currentTimeMillis();
+        Compression compression = Compression.gzip().build();
+        MemoryRecords records = createRecords(
+                RecordBatch.MAGIC_VALUE_V1,
+                now - 1001L,
+                compression
+        );
+
+        assertThrows(RecordValidationException.class, () ->
+                new LogValidator(
+                        records,
+                        new TopicPartition("topic", 0),
+                        time,
+                        CompressionType.GZIP,
+                        compression,
+                        false,
+                        RecordBatch.MAGIC_VALUE_V1,
+                        TimestampType.CREATE_TIME,
+                        1000L,
+                        1000L,
+                        RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                        AppendOrigin.CLIENT,
+                        MetadataVersion.latestTesting()
+                ).validateMessagesAndAssignOffsets(
+                        PrimitiveRef.ofLong(0),
+                        metricsRecorder,
+                        
RequestLocal.withThreadConfinedCaching().bufferSupplier()
+                )
+        );
+    }
+
+    @Test
+    public void testInvalidCreateTimeNonCompressedV2() {
+        long now = System.currentTimeMillis();
+        MemoryRecords records = createRecords(
+                RecordBatch.MAGIC_VALUE_V2,
+                now - 1001L,
+                Compression.NONE
+        );
+
+        assertThrows(RecordValidationException.class, () ->
+                new LogValidator(
+                        records,
+                        new TopicPartition("topic", 0),
+                        time,
+                        CompressionType.NONE,
+                        Compression.NONE,
+                        false,
+                        RecordBatch.MAGIC_VALUE_V2,
+                        TimestampType.CREATE_TIME,
+                        1000L,
+                        1000L,
+                        RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                        AppendOrigin.CLIENT,
+                        MetadataVersion.latestTesting()
+                ).validateMessagesAndAssignOffsets(
+                        PrimitiveRef.ofLong(0),
+                        metricsRecorder,
+                        
RequestLocal.withThreadConfinedCaching().bufferSupplier()
+                )
+        );
+    }
+
+    @Test
+    public void testInvalidChecksum() {
+        checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), CompressionType.GZIP);
+        checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), CompressionType.GZIP);
+        checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V0, 
Compression.lz4().build(), CompressionType.LZ4);
+        checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V1, 
Compression.lz4().build(), CompressionType.LZ4);
+        checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V0, 
Compression.snappy().build(), CompressionType.SNAPPY);
+        checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V1, 
Compression.snappy().build(), CompressionType.SNAPPY);
+    }
+
+    private void checkInvalidChecksum(byte magic, Compression compression, 
CompressionType type) {
+        LegacyRecord record = LegacyRecord.create(magic, 0L, null, 
"hello".getBytes());
+        ByteBuffer buf = record.buffer();
+
+        // enforce modify crc to make checksum error
+        buf.put(LegacyRecord.CRC_OFFSET, (byte) 0);
+
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, 
compression,
+                TimestampType.CREATE_TIME, 0L);
+        builder.appendUncheckedWithOffset(0, record);
+
+        MemoryRecords memoryRecords = builder.build();
+        LogValidator logValidator = new LogValidator(memoryRecords,
+                topicPartition,
+                time,
+                type,
+                compression,
+                false,
+                magic,
+                TimestampType.CREATE_TIME,
+                1000L,
+                1000L,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH,
+                AppendOrigin.CLIENT,
+                MetadataVersion.latestTesting()
+        );
+
+
+        assertThrows(CorruptRecordException.class, () -> 
logValidator.validateMessagesAndAssignOffsets(
+                PrimitiveRef.ofLong(0),
+                metricsRecorder,
+                RequestLocal.withThreadConfinedCaching().bufferSupplier()
+        ));
+
+        assertTrue(metricsRecorder.recordInvalidChecksumsCount > 0);
+    }
+
+    @ParameterizedTest
+    @EnumSource(CompressionType.class)
+    public void testInvalidSequenceV0(CompressionType type) {
+        checkInvalidSequence(RecordBatch.MAGIC_VALUE_V0, 
Compression.gzip().build(), type);
+        checkInvalidSequence(RecordBatch.MAGIC_VALUE_V0, 
Compression.lz4().build(), type);
+        checkInvalidSequence(RecordBatch.MAGIC_VALUE_V0, 
Compression.snappy().build(), type);
+        checkInvalidSequence(RecordBatch.MAGIC_VALUE_V0, 
Compression.zstd().build(), type);
+    }
+
+    @ParameterizedTest
+    @EnumSource(CompressionType.class)
+    public void testInvalidSequenceV1(CompressionType type) {
+        checkInvalidSequence(RecordBatch.MAGIC_VALUE_V1, 
Compression.gzip().build(), type);
+        checkInvalidSequence(RecordBatch.MAGIC_VALUE_V1, 
Compression.lz4().build(), type);
+        checkInvalidSequence(RecordBatch.MAGIC_VALUE_V1, 
Compression.snappy().build(), type);
+        checkInvalidSequence(RecordBatch.MAGIC_VALUE_V1, 
Compression.zstd().build(), type);
+    }
+
+    @ParameterizedTest
+    @EnumSource(CompressionType.class)
+    public void testInvalidSequenceV2(CompressionType type) {
+        checkInvalidSequence(RecordBatch.MAGIC_VALUE_V2, 
Compression.gzip().build(), type);
+        checkInvalidSequence(RecordBatch.MAGIC_VALUE_V2, 
Compression.lz4().build(), type);
+        checkInvalidSequence(RecordBatch.MAGIC_VALUE_V2, 
Compression.snappy().build(), type);
+        checkInvalidSequence(RecordBatch.MAGIC_VALUE_V2, 
Compression.zstd().build(), type);
+    }
+
+
+    private void checkInvalidSequence(byte magic, Compression compression, 
CompressionType type) {

Review Comment:
   Could you use `MethodSource`? for example:
   ```java
       private static Stream<Arguments> testInvalidSequence() {
           return Stream.of(RecordBatch.MAGIC_VALUE_V0, 
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)
                   .flatMap(code -> 
Arrays.stream(CompressionType.values()).flatMap(source ->
                           Arrays.stream(CompressionType.values()).map(target 
-> Arguments.of(code, source, target))));
       }
   
       @ParameterizedTest
       @MethodSource("testInvalidSequence")
       void testInvalidSequence(byte magic, CompressionType source, 
CompressionType target) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to