FrankYang0529 commented on code in PR #21763:
URL: https://github.com/apache/kafka/pull/21763#discussion_r2945677851


##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -92,676 +91,6 @@ class UnifiedLogTest {
     }
   }
 
-  @Test
-  def testRetentionIdempotency(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, retentionMs = 900, fileDeleteDelayMs = 0)
-    val log = createLog(logDir, logConfig)
-
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds() + 100, "a".getBytes))), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds(), "b".getBytes))), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new 
SimpleRecord(mockTime.milliseconds() + 100, "c".getBytes))), 0)
-
-    mockTime.sleep(901)
-
-    log.updateHighWatermark(log.logEndOffset)
-    log.maybeIncrementLogStartOffset(1L, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    assertEquals(2, log.deleteOldSegments(),
-      "Expecting two segment deletions as log start offset retention should 
unblock time based retention")
-    assertEquals(0, log.deleteOldSegments())
-  }
-
-
-  @Test
-  def testLogStartOffsetMovementDeletesSnapshots(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
retentionBytes = -1, fileDeleteDelayMs = 0)
-    val log = createLog(logDir, logConfig)
-    val pid1 = 1L
-    val epoch = 0.toShort
-
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes)), 
producerId = pid1,
-      producerEpoch = epoch, sequence = 0), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("b".getBytes)), 
producerId = pid1,
-      producerEpoch = epoch, sequence = 1), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("c".getBytes)), 
producerId = pid1,
-      producerEpoch = epoch, sequence = 2), 0)
-    log.updateHighWatermark(log.logEndOffset)
-    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
-
-    // Increment the log start offset to exclude the first two segments.
-    log.maybeIncrementLogStartOffset(log.logEndOffset - 1, 
LogStartOffsetIncrementReason.ClientRecordDeletion)
-    assertTrue(log.deleteOldSegments > 0, "At least one segment should be 
deleted")
-    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
-    mockTime.sleep(1)
-    assertEquals(1, ProducerStateManager.listSnapshotFiles(logDir).size,
-      "expect a single producer state snapshot remaining")
-  }
-
-  @Test
-  def testCompactionDeletesProducerStateSnapshots(): Unit = {
-    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5, 
cleanupPolicy = TopicConfig.CLEANUP_POLICY_COMPACT, fileDeleteDelayMs = 0)
-    val log = createLog(logDir, logConfig)
-    val pid1 = 1L
-    val epoch = 0.toShort
-    val cleaner = new Cleaner(0,
-      new FakeOffsetMap(Int.MaxValue),
-      64 * 1024,
-      64 * 1024,
-      0.75,
-      new Throttler(Double.MaxValue, Long.MaxValue, "throttler", "entries", 
mockTime),
-      mockTime,
-      tp => {})
-
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"a".getBytes())), producerId = pid1,
-      producerEpoch = epoch, sequence = 0), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"b".getBytes())), producerId = pid1,
-      producerEpoch = epoch, sequence = 1), 0)
-    log.roll()
-    log.appendAsLeader(TestUtils.records(List(new SimpleRecord("a".getBytes, 
"c".getBytes())), producerId = pid1,
-      producerEpoch = epoch, sequence = 2), 0)
-    log.updateHighWatermark(log.logEndOffset)
-    
assertEquals(log.logSegments.asScala.map(_.baseOffset).toSeq.sorted.drop(1), 
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted,
-      "expected a snapshot file per segment base offset, except the first 
segment")
-    assertEquals(2, ProducerStateManager.listSnapshotFiles(logDir).size)
-
-    // Clean segments, this should delete everything except the active segment 
since there only
-    // exists the key "a".
-    cleaner.clean(new LogToClean(log, 0, log.logEndOffset, false))
-    // There is no other key so we don't delete anything
-    assertEquals(0, log.deleteOldSegments())
-    // Sleep to breach the file delete delay and run scheduled file deletion 
tasks
-    mockTime.sleep(1)
-    
assertEquals(log.logSegments.asScala.map(_.baseOffset).toSeq.sorted.drop(1), 
ProducerStateManager.listSnapshotFiles(logDir).asScala.map(_.offset).sorted,
-      "expected a snapshot file per segment base offset, excluding the first")
-  }
-
-  /**
-   * After loading the log, producer state is truncated such that there are no 
producer state snapshot files which
-   * exceed the log end offset. This test verifies that these are removed.
-   */

Review Comment:
   Could you also move this to new file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to