FrankYang0529 commented on code in PR #21763:
URL: https://github.com/apache/kafka/pull/21763#discussion_r2939776930
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -1632,6 +1637,705 @@ public void
testRetentionDeletesProducerStateSnapshots(boolean createEmptyActive
assertEquals(3, log.logStartOffset());
}
+ @Test
+ public void testRetentionIdempotency() throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(TEN_KB)
+ .retentionBytes(-1)
+ .retentionMs(900)
+ .fileDeleteDelayMs(0)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes()))), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord(mockTime.milliseconds(), "b".getBytes()))), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord(mockTime.milliseconds() + 100, "c".getBytes()))), 0);
+
+ mockTime.sleep(901);
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.maybeIncrementLogStartOffset(1L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ assertEquals(
+ 2,
+ log.deleteOldSegments(),
+ "Expecting two segment deletions as log start offset retention
should unblock time based retention"
+ );
+ assertEquals(0, log.deleteOldSegments());
+ }
+
+ @Test
+ public void testLogStartOffsetMovementDeletesSnapshots() throws
IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(TEN_KB)
+ .retentionBytes(-1)
+ .fileDeleteDelayMs(0)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long pid1 = 1L;
+ short epoch = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes())), pid1, epoch, 0, 0L), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("b".getBytes())), pid1, epoch, 1, 0L), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("c".getBytes())), pid1, epoch, 2, 0L), 0);
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size());
+
+ // Increment the log start offset to exclude the first two segments.
+ log.maybeIncrementLogStartOffset(log.logEndOffset() - 1,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ // Sleep to breach the file delete delay and run scheduled file
deletion tasks
+ mockTime.sleep(1);
+ assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size(),
+ "expect a single producer state snapshot remaining");
+ }
+
+ @Test
+ public void testCompactionDeletesProducerStateSnapshots() throws
IOException, java.security.DigestException {
Review Comment:
How about importing `java.security.DigestException`?
##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -1632,6 +1637,705 @@ public void
testRetentionDeletesProducerStateSnapshots(boolean createEmptyActive
assertEquals(3, log.logStartOffset());
}
+ @Test
+ public void testRetentionIdempotency() throws IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(TEN_KB)
+ .retentionBytes(-1)
+ .retentionMs(900)
+ .fileDeleteDelayMs(0)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes()))), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord(mockTime.milliseconds(), "b".getBytes()))), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord(mockTime.milliseconds() + 100, "c".getBytes()))), 0);
+
+ mockTime.sleep(901);
+
+ log.updateHighWatermark(log.logEndOffset());
+ log.maybeIncrementLogStartOffset(1L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ assertEquals(
+ 2,
+ log.deleteOldSegments(),
+ "Expecting two segment deletions as log start offset retention
should unblock time based retention"
+ );
+ assertEquals(0, log.deleteOldSegments());
+ }
+
+ @Test
+ public void testLogStartOffsetMovementDeletesSnapshots() throws
IOException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(TEN_KB)
+ .retentionBytes(-1)
+ .fileDeleteDelayMs(0)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long pid1 = 1L;
+ short epoch = 0;
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes())), pid1, epoch, 0, 0L), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("b".getBytes())), pid1, epoch, 1, 0L), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("c".getBytes())), pid1, epoch, 2, 0L), 0);
+ log.updateHighWatermark(log.logEndOffset());
+ assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size());
+
+ // Increment the log start offset to exclude the first two segments.
+ log.maybeIncrementLogStartOffset(log.logEndOffset() - 1,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+ // Sleep to breach the file delete delay and run scheduled file
deletion tasks
+ mockTime.sleep(1);
+ assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size(),
+ "expect a single producer state snapshot remaining");
+ }
+
+ @Test
+ public void testCompactionDeletesProducerStateSnapshots() throws
IOException, java.security.DigestException {
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(TEN_KB)
+ .cleanupPolicy(TopicConfig.CLEANUP_POLICY_COMPACT)
+ .fileDeleteDelayMs(0)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ long pid1 = 1L;
+ short epoch = 0;
+
+ OffsetMap fakeOffsetMap = new LogTestUtils.FakeOffsetMap();
+
+ Cleaner cleaner = new Cleaner(0, fakeOffsetMap, 64 * 1024, 64 * 1024,
0.75,
+ new Throttler(Double.MAX_VALUE, Long.MAX_VALUE, "throttler",
"entries", mockTime),
+ mockTime, tp -> { });
+
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes(), "a".getBytes())), pid1, epoch, 0, 0L), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes(), "b".getBytes())), pid1, epoch, 1, 0L), 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("a".getBytes(), "c".getBytes())), pid1, epoch, 2, 0L), 0);
+ log.updateHighWatermark(log.logEndOffset());
+
+ List<Long> expectedSnapshotOffsets = log.logSegments()
+ .stream()
+ .map(LogSegment::baseOffset)
+ .sorted()
+ .skip(1)
+ .collect(Collectors.toList());
+ List<Long> snapshotOffsets =
ProducerStateManager.listSnapshotFiles(logDir)
+ .stream()
+ .map(f -> f.offset)
+ .sorted()
+ .collect(Collectors.toList());
+ assertEquals(
+ expectedSnapshotOffsets,
+ snapshotOffsets,
+ "expected a snapshot file per segment base offset, except the
first segment"
+ );
+ assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size());
+
+ // Clean segments, this should delete everything except the active
segment since there only
+ // exists the key "a".
+ cleaner.clean(new LogToClean(log, 0, log.logEndOffset(), false));
+ // There is no other key so we don't delete anything
+ assertEquals(0, log.deleteOldSegments());
+ // Sleep to breach the file delete delay and run scheduled file
deletion tasks
+ mockTime.sleep(1);
+
+ List<Long> expectedSnapshotOffsets2 = log.logSegments().stream()
+
.map(LogSegment::baseOffset).sorted().skip(1).collect(Collectors.toList());
+ List<Long> snapshotOffsets2 =
ProducerStateManager.listSnapshotFiles(logDir).stream()
+ .map(f -> f.offset).sorted().collect(Collectors.toList());
+ assertEquals(expectedSnapshotOffsets2, snapshotOffsets2,
+ "expected a snapshot file per segment base offset, excluding
the first");
+ }
+
+ @Test
+ public void testLoadingLogDeletesProducerStateSnapshotsPastLogEndOffset()
throws IOException {
+ Files.createFile(LogFileUtils.producerSnapshotFile(logDir,
42).toPath());
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(TEN_KB)
+ .retentionBytes(-1)
+ .fileDeleteDelayMs(0)
+ .build();
+ createLog(logDir, logConfig);
+ assertEquals(0, ProducerStateManager.listSnapshotFiles(logDir).size(),
+ "expected producer state snapshots greater than the log end
offset to be cleaned up");
+ }
+
+ @Test
+ public void testProducerIdMapTruncateFullyAndStartAt() throws IOException {
+ MemoryRecords records = singletonRecords("foo".getBytes());
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.sizeInBytes())
+ .retentionBytes((long) records.sizeInBytes() * 2)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.appendAsLeader(records, 0);
+ log.takeProducerSnapshot();
+
+ log.appendAsLeader(singletonRecords("bar".getBytes()), 0);
+ log.appendAsLeader(singletonRecords("baz".getBytes()), 0);
+ log.takeProducerSnapshot();
+
+ assertEquals(3, log.logSegments().size());
+ assertEquals(3, log.latestProducerStateEndOffset());
+ assertEquals(OptionalLong.of(3), log.latestProducerSnapshotOffset());
+
+ log.truncateFullyAndStartAt(29, Optional.empty());
+ assertEquals(1, log.logSegments().size());
+ assertEquals(OptionalLong.empty(), log.latestProducerSnapshotOffset());
+ assertEquals(29, log.latestProducerStateEndOffset());
+ }
+
+ @Test
+ public void testProducerIdExpirationOnSegmentDeletion() throws IOException
{
+ long pid1 = 1L;
+ short epoch = 0;
+ MemoryRecords records = LogTestUtils.records(List.of(new
SimpleRecord("foo".getBytes())), pid1, epoch, 0, 0L);
+ LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+ .segmentBytes(records.sizeInBytes())
+ .retentionBytes((long) records.sizeInBytes() * 2)
+ .build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.appendAsLeader(records, 0);
+ log.takeProducerSnapshot();
+
+ long pid2 = 2L;
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("bar".getBytes())), pid2, epoch, 0, 0L), 0);
+ log.appendAsLeader(LogTestUtils.records(List.of(new
SimpleRecord("baz".getBytes())), pid2, epoch, 1, 0L), 0);
+ log.takeProducerSnapshot();
+
+ assertEquals(3, log.logSegments().size());
+ assertEquals(Set.of(pid1, pid2),
log.activeProducersWithLastSequence().keySet());
+
+ log.updateHighWatermark(log.logEndOffset());
+ assertTrue(log.deleteOldSegments() > 0, "At least one segment should
be deleted");
+
+ // Producer state should not be removed when deleting log segment
+ assertEquals(2, log.logSegments().size());
+ assertEquals(Set.of(pid1, pid2),
log.activeProducersWithLastSequence().keySet());
+ }
+
+ @Test
+ public void
testTakeSnapshotOnRollAndDeleteSnapshotOnRecoveryPointCheckpoint() throws
IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+ log.appendAsLeader(singletonRecords("a".getBytes()), 0);
+ log.roll(Optional.of(1L));
+ assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset());
+ assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset());
+
+ log.appendAsLeader(singletonRecords("b".getBytes()), 0);
+ log.roll(Optional.of(2L));
+ assertEquals(OptionalLong.of(2L), log.latestProducerSnapshotOffset());
+ assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset());
+
+ log.appendAsLeader(singletonRecords("c".getBytes()), 0);
+ log.roll(Optional.of(3L));
+ assertEquals(OptionalLong.of(3L), log.latestProducerSnapshotOffset());
+
+ // roll triggers a flush at the starting offset of the new segment, we
should retain all snapshots
+ assertEquals(OptionalLong.of(1L), log.oldestProducerSnapshotOffset());
+
+ // even if we flush within the active segment, the snapshot should
remain
+ log.appendAsLeader(singletonRecords("baz".getBytes()), 0);
+ log.flushUptoOffsetExclusive(4L);
+ assertEquals(OptionalLong.of(3L), log.latestProducerSnapshotOffset());
+ }
+
+ @Test
+ public void testProducerSnapshotAfterSegmentRollOnAppend() throws
IOException {
+ long producerId = 1L;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ log.appendAsLeader(LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(), new
byte[512])),
+ producerId, (short) 0, 0, 0L), 0);
+
+ // The next append should overflow the segment and cause it to roll
+ log.appendAsLeader(LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(), new
byte[512])),
+ producerId, (short) 0, 1, 0L), 0);
+
+ assertEquals(2, log.logSegments().size());
+ assertEquals(1L, log.activeSegment().baseOffset());
+ assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset());
+
+ // Force a reload from the snapshot to check its consistency
+ log.truncateTo(1L);
+
+ assertEquals(2, log.logSegments().size());
+ assertEquals(1L, log.activeSegment().baseOffset());
+ assertFalse(log.activeSegment().log().batches().iterator().hasNext());
+ assertEquals(OptionalLong.of(1L), log.latestProducerSnapshotOffset());
+
+ ProducerStateEntry lastEntry =
log.producerStateManager().lastEntry(producerId).orElse(null);
+ assertNotNull(lastEntry);
+ assertEquals(0L, lastEntry.firstDataOffset());
+ assertEquals(0L, lastEntry.lastDataOffset());
+ }
+
+ @Test
+ public void testRebuildTransactionalState() throws IOException {
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(1024 * 1024 * 5).build();
+ UnifiedLog log = createLog(logDir, logConfig);
+
+ long pid = 137L;
+ short epoch = 5;
+ int seq = 0;
+
+ // add some transactional records
+ MemoryRecords txnRecords =
MemoryRecords.withTransactionalRecords(Compression.NONE, pid, epoch, seq,
+ new SimpleRecord("foo".getBytes()),
+ new SimpleRecord("bar".getBytes()),
+ new SimpleRecord("baz".getBytes()));
+ log.appendAsLeader(txnRecords, 0);
+ LogAppendInfo abortAppendInfo =
LogTestUtils.appendEndTxnMarkerAsLeader(log, pid, epoch,
+ ControlRecordType.ABORT, mockTime.milliseconds(), 0, 0,
+ TransactionVersion.TV_0.featureLevel());
+ log.updateHighWatermark(abortAppendInfo.lastOffset() + 1);
+
+ // now there should be no first unstable offset
+ assertEquals(Optional.empty(), log.firstUnstableOffset());
+
+ log.close();
+
+ UnifiedLog reopenedLog = createLog(logDir, logConfig, 0L, 0L,
brokerTopicStats,
+ mockTime.scheduler, mockTime, producerStateManagerConfig,
false,
+ Optional.empty(), false);
+ reopenedLog.updateHighWatermark(abortAppendInfo.lastOffset() + 1);
+ assertEquals(Optional.empty(), reopenedLog.firstUnstableOffset());
+ }
+
+ @Test
+ public void testPeriodicProducerIdExpiration() throws IOException {
+ ProducerStateManagerConfig customConfig = new
ProducerStateManagerConfig(200, false);
+ int producerIdExpirationCheckIntervalMs = 100;
+
+ long pid = 23L;
+ LogConfig logConfig = new
LogTestUtils.LogConfigBuilder().segmentBytes(TEN_KB).build();
+ UnifiedLog log = createLog(logDir, logConfig, 0L, 0L, brokerTopicStats,
+ mockTime.scheduler, mockTime, customConfig, true,
Optional.empty(), false,
+ producerIdExpirationCheckIntervalMs);
+ log.appendAsLeader(LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"foo".getBytes())),
+ pid, (short) 0, 0, 0L), 0);
+
+ assertEquals(Set.of(pid),
log.activeProducersWithLastSequence().keySet());
+
+ mockTime.sleep(producerIdExpirationCheckIntervalMs);
+ assertEquals(Set.of(pid),
log.activeProducersWithLastSequence().keySet());
+
+ mockTime.sleep(producerIdExpirationCheckIntervalMs);
+ assertEquals(Set.of(), log.activeProducersWithLastSequence().keySet());
+ }
+
+ @Test
+ public void testDuplicateAppends() throws IOException {
+ UnifiedLog log = createLog(logDir, new
LogTestUtils.LogConfigBuilder().build());
+ long pid = 1L;
+ short epoch = 0;
+
+ int[] seq = {0};
+ // Pad the beginning of the log.
+ for (int i = 0; i <= 5; i++) {
+ MemoryRecords record = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, epoch, seq[0], 0L);
+ log.appendAsLeader(record, 0);
+ seq[0]++;
+ }
+ // Append an entry with multiple log records.
+ Supplier<MemoryRecords> createRecords = () ->
LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes())
+ ), pid, epoch, seq[0], 0L);
+ LogAppendInfo multiEntryAppendInfo =
log.appendAsLeader(createRecords.get(), 0);
+ assertEquals(3, multiEntryAppendInfo.lastOffset() -
multiEntryAppendInfo.firstOffset() + 1,
+ "should have appended 3 entries");
+
+ // Append a Duplicate of the tail, when the entry at the tail has
multiple records.
+ LogAppendInfo dupMultiEntryAppendInfo =
log.appendAsLeader(createRecords.get(), 0);
+ assertEquals(multiEntryAppendInfo.firstOffset(),
dupMultiEntryAppendInfo.firstOffset(),
+ "Somehow appended a duplicate entry with multiple log records
to the tail");
+ assertEquals(multiEntryAppendInfo.lastOffset(),
dupMultiEntryAppendInfo.lastOffset(),
+ "Somehow appended a duplicate entry with multiple log records
to the tail");
+
+ seq[0] += 3;
+
+ // Append a partial duplicate of the tail. This is not allowed.
+ MemoryRecords partialDup = LogTestUtils.records(List.of(
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes()),
+ new SimpleRecord(mockTime.milliseconds(), ("key-" +
seq[0]).getBytes(), ("value-" + seq[0]).getBytes())
+ ), pid, epoch, seq[0] - 2, 0L);
+ assertThrows(OutOfOrderSequenceException.class, () ->
log.appendAsLeader(partialDup, 0),
+ () -> "Should have received an OutOfOrderSequenceException
since we attempted to append a duplicate of a records in the middle of the
log.");
+
+ // Append a duplicate of the batch which is 4th from the tail. This
should succeed without error since we
+ // retain the batch metadata of the last 5 batches.
+ MemoryRecords duplicateOfFourth = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, epoch, 2, 0L);
+ log.appendAsLeader(duplicateOfFourth, 0);
+
+ // Duplicates at older entries are reported as OutOfOrderSequence
errors
+ MemoryRecords oldDup = LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key-1".getBytes(), "value-1".getBytes())),
+ pid, epoch, 1, 0L);
+ assertThrows(OutOfOrderSequenceException.class, () ->
log.appendAsLeader(oldDup, 0),
+ () -> "Should have received an OutOfOrderSequenceException
since we attempted to append a duplicate of a batch which is older than the
last 5 appended batches.");
+
+ // Append a duplicate entry with a single records at the tail of the
log. This should return the appendInfo of the original entry.
+ Supplier<MemoryRecords> createRecordsWithDuplicate = () ->
LogTestUtils.records(
+ List.of(new SimpleRecord(mockTime.milliseconds(),
"key".getBytes(), "value".getBytes())),
+ pid, epoch, seq[0], 0L);
+ LogAppendInfo origAppendInfo =
log.appendAsLeader(createRecordsWithDuplicate.get(), 0);
+ LogAppendInfo newAppendInfo =
log.appendAsLeader(createRecordsWithDuplicate.get(), 0);
+ assertEquals(origAppendInfo.firstOffset(),
newAppendInfo.firstOffset(), "Inserted a duplicate records into the log");
+ assertEquals(origAppendInfo.lastOffset(), newAppendInfo.lastOffset(),
"Inserted a duplicate records into the log");
+ }
+
+ @Test
+ public void testMultipleProducerIdsPerMemoryRecord() throws IOException {
+ UnifiedLog log = createLog(logDir, new
LogTestUtils.LogConfigBuilder().build());
+
+ short producerEpoch = 0;
+ int partitionLeaderEpoch = 0;
+ ByteBuffer buffer = ByteBuffer.allocate(512);
+
+ MemoryRecordsBuilder builder = MemoryRecords.builder(
+ buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, 0L, mockTime.milliseconds(),
1L, producerEpoch, 0, false,
+ partitionLeaderEpoch);
+ builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2,
Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, 1L, mockTime.milliseconds(),
2L, producerEpoch, 0, false,
+ partitionLeaderEpoch);
+ builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+ builder.close();
+
+ builder = MemoryRecords.builder(
+ buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, 2L, mockTime.milliseconds(),
3L, producerEpoch, 0, false,
+ partitionLeaderEpoch);
+ builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+ builder.close();
+
+ builder = MemoryRecords.builder(
+ buffer, RecordBatch.MAGIC_VALUE_V2, Compression.NONE,
+ TimestampType.LOG_APPEND_TIME, 3L, mockTime.milliseconds(),
4L, producerEpoch, 0, false,
+ partitionLeaderEpoch);
+ builder.append(new SimpleRecord("key".getBytes(), "value".getBytes()));
+ builder.close();
+
+ buffer.flip();
+ MemoryRecords memoryRecords = MemoryRecords.readableRecords(buffer);
+
+ log.appendAsFollower(memoryRecords, partitionLeaderEpoch);
+ log.flush(false);
+
+ FetchDataInfo fetchedData = log.read(0, Integer.MAX_VALUE,
FetchIsolation.LOG_END, true);
+
+ java.util.Iterator<? extends RecordBatch> origIterator =
memoryRecords.batches().iterator();
Review Comment:
How about importing `java.util.Iterator`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]