chia7712 commented on a change in pull request #9906: URL: https://github.com/apache/kafka/pull/9906#discussion_r567562943
########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ########## @@ -124,207 +144,199 @@ public void testWriteTransactionalRecordSet(Args args) { @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) - public void testWriteTransactionalNotAllowedMagicV0(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); - - long pid = 9809; - short epoch = 15; - int sequence = 2342; - - assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, - args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, - true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); - } - - @ParameterizedTest - @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) - public void testWriteTransactionalNotAllowedMagicV1(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); - - long pid = 9809; - short epoch = 15; - int sequence = 2342; - - assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, - args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, - true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); - } - - @ParameterizedTest - @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) - public void testWriteControlBatchNotAllowedMagicV0(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + public void testWriteTransactionalNotAllowedMagic(Args args) { + if (args.magic < MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = 9809; - short epoch = 15; - int sequence = 2342; + long pid = 9809; + short epoch = 15; + int sequence = 2342; - assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, - args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, - false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); + assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, args.magic, + args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, + true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) - public void testWriteControlBatchNotAllowedMagicV1(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + public void testWriteControlBatchNotAllowedMagic(Args args) { + if (args.magic < MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = 9809; - short epoch = 15; - int sequence = 2342; + long pid = 9809; + short epoch = 15; + int sequence = 2342; - assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, - args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, - false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); + assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, args.magic, + args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, + false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void testWriteTransactionalWithInvalidPID(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + if (args.magic >= MAGIC_VALUE_V2) { Review comment: Is this check necessary? ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ########## @@ -995,17 +988,20 @@ public void testWithRecords(Args args) { byte magic = args.magic; Supplier<MemoryRecords> recordsSupplier = () -> MemoryRecords.withRecords(magic, compression, Review comment: This supplier is not necessary as we don't assert the exception anymore. ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ########## @@ -124,207 +144,199 @@ public void testWriteTransactionalRecordSet(Args args) { @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) - public void testWriteTransactionalNotAllowedMagicV0(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); - - long pid = 9809; - short epoch = 15; - int sequence = 2342; - - assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, - args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, - true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); - } - - @ParameterizedTest - @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) - public void testWriteTransactionalNotAllowedMagicV1(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); - - long pid = 9809; - short epoch = 15; - int sequence = 2342; - - assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, - args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, - true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); - } - - @ParameterizedTest - @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) - public void testWriteControlBatchNotAllowedMagicV0(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + public void testWriteTransactionalNotAllowedMagic(Args args) { + if (args.magic < MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = 9809; - short epoch = 15; - int sequence = 2342; + long pid = 9809; + short epoch = 15; + int sequence = 2342; - assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, - args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, - false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); + assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, args.magic, + args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, + true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) - public void testWriteControlBatchNotAllowedMagicV1(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + public void testWriteControlBatchNotAllowedMagic(Args args) { + if (args.magic < MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = 9809; - short epoch = 15; - int sequence = 2342; + long pid = 9809; + short epoch = 15; + int sequence = 2342; - assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, - args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, - false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); + assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, args.magic, + args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, + false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void testWriteTransactionalWithInvalidPID(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + if (args.magic >= MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = RecordBatch.NO_PRODUCER_ID; - short epoch = 15; - int sequence = 2342; + long pid = RecordBatch.NO_PRODUCER_ID; + short epoch = 15; + int sequence = 2342; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, args.compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - assertThrows(IllegalArgumentException.class, builder::close); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, args.compressionType, TimestampType.CREATE_TIME, + 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + assertThrows(IllegalArgumentException.class, builder::close); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void testWriteIdempotentWithInvalidEpoch(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + if (args.magic >= MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = 9809; - short epoch = RecordBatch.NO_PRODUCER_EPOCH; - int sequence = 2342; + long pid = 9809; + short epoch = RecordBatch.NO_PRODUCER_EPOCH; + int sequence = 2342; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, args.compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - assertThrows(IllegalArgumentException.class, builder::close); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, + 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + assertThrows(IllegalArgumentException.class, builder::close); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void testWriteIdempotentWithInvalidBaseSequence(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + if (args.magic >= MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = 9809; - short epoch = 15; - int sequence = RecordBatch.NO_SEQUENCE; + long pid = 9809; + short epoch = 15; + int sequence = RecordBatch.NO_SEQUENCE; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, args.compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - assertThrows(IllegalArgumentException.class, builder::close); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, + 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + assertThrows(IllegalArgumentException.class, builder::close); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void testWriteEndTxnMarkerNonTransactionalBatch(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + if (args.magic >= MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = 9809; - short epoch = 15; - int sequence = RecordBatch.NO_SEQUENCE; + long pid = 9809; + short epoch = 15; + int sequence = RecordBatch.NO_SEQUENCE; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, args.compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - assertThrows(IllegalArgumentException.class, () -> builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, - new EndTransactionMarker(ControlRecordType.ABORT, 0))); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, + 0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + assertThrows(IllegalArgumentException.class, () -> builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, + new EndTransactionMarker(ControlRecordType.ABORT, 0))); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void testWriteEndTxnMarkerNonControlBatch(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + if (args.magic >= MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = 9809; - short epoch = 15; - int sequence = RecordBatch.NO_SEQUENCE; + long pid = 9809; + short epoch = 15; + int sequence = RecordBatch.NO_SEQUENCE; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, args.compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - assertThrows(IllegalArgumentException.class, () -> builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, - new EndTransactionMarker(ControlRecordType.ABORT, 0))); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, + 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + assertThrows(IllegalArgumentException.class, () -> builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, + new EndTransactionMarker(ControlRecordType.ABORT, 0))); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void testWriteLeaderChangeControlBatchWithoutLeaderEpoch(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); - - final int leaderId = 1; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, args.compressionType, TimestampType.CREATE_TIME, - 0L, 0L, - RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - assertThrows(IllegalArgumentException.class, () -> builder.appendLeaderChangeMessage(RecordBatch.NO_TIMESTAMP, - new LeaderChangeMessage() - .setLeaderId(leaderId) - .setVoters(Collections.emptyList()))); + if (args.magic >= MAGIC_VALUE_V2) { Review comment: ditto ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ########## @@ -124,207 +144,199 @@ public void testWriteTransactionalRecordSet(Args args) { @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) - public void testWriteTransactionalNotAllowedMagicV0(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); - - long pid = 9809; - short epoch = 15; - int sequence = 2342; - - assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, - args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, - true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); - } - - @ParameterizedTest - @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) - public void testWriteTransactionalNotAllowedMagicV1(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); - - long pid = 9809; - short epoch = 15; - int sequence = 2342; - - assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, - args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, - true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); - } - - @ParameterizedTest - @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) - public void testWriteControlBatchNotAllowedMagicV0(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + public void testWriteTransactionalNotAllowedMagic(Args args) { + if (args.magic < MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = 9809; - short epoch = 15; - int sequence = 2342; + long pid = 9809; + short epoch = 15; + int sequence = 2342; - assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, - args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, - false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); + assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, args.magic, + args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, + true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) - public void testWriteControlBatchNotAllowedMagicV1(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + public void testWriteControlBatchNotAllowedMagic(Args args) { + if (args.magic < MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = 9809; - short epoch = 15; - int sequence = 2342; + long pid = 9809; + short epoch = 15; + int sequence = 2342; - assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, - args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, - false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); + assertThrows(IllegalArgumentException.class, () -> new MemoryRecordsBuilder(buffer, args.magic, + args.compressionType, TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, + false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity())); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void testWriteTransactionalWithInvalidPID(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + if (args.magic >= MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = RecordBatch.NO_PRODUCER_ID; - short epoch = 15; - int sequence = 2342; + long pid = RecordBatch.NO_PRODUCER_ID; + short epoch = 15; + int sequence = 2342; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, args.compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - assertThrows(IllegalArgumentException.class, builder::close); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, args.compressionType, TimestampType.CREATE_TIME, + 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + assertThrows(IllegalArgumentException.class, builder::close); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void testWriteIdempotentWithInvalidEpoch(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + if (args.magic >= MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = 9809; - short epoch = RecordBatch.NO_PRODUCER_EPOCH; - int sequence = 2342; + long pid = 9809; + short epoch = RecordBatch.NO_PRODUCER_EPOCH; + int sequence = 2342; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, args.compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - assertThrows(IllegalArgumentException.class, builder::close); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, + 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + assertThrows(IllegalArgumentException.class, builder::close); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void testWriteIdempotentWithInvalidBaseSequence(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + if (args.magic >= MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = 9809; - short epoch = 15; - int sequence = RecordBatch.NO_SEQUENCE; + long pid = 9809; + short epoch = 15; + int sequence = RecordBatch.NO_SEQUENCE; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, args.compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - assertThrows(IllegalArgumentException.class, builder::close); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, + 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + assertThrows(IllegalArgumentException.class, builder::close); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void testWriteEndTxnMarkerNonTransactionalBatch(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + if (args.magic >= MAGIC_VALUE_V2) { + ByteBuffer buffer = allocateBuffer(128, args); - long pid = 9809; - short epoch = 15; - int sequence = RecordBatch.NO_SEQUENCE; + long pid = 9809; + short epoch = 15; + int sequence = RecordBatch.NO_SEQUENCE; - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, args.compressionType, TimestampType.CREATE_TIME, - 0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - assertThrows(IllegalArgumentException.class, () -> builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, - new EndTransactionMarker(ControlRecordType.ABORT, 0))); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, args.magic, args.compressionType, TimestampType.CREATE_TIME, + 0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + assertThrows(IllegalArgumentException.class, () -> builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, + new EndTransactionMarker(ControlRecordType.ABORT, 0))); + } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void testWriteEndTxnMarkerNonControlBatch(Args args) { - ByteBuffer buffer = allocateBuffer(128, args); + if (args.magic >= MAGIC_VALUE_V2) { Review comment: How about replacing it by ```assertThrow```? ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ########## @@ -71,35 +80,46 @@ public String toString() { public Stream<? extends Arguments> provideArguments(ExtensionContext context) { List<Arguments> values = new ArrayList<>(); for (int bufferOffset : Arrays.asList(0, 15)) - for (CompressionType compressionType : CompressionType.values()) - values.add(Arguments.of(new Args(bufferOffset, compressionType))); + for (CompressionType type: CompressionType.values()) { + List<Byte> magics = type == CompressionType.ZSTD + ? Collections.singletonList(RecordBatch.MAGIC_VALUE_V2) + : asList(RecordBatch.MAGIC_VALUE_V0, MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2); + for (byte magic : magics) + values.add(Arguments.of(new Args(bufferOffset, type, magic))); + } return values.stream(); } } private final Time time = Time.SYSTEM; + @Test + public void testUnsupportedCompress() { + BiFunction<Byte, CompressionType, MemoryRecordsBuilder> builderBiFunction = (magic, compressionType) -> + new MemoryRecordsBuilder(ByteBuffer.allocate(128), magic, compressionType, TimestampType.CREATE_TIME, 0L, 0L, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, 128); + + Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1).forEach(magic -> { + Exception e = assertThrows(IllegalArgumentException.class, () -> builderBiFunction.apply(magic, CompressionType.ZSTD)); + assertEquals(e.getMessage(), "ZStandard compression is not supported for magic " + magic); + }); + } + @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void testWriteEmptyRecordSet(Args args) { - byte magic = RecordBatch.MAGIC_VALUE_V0; - assumeAtLeastV2OrNotZstd(magic, args.compressionType); - + byte magic = args.magic; ByteBuffer buffer = allocateBuffer(128, args); Supplier<MemoryRecordsBuilder> builderSupplier = () -> new MemoryRecordsBuilder(buffer, magic, Review comment: unnecessary supplier. ########## File path: clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ########## @@ -352,124 +364,87 @@ public void testEstimatedSizeInBytes(Args args) { assertEquals(records.sizeInBytes(), bytesWrittenBeforeClose); } - @ParameterizedTest - @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) - public void testCompressionRateV1(Args args) { - byte magic = RecordBatch.MAGIC_VALUE_V1; - assumeAtLeastV2OrNotZstd(magic, args.compressionType); - - ByteBuffer buffer = allocateBuffer(1024, args); - - LegacyRecord[] records = new LegacyRecord[] { - LegacyRecord.create(magic, 0L, "a".getBytes(), "1".getBytes()), - LegacyRecord.create(magic, 1L, "b".getBytes(), "2".getBytes()), - LegacyRecord.create(magic, 2L, "c".getBytes(), "3".getBytes()), - }; - - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, args.compressionType, - TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - - int uncompressedSize = 0; - for (LegacyRecord record : records) { - uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD; - builder.append(record); - } - - MemoryRecords built = builder.build(); - if (args.compressionType == CompressionType.NONE) { - assertEquals(1.0, builder.compressionRatio(), 0.00001); - } else { - int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V1; - double computedCompressionRate = (double) compressedSize / uncompressedSize; - assertEquals(computedCompressionRate, builder.compressionRatio(), 0.00001); - } - } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void buildUsingLogAppendTime(Args args) { - byte magic = RecordBatch.MAGIC_VALUE_V1; - assumeAtLeastV2OrNotZstd(magic, args.compressionType); - - ByteBuffer buffer = allocateBuffer(1024, args); - - long logAppendTime = System.currentTimeMillis(); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, args.compressionType, - TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, - RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - builder.append(0L, "a".getBytes(), "1".getBytes()); - builder.append(0L, "b".getBytes(), "2".getBytes()); - builder.append(0L, "c".getBytes(), "3".getBytes()); - MemoryRecords records = builder.build(); - - MemoryRecordsBuilder.RecordsInfo info = builder.info(); - assertEquals(logAppendTime, info.maxTimestamp); + if (args.magic >= MAGIC_VALUE_V1) { + byte magic = args.magic; - if (args.compressionType != CompressionType.NONE) - assertEquals(2L, info.shallowOffsetOfMaxTimestamp); - else - assertEquals(0L, info.shallowOffsetOfMaxTimestamp); + ByteBuffer buffer = allocateBuffer(1024, args); - for (RecordBatch batch : records.batches()) { - assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType()); - for (Record record : batch) - assertEquals(logAppendTime, record.timestamp()); + long logAppendTime = System.currentTimeMillis(); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, args.compressionType, + TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + builder.append(0L, "a".getBytes(), "1".getBytes()); + builder.append(0L, "b".getBytes(), "2".getBytes()); + builder.append(0L, "c".getBytes(), "3".getBytes()); + MemoryRecords records = builder.build(); + + MemoryRecordsBuilder.RecordsInfo info = builder.info(); + assertEquals(logAppendTime, info.maxTimestamp); + + if (args.compressionType == CompressionType.NONE && magic == MAGIC_VALUE_V1) + assertEquals(0L, info.shallowOffsetOfMaxTimestamp); + else + assertEquals(2L, info.shallowOffsetOfMaxTimestamp); + + for (RecordBatch batch : records.batches()) { + assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType()); + for (Record record : batch) + assertEquals(logAppendTime, record.timestamp()); + } } } @ParameterizedTest @ArgumentsSource(MemoryRecordsBuilderArgumentsProvider.class) public void buildUsingCreateTime(Args args) { - byte magic = RecordBatch.MAGIC_VALUE_V1; - assumeAtLeastV2OrNotZstd(magic, args.compressionType); - - ByteBuffer buffer = allocateBuffer(1024, args); - - long logAppendTime = System.currentTimeMillis(); - MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, args.compressionType, - TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); - builder.append(0L, "a".getBytes(), "1".getBytes()); - builder.append(2L, "b".getBytes(), "2".getBytes()); - builder.append(1L, "c".getBytes(), "3".getBytes()); - MemoryRecords records = builder.build(); - - MemoryRecordsBuilder.RecordsInfo info = builder.info(); - assertEquals(2L, info.maxTimestamp); - - if (args.compressionType == CompressionType.NONE) - assertEquals(1L, info.shallowOffsetOfMaxTimestamp); - else - assertEquals(2L, info.shallowOffsetOfMaxTimestamp); + if (args.magic >= MAGIC_VALUE_V1) { Review comment: Could we apply same pattern for this test case? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org