FrankYang0529 commented on code in PR #21763:
URL: https://github.com/apache/kafka/pull/21763#discussion_r2939776930


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -1632,6 +1637,705 @@ public void 
testRetentionDeletesProducerStateSnapshots(boolean createEmptyActive
         assertEquals(3, log.logStartOffset());
     }
 
+    @Test
+    public void testRetentionIdempotency() throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(TEN_KB)
+                .retentionBytes(-1)
+                .retentionMs(900)
+                .fileDeleteDelayMs(0)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes()))), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "b".getBytes()))), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord(mockTime.milliseconds() + 100, "c".getBytes()))), 0);
+
+        mockTime.sleep(901);
+
+        log.updateHighWatermark(log.logEndOffset());
+        log.maybeIncrementLogStartOffset(1L, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        assertEquals(
+                2, 
+                log.deleteOldSegments(),
+                "Expecting two segment deletions as log start offset retention 
should unblock time based retention"
+        );
+        assertEquals(0, log.deleteOldSegments());
+    }
+
+    @Test
+    public void testLogStartOffsetMovementDeletesSnapshots() throws 
IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(TEN_KB)
+                .retentionBytes(-1)
+                .fileDeleteDelayMs(0)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long pid1 = 1L;
+        short epoch = 0;
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("a".getBytes())), pid1, epoch, 0, 0L), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("b".getBytes())), pid1, epoch, 1, 0L), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("c".getBytes())), pid1, epoch, 2, 0L), 0);
+        log.updateHighWatermark(log.logEndOffset());
+        assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size());
+
+        // Increment the log start offset to exclude the first two segments.
+        log.maybeIncrementLogStartOffset(log.logEndOffset() - 1, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        // Sleep to breach the file delete delay and run scheduled file 
deletion tasks
+        mockTime.sleep(1);
+        assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size(),
+                "expect a single producer state snapshot remaining");
+    }
+
+    @Test
+    public void testCompactionDeletesProducerStateSnapshots() throws 
IOException, java.security.DigestException {

Review Comment:
   How about importing `java.security.DigestException`?



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -1632,6 +1637,705 @@ public void 
testRetentionDeletesProducerStateSnapshots(boolean createEmptyActive
         assertEquals(3, log.logStartOffset());
     }
 
+    @Test
+    public void testRetentionIdempotency() throws IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(TEN_KB)
+                .retentionBytes(-1)
+                .retentionMs(900)
+                .fileDeleteDelayMs(0)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes()))), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord(mockTime.milliseconds(), "b".getBytes()))), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord(mockTime.milliseconds() + 100, "c".getBytes()))), 0);
+
+        mockTime.sleep(901);
+
+        log.updateHighWatermark(log.logEndOffset());
+        log.maybeIncrementLogStartOffset(1L, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        assertEquals(
+                2, 
+                log.deleteOldSegments(),
+                "Expecting two segment deletions as log start offset retention 
should unblock time based retention"
+        );
+        assertEquals(0, log.deleteOldSegments());
+    }
+
+    @Test
+    public void testLogStartOffsetMovementDeletesSnapshots() throws 
IOException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(TEN_KB)
+                .retentionBytes(-1)
+                .fileDeleteDelayMs(0)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long pid1 = 1L;
+        short epoch = 0;
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("a".getBytes())), pid1, epoch, 0, 0L), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("b".getBytes())), pid1, epoch, 1, 0L), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("c".getBytes())), pid1, epoch, 2, 0L), 0);
+        log.updateHighWatermark(log.logEndOffset());
+        assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size());
+
+        // Increment the log start offset to exclude the first two segments.
+        log.maybeIncrementLogStartOffset(log.logEndOffset() - 1, 
LogStartOffsetIncrementReason.ClientRecordDeletion);
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+        // Sleep to breach the file delete delay and run scheduled file 
deletion tasks
+        mockTime.sleep(1);
+        assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size(),
+                "expect a single producer state snapshot remaining");
+    }
+
+    @Test
+    public void testCompactionDeletesProducerStateSnapshots() throws 
IOException, java.security.DigestException {
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(TEN_KB)
+                .cleanupPolicy(TopicConfig.CLEANUP_POLICY_COMPACT)
+                .fileDeleteDelayMs(0)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        long pid1 = 1L;
+        short epoch = 0;
+
+        OffsetMap fakeOffsetMap = new LogTestUtils.FakeOffsetMap();
+
+        Cleaner cleaner = new Cleaner(0, fakeOffsetMap, 64 * 1024, 64 * 1024, 
0.75,
+                new Throttler(Double.MAX_VALUE, Long.MAX_VALUE, "throttler", 
"entries", mockTime),
+                mockTime, tp -> { });
+
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("a".getBytes(), "a".getBytes())), pid1, epoch, 0, 0L), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("a".getBytes(), "b".getBytes())), pid1, epoch, 1, 0L), 0);
+        log.roll();
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("a".getBytes(), "c".getBytes())), pid1, epoch, 2, 0L), 0);
+        log.updateHighWatermark(log.logEndOffset());
+
+        List<Long> expectedSnapshotOffsets = log.logSegments()
+                .stream()
+                .map(LogSegment::baseOffset)
+                .sorted()
+                .skip(1)
+                .collect(Collectors.toList());
+        List<Long> snapshotOffsets = 
ProducerStateManager.listSnapshotFiles(logDir)
+                .stream()
+                .map(f -> f.offset)
+                .sorted()
+                .collect(Collectors.toList());
+        assertEquals(
+                expectedSnapshotOffsets, 
+                snapshotOffsets,
+                "expected a snapshot file per segment base offset, except the 
first segment"
+        );
+        assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size());
+
+        // Clean segments, this should delete everything except the active 
segment since there only
+        // exists the key "a".
+        cleaner.clean(new LogToClean(log, 0, log.logEndOffset(), false));
+        // There is no other key so we don't delete anything
+        assertEquals(0, log.deleteOldSegments());
+        // Sleep to breach the file delete delay and run scheduled file 
deletion tasks
+        mockTime.sleep(1);
+
+        List<Long> expectedSnapshotOffsets2 = log.logSegments().stream()
+                
.map(LogSegment::baseOffset).sorted().skip(1).collect(Collectors.toList());
+        List<Long> snapshotOffsets2 = 
ProducerStateManager.listSnapshotFiles(logDir).stream()
+                .map(f -> f.offset).sorted().collect(Collectors.toList());
+        assertEquals(expectedSnapshotOffsets2, snapshotOffsets2,
+                "expected a snapshot file per segment base offset, excluding 
the first");
+    }
+
+    @Test
+    public void testLoadingLogDeletesProducerStateSnapshotsPastLogEndOffset() 
throws IOException {
+        Files.createFile(LogFileUtils.producerSnapshotFile(logDir, 
42).toPath());
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(TEN_KB)
+                .retentionBytes(-1)
+                .fileDeleteDelayMs(0)
+                .build();
+        createLog(logDir, logConfig);
+        assertEquals(0, ProducerStateManager.listSnapshotFiles(logDir).size(),
+                "expected producer state snapshots greater than the log end 
offset to be cleaned up");
+    }
+
+    @Test
+    public void testProducerIdMapTruncateFullyAndStartAt() throws IOException {
+        MemoryRecords records = singletonRecords("foo".getBytes());
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.sizeInBytes())
+                .retentionBytes((long) records.sizeInBytes() * 2)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.appendAsLeader(records, 0);
+        log.takeProducerSnapshot();
+
+        log.appendAsLeader(singletonRecords("bar".getBytes()), 0);
+        log.appendAsLeader(singletonRecords("baz".getBytes()), 0);
+        log.takeProducerSnapshot();
+
+        assertEquals(3, log.logSegments().size());
+        assertEquals(3, log.latestProducerStateEndOffset());
+        assertEquals(OptionalLong.of(3), log.latestProducerSnapshotOffset());
+
+        log.truncateFullyAndStartAt(29, Optional.empty());
+        assertEquals(1, log.logSegments().size());
+        assertEquals(OptionalLong.empty(), log.latestProducerSnapshotOffset());
+        assertEquals(29, log.latestProducerStateEndOffset());
+    }
+
+    @Test
+    public void testProducerIdExpirationOnSegmentDeletion() throws IOException 
{
+        long pid1 = 1L;
+        short epoch = 0;
+        MemoryRecords records = LogTestUtils.records(List.of(new 
SimpleRecord("foo".getBytes())), pid1, epoch, 0, 0L);
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(records.sizeInBytes())
+                .retentionBytes((long) records.sizeInBytes() * 2)
+                .build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.appendAsLeader(records, 0);
+        log.takeProducerSnapshot();
+
+        long pid2 = 2L;
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("bar".getBytes())), pid2, epoch, 0, 0L), 0);
+        log.appendAsLeader(LogTestUtils.records(List.of(new 
SimpleRecord("baz".getBytes())), pid2, epoch, 1, 0L), 0);
+        log.takeProducerSnapshot();
+
+        assertEquals(3, log.logSegments().size());
+        assertEquals(Set.of(pid1, pid2), 
log.activeProducersWithLastSequence().keySet());
+
+        log.updateHighWatermark(log.logEndOffset());
+        assertTrue(log.deleteOldSegments() > 0, "At least one segment should 
be deleted");
+
+        // Producer state should not be removed when deleting log segment
+        assertEquals(2, log.logSegments().size());
+        assertEquals(Set.of(pid1, pid2), 
log.activeProducersWithLastSequence().keySet());
+    }
+
+    @Test
+    public void 
testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint() throws 
IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+        log.appendAsLeader(singletonRecords("a".getBytes()), 0);
+        log.roll(Optional.of(1L));
+        assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset());
+        assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset());
+
+        log.appendAsLeader(singletonRecords("b".getBytes()), 0);
+        log.roll(Optional.of(2L));
+        assertEquals(OptionalLong.of(2L), log.latestProducerSnapshotOffset());
+        assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset());
+
+        log.appendAsLeader(singletonRecords("c".getBytes()), 0);
+        log.roll(Optional.of(3L));
+        assertEquals(OptionalLong.of(3L), log.latestProducerSnapshotOffset());
+
+        // roll triggers a flush at the starting offset of the new segment, we 
should retain all snapshots
+        assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset());
+
+        // even if we flush within the active segment, the snapshot should 
remain
+        log.appendAsLeader(singletonRecords("baz".getBytes()), 0);
+        log.flushUptoOffsetExclusive(4L);
+        assertEquals(OptionalLong.of(3L), log.latestProducerSnapshotOffset());
+    }
+
+    @Test
+    public void testProducerSnapshotAfterSegmentRollOnAppend() throws 
IOException {
+        long producerId = 1L;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        log.appendAsLeader(LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), new 
byte[512])),
+                producerId, (short) 0, 0, 0L), 0);
+
+        // The next append should overflow the segment and cause it to roll
+        log.appendAsLeader(LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), new 
byte[512])),
+                producerId, (short) 0, 1, 0L), 0);
+
+        assertEquals(2, log.logSegments().size());
+        assertEquals(1L, log.activeSegment().baseOffset());
+        assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset());
+
+        // Force a reload from the snapshot to check its consistency
+        log.truncateTo(1L);
+
+        assertEquals(2, log.logSegments().size());
+        assertEquals(1L, log.activeSegment().baseOffset());
+        assertFalse(log.activeSegment().log().batches().iterator().hasNext());
+        assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset());
+
+        ProducerStateEntry lastEntry = 
log.producerStateManager().lastEntry(producerId).orElse(null);
+        assertNotNull(lastEntry);
+        assertEquals(0L, lastEntry.firstDataOffset());
+        assertEquals(0L, lastEntry.lastDataOffset());
+    }
+
+    @Test
+    public void testRebuildTransactionalState() throws IOException {
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+        UnifiedLog log = createLog(logDir, logConfig);
+
+        long pid = 137L;
+        short epoch = 5;
+        int seq = 0;
+
+        // add some transactional records
+        MemoryRecords txnRecords = 
MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq,
+                new SimpleRecord("foo".getBytes()),
+                new SimpleRecord("bar".getBytes()),
+                new SimpleRecord("baz".getBytes()));
+        log.appendAsLeader(txnRecords, 0);
+        LogAppendInfo abortAppendInfo = 
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
+                ControlRecordType.ABORT, mockTime.milliseconds(), 0, 0,
+                TransactionVersion.TV_0.featureLevel());
+        log.updateHighWatermark(abortAppendInfo.lastOffset() + 1);
+
+        // now there should be no first unstable offset
+        assertEquals(Optional.empty(), log.firstUnstableOffset());
+
+        log.close();
+
+        UnifiedLog reopenedLog = createLog(logDir, logConfig, 0L, 0L, 
brokerTopicStats,
+                mockTime.scheduler, mockTime, producerStateManagerConfig, 
false,
+                Optional.empty(), false);
+        reopenedLog.updateHighWatermark(abortAppendInfo.lastOffset() + 1);
+        assertEquals(Optional.empty(), reopenedLog.firstUnstableOffset());
+    }
+
+    @Test
+    public void testPeriodicProducerIdExpiration() throws IOException {
+        ProducerStateManagerConfig customConfig = new 
ProducerStateManagerConfig(200, false);
+        int producerIdExpirationCheckIntervalMs = 100;
+
+        long pid = 23L;
+        LogConfig logConfig = new 
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+        UnifiedLog log = createLog(logDir, logConfig, 0L, 0L, brokerTopicStats,
+                mockTime.scheduler, mockTime, customConfig, true, 
Optional.empty(), false,
+                producerIdExpirationCheckIntervalMs);
+        log.appendAsLeader(LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), 
"foo".getBytes())),
+                pid, (short) 0, 0, 0L), 0);
+
+        assertEquals(Set.of(pid), 
log.activeProducersWithLastSequence().keySet());
+
+        mockTime.sleep(producerIdExpirationCheckIntervalMs);
+        assertEquals(Set.of(pid), 
log.activeProducersWithLastSequence().keySet());
+
+        mockTime.sleep(producerIdExpirationCheckIntervalMs);
+        assertEquals(Set.of(), log.activeProducersWithLastSequence().keySet());
+    }
+
+    @Test
+    public void testDuplicateAppends() throws IOException {
+        UnifiedLog log = createLog(logDir, new 
LogTestUtils.LogConfigBuilder().build());
+        long pid = 1L;
+        short epoch = 0;
+
+        int[] seq = {0};
+        // Pad the beginning of the log.
+        for (int i = 0; i <= 5; i++) {
+            MemoryRecords record = LogTestUtils.records(
+                    List.of(new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "value".getBytes())),
+                    pid, epoch, seq[0], 0L);
+            log.appendAsLeader(record, 0);
+            seq[0]++;
+        }
+        // Append an entry with multiple log records.
+        Supplier<MemoryRecords> createRecords = () -> 
LogTestUtils.records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes())
+        ), pid, epoch, seq[0], 0L);
+        LogAppendInfo multiEntryAppendInfo = 
log.appendAsLeader(createRecords.get(), 0);
+        assertEquals(3, multiEntryAppendInfo.lastOffset() - 
multiEntryAppendInfo.firstOffset() + 1,
+                "should have appended 3 entries");
+
+        // Append a Duplicate of the tail, when the entry at the tail has 
multiple records.
+        LogAppendInfo dupMultiEntryAppendInfo = 
log.appendAsLeader(createRecords.get(), 0);
+        assertEquals(multiEntryAppendInfo.firstOffset(), 
dupMultiEntryAppendInfo.firstOffset(),
+                "Somehow appended a duplicate entry with multiple log records 
to the tail");
+        assertEquals(multiEntryAppendInfo.lastOffset(), 
dupMultiEntryAppendInfo.lastOffset(),
+                "Somehow appended a duplicate entry with multiple log records 
to the tail");
+
+        seq[0] += 3;
+
+        // Append a partial duplicate of the tail. This is not allowed.
+        MemoryRecords partialDup = LogTestUtils.records(List.of(
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
+                new SimpleRecord(mockTime.milliseconds(), ("key-" + 
seq[0]).getBytes(), ("value-" + seq[0]).getBytes())
+        ), pid, epoch, seq[0] - 2, 0L);
+        assertThrows(OutOfOrderSequenceException.class, () -> 
log.appendAsLeader(partialDup, 0),
+                () -> "Should have received an OutOfOrderSequenceException 
since we attempted to append a duplicate of a records in the middle of the 
log.");
+
+        // Append a duplicate of the batch which is 4th from the tail. This 
should succeed without error since we
+        // retain the batch metadata of the last 5 batches.
+        MemoryRecords duplicateOfFourth = LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "value".getBytes())),
+                pid, epoch, 2, 0L);
+        log.appendAsLeader(duplicateOfFourth, 0);
+
+        // Duplicates at older entries are reported as OutOfOrderSequence 
errors
+        MemoryRecords oldDup = LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), 
"key-1".getBytes(), "value-1".getBytes())),
+                pid, epoch, 1, 0L);
+        assertThrows(OutOfOrderSequenceException.class, () -> 
log.appendAsLeader(oldDup, 0),
+                () -> "Should have received an OutOfOrderSequenceException 
since we attempted to append a duplicate of a batch which is older than the 
last 5 appended batches.");
+
+        // Append a duplicate entry with a single records at the tail of the 
log. This should return the appendInfo of the original entry.
+        Supplier<MemoryRecords> createRecordsWithDuplicate = () -> 
LogTestUtils.records(
+                List.of(new SimpleRecord(mockTime.milliseconds(), 
"key".getBytes(), "value".getBytes())),
+                pid, epoch, seq[0], 0L);
+        LogAppendInfo origAppendInfo = 
log.appendAsLeader(createRecordsWithDuplicate.get(), 0);
+        LogAppendInfo newAppendInfo = 
log.appendAsLeader(createRecordsWithDuplicate.get(), 0);
+        assertEquals(origAppendInfo.firstOffset(), 
newAppendInfo.firstOffset(), "Inserted a duplicate records into the log");
+        assertEquals(origAppendInfo.lastOffset(), newAppendInfo.lastOffset(), 
"Inserted a duplicate records into the log");
+    }
+
+    @Test
+    public void testMultipleProducerIdsPerMemoryRecord() throws IOException {
+        UnifiedLog log = createLog(logDir, new 
LogTestUtils.LogConfigBuilder().build());
+
+        short producerEpoch = 0;
+        int partitionLeaderEpoch = 0;
+        ByteBuffer buffer = ByteBuffer.allocate(512);
+
+        MemoryRecordsBuilder builder = MemoryRecords.builder(
+                buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(), 
1L, producerEpoch, 0, false,
+                partitionLeaderEpoch);
+        builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+        builder.close();
+
+        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(), 
2L, producerEpoch, 0, false,
+                partitionLeaderEpoch);
+        builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+        builder.close();
+
+        builder = MemoryRecords.builder(
+                buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(), 
3L, producerEpoch, 0, false,
+                partitionLeaderEpoch);
+        builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+        builder.close();
+
+        builder = MemoryRecords.builder(
+                buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+                TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(), 
4L, producerEpoch, 0, false,
+                partitionLeaderEpoch);
+        builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+        builder.close();
+
+        buffer.flip();
+        MemoryRecords memoryRecords = MemoryRecords.readableRecords(buffer);
+
+        log.appendAsFollower(memoryRecords, partitionLeaderEpoch);
+        log.flush(false);
+
+        FetchDataInfo fetchedData = log.read(0, Integer.MAX_VALUE, 
FetchIsolation.LOG_END, true);
+
+        java.util.Iterator<? extends RecordBatch> origIterator = 
memoryRecords.batches().iterator();

Review Comment:
   How about importing `java.util.Iterator`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to