TaiJuWu commented on code in PR #20635:
URL: https://github.com/apache/kafka/pull/20635#discussion_r2638731871
##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3015,526 +3016,8 @@ class UnifiedLogTest {
assertFalse(LogTestUtils.hasOffsetOverflow(log))
}
- @Test
- def testDeleteOldSegments(): Unit = {
- def createRecords = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds - 1000)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 100)
- log.appendAsLeader(createRecords, 0)
-
- log.assignEpochStartOffset(0, 40)
- log.assignEpochStartOffset(1, 90)
-
- // segments are not eligible for deletion if no high watermark has been set
- val numSegments = log.numberOfSegments
- log.deleteOldSegments()
- assertEquals(numSegments, log.numberOfSegments)
- assertEquals(0L, log.logStartOffset)
-
- // only segments with offset before the current high watermark are
eligible for deletion
- for (hw <- 25 to 30) {
- log.updateHighWatermark(hw)
- log.deleteOldSegments()
- assertTrue(log.logStartOffset <= hw)
- log.logSegments.forEach { segment =>
- val segmentFetchInfo = segment.read(segment.baseOffset, Int.MaxValue)
- val segmentLastOffsetOpt =
segmentFetchInfo.records.records.asScala.lastOption.map(_.offset)
- segmentLastOffsetOpt.foreach { lastOffset =>
- assertTrue(lastOffset >= hw)
- }
- }
- }
-
- // expire all segments
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(1, log.numberOfSegments, "The deleted segments should be
gone.")
- assertEquals(1, epochCache(log).epochEntries.size, "Epoch entries should
have gone.")
- assertEquals(new EpochEntry(1, 100), epochCache(log).epochEntries.get(0),
"Epoch entry should be the latest epoch and the leo.")
-
- // append some messages to create some segments
- for (_ <- 0 until 100)
- log.appendAsLeader(createRecords, 0)
-
- log.delete()
- assertEquals(0, log.numberOfSegments, "The number of segments should be 0")
- assertEquals(0, log.deleteOldSegments(), "The number of deleted segments
should be zero.")
- assertEquals(0, epochCache(log).epochEntries.size, "Epoch entries should
have gone.")
- }
-
- @Test
- def testLogDeletionAfterClose(): Unit = {
- def createRecords = TestUtils.singletonRecords(value = "test".getBytes,
timestamp = mockTime.milliseconds - 1000)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- log.appendAsLeader(createRecords, 0)
-
- assertEquals(1, log.numberOfSegments, "The deleted segments should be
gone.")
- assertEquals(1, epochCache(log).epochEntries.size, "Epoch entries should
have gone.")
-
- log.close()
- log.delete()
- assertEquals(0, log.numberOfSegments, "The number of segments should be 0")
- assertEquals(0, epochCache(log).epochEntries.size, "Epoch entries should
have gone.")
- }
-
- @Test
- def testLogDeletionAfterDeleteRecords(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5)
- val log = createLog(logDir, logConfig)
-
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
- assertEquals(3, log.numberOfSegments, "should have 3 segments")
- assertEquals(log.logStartOffset, 0)
- log.updateHighWatermark(log.logEndOffset)
-
- log.maybeIncrementLogStartOffset(1,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- log.deleteOldSegments()
- assertEquals(3, log.numberOfSegments, "should have 3 segments")
- assertEquals(log.logStartOffset, 1)
-
- log.maybeIncrementLogStartOffset(6,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- log.deleteOldSegments()
- assertEquals(2, log.numberOfSegments, "should have 2 segments")
- assertEquals(log.logStartOffset, 6)
-
- log.maybeIncrementLogStartOffset(15,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- log.deleteOldSegments()
- assertEquals(1, log.numberOfSegments, "should have 1 segments")
- assertEquals(log.logStartOffset, 15)
- }
-
def epochCache(log: UnifiedLog): LeaderEpochFileCache = log.leaderEpochCache
- @Test
- def shouldDeleteSizeBasedSegments(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(2,log.numberOfSegments, "should have 2 segments")
- }
-
- @Test
- def shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 15)
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(3,log.numberOfSegments, "should have 3 segments")
- }
-
- @Test
- def shouldDeleteTimeBasedSegmentsReadyToBeDeleted(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp
= 10)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000)
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(1, log.numberOfSegments, "There should be 1 segment
remaining")
- }
-
- @Test
- def shouldNotDeleteTimeBasedSegmentsWhenNoneReadyToBeDeleted(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes, timestamp
= mockTime.milliseconds)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000000)
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(3, log.numberOfSegments, "There should be 3 segments
remaining")
- }
-
- @Test
- def shouldNotDeleteSegmentsWhenPolicyDoesNotIncludeDelete(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes(), timestamp = 10L)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy = "compact")
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- // mark the oldest segment as older the retention.ms
- log.logSegments.asScala.head.setLastModified(mockTime.milliseconds - 20000)
-
- val segments = log.numberOfSegments
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(segments, log.numberOfSegments, "There should be 3 segments
remaining")
- }
-
- @Test
- def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention(): Unit =
{
- def createRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes(), timestamp = 10L)
- val recordSize = createRecords.sizeInBytes
- val logConfig = LogTestUtils.createLogConfig(
- segmentBytes = recordSize * 2,
- localRetentionBytes = recordSize / 2,
- cleanupPolicy = "",
- remoteLogStorageEnable = true
- )
- val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
- for (_ <- 0 until 10)
- log.appendAsLeader(createRecords, 0)
-
- val segmentsBefore = log.numberOfSegments
- log.updateHighWatermark(log.logEndOffset)
- log.updateHighestOffsetInRemoteStorage(log.logEndOffset - 1)
- val deleteOldSegments = log.deleteOldSegments()
-
- assertTrue(log.numberOfSegments < segmentsBefore, "Some segments should be
deleted due to size retention")
- assertTrue(deleteOldSegments > 0, "At least one segment should be deleted")
- }
-
- @Test
- def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention(): Unit = {
- val oldTimestamp = mockTime.milliseconds - 20000
- def oldRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes(), timestamp = oldTimestamp)
- val recordSize = oldRecords.sizeInBytes
- val logConfig = LogTestUtils.createLogConfig(
- segmentBytes = recordSize * 2,
- localRetentionMs = 5000,
- cleanupPolicy = "",
- remoteLogStorageEnable = true
- )
- val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
-
- for (_ <- 0 until 10)
- log.appendAsLeader(oldRecords, 0)
-
- def newRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes(), timestamp = mockTime.milliseconds)
- for (_ <- 0 until 5)
- log.appendAsLeader(newRecords, 0)
-
- val segmentsBefore = log.numberOfSegments
-
- log.updateHighWatermark(log.logEndOffset)
- log.updateHighestOffsetInRemoteStorage(log.logEndOffset - 1)
- val deleteOldSegments = log.deleteOldSegments()
-
- assertTrue(log.numberOfSegments < segmentsBefore, "Some segments should be
deleted due to time retention")
- assertTrue(deleteOldSegments > 0, "At least one segment should be deleted")
- }
-
- @Test
- def
shouldDeleteSegmentsReadyToBeDeletedWhenCleanupPolicyIsCompactAndDelete(): Unit
= {
- def createRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes, timestamp = 10L)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionMs = 10000, cleanupPolicy =
"compact,delete")
- val log = createLog(logDir, logConfig)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(1, log.numberOfSegments, "There should be 1 segment
remaining")
- }
-
- @Test
- def shouldDeleteStartOffsetBreachedSegmentsWhenPolicyDoesNotIncludeDelete():
Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes, timestamp = 10L)
- val recordsPerSegment = 5
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * recordsPerSegment, retentionMs = 10000,
cleanupPolicy = "compact")
- val log = createLog(logDir, logConfig, brokerTopicStats)
-
- // append some messages to create some segments
- for (_ <- 0 until 15)
- log.appendAsLeader(createRecords, 0)
-
- // Three segments should be created
- assertEquals(3, log.logSegments.asScala.count(_ => true))
- log.updateHighWatermark(log.logEndOffset)
- log.maybeIncrementLogStartOffset(recordsPerSegment,
LogStartOffsetIncrementReason.ClientRecordDeletion)
-
- // The first segment, which is entirely before the log start offset,
should be deleted
- // Of the remaining the segments, the first can overlap the log start
offset and the rest must have a base offset
- // greater than the start offset
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
- assertEquals(2, log.numberOfSegments, "There should be 2 segments
remaining")
- assertTrue(log.logSegments.asScala.head.baseOffset <= log.logStartOffset)
- assertTrue(log.logSegments.asScala.tail.forall(s => s.baseOffset >
log.logStartOffset))
- }
-
- @Test
- def shouldApplyEpochToMessageOnAppendIfLeader(): Unit = {
- val records = (0 until 50).toArray.map(id => new
SimpleRecord(id.toString.getBytes))
-
- //Given this partition is on leader epoch 72
- val epoch = 72
- val log = createLog(logDir, new LogConfig(new Properties))
- log.assignEpochStartOffset(epoch, records.length)
-
- //When appending messages as a leader (i.e. assignOffsets = true)
- for (record <- records)
- log.appendAsLeader(
- MemoryRecords.withRecords(Compression.NONE, record),
- epoch
- )
-
- //Then leader epoch should be set on messages
- for (i <- records.indices) {
- val read = LogTestUtils.readLog(log, i,
1).records.batches.iterator.next()
- assertEquals(72, read.partitionLeaderEpoch, "Should have set leader
epoch")
- }
- }
-
- @Test
- def
followerShouldSaveEpochInformationFromReplicatedMessagesToTheEpochCache(): Unit
= {
- val messageIds = (0 until 50).toArray
- val records = messageIds.map(id => new SimpleRecord(id.toString.getBytes))
-
- //Given each message has an offset & epoch, as msgs from leader would
- def recordsForEpoch(i: Int): MemoryRecords = {
- val recs = MemoryRecords.withRecords(messageIds(i), Compression.NONE,
records(i))
- recs.batches.forEach{record =>
- record.setPartitionLeaderEpoch(42)
- record.setLastOffset(i)
- }
- recs
- }
-
- val log = createLog(logDir, new LogConfig(new Properties))
-
- //When appending as follower (assignOffsets = false)
- for (i <- records.indices)
- log.appendAsFollower(recordsForEpoch(i), i)
-
- assertEquals(Optional.of(42), log.latestEpoch)
- }
-
- @Test
- def shouldTruncateLeaderEpochsWhenDeletingSegments(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
- val log = createLog(logDir, logConfig)
- val cache = epochCache(log)
-
- // Given three segments of 5 messages each
- for (_ <- 0 until 15) {
- log.appendAsLeader(createRecords, 0)
- }
-
- //Given epochs
- cache.assign(0, 0)
- cache.assign(1, 5)
- cache.assign(2, 10)
-
- //When first segment is removed
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
-
- //The oldest epoch entry should have been removed
- assertEquals(util.List.of(new EpochEntry(1, 5), new EpochEntry(2, 10)),
cache.epochEntries)
- }
-
- @Test
- def shouldUpdateOffsetForLeaderEpochsWhenDeletingSegments(): Unit = {
- def createRecords = TestUtils.singletonRecords("test".getBytes)
- val logConfig = LogTestUtils.createLogConfig(segmentBytes =
createRecords.sizeInBytes * 5, retentionBytes = createRecords.sizeInBytes * 10)
- val log = createLog(logDir, logConfig)
- val cache = epochCache(log)
-
- // Given three segments of 5 messages each
- for (_ <- 0 until 15) {
- log.appendAsLeader(createRecords, 0)
- }
-
- //Given epochs
- cache.assign(0, 0)
- cache.assign(1, 7)
- cache.assign(2, 10)
-
- //When first segment removed (up to offset 5)
- log.updateHighWatermark(log.logEndOffset)
- log.deleteOldSegments()
-
- //The first entry should have gone from (0,0) => (0,5)
- assertEquals(util.List.of(new EpochEntry(0, 5), new EpochEntry(1, 7), new
EpochEntry(2, 10)), cache.epochEntries)
- }
-
- @Test
- def shouldTruncateLeaderEpochCheckpointFileWhenTruncatingLog(): Unit = {
- def createRecords(startOffset: Long, epoch: Int): MemoryRecords = {
- TestUtils.records(Seq(new SimpleRecord("value".getBytes)),
- baseOffset = startOffset, partitionLeaderEpoch = epoch)
- }
-
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 10 *
createRecords(0, 0).sizeInBytes)
- val log = createLog(logDir, logConfig)
- val cache = epochCache(log)
-
- def append(epoch: Int, startOffset: Long, count: Int): Unit = {
- for (i <- 0 until count)
- log.appendAsFollower(createRecords(startOffset + i, epoch), epoch)
- }
-
- //Given 2 segments, 10 messages per segment
- append(epoch = 0, startOffset = 0, count = 10)
- append(epoch = 1, startOffset = 10, count = 6)
- append(epoch = 2, startOffset = 16, count = 4)
-
- assertEquals(2, log.numberOfSegments)
- assertEquals(20, log.logEndOffset)
-
- //When truncate to LEO (no op)
- log.truncateTo(log.logEndOffset)
-
- //Then no change
- assertEquals(3, cache.epochEntries.size)
-
- //When truncate
- log.truncateTo(11)
-
- //Then no change
- assertEquals(2, cache.epochEntries.size)
-
- //When truncate
- log.truncateTo(10)
-
- //Then
- assertEquals(1, cache.epochEntries.size)
-
- //When truncate all
- log.truncateTo(0)
-
- //Then
- assertEquals(0, cache.epochEntries.size)
- }
-
- @Test
- def testFirstUnstableOffsetNoTransactionalData(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- val records = MemoryRecords.withRecords(Compression.NONE,
- new SimpleRecord("foo".getBytes),
- new SimpleRecord("bar".getBytes),
- new SimpleRecord("baz".getBytes))
-
- log.appendAsLeader(records, 0)
- assertEquals(Optional.empty, log.firstUnstableOffset)
- }
-
- @Test
- def testFirstUnstableOffsetWithTransactionalData(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024 *
5)
- val log = createLog(logDir, logConfig)
-
- val pid = 137L
- val epoch = 5.toShort
- var seq = 0
-
- // add some transactional records
- val records = MemoryRecords.withTransactionalRecords(Compression.NONE,
pid, epoch, seq,
- new SimpleRecord("foo".getBytes),
- new SimpleRecord("bar".getBytes),
- new SimpleRecord("baz".getBytes))
-
- val firstAppendInfo = log.appendAsLeader(records, 0)
- assertEquals(Optional.of(firstAppendInfo.firstOffset),
log.firstUnstableOffset)
-
- // add more transactional records
- seq += 3
-
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE,
pid, epoch, seq,
- new SimpleRecord("blah".getBytes)), 0)
-
- // LSO should not have changed
- assertEquals(Optional.of(firstAppendInfo.firstOffset),
log.firstUnstableOffset)
-
- // now transaction is committed
- val commitAppendInfo = LogTestUtils.appendEndTxnMarkerAsLeader(log, pid,
epoch, ControlRecordType.COMMIT,
- mockTime.milliseconds(), transactionVersion =
TransactionVersion.TV_0.featureLevel())
-
- // first unstable offset is not updated until the high watermark is
advanced
- assertEquals(Optional.of(firstAppendInfo.firstOffset),
log.firstUnstableOffset)
- log.updateHighWatermark(commitAppendInfo.lastOffset + 1)
-
- // now there should be no first unstable offset
- assertEquals(Optional.empty, log.firstUnstableOffset)
- }
-
- @Test
- def testReadCommittedWithConcurrentHighWatermarkUpdates(): Unit = {
Review Comment:
restore this test due to it is not in this migrate plan.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]