chia7712 commented on code in PR #21644:
URL: https://github.com/apache/kafka/pull/21644#discussion_r2891390374
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java:
##########
@@ -146,16 +154,103 @@ public static MemoryRecords records(List<SimpleRecord>
records,
short producerEpoch,
int sequence,
long baseOffset,
- int partitionLeaderEpoch) {
+ int partitionLeaderEpoch,
+ long timestamp) {
ByteBuffer buf =
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records));
MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue,
codec, TimestampType.CREATE_TIME, baseOffset,
- System.currentTimeMillis(), producerId, producerEpoch, sequence,
false, partitionLeaderEpoch);
+ timestamp, producerId, producerEpoch, sequence, false,
partitionLeaderEpoch);
records.forEach(builder::append);
return builder.build();
}
+ public static MemoryRecords records(List<SimpleRecord> records,
+ byte magicValue,
+ Compression codec,
+ long producerId,
+ short producerEpoch,
+ int sequence,
+ long baseOffset,
+ int partitionLeaderEpoch) {
+ return records(records, magicValue, codec, producerId, producerEpoch,
sequence, baseOffset, partitionLeaderEpoch, System.currentTimeMillis());
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records,
+ long producerId,
+ short producerEpoch,
+ int sequence,
+ long baseOffset) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, producerId, producerEpoch, sequence, baseOffset,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records, long
timestamp) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH, timestamp);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records, long
baseOffset, int partitionLeaderEpoch) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch);
+ }
+
+ public static void deleteProducerSnapshotFiles(File logDir) throws
IOException {
+ Set<File> files = Stream.of(logDir.listFiles()).filter(f -> f.isFile()
&&
f.getName().endsWith(LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)).collect(Collectors.toSet());
+ for (File file : files) {
+ Utils.delete(file);
+ }
+ }
+
+ public static List<Long> listProducerSnapshotOffsets(File logDir) throws
IOException {
+ return ProducerStateManager.listSnapshotFiles(logDir).stream().map(f
-> f.offset).sorted().toList();
+ }
+
+ public static void appendNonTransactionalAsLeader(UnifiedLog log, int
numRecords) throws IOException {
+ List<SimpleRecord> simpleRecords = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ simpleRecords.add(new SimpleRecord(String.valueOf(i).getBytes()));
+ }
+ MemoryRecords records = MemoryRecords.withRecords(Compression.NONE,
simpleRecords.toArray(new SimpleRecord[0]));
+ log.appendAsLeader(records, 0);
+ }
+
+ public static Consumer<Integer> appendTransactionalAsLeader(UnifiedLog log,
+ long
producerId,
+ short
producerEpoch,
+ Time time) {
+ return appendIdempotentAsLeader(log, producerId, producerEpoch, time,
true);
+ }
+
+ public static Consumer<Integer> appendIdempotentAsLeader(UnifiedLog log,
+ long producerId,
+ short
producerEpoch,
+ Time time,
+ boolean
isTransactional) {
+ final AtomicInteger sequence = new AtomicInteger(0);
+ return numRecords -> {
+ int baseSequence = sequence.get();
+ List<SimpleRecord> simpleRecords = new ArrayList<>();
+ for (int i = baseSequence; i < baseSequence + numRecords; i++) {
+ simpleRecords.add(new SimpleRecord(time.milliseconds(),
String.valueOf(i).getBytes()));
+ }
+
+ MemoryRecords records = isTransactional
+ ? MemoryRecords.withTransactionalRecords(Compression.NONE,
producerId,
+ producerEpoch, baseSequence, simpleRecords.toArray(new
SimpleRecord[0]))
+ : MemoryRecords.withIdempotentRecords(Compression.NONE,
producerId,
+ producerEpoch, baseSequence, simpleRecords.toArray(new
SimpleRecord[0]));
+
+ try {
Review Comment:
```java
Assertions.assertDoesNotThrow(() -> log.appendAsLeader(records, 0));
```
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java:
##########
@@ -146,16 +154,103 @@ public static MemoryRecords records(List<SimpleRecord>
records,
short producerEpoch,
int sequence,
long baseOffset,
- int partitionLeaderEpoch) {
+ int partitionLeaderEpoch,
+ long timestamp) {
ByteBuffer buf =
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records));
MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue,
codec, TimestampType.CREATE_TIME, baseOffset,
- System.currentTimeMillis(), producerId, producerEpoch, sequence,
false, partitionLeaderEpoch);
+ timestamp, producerId, producerEpoch, sequence, false,
partitionLeaderEpoch);
records.forEach(builder::append);
return builder.build();
}
+ public static MemoryRecords records(List<SimpleRecord> records,
+ byte magicValue,
+ Compression codec,
+ long producerId,
+ short producerEpoch,
+ int sequence,
+ long baseOffset,
+ int partitionLeaderEpoch) {
+ return records(records, magicValue, codec, producerId, producerEpoch,
sequence, baseOffset, partitionLeaderEpoch, System.currentTimeMillis());
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records,
+ long producerId,
+ short producerEpoch,
+ int sequence,
+ long baseOffset) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, producerId, producerEpoch, sequence, baseOffset,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records, long
timestamp) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH, timestamp);
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records, long
baseOffset, int partitionLeaderEpoch) {
+ return records(records, RecordBatch.CURRENT_MAGIC_VALUE,
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch);
+ }
+
+ public static void deleteProducerSnapshotFiles(File logDir) throws
IOException {
+ Set<File> files = Stream.of(logDir.listFiles()).filter(f -> f.isFile()
&&
f.getName().endsWith(LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)).collect(Collectors.toSet());
Review Comment:
```java
public static void deleteProducerSnapshotFiles(File logDir) {
Stream.of(Objects.requireNonNullElse(logDir.listFiles(), new
File[0]))
.filter(f -> f.isFile() &&
f.getName().endsWith(LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX))
.forEach(f -> Assertions.assertDoesNotThrow(() ->
Utils.delete(f)));
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]