FrankYang0529 commented on code in PR #21763:
URL: https://github.com/apache/kafka/pull/21763#discussion_r2945677851
##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -92,676 +91,6 @@ class UnifiedLogTest {
}
}
- @Test
- def testRetentionIdempotency(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = -1, retentionMs = 900, fileDeleteDelayMs = 0)
- val log = createLog(logDir, logConfig)
-
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes))), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds(), "b".getBytes))), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new
SimpleRecord(mockTime.milliseconds() + 100, "c".getBytes))), 0)
-
- mockTime.sleep(901)
-
- log.updateHighWatermark(log.logEndOffset)
- log.maybeIncrementLogStartOffset(1L,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- assertEquals(2, log.deleteOldSegments(),
- "Expecting two segment deletions as log start offset retention should
unblock time based retention")
- assertEquals(0, log.deleteOldSegments())
- }
-
-
- @Test
- def testLogStartOffsetMovementDeletesSnapshots(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
retentionBytes = -1, fileDeleteDelayMs = 0)
- val log = createLog(logDir, logConfig)
- val pid1 = 1L
- val epoch = 0.toShort
-
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)),
producerId = pid1,
- producerEpoch = epoch, sequence = 0), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)),
producerId = pid1,
- producerEpoch = epoch, sequence = 1), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)),
producerId = pid1,
- producerEpoch = epoch, sequence = 2), 0)
- log.updateHighWatermark(log.logEndOffset)
- assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
-
- // Increment the log start offset to exclude the first two segments.
- log.maybeIncrementLogStartOffset(log.logEndOffset - 1,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- assertTrue(log.deleteOldSegments > 0, "At least one segment should be
deleted")
- // Sleep to breach the file delete delay and run scheduled file deletion
tasks
- mockTime.sleep(1)
- assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size,
- "expect a single producer state snapshot remaining")
- }
-
- @Test
- def testCompactionDeletesProducerStateSnapshots(): Unit = {
- val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5,
cleanupPolicy = TopicConfig.CLEANUP_POLICY_COMPACT, fileDeleteDelayMs = 0)
- val log = createLog(logDir, logConfig)
- val pid1 = 1L
- val epoch = 0.toShort
- val cleaner = new Cleaner(0,
- new FakeOffsetMap(Int.MaxValue),
- 64 * 1024,
- 64 * 1024,
- 0.75,
- new Throttler(Double.MaxValue, Long.MaxValue, "throttler", "entries",
mockTime),
- mockTime,
- tp => {})
-
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes,
"a".getBytes())), producerId = pid1,
- producerEpoch = epoch, sequence = 0), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes,
"b".getBytes())), producerId = pid1,
- producerEpoch = epoch, sequence = 1), 0)
- log.roll()
- log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes,
"c".getBytes())), producerId = pid1,
- producerEpoch = epoch, sequence = 2), 0)
- log.updateHighWatermark(log.logEndOffset)
-
assertEquals(log.logSegments.asScala.map(_.baseOffset).toSeq.sorted.drop(1),
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted,
- "expected a snapshot file per segment base offset, except the first
segment")
- assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
-
- // Clean segments, this should delete everything except the active segment
since there only
- // exists the key "a".
- cleaner.clean(new LogToClean(log, 0, log.logEndOffset, false))
- // There is no other key so we don't delete anything
- assertEquals(0, log.deleteOldSegments())
- // Sleep to breach the file delete delay and run scheduled file deletion
tasks
- mockTime.sleep(1)
-
assertEquals(log.logSegments.asScala.map(_.baseOffset).toSeq.sorted.drop(1),
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted,
- "expected a snapshot file per segment base offset, excluding the first")
- }
-
- /**
- * After loading the log, producer state is truncated such that there are no
producer state snapshot files which
- * exceed the log end offset. This test verifies that these are removed.
- */
Review Comment:
Could you also move this to new file?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]