chia7712 commented on code in PR #16167: URL: https://github.com/apache/kafka/pull/16167#discussion_r1680405766
########## storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java: ########## @@ -0,0 +1,2250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import kafka.server.RequestLocal; + +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidTimestampException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LogValidatorTest { + private final Time time = Time.SYSTEM; + private final TopicPartition topicPartition = new TopicPartition("topic", 0); + private final MetricsRecorder metricsRecorder = new MetricsRecorder(); + + static class MetricsRecorder implements LogValidator.MetricsRecorder { + public int recordInvalidMagicCount = 0; + public int recordInvalidOffsetCount = 0; + public int recordInvalidChecksumsCount = 0; + public int recordInvalidSequenceCount = 0; + public int recordNoKeyCompactedTopicCount = 0; + + public void recordInvalidMagic() { + recordInvalidMagicCount += 1; + } + + public void recordInvalidOffset() { + recordInvalidOffsetCount += 1; + } + + public void recordInvalidSequence() { + recordInvalidSequenceCount += 1; + } + + public void recordInvalidChecksums() { + recordInvalidChecksumsCount += 1; + } + + public void recordNoKeyCompactedTopic() { + recordNoKeyCompactedTopicCount += 1; + } + } + + @Test + public void testOnlyOneBatch() { + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.gzip().build()); + } + + @Test + public void testAllowMultiBatch() { + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.NONE); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.NONE); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.gzip().build()); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.gzip().build()); + } + + @Test + public void testValidationOfBatchesWithNonSequentialInnerOffsets() { + Arrays.stream(RecordVersion.values()).forEach(version -> { + int numRecords = 20; + Compression compression = Compression.gzip().build(); + MemoryRecords invalidRecords = recordsWithNonSequentialInnerOffsets(version.value, compression, numRecords); + + // Validation for v2 and above is strict for this case. For older formats, we fix invalid + // internal offsets by rewriting the batch. + if (version.value >= RecordBatch.MAGIC_VALUE_V2) { + assertThrows(InvalidRecordException.class, () -> + validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression) + ); + } else { + ValidationResult result = validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression); + List<Long> recordsResult = new ArrayList<>(); + result.validatedRecords.records().forEach(s -> recordsResult.add(s.offset())); + assertEquals(LongStream.range(0, numRecords).boxed().collect(Collectors.toList()), recordsResult); + } + }); + } + + @Test + public void testMisMatchMagic() { + Compression compress = Compression.gzip().build(); + checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, compress); + checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V0, compress); + } + + @Test + void testUncompressedBatchWithoutRecordsNotAllowed() { + testBatchWithoutRecordsNotAllowed(CompressionType.NONE, Compression.NONE); + } + + @Test + void testRecompressedBatchWithoutRecordsNotAllowed() { + testBatchWithoutRecordsNotAllowed(CompressionType.NONE, Compression.gzip().build()); + } + + private void checkOnlyOneBatch(Byte magic, Compression sourceCompression, + Compression targetCompression) { + assertThrows(InvalidRecordException.class, + () -> validateMessages(createTwoBatchedRecords(magic, sourceCompression), + magic, sourceCompression.type(), targetCompression) + ); + } + + @Test + public void testLogAppendTimeNonCompressedV0() { Review Comment: ditto ########## storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java: ########## @@ -0,0 +1,2250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import kafka.server.RequestLocal; + +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidTimestampException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LogValidatorTest { + private final Time time = Time.SYSTEM; + private final TopicPartition topicPartition = new TopicPartition("topic", 0); + private final MetricsRecorder metricsRecorder = new MetricsRecorder(); + + static class MetricsRecorder implements LogValidator.MetricsRecorder { + public int recordInvalidMagicCount = 0; + public int recordInvalidOffsetCount = 0; + public int recordInvalidChecksumsCount = 0; + public int recordInvalidSequenceCount = 0; + public int recordNoKeyCompactedTopicCount = 0; + + public void recordInvalidMagic() { + recordInvalidMagicCount += 1; + } + + public void recordInvalidOffset() { + recordInvalidOffsetCount += 1; + } + + public void recordInvalidSequence() { + recordInvalidSequenceCount += 1; + } + + public void recordInvalidChecksums() { + recordInvalidChecksumsCount += 1; + } + + public void recordNoKeyCompactedTopic() { + recordNoKeyCompactedTopicCount += 1; + } + } + + @Test + public void testOnlyOneBatch() { + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.gzip().build()); + } + + @Test + public void testAllowMultiBatch() { Review Comment: ditto ########## storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java: ########## @@ -0,0 +1,2250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import kafka.server.RequestLocal; + +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidTimestampException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LogValidatorTest { + private final Time time = Time.SYSTEM; + private final TopicPartition topicPartition = new TopicPartition("topic", 0); + private final MetricsRecorder metricsRecorder = new MetricsRecorder(); + + static class MetricsRecorder implements LogValidator.MetricsRecorder { + public int recordInvalidMagicCount = 0; + public int recordInvalidOffsetCount = 0; + public int recordInvalidChecksumsCount = 0; + public int recordInvalidSequenceCount = 0; + public int recordNoKeyCompactedTopicCount = 0; + + public void recordInvalidMagic() { + recordInvalidMagicCount += 1; + } + + public void recordInvalidOffset() { + recordInvalidOffsetCount += 1; + } + + public void recordInvalidSequence() { + recordInvalidSequenceCount += 1; + } + + public void recordInvalidChecksums() { + recordInvalidChecksumsCount += 1; + } + + public void recordNoKeyCompactedTopic() { + recordNoKeyCompactedTopicCount += 1; + } + } + + @Test + public void testOnlyOneBatch() { + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.gzip().build()); + } + + @Test + public void testAllowMultiBatch() { + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.NONE); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.NONE); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.gzip().build()); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.gzip().build()); + } + + @Test + public void testValidationOfBatchesWithNonSequentialInnerOffsets() { + Arrays.stream(RecordVersion.values()).forEach(version -> { + int numRecords = 20; + Compression compression = Compression.gzip().build(); + MemoryRecords invalidRecords = recordsWithNonSequentialInnerOffsets(version.value, compression, numRecords); + + // Validation for v2 and above is strict for this case. For older formats, we fix invalid + // internal offsets by rewriting the batch. + if (version.value >= RecordBatch.MAGIC_VALUE_V2) { + assertThrows(InvalidRecordException.class, () -> + validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression) + ); + } else { + ValidationResult result = validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression); + List<Long> recordsResult = new ArrayList<>(); + result.validatedRecords.records().forEach(s -> recordsResult.add(s.offset())); + assertEquals(LongStream.range(0, numRecords).boxed().collect(Collectors.toList()), recordsResult); + } + }); + } + + @Test + public void testMisMatchMagic() { + Compression compress = Compression.gzip().build(); + checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, compress); + checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V0, compress); + } + + @Test + void testUncompressedBatchWithoutRecordsNotAllowed() { + testBatchWithoutRecordsNotAllowed(CompressionType.NONE, Compression.NONE); + } + + @Test + void testRecompressedBatchWithoutRecordsNotAllowed() { + testBatchWithoutRecordsNotAllowed(CompressionType.NONE, Compression.gzip().build()); + } + + private void checkOnlyOneBatch(Byte magic, Compression sourceCompression, + Compression targetCompression) { + assertThrows(InvalidRecordException.class, + () -> validateMessages(createTwoBatchedRecords(magic, sourceCompression), + magic, sourceCompression.type(), targetCompression) + ); + } + + @Test + public void testLogAppendTimeNonCompressedV0() { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V0); + } + + @Test + public void testLogAppendTimeNonCompressedV1() { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + public void testLogAppendTimeNonCompressedV2() { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2); + } + + + private void testBatchWithoutRecordsNotAllowed(CompressionType sourceCompression, Compression targetCompression) { + long offset = 1234567; + long producerId = 1324L; + short producerEpoch = 10; + int baseSequence = 984; + boolean isTransactional = true; + int partitionLeaderEpoch = 40; + + ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); + DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch, + baseSequence, 0L, 5L, partitionLeaderEpoch, TimestampType.CREATE_TIME, System.currentTimeMillis(), + isTransactional, false); + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + + assertThrows(InvalidRecordException.class, () -> new LogValidator(records, + topicPartition, + time, + sourceCompression, + targetCompression, + false, + RecordBatch.CURRENT_MAGIC_VALUE, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + } + + private void checkMismatchMagic(byte batchMagic, byte recordMagic, Compression compression) { + assertThrows(RecordValidationException.class, + () -> validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compression), batchMagic, compression.type(), compression)); + + assertTrue(metricsRecorder.recordInvalidMagicCount > 0); + } + + @Test + void testNonCompressedV2() { Review Comment: ditto ########## storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java: ########## @@ -0,0 +1,2250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import kafka.server.RequestLocal; + +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidTimestampException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LogValidatorTest { + private final Time time = Time.SYSTEM; + private final TopicPartition topicPartition = new TopicPartition("topic", 0); + private final MetricsRecorder metricsRecorder = new MetricsRecorder(); + + static class MetricsRecorder implements LogValidator.MetricsRecorder { + public int recordInvalidMagicCount = 0; + public int recordInvalidOffsetCount = 0; + public int recordInvalidChecksumsCount = 0; + public int recordInvalidSequenceCount = 0; + public int recordNoKeyCompactedTopicCount = 0; + + public void recordInvalidMagic() { + recordInvalidMagicCount += 1; + } + + public void recordInvalidOffset() { + recordInvalidOffsetCount += 1; + } + + public void recordInvalidSequence() { + recordInvalidSequenceCount += 1; + } + + public void recordInvalidChecksums() { + recordInvalidChecksumsCount += 1; + } + + public void recordNoKeyCompactedTopic() { + recordNoKeyCompactedTopicCount += 1; + } + } + + @Test + public void testOnlyOneBatch() { Review Comment: Could you please use `CsvSource` to rewrite them? ########## storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java: ########## @@ -0,0 +1,2250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import kafka.server.RequestLocal; + +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidTimestampException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LogValidatorTest { + private final Time time = Time.SYSTEM; + private final TopicPartition topicPartition = new TopicPartition("topic", 0); + private final MetricsRecorder metricsRecorder = new MetricsRecorder(); + + static class MetricsRecorder implements LogValidator.MetricsRecorder { + public int recordInvalidMagicCount = 0; + public int recordInvalidOffsetCount = 0; + public int recordInvalidChecksumsCount = 0; + public int recordInvalidSequenceCount = 0; + public int recordNoKeyCompactedTopicCount = 0; + + public void recordInvalidMagic() { + recordInvalidMagicCount += 1; + } + + public void recordInvalidOffset() { + recordInvalidOffsetCount += 1; + } + + public void recordInvalidSequence() { + recordInvalidSequenceCount += 1; + } + + public void recordInvalidChecksums() { + recordInvalidChecksumsCount += 1; + } + + public void recordNoKeyCompactedTopic() { + recordNoKeyCompactedTopicCount += 1; + } + } + + @Test + public void testOnlyOneBatch() { + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.gzip().build()); + } + + @Test + public void testAllowMultiBatch() { + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.NONE); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.NONE); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.gzip().build()); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.gzip().build()); + } + + @Test + public void testValidationOfBatchesWithNonSequentialInnerOffsets() { + Arrays.stream(RecordVersion.values()).forEach(version -> { + int numRecords = 20; + Compression compression = Compression.gzip().build(); + MemoryRecords invalidRecords = recordsWithNonSequentialInnerOffsets(version.value, compression, numRecords); + + // Validation for v2 and above is strict for this case. For older formats, we fix invalid + // internal offsets by rewriting the batch. + if (version.value >= RecordBatch.MAGIC_VALUE_V2) { + assertThrows(InvalidRecordException.class, () -> + validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression) + ); + } else { + ValidationResult result = validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression); + List<Long> recordsResult = new ArrayList<>(); + result.validatedRecords.records().forEach(s -> recordsResult.add(s.offset())); + assertEquals(LongStream.range(0, numRecords).boxed().collect(Collectors.toList()), recordsResult); + } + }); + } + + @Test + public void testMisMatchMagic() { Review Comment: ditto ########## storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java: ########## @@ -0,0 +1,2250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import kafka.server.RequestLocal; + +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidTimestampException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LogValidatorTest { + private final Time time = Time.SYSTEM; + private final TopicPartition topicPartition = new TopicPartition("topic", 0); + private final MetricsRecorder metricsRecorder = new MetricsRecorder(); + + static class MetricsRecorder implements LogValidator.MetricsRecorder { + public int recordInvalidMagicCount = 0; + public int recordInvalidOffsetCount = 0; + public int recordInvalidChecksumsCount = 0; + public int recordInvalidSequenceCount = 0; + public int recordNoKeyCompactedTopicCount = 0; + + public void recordInvalidMagic() { + recordInvalidMagicCount += 1; + } + + public void recordInvalidOffset() { + recordInvalidOffsetCount += 1; + } + + public void recordInvalidSequence() { + recordInvalidSequenceCount += 1; + } + + public void recordInvalidChecksums() { + recordInvalidChecksumsCount += 1; + } + + public void recordNoKeyCompactedTopic() { + recordNoKeyCompactedTopicCount += 1; + } + } + + @Test + public void testOnlyOneBatch() { + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.gzip().build()); + } + + @Test + public void testAllowMultiBatch() { + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.NONE); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.NONE); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.gzip().build()); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.gzip().build()); + } + + @Test + public void testValidationOfBatchesWithNonSequentialInnerOffsets() { + Arrays.stream(RecordVersion.values()).forEach(version -> { + int numRecords = 20; + Compression compression = Compression.gzip().build(); + MemoryRecords invalidRecords = recordsWithNonSequentialInnerOffsets(version.value, compression, numRecords); + + // Validation for v2 and above is strict for this case. For older formats, we fix invalid + // internal offsets by rewriting the batch. + if (version.value >= RecordBatch.MAGIC_VALUE_V2) { + assertThrows(InvalidRecordException.class, () -> + validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression) + ); + } else { + ValidationResult result = validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression); + List<Long> recordsResult = new ArrayList<>(); + result.validatedRecords.records().forEach(s -> recordsResult.add(s.offset())); + assertEquals(LongStream.range(0, numRecords).boxed().collect(Collectors.toList()), recordsResult); + } + }); + } + + @Test + public void testMisMatchMagic() { + Compression compress = Compression.gzip().build(); + checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, compress); + checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V0, compress); + } + + @Test + void testUncompressedBatchWithoutRecordsNotAllowed() { + testBatchWithoutRecordsNotAllowed(CompressionType.NONE, Compression.NONE); + } + + @Test + void testRecompressedBatchWithoutRecordsNotAllowed() { + testBatchWithoutRecordsNotAllowed(CompressionType.NONE, Compression.gzip().build()); + } + + private void checkOnlyOneBatch(Byte magic, Compression sourceCompression, + Compression targetCompression) { + assertThrows(InvalidRecordException.class, + () -> validateMessages(createTwoBatchedRecords(magic, sourceCompression), + magic, sourceCompression.type(), targetCompression) + ); + } + + @Test + public void testLogAppendTimeNonCompressedV0() { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V0); + } + + @Test + public void testLogAppendTimeNonCompressedV1() { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + public void testLogAppendTimeNonCompressedV2() { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2); + } + + + private void testBatchWithoutRecordsNotAllowed(CompressionType sourceCompression, Compression targetCompression) { + long offset = 1234567; + long producerId = 1324L; + short producerEpoch = 10; + int baseSequence = 984; + boolean isTransactional = true; + int partitionLeaderEpoch = 40; + + ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); + DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch, + baseSequence, 0L, 5L, partitionLeaderEpoch, TimestampType.CREATE_TIME, System.currentTimeMillis(), + isTransactional, false); + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + + assertThrows(InvalidRecordException.class, () -> new LogValidator(records, + topicPartition, + time, + sourceCompression, + targetCompression, + false, + RecordBatch.CURRENT_MAGIC_VALUE, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + } + + private void checkMismatchMagic(byte batchMagic, byte recordMagic, Compression compression) { + assertThrows(RecordValidationException.class, + () -> validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compression), batchMagic, compression.type(), compression)); + + assertTrue(metricsRecorder.recordInvalidMagicCount > 0); + } + + @Test + void testNonCompressedV2() { + checkNonCompressed(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + void testRecompressionV1() { + checkRecompression(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + void testRecompressionV2() { + checkRecompression(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + void testCreateTimeUpConversionV0ToV1() { + checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + void testCreateTimeUpConversionV0ToV2() { + checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + public void testCreateTimeUpConversionV1ToV2() { + long timestamp = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, timestamp, compression); + + LogValidator validator = new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + + LogValidator.ValidationResult validatedResults = validator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatedResults.validatedRecords; + + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + maybeCheckBaseTimestamp(timestamp, batch); + assertEquals(timestamp, batch.maxTimestamp()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch()); + assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId()); + assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence()); + } + + assertEquals(timestamp, validatedResults.maxTimestampMs); + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, "Offset of max timestamp should be the last offset 2."); + assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed"); + + verifyRecordValidationStats( + validatedResults.recordValidationStats, + 3, + records, + true + ); + } + + private void checkCreateTimeUpConversionFromV0(byte toMagic) { + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, RecordBatch.NO_TIMESTAMP, compression); + LogValidator logValidator = new LogValidator(records, + topicPartition, + time, + CompressionType.GZIP, + compression, + false, + toMagic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + LogValidator.ValidationResult validatedResults = logValidator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + MemoryRecords validatedRecords = validatedResults.validatedRecords; + + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + maybeCheckBaseTimestamp(RecordBatch.NO_TIMESTAMP, batch); + assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch()); + assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId()); + assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence()); + } + assertEquals(RecordBatch.NO_TIMESTAMP, validatedResults.maxTimestampMs, + "Max timestamp should be " + RecordBatch.NO_TIMESTAMP); + assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp); + assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed"); + + verifyRecordValidationStats(validatedResults.recordValidationStats, 3, records, true); + } + + private void checkRecompression(byte magic) { + long now = System.currentTimeMillis(); + // Set the timestamp of seq(1) (i.e. offset 1) as the max timestamp + List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now); + + long producerId; + short producerEpoch; + int baseSequence; + boolean isTransactional; + int partitionLeaderEpoch; + + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + producerId = 1324L; + producerEpoch = 10; + baseSequence = 984; + isTransactional = true; + partitionLeaderEpoch = 40; + } else { + producerId = RecordBatch.NO_PRODUCER_ID; + producerEpoch = RecordBatch.NO_PRODUCER_EPOCH; + baseSequence = RecordBatch.NO_SEQUENCE; + isTransactional = false; + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH; + } + + MemoryRecords records = MemoryRecords.withRecords( + magic, + 0L, + Compression.NONE, + TimestampType.CREATE_TIME, + producerId, + producerEpoch, + baseSequence, + partitionLeaderEpoch, + isTransactional, + new SimpleRecord(timestampSeq.get(0), "hello".getBytes()), + new SimpleRecord(timestampSeq.get(1), "there".getBytes()), + new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes()) + ); + + // V2 has single batch, and other versions have many single-record batches + assertEquals(magic >= RecordBatch.MAGIC_VALUE_V2 ? 1 : 3, iteratorSize(records.batches().iterator())); + + LogValidator.ValidationResult validatingResults = new LogValidator( + records, + topicPartition, + time, + CompressionType.NONE, + Compression.gzip().build(), + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + partitionLeaderEpoch, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatingResults.validatedRecords; + + int i = 0; + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + maybeCheckBaseTimestamp(timestampSeq.get(0), batch); + assertEquals(batch.maxTimestamp(), batch.maxTimestamp()); + assertEquals(producerEpoch, batch.producerEpoch()); + assertEquals(producerId, batch.producerId()); + assertEquals(baseSequence, batch.baseSequence()); + assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch()); + + for (Record record : batch) { + record.ensureValid(); + assertEquals((long) timestampSeq.get(i), record.timestamp()); + i++; + } + } + + assertEquals(now + 1, validatingResults.maxTimestampMs, + "Max timestamp should be " + (now + 1)); + + // Both V2 and V1 have single batch in the validated records when compression is enabled, and hence their shallow + // OffsetOfMaxTimestamp is the last offset of the single batch + assertEquals(1, iteratorSize(validatedRecords.batches().iterator())); + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp); + assertTrue(validatingResults.messageSizeMaybeChanged, + "Message size should have been changed"); + + verifyRecordValidationStats(validatingResults.recordValidationStats, 3, records, true); + } + + private MemoryRecords recordsWithInvalidInnerMagic(byte batchMagicValue, byte recordMagicValue, Compression codec) { + List<LegacyRecord> records = new ArrayList<>(); + + for (int id = 0; id < 20; id++) { + records.add(LegacyRecord.create( + recordMagicValue, + RecordBatch.NO_TIMESTAMP, + Integer.toString(id).getBytes(), + Integer.toString(id).getBytes() + )); + } + + ByteBuffer buffer = ByteBuffer.allocate(Math.min(Math.max( + records.stream().mapToInt(LegacyRecord::sizeInBytes).sum() / 2, 1024), 1 << 16)); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, batchMagicValue, codec, + TimestampType.CREATE_TIME, 0L); + + AtomicLong offset = new AtomicLong(1234567); + records.forEach(record -> { + builder.appendUncheckedWithOffset(offset.get(), record); + offset.incrementAndGet(); + }); + + return builder.build(); + } + + private MemoryRecords recordsWithNonSequentialInnerOffsets(Byte magicValue, Compression compression, int numRecords) { + List<SimpleRecord> records = IntStream.range(0, numRecords) + .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes())) + .collect(Collectors.toList()); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magicValue, compression, TimestampType.CREATE_TIME, 0L); + + records.forEach(record -> + assertDoesNotThrow(() -> builder.appendUncheckedWithOffset(0, record)) + ); + + return builder.build(); + } + + private void checkAllowMultiBatch(Byte magic, Compression sourceCompression, Compression targetCompression) { + validateMessages(createTwoBatchedRecords(magic, sourceCompression), magic, sourceCompression.type(), targetCompression); + } + + + private ValidationResult validateMessages(MemoryRecords records, + Byte magic, + CompressionType sourceCompressionType, + Compression targetCompressionType) { + MockTime mockTime = new MockTime(0L, 0L); + return new LogValidator(records, + topicPartition, + mockTime, + sourceCompressionType, + targetCompressionType, + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PRODUCER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.IBP_2_3_IV1 + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier()); + } + + private MemoryRecords createTwoBatchedRecords(Byte magicValue, + Compression codec) { + ByteBuffer buf = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L); + builder.append(10L, "1".getBytes(), "a".getBytes()); + builder.close(); + builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 1L); + builder.append(11L, "2".getBytes(), "b".getBytes()); + builder.append(12L, "3".getBytes(), "c".getBytes()); + builder.close(); + + buf.flip(); + return MemoryRecords.readableRecords(buf.slice()); + } + + private MemoryRecords createRecords(byte magicValue, + long timestamp, + Compression codec) { + List<byte[]> records = Arrays.asList("hello".getBytes(), "there".getBytes(), "beautiful".getBytes()); + return createRecords(records, magicValue, timestamp, codec); + } + + @Test + void testCompressedV1() { + checkCompressed(RecordBatch.MAGIC_VALUE_V1); + } + + private void checkCompressed(byte magic) { + long now = System.currentTimeMillis(); + // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp + List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now); + + long producerId; + short producerEpoch; + int baseSequence; + boolean isTransactional; + int partitionLeaderEpoch; + + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + producerId = 1324L; + producerEpoch = 10; + baseSequence = 984; + isTransactional = true; + partitionLeaderEpoch = 40; + } else { + producerId = RecordBatch.NO_PRODUCER_ID; + producerEpoch = RecordBatch.NO_PRODUCER_EPOCH; + baseSequence = RecordBatch.NO_SEQUENCE; + isTransactional = false; + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH; + } + + List<SimpleRecord> recordList = Arrays.asList( + new SimpleRecord(timestampSeq.get(0), "hello".getBytes()), + new SimpleRecord(timestampSeq.get(1), "there".getBytes()), + new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes()) + ); + + MemoryRecords records = MemoryRecords.withRecords( + magic, + 0L, + Compression.gzip().build(), + TimestampType.CREATE_TIME, + producerId, + producerEpoch, + baseSequence, + partitionLeaderEpoch, + isTransactional, + recordList.toArray(new SimpleRecord[0]) + ); + + LogValidator validator = new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + Compression.gzip().build(), + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + partitionLeaderEpoch, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + + LogValidator.ValidationResult validatedResults = validator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatedResults.validatedRecords; + + int i = 0; + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + assertEquals(batch.timestampType(), TimestampType.CREATE_TIME); + maybeCheckBaseTimestamp(timestampSeq.get(0), batch); + assertEquals(batch.maxTimestamp(), batch.maxTimestamp()); + assertEquals(producerEpoch, batch.producerEpoch()); + assertEquals(producerId, batch.producerId()); + assertEquals(baseSequence, batch.baseSequence()); + assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch()); + + for (Record record : batch) { + record.ensureValid(); + assertEquals((long) timestampSeq.get(i), record.timestamp()); + i++; + } + } + + assertEquals(now + 1, validatedResults.maxTimestampMs, "Max timestamp should be " + (now + 1)); + + int expectedShallowOffsetOfMaxTimestamp = 2; + assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestamp, "Shallow offset of max timestamp should be 2"); + assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed"); + + verifyRecordValidationStats(validatedResults.recordValidationStats, 0, records, true); + } + + private MemoryRecords createRecords(List<byte[]> records, + byte magicValue, + long timestamp, + Compression codec) { + ByteBuffer buf = ByteBuffer.allocate(512); + MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L); + + AtomicInteger offset = new AtomicInteger(0); + records.forEach(item -> + builder.appendWithOffset(offset.getAndIncrement(), timestamp, null, item)); + return builder.build(); + } + + @Test + void testLogAppendTimeWithRecompressionV1() { + checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + void testLogAppendTimeWithRecompressionV2() { + checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + void testLogAppendTimeWithoutRecompressionV1() { + checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + void testCompressedV2() { + checkCompressed(RecordBatch.MAGIC_VALUE_V2); + } + @Test + void testInvalidOffsetRangeAndRecordCount() { + // The batch to be written contains 3 records, so the correct lastOffsetDelta is 2 + validateRecordBatchWithCountOverrides(2, 3); + + // Count and offset range are inconsistent or invalid + assertInvalidBatchCountOverrides(0, 3); + assertInvalidBatchCountOverrides(15, 3); + assertInvalidBatchCountOverrides(-3, 3); + assertInvalidBatchCountOverrides(2, -3); + assertInvalidBatchCountOverrides(2, 6); + assertInvalidBatchCountOverrides(2, 0); + assertInvalidBatchCountOverrides(-3, -2); + + // Count and offset range are consistent, but do not match the actual number of records + assertInvalidBatchCountOverrides(5, 6); + assertInvalidBatchCountOverrides(1, 2); + } + + @Test + void testLogAppendTimeWithoutRecompressionV2() { + checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + void testInvalidCreateTimeNonCompressedV1() { + long now = System.currentTimeMillis(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, now - 1001L, + Compression.NONE); + assertThrows(RecordValidationException.class, () -> new LogValidator( + records, + topicPartition, + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + } + + @Test + public void testInvalidCreateTimeCompressedV1() { + long now = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords( + RecordBatch.MAGIC_VALUE_V1, + now - 1001L, + compression + ); + + assertThrows(RecordValidationException.class, () -> + new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ) + ); + } + + @Test + public void testInvalidCreateTimeNonCompressedV2() { + long now = System.currentTimeMillis(); + MemoryRecords records = createRecords( + RecordBatch.MAGIC_VALUE_V2, + now - 1001L, + Compression.NONE + ); + + assertThrows(RecordValidationException.class, () -> + new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ) + ); + } + + @Test + public void testInvalidChecksum() { Review Comment: ditto ########## storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java: ########## @@ -0,0 +1,2250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import kafka.server.RequestLocal; + +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidTimestampException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LogValidatorTest { + private final Time time = Time.SYSTEM; + private final TopicPartition topicPartition = new TopicPartition("topic", 0); + private final MetricsRecorder metricsRecorder = new MetricsRecorder(); + + static class MetricsRecorder implements LogValidator.MetricsRecorder { + public int recordInvalidMagicCount = 0; + public int recordInvalidOffsetCount = 0; + public int recordInvalidChecksumsCount = 0; + public int recordInvalidSequenceCount = 0; + public int recordNoKeyCompactedTopicCount = 0; + + public void recordInvalidMagic() { + recordInvalidMagicCount += 1; + } + + public void recordInvalidOffset() { + recordInvalidOffsetCount += 1; + } + + public void recordInvalidSequence() { + recordInvalidSequenceCount += 1; + } + + public void recordInvalidChecksums() { + recordInvalidChecksumsCount += 1; + } + + public void recordNoKeyCompactedTopic() { + recordNoKeyCompactedTopicCount += 1; + } + } + + @Test + public void testOnlyOneBatch() { + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.gzip().build()); + } + + @Test + public void testAllowMultiBatch() { + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.NONE); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.NONE); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.gzip().build()); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.gzip().build()); + } + + @Test + public void testValidationOfBatchesWithNonSequentialInnerOffsets() { + Arrays.stream(RecordVersion.values()).forEach(version -> { + int numRecords = 20; + Compression compression = Compression.gzip().build(); + MemoryRecords invalidRecords = recordsWithNonSequentialInnerOffsets(version.value, compression, numRecords); + + // Validation for v2 and above is strict for this case. For older formats, we fix invalid + // internal offsets by rewriting the batch. + if (version.value >= RecordBatch.MAGIC_VALUE_V2) { + assertThrows(InvalidRecordException.class, () -> + validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression) + ); + } else { + ValidationResult result = validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression); + List<Long> recordsResult = new ArrayList<>(); + result.validatedRecords.records().forEach(s -> recordsResult.add(s.offset())); + assertEquals(LongStream.range(0, numRecords).boxed().collect(Collectors.toList()), recordsResult); + } + }); + } + + @Test + public void testMisMatchMagic() { + Compression compress = Compression.gzip().build(); + checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, compress); + checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V0, compress); + } + + @Test + void testUncompressedBatchWithoutRecordsNotAllowed() { + testBatchWithoutRecordsNotAllowed(CompressionType.NONE, Compression.NONE); + } + + @Test + void testRecompressedBatchWithoutRecordsNotAllowed() { + testBatchWithoutRecordsNotAllowed(CompressionType.NONE, Compression.gzip().build()); + } + + private void checkOnlyOneBatch(Byte magic, Compression sourceCompression, + Compression targetCompression) { + assertThrows(InvalidRecordException.class, + () -> validateMessages(createTwoBatchedRecords(magic, sourceCompression), + magic, sourceCompression.type(), targetCompression) + ); + } + + @Test + public void testLogAppendTimeNonCompressedV0() { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V0); + } + + @Test + public void testLogAppendTimeNonCompressedV1() { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + public void testLogAppendTimeNonCompressedV2() { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2); + } + + + private void testBatchWithoutRecordsNotAllowed(CompressionType sourceCompression, Compression targetCompression) { + long offset = 1234567; + long producerId = 1324L; + short producerEpoch = 10; + int baseSequence = 984; + boolean isTransactional = true; + int partitionLeaderEpoch = 40; + + ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); + DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch, + baseSequence, 0L, 5L, partitionLeaderEpoch, TimestampType.CREATE_TIME, System.currentTimeMillis(), + isTransactional, false); + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + + assertThrows(InvalidRecordException.class, () -> new LogValidator(records, + topicPartition, + time, + sourceCompression, + targetCompression, + false, + RecordBatch.CURRENT_MAGIC_VALUE, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + } + + private void checkMismatchMagic(byte batchMagic, byte recordMagic, Compression compression) { + assertThrows(RecordValidationException.class, + () -> validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compression), batchMagic, compression.type(), compression)); + + assertTrue(metricsRecorder.recordInvalidMagicCount > 0); + } + + @Test + void testNonCompressedV2() { + checkNonCompressed(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + void testRecompressionV1() { + checkRecompression(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + void testRecompressionV2() { + checkRecompression(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + void testCreateTimeUpConversionV0ToV1() { + checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + void testCreateTimeUpConversionV0ToV2() { + checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + public void testCreateTimeUpConversionV1ToV2() { + long timestamp = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, timestamp, compression); + + LogValidator validator = new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + + LogValidator.ValidationResult validatedResults = validator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatedResults.validatedRecords; + + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + maybeCheckBaseTimestamp(timestamp, batch); + assertEquals(timestamp, batch.maxTimestamp()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch()); + assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId()); + assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence()); + } + + assertEquals(timestamp, validatedResults.maxTimestampMs); + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, "Offset of max timestamp should be the last offset 2."); + assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed"); + + verifyRecordValidationStats( + validatedResults.recordValidationStats, + 3, + records, + true + ); + } + + private void checkCreateTimeUpConversionFromV0(byte toMagic) { + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, RecordBatch.NO_TIMESTAMP, compression); + LogValidator logValidator = new LogValidator(records, + topicPartition, + time, + CompressionType.GZIP, + compression, + false, + toMagic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + LogValidator.ValidationResult validatedResults = logValidator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + MemoryRecords validatedRecords = validatedResults.validatedRecords; + + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + maybeCheckBaseTimestamp(RecordBatch.NO_TIMESTAMP, batch); + assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch()); + assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId()); + assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence()); + } + assertEquals(RecordBatch.NO_TIMESTAMP, validatedResults.maxTimestampMs, + "Max timestamp should be " + RecordBatch.NO_TIMESTAMP); + assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp); + assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed"); + + verifyRecordValidationStats(validatedResults.recordValidationStats, 3, records, true); + } + + private void checkRecompression(byte magic) { + long now = System.currentTimeMillis(); + // Set the timestamp of seq(1) (i.e. offset 1) as the max timestamp + List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now); + + long producerId; + short producerEpoch; + int baseSequence; + boolean isTransactional; + int partitionLeaderEpoch; + + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + producerId = 1324L; + producerEpoch = 10; + baseSequence = 984; + isTransactional = true; + partitionLeaderEpoch = 40; + } else { + producerId = RecordBatch.NO_PRODUCER_ID; + producerEpoch = RecordBatch.NO_PRODUCER_EPOCH; + baseSequence = RecordBatch.NO_SEQUENCE; + isTransactional = false; + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH; + } + + MemoryRecords records = MemoryRecords.withRecords( + magic, + 0L, + Compression.NONE, + TimestampType.CREATE_TIME, + producerId, + producerEpoch, + baseSequence, + partitionLeaderEpoch, + isTransactional, + new SimpleRecord(timestampSeq.get(0), "hello".getBytes()), + new SimpleRecord(timestampSeq.get(1), "there".getBytes()), + new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes()) + ); + + // V2 has single batch, and other versions have many single-record batches + assertEquals(magic >= RecordBatch.MAGIC_VALUE_V2 ? 1 : 3, iteratorSize(records.batches().iterator())); + + LogValidator.ValidationResult validatingResults = new LogValidator( + records, + topicPartition, + time, + CompressionType.NONE, + Compression.gzip().build(), + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + partitionLeaderEpoch, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatingResults.validatedRecords; + + int i = 0; + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + maybeCheckBaseTimestamp(timestampSeq.get(0), batch); + assertEquals(batch.maxTimestamp(), batch.maxTimestamp()); + assertEquals(producerEpoch, batch.producerEpoch()); + assertEquals(producerId, batch.producerId()); + assertEquals(baseSequence, batch.baseSequence()); + assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch()); + + for (Record record : batch) { + record.ensureValid(); + assertEquals((long) timestampSeq.get(i), record.timestamp()); + i++; + } + } + + assertEquals(now + 1, validatingResults.maxTimestampMs, + "Max timestamp should be " + (now + 1)); + + // Both V2 and V1 have single batch in the validated records when compression is enabled, and hence their shallow + // OffsetOfMaxTimestamp is the last offset of the single batch + assertEquals(1, iteratorSize(validatedRecords.batches().iterator())); + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp); + assertTrue(validatingResults.messageSizeMaybeChanged, + "Message size should have been changed"); + + verifyRecordValidationStats(validatingResults.recordValidationStats, 3, records, true); + } + + private MemoryRecords recordsWithInvalidInnerMagic(byte batchMagicValue, byte recordMagicValue, Compression codec) { + List<LegacyRecord> records = new ArrayList<>(); + + for (int id = 0; id < 20; id++) { + records.add(LegacyRecord.create( + recordMagicValue, + RecordBatch.NO_TIMESTAMP, + Integer.toString(id).getBytes(), + Integer.toString(id).getBytes() + )); + } + + ByteBuffer buffer = ByteBuffer.allocate(Math.min(Math.max( + records.stream().mapToInt(LegacyRecord::sizeInBytes).sum() / 2, 1024), 1 << 16)); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, batchMagicValue, codec, + TimestampType.CREATE_TIME, 0L); + + AtomicLong offset = new AtomicLong(1234567); + records.forEach(record -> { + builder.appendUncheckedWithOffset(offset.get(), record); + offset.incrementAndGet(); + }); + + return builder.build(); + } + + private MemoryRecords recordsWithNonSequentialInnerOffsets(Byte magicValue, Compression compression, int numRecords) { + List<SimpleRecord> records = IntStream.range(0, numRecords) + .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes())) + .collect(Collectors.toList()); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magicValue, compression, TimestampType.CREATE_TIME, 0L); + + records.forEach(record -> + assertDoesNotThrow(() -> builder.appendUncheckedWithOffset(0, record)) + ); + + return builder.build(); + } + + private void checkAllowMultiBatch(Byte magic, Compression sourceCompression, Compression targetCompression) { + validateMessages(createTwoBatchedRecords(magic, sourceCompression), magic, sourceCompression.type(), targetCompression); + } + + + private ValidationResult validateMessages(MemoryRecords records, + Byte magic, + CompressionType sourceCompressionType, + Compression targetCompressionType) { + MockTime mockTime = new MockTime(0L, 0L); + return new LogValidator(records, + topicPartition, + mockTime, + sourceCompressionType, + targetCompressionType, + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PRODUCER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.IBP_2_3_IV1 + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier()); + } + + private MemoryRecords createTwoBatchedRecords(Byte magicValue, + Compression codec) { + ByteBuffer buf = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L); + builder.append(10L, "1".getBytes(), "a".getBytes()); + builder.close(); + builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 1L); + builder.append(11L, "2".getBytes(), "b".getBytes()); + builder.append(12L, "3".getBytes(), "c".getBytes()); + builder.close(); + + buf.flip(); + return MemoryRecords.readableRecords(buf.slice()); + } + + private MemoryRecords createRecords(byte magicValue, + long timestamp, + Compression codec) { + List<byte[]> records = Arrays.asList("hello".getBytes(), "there".getBytes(), "beautiful".getBytes()); + return createRecords(records, magicValue, timestamp, codec); + } + + @Test + void testCompressedV1() { + checkCompressed(RecordBatch.MAGIC_VALUE_V1); + } + + private void checkCompressed(byte magic) { + long now = System.currentTimeMillis(); + // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp + List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now); + + long producerId; + short producerEpoch; + int baseSequence; + boolean isTransactional; + int partitionLeaderEpoch; + + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + producerId = 1324L; + producerEpoch = 10; + baseSequence = 984; + isTransactional = true; + partitionLeaderEpoch = 40; + } else { + producerId = RecordBatch.NO_PRODUCER_ID; + producerEpoch = RecordBatch.NO_PRODUCER_EPOCH; + baseSequence = RecordBatch.NO_SEQUENCE; + isTransactional = false; + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH; + } + + List<SimpleRecord> recordList = Arrays.asList( + new SimpleRecord(timestampSeq.get(0), "hello".getBytes()), + new SimpleRecord(timestampSeq.get(1), "there".getBytes()), + new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes()) + ); + + MemoryRecords records = MemoryRecords.withRecords( + magic, + 0L, + Compression.gzip().build(), + TimestampType.CREATE_TIME, + producerId, + producerEpoch, + baseSequence, + partitionLeaderEpoch, + isTransactional, + recordList.toArray(new SimpleRecord[0]) + ); + + LogValidator validator = new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + Compression.gzip().build(), + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + partitionLeaderEpoch, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + + LogValidator.ValidationResult validatedResults = validator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatedResults.validatedRecords; + + int i = 0; + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + assertEquals(batch.timestampType(), TimestampType.CREATE_TIME); + maybeCheckBaseTimestamp(timestampSeq.get(0), batch); + assertEquals(batch.maxTimestamp(), batch.maxTimestamp()); + assertEquals(producerEpoch, batch.producerEpoch()); + assertEquals(producerId, batch.producerId()); + assertEquals(baseSequence, batch.baseSequence()); + assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch()); + + for (Record record : batch) { + record.ensureValid(); + assertEquals((long) timestampSeq.get(i), record.timestamp()); + i++; + } + } + + assertEquals(now + 1, validatedResults.maxTimestampMs, "Max timestamp should be " + (now + 1)); + + int expectedShallowOffsetOfMaxTimestamp = 2; + assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestamp, "Shallow offset of max timestamp should be 2"); + assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed"); + + verifyRecordValidationStats(validatedResults.recordValidationStats, 0, records, true); + } + + private MemoryRecords createRecords(List<byte[]> records, + byte magicValue, + long timestamp, + Compression codec) { + ByteBuffer buf = ByteBuffer.allocate(512); + MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L); + + AtomicInteger offset = new AtomicInteger(0); + records.forEach(item -> + builder.appendWithOffset(offset.getAndIncrement(), timestamp, null, item)); + return builder.build(); + } + + @Test + void testLogAppendTimeWithRecompressionV1() { + checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + void testLogAppendTimeWithRecompressionV2() { + checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + void testLogAppendTimeWithoutRecompressionV1() { + checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + void testCompressedV2() { + checkCompressed(RecordBatch.MAGIC_VALUE_V2); + } + @Test + void testInvalidOffsetRangeAndRecordCount() { Review Comment: Could you please separate the different cases? for example: ```java @Test void testRecordBatchWithCountOverrides() { // The batch to be written contains 3 records, so the correct lastOffsetDelta is 2 validateRecordBatchWithCountOverrides(2, 3); } @ParameterizedTest @CsvSource({"0,3", "15,3", "-3,3"}) void testInconsistentCountAndOffset(int lastOffsetDelta, int count) { // Count and offset range are inconsistent or invalid assertInvalidBatchCountOverrides(lastOffsetDelta, count); } @ParameterizedTest @CsvSource({"5,6", "1,2"}) void testUnmatchedNumberOfRecords(int lastOffsetDelta, int count) { // Count and offset range are consistent, but do not match the actual number of records assertInvalidBatchCountOverrides(lastOffsetDelta, count); } ``` ########## storage/src/test/java/org/apache/kafka/storage/internals/log/LogValidatorTest.java: ########## @@ -0,0 +1,2250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import kafka.server.RequestLocal; + +import org.apache.kafka.common.InvalidRecordException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.compress.GzipCompression; +import org.apache.kafka.common.errors.CorruptRecordException; +import org.apache.kafka.common.errors.InvalidTimestampException; +import org.apache.kafka.common.errors.UnsupportedCompressionTypeException; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.LegacyRecord; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordValidationStats; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.utils.PrimitiveRef; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.internals.log.LogValidator.ValidationResult; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.LongStream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LogValidatorTest { + private final Time time = Time.SYSTEM; + private final TopicPartition topicPartition = new TopicPartition("topic", 0); + private final MetricsRecorder metricsRecorder = new MetricsRecorder(); + + static class MetricsRecorder implements LogValidator.MetricsRecorder { + public int recordInvalidMagicCount = 0; + public int recordInvalidOffsetCount = 0; + public int recordInvalidChecksumsCount = 0; + public int recordInvalidSequenceCount = 0; + public int recordNoKeyCompactedTopicCount = 0; + + public void recordInvalidMagic() { + recordInvalidMagicCount += 1; + } + + public void recordInvalidOffset() { + recordInvalidOffsetCount += 1; + } + + public void recordInvalidSequence() { + recordInvalidSequenceCount += 1; + } + + public void recordInvalidChecksums() { + recordInvalidChecksumsCount += 1; + } + + public void recordNoKeyCompactedTopic() { + recordNoKeyCompactedTopicCount += 1; + } + } + + @Test + public void testOnlyOneBatch() { + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.gzip().build()); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.NONE); + checkOnlyOneBatch(RecordBatch.MAGIC_VALUE_V2, Compression.NONE, Compression.gzip().build()); + } + + @Test + public void testAllowMultiBatch() { + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.NONE); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.NONE); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V0, Compression.NONE, Compression.gzip().build()); + checkAllowMultiBatch(RecordBatch.MAGIC_VALUE_V1, Compression.NONE, Compression.gzip().build()); + } + + @Test + public void testValidationOfBatchesWithNonSequentialInnerOffsets() { + Arrays.stream(RecordVersion.values()).forEach(version -> { + int numRecords = 20; + Compression compression = Compression.gzip().build(); + MemoryRecords invalidRecords = recordsWithNonSequentialInnerOffsets(version.value, compression, numRecords); + + // Validation for v2 and above is strict for this case. For older formats, we fix invalid + // internal offsets by rewriting the batch. + if (version.value >= RecordBatch.MAGIC_VALUE_V2) { + assertThrows(InvalidRecordException.class, () -> + validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression) + ); + } else { + ValidationResult result = validateMessages(invalidRecords, version.value, CompressionType.GZIP, compression); + List<Long> recordsResult = new ArrayList<>(); + result.validatedRecords.records().forEach(s -> recordsResult.add(s.offset())); + assertEquals(LongStream.range(0, numRecords).boxed().collect(Collectors.toList()), recordsResult); + } + }); + } + + @Test + public void testMisMatchMagic() { + Compression compress = Compression.gzip().build(); + checkMismatchMagic(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, compress); + checkMismatchMagic(RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V0, compress); + } + + @Test + void testUncompressedBatchWithoutRecordsNotAllowed() { + testBatchWithoutRecordsNotAllowed(CompressionType.NONE, Compression.NONE); + } + + @Test + void testRecompressedBatchWithoutRecordsNotAllowed() { + testBatchWithoutRecordsNotAllowed(CompressionType.NONE, Compression.gzip().build()); + } + + private void checkOnlyOneBatch(Byte magic, Compression sourceCompression, + Compression targetCompression) { + assertThrows(InvalidRecordException.class, + () -> validateMessages(createTwoBatchedRecords(magic, sourceCompression), + magic, sourceCompression.type(), targetCompression) + ); + } + + @Test + public void testLogAppendTimeNonCompressedV0() { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V0); + } + + @Test + public void testLogAppendTimeNonCompressedV1() { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + public void testLogAppendTimeNonCompressedV2() { + checkLogAppendTimeNonCompressed(RecordBatch.MAGIC_VALUE_V2); + } + + + private void testBatchWithoutRecordsNotAllowed(CompressionType sourceCompression, Compression targetCompression) { + long offset = 1234567; + long producerId = 1324L; + short producerEpoch = 10; + int baseSequence = 984; + boolean isTransactional = true; + int partitionLeaderEpoch = 40; + + ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD); + DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.CURRENT_MAGIC_VALUE, producerId, producerEpoch, + baseSequence, 0L, 5L, partitionLeaderEpoch, TimestampType.CREATE_TIME, System.currentTimeMillis(), + isTransactional, false); + buffer.flip(); + + MemoryRecords records = MemoryRecords.readableRecords(buffer); + + assertThrows(InvalidRecordException.class, () -> new LogValidator(records, + topicPartition, + time, + sourceCompression, + targetCompression, + false, + RecordBatch.CURRENT_MAGIC_VALUE, + TimestampType.CREATE_TIME, + 5000L, + 5000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(offset), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + } + + private void checkMismatchMagic(byte batchMagic, byte recordMagic, Compression compression) { + assertThrows(RecordValidationException.class, + () -> validateMessages(recordsWithInvalidInnerMagic(batchMagic, recordMagic, compression), batchMagic, compression.type(), compression)); + + assertTrue(metricsRecorder.recordInvalidMagicCount > 0); + } + + @Test + void testNonCompressedV2() { + checkNonCompressed(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + void testRecompressionV1() { + checkRecompression(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + void testRecompressionV2() { + checkRecompression(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + void testCreateTimeUpConversionV0ToV1() { + checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + void testCreateTimeUpConversionV0ToV2() { + checkCreateTimeUpConversionFromV0(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + public void testCreateTimeUpConversionV1ToV2() { + long timestamp = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, timestamp, compression); + + LogValidator validator = new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + + LogValidator.ValidationResult validatedResults = validator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatedResults.validatedRecords; + + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + maybeCheckBaseTimestamp(timestamp, batch); + assertEquals(timestamp, batch.maxTimestamp()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch()); + assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId()); + assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence()); + } + + assertEquals(timestamp, validatedResults.maxTimestampMs); + assertEquals(2, validatedResults.shallowOffsetOfMaxTimestamp, "Offset of max timestamp should be the last offset 2."); + assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed"); + + verifyRecordValidationStats( + validatedResults.recordValidationStats, + 3, + records, + true + ); + } + + private void checkCreateTimeUpConversionFromV0(byte toMagic) { + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V0, RecordBatch.NO_TIMESTAMP, compression); + LogValidator logValidator = new LogValidator(records, + topicPartition, + time, + CompressionType.GZIP, + compression, + false, + toMagic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + LogValidator.ValidationResult validatedResults = logValidator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + MemoryRecords validatedRecords = validatedResults.validatedRecords; + + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + maybeCheckBaseTimestamp(RecordBatch.NO_TIMESTAMP, batch); + assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch()); + assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId()); + assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence()); + } + assertEquals(RecordBatch.NO_TIMESTAMP, validatedResults.maxTimestampMs, + "Max timestamp should be " + RecordBatch.NO_TIMESTAMP); + assertEquals(-1, validatedResults.shallowOffsetOfMaxTimestamp); + assertTrue(validatedResults.messageSizeMaybeChanged, "Message size should have been changed"); + + verifyRecordValidationStats(validatedResults.recordValidationStats, 3, records, true); + } + + private void checkRecompression(byte magic) { + long now = System.currentTimeMillis(); + // Set the timestamp of seq(1) (i.e. offset 1) as the max timestamp + List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now); + + long producerId; + short producerEpoch; + int baseSequence; + boolean isTransactional; + int partitionLeaderEpoch; + + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + producerId = 1324L; + producerEpoch = 10; + baseSequence = 984; + isTransactional = true; + partitionLeaderEpoch = 40; + } else { + producerId = RecordBatch.NO_PRODUCER_ID; + producerEpoch = RecordBatch.NO_PRODUCER_EPOCH; + baseSequence = RecordBatch.NO_SEQUENCE; + isTransactional = false; + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH; + } + + MemoryRecords records = MemoryRecords.withRecords( + magic, + 0L, + Compression.NONE, + TimestampType.CREATE_TIME, + producerId, + producerEpoch, + baseSequence, + partitionLeaderEpoch, + isTransactional, + new SimpleRecord(timestampSeq.get(0), "hello".getBytes()), + new SimpleRecord(timestampSeq.get(1), "there".getBytes()), + new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes()) + ); + + // V2 has single batch, and other versions have many single-record batches + assertEquals(magic >= RecordBatch.MAGIC_VALUE_V2 ? 1 : 3, iteratorSize(records.batches().iterator())); + + LogValidator.ValidationResult validatingResults = new LogValidator( + records, + topicPartition, + time, + CompressionType.NONE, + Compression.gzip().build(), + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + partitionLeaderEpoch, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatingResults.validatedRecords; + + int i = 0; + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + assertEquals(TimestampType.CREATE_TIME, batch.timestampType()); + maybeCheckBaseTimestamp(timestampSeq.get(0), batch); + assertEquals(batch.maxTimestamp(), batch.maxTimestamp()); + assertEquals(producerEpoch, batch.producerEpoch()); + assertEquals(producerId, batch.producerId()); + assertEquals(baseSequence, batch.baseSequence()); + assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch()); + + for (Record record : batch) { + record.ensureValid(); + assertEquals((long) timestampSeq.get(i), record.timestamp()); + i++; + } + } + + assertEquals(now + 1, validatingResults.maxTimestampMs, + "Max timestamp should be " + (now + 1)); + + // Both V2 and V1 have single batch in the validated records when compression is enabled, and hence their shallow + // OffsetOfMaxTimestamp is the last offset of the single batch + assertEquals(1, iteratorSize(validatedRecords.batches().iterator())); + assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp); + assertTrue(validatingResults.messageSizeMaybeChanged, + "Message size should have been changed"); + + verifyRecordValidationStats(validatingResults.recordValidationStats, 3, records, true); + } + + private MemoryRecords recordsWithInvalidInnerMagic(byte batchMagicValue, byte recordMagicValue, Compression codec) { + List<LegacyRecord> records = new ArrayList<>(); + + for (int id = 0; id < 20; id++) { + records.add(LegacyRecord.create( + recordMagicValue, + RecordBatch.NO_TIMESTAMP, + Integer.toString(id).getBytes(), + Integer.toString(id).getBytes() + )); + } + + ByteBuffer buffer = ByteBuffer.allocate(Math.min(Math.max( + records.stream().mapToInt(LegacyRecord::sizeInBytes).sum() / 2, 1024), 1 << 16)); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, batchMagicValue, codec, + TimestampType.CREATE_TIME, 0L); + + AtomicLong offset = new AtomicLong(1234567); + records.forEach(record -> { + builder.appendUncheckedWithOffset(offset.get(), record); + offset.incrementAndGet(); + }); + + return builder.build(); + } + + private MemoryRecords recordsWithNonSequentialInnerOffsets(Byte magicValue, Compression compression, int numRecords) { + List<SimpleRecord> records = IntStream.range(0, numRecords) + .mapToObj(id -> new SimpleRecord(String.valueOf(id).getBytes())) + .collect(Collectors.toList()); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magicValue, compression, TimestampType.CREATE_TIME, 0L); + + records.forEach(record -> + assertDoesNotThrow(() -> builder.appendUncheckedWithOffset(0, record)) + ); + + return builder.build(); + } + + private void checkAllowMultiBatch(Byte magic, Compression sourceCompression, Compression targetCompression) { + validateMessages(createTwoBatchedRecords(magic, sourceCompression), magic, sourceCompression.type(), targetCompression); + } + + + private ValidationResult validateMessages(MemoryRecords records, + Byte magic, + CompressionType sourceCompressionType, + Compression targetCompressionType) { + MockTime mockTime = new MockTime(0L, 0L); + return new LogValidator(records, + topicPartition, + mockTime, + sourceCompressionType, + targetCompressionType, + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PRODUCER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.IBP_2_3_IV1 + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0L), metricsRecorder, RequestLocal.withThreadConfinedCaching().bufferSupplier()); + } + + private MemoryRecords createTwoBatchedRecords(Byte magicValue, + Compression codec) { + ByteBuffer buf = ByteBuffer.allocate(2048); + MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L); + builder.append(10L, "1".getBytes(), "a".getBytes()); + builder.close(); + builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 1L); + builder.append(11L, "2".getBytes(), "b".getBytes()); + builder.append(12L, "3".getBytes(), "c".getBytes()); + builder.close(); + + buf.flip(); + return MemoryRecords.readableRecords(buf.slice()); + } + + private MemoryRecords createRecords(byte magicValue, + long timestamp, + Compression codec) { + List<byte[]> records = Arrays.asList("hello".getBytes(), "there".getBytes(), "beautiful".getBytes()); + return createRecords(records, magicValue, timestamp, codec); + } + + @Test + void testCompressedV1() { + checkCompressed(RecordBatch.MAGIC_VALUE_V1); + } + + private void checkCompressed(byte magic) { + long now = System.currentTimeMillis(); + // set the timestamp of seq(1) (i.e. offset 1) as the max timestamp + List<Long> timestampSeq = Arrays.asList(now - 1, now + 1, now); + + long producerId; + short producerEpoch; + int baseSequence; + boolean isTransactional; + int partitionLeaderEpoch; + + if (magic >= RecordBatch.MAGIC_VALUE_V2) { + producerId = 1324L; + producerEpoch = 10; + baseSequence = 984; + isTransactional = true; + partitionLeaderEpoch = 40; + } else { + producerId = RecordBatch.NO_PRODUCER_ID; + producerEpoch = RecordBatch.NO_PRODUCER_EPOCH; + baseSequence = RecordBatch.NO_SEQUENCE; + isTransactional = false; + partitionLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH; + } + + List<SimpleRecord> recordList = Arrays.asList( + new SimpleRecord(timestampSeq.get(0), "hello".getBytes()), + new SimpleRecord(timestampSeq.get(1), "there".getBytes()), + new SimpleRecord(timestampSeq.get(2), "beautiful".getBytes()) + ); + + MemoryRecords records = MemoryRecords.withRecords( + magic, + 0L, + Compression.gzip().build(), + TimestampType.CREATE_TIME, + producerId, + producerEpoch, + baseSequence, + partitionLeaderEpoch, + isTransactional, + recordList.toArray(new SimpleRecord[0]) + ); + + LogValidator validator = new LogValidator( + records, + topicPartition, + time, + CompressionType.GZIP, + Compression.gzip().build(), + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + partitionLeaderEpoch, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + + LogValidator.ValidationResult validatedResults = validator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ); + + MemoryRecords validatedRecords = validatedResults.validatedRecords; + + int i = 0; + for (RecordBatch batch : validatedRecords.batches()) { + assertTrue(batch.isValid()); + assertEquals(batch.timestampType(), TimestampType.CREATE_TIME); + maybeCheckBaseTimestamp(timestampSeq.get(0), batch); + assertEquals(batch.maxTimestamp(), batch.maxTimestamp()); + assertEquals(producerEpoch, batch.producerEpoch()); + assertEquals(producerId, batch.producerId()); + assertEquals(baseSequence, batch.baseSequence()); + assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch()); + + for (Record record : batch) { + record.ensureValid(); + assertEquals((long) timestampSeq.get(i), record.timestamp()); + i++; + } + } + + assertEquals(now + 1, validatedResults.maxTimestampMs, "Max timestamp should be " + (now + 1)); + + int expectedShallowOffsetOfMaxTimestamp = 2; + assertEquals(expectedShallowOffsetOfMaxTimestamp, validatedResults.shallowOffsetOfMaxTimestamp, "Shallow offset of max timestamp should be 2"); + assertFalse(validatedResults.messageSizeMaybeChanged, "Message size should not have been changed"); + + verifyRecordValidationStats(validatedResults.recordValidationStats, 0, records, true); + } + + private MemoryRecords createRecords(List<byte[]> records, + byte magicValue, + long timestamp, + Compression codec) { + ByteBuffer buf = ByteBuffer.allocate(512); + MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, 0L); + + AtomicInteger offset = new AtomicInteger(0); + records.forEach(item -> + builder.appendWithOffset(offset.getAndIncrement(), timestamp, null, item)); + return builder.build(); + } + + @Test + void testLogAppendTimeWithRecompressionV1() { + checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + void testLogAppendTimeWithRecompressionV2() { + checkLogAppendTimeWithRecompression(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + void testLogAppendTimeWithoutRecompressionV1() { + checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V1); + } + + @Test + void testCompressedV2() { + checkCompressed(RecordBatch.MAGIC_VALUE_V2); + } + @Test + void testInvalidOffsetRangeAndRecordCount() { + // The batch to be written contains 3 records, so the correct lastOffsetDelta is 2 + validateRecordBatchWithCountOverrides(2, 3); + + // Count and offset range are inconsistent or invalid + assertInvalidBatchCountOverrides(0, 3); + assertInvalidBatchCountOverrides(15, 3); + assertInvalidBatchCountOverrides(-3, 3); + assertInvalidBatchCountOverrides(2, -3); + assertInvalidBatchCountOverrides(2, 6); + assertInvalidBatchCountOverrides(2, 0); + assertInvalidBatchCountOverrides(-3, -2); + + // Count and offset range are consistent, but do not match the actual number of records + assertInvalidBatchCountOverrides(5, 6); + assertInvalidBatchCountOverrides(1, 2); + } + + @Test + void testLogAppendTimeWithoutRecompressionV2() { + checkLogAppendTimeWithoutRecompression(RecordBatch.MAGIC_VALUE_V2); + } + + @Test + void testInvalidCreateTimeNonCompressedV1() { + long now = System.currentTimeMillis(); + MemoryRecords records = createRecords(RecordBatch.MAGIC_VALUE_V1, now - 1001L, + Compression.NONE); + assertThrows(RecordValidationException.class, () -> new LogValidator( + records, + topicPartition, + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + } + + @Test + public void testInvalidCreateTimeCompressedV1() { + long now = System.currentTimeMillis(); + Compression compression = Compression.gzip().build(); + MemoryRecords records = createRecords( + RecordBatch.MAGIC_VALUE_V1, + now - 1001L, + compression + ); + + assertThrows(RecordValidationException.class, () -> + new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.GZIP, + compression, + false, + RecordBatch.MAGIC_VALUE_V1, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ) + ); + } + + @Test + public void testInvalidCreateTimeNonCompressedV2() { + long now = System.currentTimeMillis(); + MemoryRecords records = createRecords( + RecordBatch.MAGIC_VALUE_V2, + now - 1001L, + Compression.NONE + ); + + assertThrows(RecordValidationException.class, () -> + new LogValidator( + records, + new TopicPartition("topic", 0), + time, + CompressionType.NONE, + Compression.NONE, + false, + RecordBatch.MAGIC_VALUE_V2, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ).validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + ) + ); + } + + @Test + public void testInvalidChecksum() { + checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), CompressionType.GZIP); + checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), CompressionType.GZIP); + checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V0, Compression.lz4().build(), CompressionType.LZ4); + checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V1, Compression.lz4().build(), CompressionType.LZ4); + checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V0, Compression.snappy().build(), CompressionType.SNAPPY); + checkInvalidChecksum(RecordBatch.MAGIC_VALUE_V1, Compression.snappy().build(), CompressionType.SNAPPY); + } + + private void checkInvalidChecksum(byte magic, Compression compression, CompressionType type) { + LegacyRecord record = LegacyRecord.create(magic, 0L, null, "hello".getBytes()); + ByteBuffer buf = record.buffer(); + + // enforce modify crc to make checksum error + buf.put(LegacyRecord.CRC_OFFSET, (byte) 0); + + ByteBuffer buffer = ByteBuffer.allocate(1024); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, + TimestampType.CREATE_TIME, 0L); + builder.appendUncheckedWithOffset(0, record); + + MemoryRecords memoryRecords = builder.build(); + LogValidator logValidator = new LogValidator(memoryRecords, + topicPartition, + time, + type, + compression, + false, + magic, + TimestampType.CREATE_TIME, + 1000L, + 1000L, + RecordBatch.NO_PARTITION_LEADER_EPOCH, + AppendOrigin.CLIENT, + MetadataVersion.latestTesting() + ); + + + assertThrows(CorruptRecordException.class, () -> logValidator.validateMessagesAndAssignOffsets( + PrimitiveRef.ofLong(0), + metricsRecorder, + RequestLocal.withThreadConfinedCaching().bufferSupplier() + )); + + assertTrue(metricsRecorder.recordInvalidChecksumsCount > 0); + } + + @ParameterizedTest + @EnumSource(CompressionType.class) + public void testInvalidSequenceV0(CompressionType type) { + checkInvalidSequence(RecordBatch.MAGIC_VALUE_V0, Compression.gzip().build(), type); + checkInvalidSequence(RecordBatch.MAGIC_VALUE_V0, Compression.lz4().build(), type); + checkInvalidSequence(RecordBatch.MAGIC_VALUE_V0, Compression.snappy().build(), type); + checkInvalidSequence(RecordBatch.MAGIC_VALUE_V0, Compression.zstd().build(), type); + } + + @ParameterizedTest + @EnumSource(CompressionType.class) + public void testInvalidSequenceV1(CompressionType type) { + checkInvalidSequence(RecordBatch.MAGIC_VALUE_V1, Compression.gzip().build(), type); + checkInvalidSequence(RecordBatch.MAGIC_VALUE_V1, Compression.lz4().build(), type); + checkInvalidSequence(RecordBatch.MAGIC_VALUE_V1, Compression.snappy().build(), type); + checkInvalidSequence(RecordBatch.MAGIC_VALUE_V1, Compression.zstd().build(), type); + } + + @ParameterizedTest + @EnumSource(CompressionType.class) + public void testInvalidSequenceV2(CompressionType type) { + checkInvalidSequence(RecordBatch.MAGIC_VALUE_V2, Compression.gzip().build(), type); + checkInvalidSequence(RecordBatch.MAGIC_VALUE_V2, Compression.lz4().build(), type); + checkInvalidSequence(RecordBatch.MAGIC_VALUE_V2, Compression.snappy().build(), type); + checkInvalidSequence(RecordBatch.MAGIC_VALUE_V2, Compression.zstd().build(), type); + } + + + private void checkInvalidSequence(byte magic, Compression compression, CompressionType type) { Review Comment: Could you use `MethodSource`? for example: ```java private static Stream<Arguments> testInvalidSequence() { return Stream.of(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2) .flatMap(code -> Arrays.stream(CompressionType.values()).flatMap(source -> Arrays.stream(CompressionType.values()).map(target -> Arguments.of(code, source, target)))); } @ParameterizedTest @MethodSource("testInvalidSequence") void testInvalidSequence(byte magic, CompressionType source, CompressionType target) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org