This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 24994ef231f KAFKA-20084 Move LogCleanerManagerTest to storage module
(#21343)
24994ef231f is described below
commit 24994ef231fe12cd18604125e5e7c27dd34847ff
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Jan 29 13:24:57 2026 +0500
KAFKA-20084 Move LogCleanerManagerTest to storage module (#21343)
LogCleanerManagerTest has been moved to the storage module and rewritten
in Java.
Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../unit/kafka/log/LogCleanerManagerTest.scala | 901 --------------------
.../storage/internals/log/LogCleanerManager.java | 17 +-
.../internals/log/LogCleanerManagerTest.java | 928 +++++++++++++++++++++
.../kafka/storage/internals/log/LogTestUtils.java | 41 +
4 files changed, 977 insertions(+), 910 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
deleted file mode 100644
index 0045551d109..00000000000
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ /dev/null
@@ -1,901 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.io.File
-import java.nio.file.Files
-import java.util.{Optional, Properties}
-import kafka.utils._
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.coordinator.transaction.TransactionLogConfig
-import org.apache.kafka.server.util.MockTime
-import
org.apache.kafka.storage.internals.log.LogCleaningState.{LOG_CLEANING_ABORTED,
LOG_CLEANING_IN_PROGRESS}
-import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LocalLog,
LogCleanerManager, LogCleaningException, LogCleaningState, LogConfig,
LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegment, LogSegments,
LogStartOffsetIncrementReason, LogToClean, PreCleanStats, ProducerStateManager,
ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
-import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, Test}
-
-import java.lang.{Long => JLong}
-import java.util
-import java.util.concurrent.ConcurrentHashMap
-import java.util.stream.Collectors
-import scala.jdk.OptionConverters.RichOptional
-
-/**
- * Unit tests for the log cleaning logic
- */
-class LogCleanerManagerTest extends Logging {
-
- val tmpDir: File = TestUtils.tempDir()
- val tmpDir2: File = TestUtils.tempDir()
- val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
- val logDir2: File = TestUtils.randomPartitionLogDir(tmpDir2)
- val topicPartition = new TopicPartition("log", 0)
- val topicPartition2 = new TopicPartition("log2", 0)
- val logProps = new Properties()
- logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024:
java.lang.Integer)
- logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024: java.lang.Integer)
- logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT)
- val logConfig: LogConfig = new LogConfig(logProps)
- val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC
2014 for `currentTimeMs`
- val offset = 999
- val producerStateManagerConfig = new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false)
-
- val cleanerCheckpoints: util.HashMap[TopicPartition, JLong] = new
util.HashMap[TopicPartition, JLong]()
-
- class LogCleanerManagerMock(logDirs: util.List[File],
- logs:
util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog],
- logDirFailureChannel: LogDirFailureChannel)
extends LogCleanerManager(logDirs, logs, logDirFailureChannel) {
- override def allCleanerCheckpoints: util.Map[TopicPartition, JLong] = {
- cleanerCheckpoints
- }
-
- override def updateCheckpoints(dataDir: File, partitionToUpdateOrAdd:
Optional[util.Map.Entry[TopicPartition, JLong]],
- partitionToRemove:
Optional[TopicPartition]): Unit = {
- assert(partitionToRemove.isEmpty, "partitionToRemove argument with value
not yet handled")
- val entry = partitionToUpdateOrAdd.orElseThrow(() =>
- new IllegalArgumentException("partitionToUpdateOrAdd==None argument
not yet handled"))
- cleanerCheckpoints.put(entry.getKey, entry.getValue)
- }
- }
-
- @AfterEach
- def tearDown(): Unit = {
- Utils.delete(tmpDir)
- }
-
- private def setupIncreasinglyFilthyLogs(partitions: Seq[TopicPartition],
- startNumBatches: Int,
- batchIncrement: Int):
util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog] = {
- val logs = new util.concurrent.ConcurrentHashMap[TopicPartition,
UnifiedLog]()
- var numBatches = startNumBatches
-
- for (tp <- partitions) {
- val log = createLog(2048, TopicConfig.CLEANUP_POLICY_COMPACT,
topicPartition = tp)
- logs.put(tp, log)
-
- writeRecords(log, numBatches = numBatches, recordsPerBatch = 1,
batchesPerSegment = 5)
- numBatches += batchIncrement
- }
- logs
- }
-
- @Test
- def testGrabFilthiestCompactedLogThrowsException(): Unit = {
- val tp = new TopicPartition("A", 1)
- val logSegmentSize =
TestUtils.singletonRecords("test".getBytes).sizeInBytes * 10
- val logSegmentsCount = 2
- val tpDir = new File(logDir, "A-1")
- Files.createDirectories(tpDir.toPath)
- val logDirFailureChannel = new LogDirFailureChannel(10)
- val config = createLowRetentionLogConfig(logSegmentSize,
TopicConfig.CLEANUP_POLICY_COMPACT)
- val maxTransactionTimeoutMs = 5 * 60 * 1000
- val producerIdExpirationCheckIntervalMs =
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT
- val segments = new LogSegments(tp)
- val leaderEpochCache = UnifiedLog.createLeaderEpochCache(
- tpDir, topicPartition, logDirFailureChannel, Optional.empty,
time.scheduler)
- val producerStateManager = new ProducerStateManager(topicPartition, tpDir,
maxTransactionTimeoutMs, producerStateManagerConfig, time)
- val offsets = new LogLoader(
- tpDir,
- tp,
- config,
- time.scheduler,
- time,
- logDirFailureChannel,
- true,
- segments,
- 0L,
- 0L,
- leaderEpochCache,
- producerStateManager,
- new ConcurrentHashMap[String, Integer],
- false
- ).load()
- val localLog = new LocalLog(tpDir, config, segments, offsets.recoveryPoint,
- offsets.nextOffsetMetadata, time.scheduler, time, tp,
logDirFailureChannel)
- // the exception should be caught and the partition that caused it marked
as uncleanable
- class LogMock extends UnifiedLog(offsets.logStartOffset, localLog, new
BrokerTopicStats,
- producerIdExpirationCheckIntervalMs, leaderEpochCache,
- producerStateManager, Optional.empty, false,
LogOffsetsListener.NO_OP_OFFSETS_LISTENER) {
- // Throw an error in getFirstBatchTimestampForSegments since it is
called in grabFilthiestLog()
- override def getFirstBatchTimestampForSegments(segments:
util.Collection[LogSegment]): util.Collection[java.lang.Long] =
- throw new IllegalStateException("Error!")
- }
-
- val log: UnifiedLog = new LogMock()
- writeRecords(log = log,
- numBatches = logSegmentsCount * 2,
- recordsPerBatch = 10,
- batchesPerSegment = 2
- )
-
- val logsPool = new util.concurrent.ConcurrentHashMap[TopicPartition,
UnifiedLog]()
- logsPool.put(tp, log)
- val cleanerManager = createCleanerManagerMock(logsPool)
- cleanerCheckpoints.put(tp, 1)
-
- val thrownException = assertThrows(classOf[LogCleaningException], () =>
cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get)
- assertEquals(log, thrownException.log)
- assertTrue(thrownException.getCause.isInstanceOf[IllegalStateException])
- }
-
- @Test
- def testGrabFilthiestCompactedLogReturnsLogWithDirtiestRatio(): Unit = {
- val tp0 = new TopicPartition("wishing-well", 0)
- val tp1 = new TopicPartition("wishing-well", 1)
- val tp2 = new TopicPartition("wishing-well", 2)
- val partitions = Seq(tp0, tp1, tp2)
-
- // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
- val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20,
batchIncrement = 5)
- val cleanerManager = createCleanerManagerMock(logs)
- partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
-
- val filthiestLog: LogToClean =
cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
- assertEquals(tp2, filthiestLog.topicPartition)
- assertEquals(tp2, filthiestLog.log.topicPartition)
- }
-
- @Test
- def testGrabFilthiestCompactedLogIgnoresUncleanablePartitions(): Unit = {
- val tp0 = new TopicPartition("wishing-well", 0)
- val tp1 = new TopicPartition("wishing-well", 1)
- val tp2 = new TopicPartition("wishing-well", 2)
- val partitions = Seq(tp0, tp1, tp2)
-
- // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
- val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20,
batchIncrement = 5)
- val cleanerManager = createCleanerManagerMock(logs)
- partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
-
- cleanerManager.markPartitionUncleanable(logs.get(tp2).dir.getParent, tp2)
-
- val filthiestLog: LogToClean =
cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
- assertEquals(tp1, filthiestLog.topicPartition)
- assertEquals(tp1, filthiestLog.log.topicPartition)
- }
-
- @Test
- def testGrabFilthiestCompactedLogIgnoresInProgressPartitions(): Unit = {
- val tp0 = new TopicPartition("wishing-well", 0)
- val tp1 = new TopicPartition("wishing-well", 1)
- val tp2 = new TopicPartition("wishing-well", 2)
- val partitions = Seq(tp0, tp1, tp2)
-
- // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
- val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20,
batchIncrement = 5)
- val cleanerManager = createCleanerManagerMock(logs)
- partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
-
- cleanerManager.setCleaningState(tp2, LOG_CLEANING_IN_PROGRESS)
-
- val filthiestLog: LogToClean =
cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats()).get
- assertEquals(tp1, filthiestLog.topicPartition)
- assertEquals(tp1, filthiestLog.log.topicPartition)
- }
-
- @Test
- def
testGrabFilthiestCompactedLogIgnoresBothInProgressPartitionsAndUncleanablePartitions():
Unit = {
- val tp0 = new TopicPartition("wishing-well", 0)
- val tp1 = new TopicPartition("wishing-well", 1)
- val tp2 = new TopicPartition("wishing-well", 2)
- val partitions = Seq(tp0, tp1, tp2)
-
- // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
- val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20,
batchIncrement = 5)
- val cleanerManager = createCleanerManagerMock(logs)
- partitions.foreach(partition => cleanerCheckpoints.put(partition, 20))
-
- cleanerManager.setCleaningState(tp2, LOG_CLEANING_IN_PROGRESS)
- cleanerManager.markPartitionUncleanable(logs.get(tp1).dir.getParent, tp1)
-
- val filthiestLog: Optional[LogToClean] =
cleanerManager.grabFilthiestCompactedLog(time, new PreCleanStats())
- assertEquals(Optional.empty(), filthiestLog)
- }
-
- @Test
- def testDirtyOffsetResetIfLargerThanEndOffset(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20,
batchIncrement = 5)
- val cleanerManager = createCleanerManagerMock(logs)
- cleanerCheckpoints.put(tp, 200)
-
- val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new
PreCleanStats()).get
- assertEquals(0L, filthiestLog.firstDirtyOffset)
- }
-
- @Test
- def testDirtyOffsetResetIfSmallerThanStartOffset(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20,
batchIncrement = 5)
-
- logs.get(tp).maybeIncrementLogStartOffset(10L,
LogStartOffsetIncrementReason.ClientRecordDeletion)
-
- val cleanerManager = createCleanerManagerMock(logs)
- cleanerCheckpoints.put(tp, 0L)
-
- val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new
PreCleanStats()).get
- assertEquals(10L, filthiestLog.firstDirtyOffset)
- }
-
- @Test
- def testLogStartOffsetLargerThanActiveSegmentBaseOffset(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val log = createLog(segmentSize = 2048,
TopicConfig.CLEANUP_POLICY_COMPACT, tp)
-
- val logs = new util.concurrent.ConcurrentHashMap[TopicPartition,
UnifiedLog]()
- logs.put(tp, log)
-
- appendRecords(log, numRecords = 3)
- appendRecords(log, numRecords = 3)
- appendRecords(log, numRecords = 3)
-
- assertEquals(1, log.logSegments.size)
-
- log.maybeIncrementLogStartOffset(2L,
LogStartOffsetIncrementReason.ClientRecordDeletion)
-
- val cleanerManager = createCleanerManagerMock(logs)
- cleanerCheckpoints.put(tp, 0L)
-
- // The active segment is uncleanable and hence not filthy from the POV of
the CleanerManager.
- val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new
PreCleanStats())
- assertEquals(Optional.empty(), filthiestLog)
- }
-
- @Test
- def testDirtyOffsetLargerThanActiveSegmentBaseOffset(): Unit = {
- // It is possible in the case of an unclean leader election for the
checkpoint
- // dirty offset to get ahead of the active segment base offset, but still
be
- // within the range of the log.
-
- val tp = new TopicPartition("foo", 0)
-
- val logs = new util.concurrent.ConcurrentHashMap[TopicPartition,
UnifiedLog]()
- val log = createLog(2048, TopicConfig.CLEANUP_POLICY_COMPACT,
topicPartition = tp)
- logs.put(tp, log)
-
- appendRecords(log, numRecords = 3)
- appendRecords(log, numRecords = 3)
-
- assertEquals(1, log.logSegments.size)
- assertEquals(0L, log.activeSegment.baseOffset)
-
- val cleanerManager = createCleanerManagerMock(logs)
- cleanerCheckpoints.put(tp, 3L)
-
- // These segments are uncleanable and hence not filthy
- val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new
PreCleanStats())
- assertEquals(Optional.empty(), filthiestLog)
- }
-
- /**
- * When checking for logs with segments ready for deletion
- * we shouldn't consider logs where cleanup.policy=delete
- * as they are handled by the LogManager
- */
- @Test
- def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs():
Unit = {
- val records = TestUtils.singletonRecords("test".getBytes)
- val log: UnifiedLog = createLog(records.sizeInBytes * 5,
TopicConfig.CLEANUP_POLICY_DELETE)
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
- val readyToDelete = cleanerManager.deletableLogs().size
- assertEquals(0, readyToDelete, "should have 0 logs ready to be deleted")
- }
-
- /**
- * We should find logs with segments ready to be deleted when
cleanup.policy=compact,delete
- */
- @Test
- def
testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactDeleteLogs():
Unit = {
- val records = TestUtils.singletonRecords("test".getBytes,
key="test".getBytes)
- val log: UnifiedLog = createLog(records.sizeInBytes * 5,
TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
- val readyToDelete = cleanerManager.deletableLogs().size
- assertEquals(1, readyToDelete, "should have 1 logs ready to be deleted")
- }
-
- /**
- * When looking for logs with segments ready to be deleted we should
consider
- * logs with cleanup.policy=compact because they may have segments from
before the log start offset
- */
- @Test
- def testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactLogs():
Unit = {
- val records = TestUtils.singletonRecords("test".getBytes,
key="test".getBytes)
- val log: UnifiedLog = createLog(records.sizeInBytes * 5,
TopicConfig.CLEANUP_POLICY_COMPACT)
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
- val readyToDelete = cleanerManager.deletableLogs().size
- assertEquals(1, readyToDelete, "should have 1 logs ready to be deleted")
- }
-
- /**
- * log under cleanup should be ineligible for compaction
- */
- @Test
- def testLogsUnderCleanupIneligibleForCompaction(): Unit = {
- val records = TestUtils.singletonRecords("test".getBytes,
key="test".getBytes)
- val log: UnifiedLog = createLog(records.sizeInBytes * 5,
TopicConfig.CLEANUP_POLICY_DELETE)
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
- log.appendAsLeader(records, 0)
- log.roll()
- log.appendAsLeader(TestUtils.singletonRecords("test2".getBytes,
key="test2".getBytes), 0)
- log.updateHighWatermark(2L)
-
- // simulate cleanup thread working on the log partition
- val deletableLog = cleanerManager.pauseCleaningForNonCompactedPartitions()
- assertEquals(1, deletableLog.size, "should have 1 logs ready to be
deleted")
-
- // change cleanup policy from delete to compact
- val logProps = new Properties()
- logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG,
log.config.segmentSize(): Integer)
- logProps.put(TopicConfig.RETENTION_MS_CONFIG, log.config.retentionMs:
java.lang.Long)
- logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT)
- logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0: Integer)
- val config = new LogConfig(logProps)
- log.updateConfig(config)
-
- // log cleanup inprogress, the log is not available for compaction
- val cleanable = cleanerManager.grabFilthiestCompactedLog(time, new
PreCleanStats()).toScala
- assertEquals(0, cleanable.size, "should have 0 logs ready to be compacted")
-
- // log cleanup finished, and log can be picked up for compaction
- cleanerManager.resumeCleaning(
- deletableLog.stream()
- .map[TopicPartition](entry => entry.getKey)
- .collect(Collectors.toSet[TopicPartition]())
- )
- val cleanable2 = cleanerManager.grabFilthiestCompactedLog(time, new
PreCleanStats()).toScala
- assertEquals(1, cleanable2.size, "should have 1 logs ready to be
compacted")
-
- // update cleanup policy to delete
- logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE)
- val config2 = new LogConfig(logProps)
- log.updateConfig(config2)
-
- // compaction in progress, should have 0 log eligible for log cleanup
- val deletableLog2 = cleanerManager.pauseCleaningForNonCompactedPartitions()
- assertEquals(0, deletableLog2.size, "should have 0 logs ready to be
deleted")
-
- // compaction done, should have 1 log eligible for log cleanup
- cleanerManager.doneDeleting(util.List.of(cleanable2.get.topicPartition))
- val deletableLog3 = cleanerManager.pauseCleaningForNonCompactedPartitions()
- assertEquals(1, deletableLog3.size, "should have 1 logs ready to be
deleted")
- }
-
- @Test
- def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
- val records = TestUtils.singletonRecords("test".getBytes,
key="test".getBytes)
- val log: UnifiedLog = createLog(records.sizeInBytes * 5,
TopicConfig.CLEANUP_POLICY_COMPACT)
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
- // expect the checkpoint offset is not the expectedOffset before doing
updateCheckpoints
- assertNotEquals(offset,
cleanerManager.allCleanerCheckpoints.getOrDefault(topicPartition, 0))
-
- cleanerManager.updateCheckpoints(logDir,
Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
- // expect the checkpoint offset is now updated to the expected offset
after doing updateCheckpoints
- assertEquals(offset,
cleanerManager.allCleanerCheckpoints.get(topicPartition))
- }
-
- @Test
- def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
- val records = TestUtils.singletonRecords("test".getBytes,
key="test".getBytes)
- val log: UnifiedLog = createLog(records.sizeInBytes * 5,
TopicConfig.CLEANUP_POLICY_COMPACT)
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
- // write some data into the cleaner-offset-checkpoint file
- cleanerManager.updateCheckpoints(logDir,
Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
- assertEquals(offset,
cleanerManager.allCleanerCheckpoints.get(topicPartition))
-
- // updateCheckpoints should remove the topicPartition data in the logDir
- cleanerManager.updateCheckpoints(logDir, Optional.empty(),
Optional.of(topicPartition))
-
assertFalse(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
- }
-
- @Test
- def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
- val records = TestUtils.singletonRecords("test".getBytes,
key="test".getBytes)
- val log: UnifiedLog = createLog(records.sizeInBytes * 5,
TopicConfig.CLEANUP_POLICY_COMPACT)
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
- // write some data into the cleaner-offset-checkpoint file in logDir and
logDir2
- cleanerManager.updateCheckpoints(logDir,
Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
- cleanerManager.updateCheckpoints(logDir2,
Optional.of(util.Map.entry(topicPartition2, offset)), Optional.empty())
- assertEquals(offset,
cleanerManager.allCleanerCheckpoints.get(topicPartition))
- assertEquals(offset,
cleanerManager.allCleanerCheckpoints.get(topicPartition2))
-
- cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
- // verify the partition data in logDir is gone, and data in logDir2 is
still there
- assertEquals(offset,
cleanerManager.allCleanerCheckpoints.get(topicPartition2))
-
assertFalse(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
- }
-
- @Test
- def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
- val records = TestUtils.singletonRecords("test".getBytes,
key="test".getBytes)
- val log: UnifiedLog = createLog(records.sizeInBytes * 5,
TopicConfig.CLEANUP_POLICY_COMPACT)
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
- val lowerOffset = 1L
- val higherOffset = 1000L
-
- // write some data into the cleaner-offset-checkpoint file in logDir
- cleanerManager.updateCheckpoints(logDir,
Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
- assertEquals(offset,
cleanerManager.allCleanerCheckpoints.get(topicPartition))
-
- // we should not truncate the checkpoint data for checkpointed offset <=
the given offset (higherOffset)
- cleanerManager.maybeTruncateCheckpoint(logDir, topicPartition,
higherOffset)
- assertEquals(offset,
cleanerManager.allCleanerCheckpoints.get(topicPartition))
- // we should truncate the checkpoint data for checkpointed offset > the
given offset (lowerOffset)
- cleanerManager.maybeTruncateCheckpoint(logDir, topicPartition, lowerOffset)
- assertEquals(lowerOffset,
cleanerManager.allCleanerCheckpoints.get(topicPartition))
- }
-
- @Test
- def testAlterCheckpointDirShouldRemoveDataInSrcDirAndAddInNewDir(): Unit = {
- val records = TestUtils.singletonRecords("test".getBytes,
key="test".getBytes)
- val log: UnifiedLog = createLog(records.sizeInBytes * 5,
TopicConfig.CLEANUP_POLICY_COMPACT)
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
- // write some data into the cleaner-offset-checkpoint file in logDir
- cleanerManager.updateCheckpoints(logDir,
Optional.of(util.Map.entry(topicPartition, offset)), Optional.empty())
- assertEquals(offset,
cleanerManager.allCleanerCheckpoints.get(topicPartition))
-
- cleanerManager.alterCheckpointDir(topicPartition, logDir, logDir2)
- // verify we still can get the partition offset after alterCheckpointDir
- // This data should locate in logDir2, not logDir
- assertEquals(offset,
cleanerManager.allCleanerCheckpoints.get(topicPartition))
-
- // force delete the logDir2 from checkpoints, so that the partition data
should also be deleted
- cleanerManager.handleLogDirFailure(logDir2.getAbsolutePath)
-
assertFalse(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
- }
-
- /**
- * log under cleanup should still be eligible for log truncation
- */
- @Test
- def testConcurrentLogCleanupAndLogTruncation(): Unit = {
- val records = TestUtils.singletonRecords("test".getBytes,
key="test".getBytes)
- val log: UnifiedLog = createLog(records.sizeInBytes * 5,
TopicConfig.CLEANUP_POLICY_DELETE)
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
- // log cleanup starts
- val pausedPartitions =
cleanerManager.pauseCleaningForNonCompactedPartitions()
- // Log truncation happens due to unclean leader election
- cleanerManager.abortAndPauseCleaning(log.topicPartition)
- cleanerManager.resumeCleaning(util.Set.of(log.topicPartition))
- // log cleanup finishes and pausedPartitions are resumed
- cleanerManager.resumeCleaning(
- pausedPartitions.stream()
- .map[TopicPartition](entry => entry.getKey)
- .collect(Collectors.toSet[TopicPartition]())
- )
-
- assertEquals(Optional.empty(),
cleanerManager.cleaningState(log.topicPartition))
- }
-
- /**
- * log under cleanup should still be eligible for topic deletion
- */
- @Test
- def testConcurrentLogCleanupAndTopicDeletion(): Unit = {
- val records = TestUtils.singletonRecords("test".getBytes, key =
"test".getBytes)
- val log: UnifiedLog = createLog(records.sizeInBytes * 5,
TopicConfig.CLEANUP_POLICY_DELETE)
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
- // log cleanup starts
- val pausedPartitions =
cleanerManager.pauseCleaningForNonCompactedPartitions()
- // Broker processes StopReplicaRequest with delete=true
- cleanerManager.abortCleaning(log.topicPartition)
- // log cleanup finishes and pausedPartitions are resumed
- cleanerManager.resumeCleaning(
- pausedPartitions.stream()
- .map[TopicPartition](entry => entry.getKey)
- .collect(Collectors.toSet[TopicPartition]())
- )
-
- assertEquals(Optional.empty(),
cleanerManager.cleaningState(log.topicPartition))
- }
-
- /**
- * When looking for logs with segments ready to be deleted we shouldn't
consider
- * logs that have had their partition marked as uncleanable.
- */
- @Test
- def testLogsWithSegmentsToDeleteShouldNotConsiderUncleanablePartitions():
Unit = {
- val records = TestUtils.singletonRecords("test".getBytes,
key="test".getBytes)
- val log: UnifiedLog = createLog(records.sizeInBytes * 5,
TopicConfig.CLEANUP_POLICY_COMPACT)
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
- cleanerManager.markPartitionUncleanable(log.dir.getParent, topicPartition)
-
- val readyToDelete = cleanerManager.deletableLogs().size
- assertEquals(0, readyToDelete, "should have 0 logs ready to be deleted")
- }
-
- /**
- * Test computation of cleanable range with no minimum compaction lag
settings active where bounded by LSO
- */
- @Test
- def testCleanableOffsetsForNone(): Unit = {
- val logProps = new Properties()
- logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024:
java.lang.Integer)
-
- val log = makeLog(config = LogConfig.fromProps(logConfig.originals,
logProps))
-
- while (log.numberOfSegments < 8)
- log.appendAsLeader(records(log.logEndOffset.toInt,
log.logEndOffset.toInt, time.milliseconds()), 0)
-
- log.updateHighWatermark(50)
-
- val lastCleanOffset = Optional.of(0L.asInstanceOf[JLong])
- val cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
lastCleanOffset, time.milliseconds)
- assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable
offset starts at the beginning of the log.")
- assertEquals(log.highWatermark, log.lastStableOffset, "The high watermark
equals the last stable offset as no transactions are in progress")
- assertEquals(log.lastStableOffset,
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset is
bounded by the last stable offset.")
- }
-
- /**
- * Test computation of cleanable range with no minimum compaction lag
settings active where bounded by active segment
- */
- @Test
- def testCleanableOffsetsActiveSegment(): Unit = {
- val logProps = new Properties()
- logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024:
java.lang.Integer)
-
- val log = makeLog(config = LogConfig.fromProps(logConfig.originals,
logProps))
-
- while (log.numberOfSegments < 8)
- log.appendAsLeader(records(log.logEndOffset.toInt,
log.logEndOffset.toInt, time.milliseconds()), 0)
-
- log.updateHighWatermark(log.logEndOffset)
-
- val lastCleanOffset = Optional.of(0L.asInstanceOf[JLong])
- val cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
lastCleanOffset, time.milliseconds)
- assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable
offset starts at the beginning of the log.")
- assertEquals(log.activeSegment.baseOffset,
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset
begins with the active segment.")
- }
-
- /**
- * Test computation of cleanable range with a minimum compaction lag time
- */
- @Test
- def testCleanableOffsetsForTime(): Unit = {
- val compactionLag = 60 * 60 * 1000
- val logProps = new Properties()
- logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024:
java.lang.Integer)
- logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag:
java.lang.Integer)
-
- val log = makeLog(config = LogConfig.fromProps(logConfig.originals,
logProps))
-
- val t0 = time.milliseconds
- while (log.numberOfSegments < 4)
- log.appendAsLeader(records(log.logEndOffset.toInt,
log.logEndOffset.toInt, t0), 0)
-
- val activeSegAtT0 = log.activeSegment
-
- time.sleep(compactionLag + 1)
- val t1 = time.milliseconds
-
- while (log.numberOfSegments < 8)
- log.appendAsLeader(records(log.logEndOffset.toInt,
log.logEndOffset.toInt, t1), 0)
-
- log.updateHighWatermark(log.logEndOffset)
-
- val lastCleanOffset = Optional.of(0L.asInstanceOf[JLong])
- val cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
lastCleanOffset, time.milliseconds)
- assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable
offset starts at the beginning of the log.")
- assertEquals(activeSegAtT0.baseOffset,
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset
begins with the second block of log entries.")
- }
-
- /**
- * Test computation of cleanable range with a minimum compaction lag time
that is small enough that
- * the active segment contains it.
- */
- @Test
- def testCleanableOffsetsForShortTime(): Unit = {
- val compactionLag = 60 * 60 * 1000
- val logProps = new Properties()
- logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024:
java.lang.Integer)
- logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag:
java.lang.Integer)
-
- val log = makeLog(config = LogConfig.fromProps(logConfig.originals,
logProps))
-
- val t0 = time.milliseconds
- while (log.numberOfSegments < 8)
- log.appendAsLeader(records(log.logEndOffset.toInt,
log.logEndOffset.toInt, t0), 0)
-
- log.updateHighWatermark(log.logEndOffset)
-
- time.sleep(compactionLag + 1)
-
- val lastCleanOffset = Optional.of(0L.asInstanceOf[JLong])
- val cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
lastCleanOffset, time.milliseconds)
- assertEquals(0L, cleanableOffsets.firstDirtyOffset, "The first cleanable
offset starts at the beginning of the log.")
- assertEquals(log.activeSegment.baseOffset,
cleanableOffsets.firstUncleanableDirtyOffset, "The first uncleanable offset
begins with active segment.")
- }
-
- @Test
- def testCleanableOffsetsNeedsCheckpointReset(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20,
batchIncrement = 5)
- logs.get(tp).maybeIncrementLogStartOffset(10L,
LogStartOffsetIncrementReason.ClientRecordDeletion)
-
- var lastCleanOffset = Optional.of(15L.asInstanceOf[JLong])
- var cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp),
lastCleanOffset, time.milliseconds)
- assertFalse(cleanableOffsets.forceUpdateCheckpoint, "Checkpoint offset
should not be reset if valid")
-
- logs.get(tp).maybeIncrementLogStartOffset(20L,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp),
lastCleanOffset, time.milliseconds)
- assertTrue(cleanableOffsets.forceUpdateCheckpoint, "Checkpoint offset
needs to be reset if less than log start offset")
-
- lastCleanOffset = Optional.of(25L)
- cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp),
lastCleanOffset, time.milliseconds)
- assertTrue(cleanableOffsets.forceUpdateCheckpoint, "Checkpoint offset
needs to be reset if greater than log end offset")
- }
-
- @Test
- def testUndecidedTransactionalDataNotCleanable(): Unit = {
- val compactionLag = 60 * 60 * 1000
- val logProps = new Properties()
- logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024:
java.lang.Integer)
- logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag:
java.lang.Integer)
-
- val log = makeLog(config = LogConfig.fromProps(logConfig.originals,
logProps))
-
- val producerId = 15L
- val producerEpoch = 0.toShort
- val sequence = 0
-
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE,
producerId, producerEpoch, sequence,
- new SimpleRecord(time.milliseconds(), "1".getBytes, "a".getBytes),
- new SimpleRecord(time.milliseconds(), "2".getBytes, "b".getBytes)), 0)
-
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE,
producerId, producerEpoch, sequence + 2,
- new SimpleRecord(time.milliseconds(), "3".getBytes, "c".getBytes)), 0)
- log.roll()
- log.updateHighWatermark(3L)
-
- time.sleep(compactionLag + 1)
- // although the compaction lag has been exceeded, the undecided data
should not be cleaned
- var cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
Optional.of(0L), time.milliseconds())
- assertEquals(0L, cleanableOffsets.firstDirtyOffset)
- assertEquals(0L, cleanableOffsets.firstUncleanableDirtyOffset)
-
-
log.appendAsLeader(MemoryRecords.withEndTransactionMarker(time.milliseconds(),
producerId, producerEpoch,
- new EndTransactionMarker(ControlRecordType.ABORT, 15)), 0,
- AppendOrigin.COORDINATOR, RequestLocal.noCaching(),
VerificationGuard.SENTINEL, TransactionVersion.TV_1.featureLevel())
- log.roll()
- log.updateHighWatermark(4L)
-
- // the first segment should now become cleanable immediately
- cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
Optional.of(0L), time.milliseconds())
- assertEquals(0L, cleanableOffsets.firstDirtyOffset)
- assertEquals(3L, cleanableOffsets.firstUncleanableDirtyOffset)
-
- time.sleep(compactionLag + 1)
-
- // the second segment becomes cleanable after the compaction lag
- cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
Optional.of(0L), time.milliseconds())
- assertEquals(0L, cleanableOffsets.firstDirtyOffset)
- assertEquals(4L, cleanableOffsets.firstUncleanableDirtyOffset)
- }
-
- @Test
- def testDoneCleaning(): Unit = {
- val logProps = new Properties()
- logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024:
java.lang.Integer)
- val log = makeLog(config = LogConfig.fromProps(logConfig.originals,
logProps))
- while (log.numberOfSegments < 8)
- log.appendAsLeader(records(log.logEndOffset.toInt,
log.logEndOffset.toInt, time.milliseconds()), 0)
-
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
-
- assertThrows(classOf[IllegalStateException], () =>
cleanerManager.doneCleaning(topicPartition, log.dir, 1))
-
- cleanerManager.setCleaningState(topicPartition,
LogCleaningState.logCleaningPaused(1))
- assertThrows(classOf[IllegalStateException], () =>
cleanerManager.doneCleaning(topicPartition, log.dir, 1))
-
- cleanerManager.setCleaningState(topicPartition, LOG_CLEANING_IN_PROGRESS)
- val endOffset = 1L
- cleanerManager.doneCleaning(topicPartition, log.dir, endOffset)
- assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty)
-
assertTrue(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
- assertEquals(Some(endOffset),
Option(cleanerManager.allCleanerCheckpoints.get(topicPartition)))
-
- cleanerManager.setCleaningState(topicPartition, LOG_CLEANING_ABORTED)
- cleanerManager.doneCleaning(topicPartition, log.dir, endOffset)
- assertEquals(LogCleaningState.logCleaningPaused(1),
cleanerManager.cleaningState(topicPartition).get)
-
assertTrue(cleanerManager.allCleanerCheckpoints.containsKey(topicPartition))
- }
-
- @Test
- def testDoneDeleting(): Unit = {
- val records = TestUtils.singletonRecords("test".getBytes,
key="test".getBytes)
- val log: UnifiedLog = createLog(records.sizeInBytes * 5,
TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE)
- val cleanerManager: LogCleanerManager = createCleanerManager(log)
- val tp = new TopicPartition("log", 0)
-
- assertThrows(classOf[IllegalStateException], () =>
cleanerManager.doneDeleting(util.List.of(tp)))
-
- cleanerManager.setCleaningState(tp, LogCleaningState.logCleaningPaused(1))
- assertThrows(classOf[IllegalStateException], () =>
cleanerManager.doneDeleting(util.List.of(tp)))
-
- cleanerManager.setCleaningState(tp, LOG_CLEANING_IN_PROGRESS)
- cleanerManager.doneDeleting(util.List.of(tp))
- assertTrue(cleanerManager.cleaningState(tp).isEmpty)
-
- cleanerManager.setCleaningState(tp, LOG_CLEANING_ABORTED)
- cleanerManager.doneDeleting(util.List.of(tp))
- assertEquals(LogCleaningState.logCleaningPaused(1),
cleanerManager.cleaningState(tp).get)
- }
-
- /**
- * Logs with invalid checkpoint offsets should update their checkpoint
offset even if the log doesn't need cleaning
- */
- @Test
- def testCheckpointUpdatedForInvalidOffsetNoCleaning(): Unit = {
- val tp = new TopicPartition("foo", 0)
- val logs = setupIncreasinglyFilthyLogs(Seq(tp), startNumBatches = 20,
batchIncrement = 5)
-
- logs.get(tp).maybeIncrementLogStartOffset(20L,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- val cleanerManager = createCleanerManagerMock(logs)
- cleanerCheckpoints.put(tp, 15L)
-
- val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new
PreCleanStats())
- assertEquals(Optional.empty(), filthiestLog, "Log should not be selected
for cleaning")
- assertEquals(20L, cleanerCheckpoints.get(tp), "Unselected log should have
checkpoint offset updated")
- }
-
- /**
- * Logs with invalid checkpoint offsets should update their checkpoint
offset even if they aren't selected
- * for immediate cleaning
- */
- @Test
- def testCheckpointUpdatedForInvalidOffsetNotSelected(): Unit = {
- val tp0 = new TopicPartition("foo", 0)
- val tp1 = new TopicPartition("foo", 1)
- val partitions = Seq(tp0, tp1)
-
- // create two logs, one with an invalid offset, and one that is dirtier
than the log with an invalid offset
- val logs = setupIncreasinglyFilthyLogs(partitions, startNumBatches = 20,
batchIncrement = 5)
- logs.get(tp0).maybeIncrementLogStartOffset(15L,
LogStartOffsetIncrementReason.ClientRecordDeletion)
- val cleanerManager = createCleanerManagerMock(logs)
- cleanerCheckpoints.put(tp0, 10L)
- cleanerCheckpoints.put(tp1, 5L)
-
- val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time, new
PreCleanStats()).get
- assertEquals(tp1, filthiestLog.topicPartition, "Dirtier log should be
selected")
- assertEquals(15L, cleanerCheckpoints.get(tp0), "Unselected log should have
checkpoint offset updated")
- }
-
- private def createCleanerManager(log: UnifiedLog): LogCleanerManager = {
- val logs = new util.concurrent.ConcurrentHashMap[TopicPartition,
UnifiedLog]()
- logs.put(topicPartition, log)
- new LogCleanerManager(util.List.of(logDir, logDir2), logs, null)
- }
-
- private def createCleanerManagerMock(pool:
util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog]):
LogCleanerManagerMock = {
- new LogCleanerManagerMock(util.List.of(logDir), pool, null)
- }
-
- private def createLog(segmentSize: Int,
- cleanupPolicy: String,
- topicPartition: TopicPartition = new
TopicPartition("log", 0)): UnifiedLog = {
- val config = createLowRetentionLogConfig(segmentSize, cleanupPolicy)
- val partitionDir = new File(logDir, UnifiedLog.logDirName(topicPartition))
-
- UnifiedLog.create(
- partitionDir,
- config,
- 0L,
- 0L,
- time.scheduler,
- new BrokerTopicStats,
- time,
- 5 * 60 * 1000,
- producerStateManagerConfig,
- TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
- new LogDirFailureChannel(10),
- true,
- Optional.empty)
- }
-
- private def createLowRetentionLogConfig(segmentSize: Int, cleanupPolicy:
String): LogConfig = {
- val logProps = new Properties()
- logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentSize: Integer)
- logProps.put(TopicConfig.RETENTION_MS_CONFIG, 1: Integer)
- logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy)
- logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.05:
java.lang.Double) // small for easier and clearer tests
-
- new LogConfig(logProps)
- }
-
- private def writeRecords(log: UnifiedLog,
- numBatches: Int,
- recordsPerBatch: Int,
- batchesPerSegment: Int): Unit = {
- for (i <- 0 until numBatches) {
- appendRecords(log, recordsPerBatch)
- if (i % batchesPerSegment == 0)
- log.roll()
- }
- log.roll()
- }
-
- private def appendRecords(log: UnifiedLog, numRecords: Int): Unit = {
- val startOffset = log.logEndOffset
- val endOffset = startOffset + numRecords
- var lastTimestamp = 0L
- val records = (startOffset until endOffset).map { offset =>
- val currentTimestamp = time.milliseconds()
- if (offset == endOffset - 1)
- lastTimestamp = currentTimestamp
- new SimpleRecord(currentTimestamp, s"key-$offset".getBytes,
s"value-$offset".getBytes)
- }
-
- log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
records:_*), 1)
- log.maybeIncrementHighWatermark(log.logEndOffsetMetadata)
- }
-
- private def makeLog(dir: File = logDir, config: LogConfig) = {
- UnifiedLog.create(
- dir,
- config,
- 0L,
- 0L,
- time.scheduler,
- new BrokerTopicStats,
- time,
- 5 * 60 * 1000,
- producerStateManagerConfig,
- TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
- new LogDirFailureChannel(10),
- true,
- Optional.empty
- )
- }
-
- private def records(key: Int, value: Int, timestamp: Long) =
- MemoryRecords.withRecords(Compression.NONE, new SimpleRecord(timestamp,
key.toString.getBytes, value.toString.getBytes))
-
-}
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java
index 0b8c2b398d0..d7781a30254 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java
@@ -219,16 +219,16 @@ public class LogCleanerManager {
}
/**
- * Public for unit test. Get the cleaning state of the partition.
+ * For testing only. Get the cleaning state of the partition.
*/
- public Optional<LogCleaningState> cleaningState(TopicPartition tp) {
+ Optional<LogCleaningState> cleaningState(TopicPartition tp) {
return inLock(lock, () -> Optional.ofNullable(inProgress.get(tp)));
}
/**
- * Public for unit test. Set the cleaning state of the partition.
+ * For testing only. Set the cleaning state of the partition.
*/
- public void setCleaningState(TopicPartition tp, LogCleaningState state) {
+ void setCleaningState(TopicPartition tp, LogCleaningState state) {
inLock(lock, () -> inProgress.put(tp, state));
}
@@ -612,7 +612,7 @@ public class LogCleanerManager {
}
/**
- * Returns an immutable set of the uncleanable partitions for a given log
directory.
+ * For testing only. Returns an immutable set of the uncleanable
partitions for a given log directory.
* Only used for testing.
*/
public Set<TopicPartition> uncleanablePartitions(String logDir) {
@@ -689,7 +689,8 @@ public class LogCleanerManager {
* @return OffsetsToClean containing offsets for cleanable portion of log
and whether the log checkpoint needs updating
* @throws IOException if an I/O error occurs
*/
- public static OffsetsToClean cleanableOffsets(UnifiedLog log,
Optional<Long> lastCleanOffset, long now) throws IOException {
+ // Visible for testing
+ static OffsetsToClean cleanableOffsets(UnifiedLog log, Optional<Long>
lastCleanOffset, long now) throws IOException {
// If the log segments are abnormally truncated and hence the
checkpointed offset is no longer valid;
// reset to the log starting offset and log the error
@@ -791,8 +792,6 @@ public class LogCleanerManager {
* @param forceUpdateCheckpoint whether to update the checkpoint
associated with this log. if true, checkpoint should be
* reset to firstDirtyOffset
*/
- public record OffsetsToClean(long firstDirtyOffset, long
firstUncleanableDirtyOffset,
- boolean forceUpdateCheckpoint) {
-
+ record OffsetsToClean(long firstDirtyOffset, long
firstUncleanableDirtyOffset, boolean forceUpdateCheckpoint) {
}
}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerManagerTest.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerManagerTest.java
new file mode 100644
index 00000000000..2918d4157a2
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogCleanerManagerTest.java
@@ -0,0 +1,928 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.common.RequestLocal;
+import org.apache.kafka.server.common.TransactionVersion;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.LogCleanerManager.OffsetsToClean;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static
org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT;
+import static
org.apache.kafka.storage.internals.log.LogCleaningState.LOG_CLEANING_ABORTED;
+import static
org.apache.kafka.storage.internals.log.LogCleaningState.LOG_CLEANING_IN_PROGRESS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests for the log cleaning logic.
+ */
+class LogCleanerManagerTest {
+ private static final TopicPartition TOPIC_PARTITION = new
TopicPartition("log", 0);
+ private static final TopicPartition TOPIC_PARTITION_2 = new
TopicPartition("log2", 0);
+ private static final LogConfig LOG_CONFIG = createLogConfig();
+ private static final MockTime TIME = new MockTime(1400000000000L, 1000L);
// Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
+ private static final long OFFSET = 999;
+ private static final ProducerStateManagerConfig
PRODUCER_STATE_MANAGER_CONFIG =
+ new
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
false);
+
+ private File tmpDir;
+ private File tmpDir2;
+ private File logDir;
+ private File logDir2;
+
+ static class LogCleanerManagerMock extends LogCleanerManager {
+ private final Map<TopicPartition, Long> cleanerCheckpoints = new
HashMap<>();
+
+ LogCleanerManagerMock(
+ List<File> logDirs,
+ ConcurrentMap<TopicPartition, UnifiedLog> logs,
+ LogDirFailureChannel logDirFailureChannel
+ ) {
+ super(logDirs, logs, logDirFailureChannel);
+ }
+
+ @Override
+ public Map<TopicPartition, Long> allCleanerCheckpoints() {
+ return cleanerCheckpoints;
+ }
+
+ @Override
+ public void updateCheckpoints(
+ File dataDir,
+ Optional<Map.Entry<TopicPartition, Long>> partitionToUpdateOrAdd,
+ Optional<TopicPartition> partitionToRemove
+ ) {
+ assert partitionToRemove.isEmpty() : "partitionToRemove argument
with value not yet handled";
+
+ Map.Entry<TopicPartition, Long> entry =
partitionToUpdateOrAdd.orElseThrow(() ->
+ new IllegalArgumentException("Empty 'partitionToUpdateOrAdd'
argument not yet handled"));
+
+ addCheckpoint(entry.getKey(), entry.getValue());
+ }
+
+ void addCheckpoint(TopicPartition partition, long offset) {
+ cleanerCheckpoints.put(partition, offset);
+ }
+
+ long checkpointOffset(TopicPartition partition) {
+ return cleanerCheckpoints.get(partition);
+ }
+ }
+
+ // the exception should be caught and the partition that caused it marked
as uncleanable
+ static class LogMock extends UnifiedLog {
+
+ LogMock(
+ long logStartOffset,
+ LocalLog localLog,
+ BrokerTopicStats brokerTopicStats,
+ int producerIdExpirationCheckIntervalMs,
+ LeaderEpochFileCache leaderEpochCache,
+ ProducerStateManager producerStateManager,
+ Optional<Uuid> topicId,
+ boolean remoteStorageSystemEnable,
+ LogOffsetsListener logOffsetsListener
+ ) throws IOException {
+ super(logStartOffset, localLog, brokerTopicStats,
producerIdExpirationCheckIntervalMs, leaderEpochCache,
+ producerStateManager, topicId, remoteStorageSystemEnable,
logOffsetsListener);
+ }
+
+ // Throw an error in getFirstBatchTimestampForSegments since it is
called in grabFilthiestLog()
+ @Override
+ public Collection<Long>
getFirstBatchTimestampForSegments(Collection<LogSegment> segments) {
+ throw new IllegalStateException("Error!");
+ }
+ }
+
+ @BeforeEach
+ public void setup() {
+ tmpDir = TestUtils.tempDirectory();
+ tmpDir2 = TestUtils.tempDirectory();
+ logDir = TestUtils.randomPartitionLogDir(tmpDir);
+ logDir2 = TestUtils.randomPartitionLogDir(tmpDir2);
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ Utils.delete(tmpDir);
+ Utils.delete(tmpDir2);
+ }
+
+ private ConcurrentMap<TopicPartition, UnifiedLog>
setupIncreasinglyFilthyLogs(List<TopicPartition> partitions) throws IOException
{
+ ConcurrentMap<TopicPartition, UnifiedLog> logs = new
ConcurrentHashMap<>();
+ int numBatches = 20;
+
+ for (TopicPartition tp : partitions) {
+ UnifiedLog log = createLog(2048,
TopicConfig.CLEANUP_POLICY_COMPACT, tp);
+ logs.put(tp, log);
+
+ writeRecords(log, numBatches, 1, 5);
+ numBatches += 5;
+ }
+
+ return logs;
+ }
+
+ @Test
+ public void testGrabFilthiestCompactedLogThrowsException() throws
IOException {
+ TopicPartition tp = new TopicPartition("A", 1);
+ int logSegmentSize = LogTestUtils.singletonRecords("test".getBytes(),
null).sizeInBytes() * 10;
+ int logSegmentsCount = 2;
+ File tpDir = new File(logDir, "A-1");
+ Files.createDirectories(tpDir.toPath());
+
+ LogDirFailureChannel logDirFailureChannel = new
LogDirFailureChannel(10);
+ LogConfig config = createLowRetentionLogConfig(logSegmentSize,
TopicConfig.CLEANUP_POLICY_COMPACT);
+ LogSegments segments = new LogSegments(tp);
+ LeaderEpochFileCache leaderEpochCache =
UnifiedLog.createLeaderEpochCache(tpDir, TOPIC_PARTITION, logDirFailureChannel,
+ Optional.empty(), TIME.scheduler);
+ ProducerStateManager producerStateManager = new
ProducerStateManager(TOPIC_PARTITION, tpDir, 5 * 60 * 1000,
+ PRODUCER_STATE_MANAGER_CONFIG, TIME);
+ LoadedLogOffsets offsets = new LogLoader(tpDir, tp, config,
TIME.scheduler, TIME, logDirFailureChannel, true,
+ segments, 0L, 0L, leaderEpochCache, producerStateManager, new
ConcurrentHashMap<>(), false).load();
+ LocalLog localLog = new LocalLog(tpDir, config, segments,
offsets.recoveryPoint(), offsets.nextOffsetMetadata(),
+ TIME.scheduler, TIME, tp, logDirFailureChannel);
+ UnifiedLog log = new LogMock(offsets.logStartOffset(), localLog, new
BrokerTopicStats(), PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+ leaderEpochCache, producerStateManager, Optional.empty(), false,
LogOffsetsListener.NO_OP_OFFSETS_LISTENER);
+
+ writeRecords(log, logSegmentsCount * 2, 10, 2);
+
+ ConcurrentMap<TopicPartition, UnifiedLog> logsPool = new
ConcurrentHashMap<>();
+ logsPool.put(tp, log);
+ LogCleanerManagerMock cleanerManager =
createCleanerManagerMock(logsPool);
+ cleanerManager.addCheckpoint(tp, 1L);
+
+ LogCleaningException thrownException =
assertThrows(LogCleaningException.class,
+ () -> cleanerManager.grabFilthiestCompactedLog(TIME, new
PreCleanStats()).get());
+
+ assertEquals(log, thrownException.log);
+ assertInstanceOf(IllegalStateException.class,
thrownException.getCause());
+ }
+
+ @Test
+ public void testGrabFilthiestCompactedLogReturnsLogWithDirtiestRatio()
throws IOException {
+ TopicPartition tp0 = new TopicPartition("wishing-well", 0);
+ TopicPartition tp1 = new TopicPartition("wishing-well", 1);
+ TopicPartition tp2 = new TopicPartition("wishing-well", 2);
+ List<TopicPartition> partitions = List.of(tp0, tp1, tp2);
+
+ // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
+ ConcurrentMap<TopicPartition, UnifiedLog> logs =
setupIncreasinglyFilthyLogs(partitions);
+ LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+ partitions.forEach(partition ->
cleanerManager.addCheckpoint(partition, 20L));
+
+ LogToClean filthiestLog =
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get();
+ assertEquals(tp2, filthiestLog.topicPartition());
+ assertEquals(tp2, filthiestLog.log().topicPartition());
+ }
+
+ @Test
+ public void testGrabFilthiestCompactedLogIgnoresUncleanablePartitions()
throws IOException {
+ TopicPartition tp0 = new TopicPartition("wishing-well", 0);
+ TopicPartition tp1 = new TopicPartition("wishing-well", 1);
+ TopicPartition tp2 = new TopicPartition("wishing-well", 2);
+ List<TopicPartition> partitions = List.of(tp0, tp1, tp2);
+
+ // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
+ ConcurrentMap<TopicPartition, UnifiedLog> logs =
setupIncreasinglyFilthyLogs(partitions);
+ LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+ partitions.forEach(partition ->
cleanerManager.addCheckpoint(partition, 20L));
+
+
cleanerManager.markPartitionUncleanable(logs.get(tp2).dir().getParent(), tp2);
+
+ LogToClean filthiestLog =
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get();
+ assertEquals(tp1, filthiestLog.topicPartition());
+ assertEquals(tp1, filthiestLog.log().topicPartition());
+ }
+
+ @Test
+ public void testGrabFilthiestCompactedLogIgnoresInProgressPartitions()
throws IOException {
+ TopicPartition tp0 = new TopicPartition("wishing-well", 0);
+ TopicPartition tp1 = new TopicPartition("wishing-well", 1);
+ TopicPartition tp2 = new TopicPartition("wishing-well", 2);
+ List<TopicPartition> partitions = List.of(tp0, tp1, tp2);
+
+ // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
+ ConcurrentMap<TopicPartition, UnifiedLog> logs =
setupIncreasinglyFilthyLogs(partitions);
+ LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+ partitions.forEach(partition ->
cleanerManager.addCheckpoint(partition, 20L));
+
+ cleanerManager.setCleaningState(tp2, LOG_CLEANING_IN_PROGRESS);
+
+ LogToClean filthiestLog =
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get();
+ assertEquals(tp1, filthiestLog.topicPartition());
+ assertEquals(tp1, filthiestLog.log().topicPartition());
+ }
+
+ @Test
+ public void
testGrabFilthiestCompactedLogIgnoresBothInProgressPartitionsAndUncleanablePartitions()
throws IOException {
+ TopicPartition tp0 = new TopicPartition("wishing-well", 0);
+ TopicPartition tp1 = new TopicPartition("wishing-well", 1);
+ TopicPartition tp2 = new TopicPartition("wishing-well", 2);
+ List<TopicPartition> partitions = List.of(tp0, tp1, tp2);
+
+ // setup logs with cleanable range: [20, 20], [20, 25], [20, 30]
+ ConcurrentMap<TopicPartition, UnifiedLog> logs =
setupIncreasinglyFilthyLogs(partitions);
+ LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+ partitions.forEach(partition ->
cleanerManager.addCheckpoint(partition, 20L));
+
+ cleanerManager.setCleaningState(tp2, LOG_CLEANING_IN_PROGRESS);
+
cleanerManager.markPartitionUncleanable(logs.get(tp1).dir().getParent(), tp1);
+
+ Optional<LogToClean> filthiestLog =
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats());
+ assertEquals(Optional.empty(), filthiestLog);
+ }
+
+ @Test
+ public void testDirtyOffsetResetIfLargerThanEndOffset() throws IOException
{
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ConcurrentMap<TopicPartition, UnifiedLog> logs =
setupIncreasinglyFilthyLogs(List.of(tp));
+ LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+ cleanerManager.addCheckpoint(tp, 200L);
+
+ LogToClean filthiestLog =
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get();
+ assertEquals(0L, filthiestLog.firstDirtyOffset());
+ }
+
+ @Test
+ public void testDirtyOffsetResetIfSmallerThanStartOffset() throws
IOException {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ConcurrentMap<TopicPartition, UnifiedLog> logs =
setupIncreasinglyFilthyLogs(List.of(tp));
+
+ logs.get(tp).maybeIncrementLogStartOffset(10L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+
+ LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+ cleanerManager.addCheckpoint(tp, 0L);
+
+ LogToClean filthiestLog =
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get();
+ assertEquals(10L, filthiestLog.firstDirtyOffset());
+ }
+
+ @Test
+ public void testLogStartOffsetLargerThanActiveSegmentBaseOffset() throws
IOException {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ UnifiedLog log = createLog(2048, TopicConfig.CLEANUP_POLICY_COMPACT,
tp);
+
+ ConcurrentMap<TopicPartition, UnifiedLog> logs = new
ConcurrentHashMap<>();
+ logs.put(tp, log);
+
+ appendRecords(log, 3);
+ appendRecords(log, 3);
+ appendRecords(log, 3);
+
+ assertEquals(1, log.logSegments().size());
+
+ log.maybeIncrementLogStartOffset(2L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+
+ LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+ cleanerManager.addCheckpoint(tp, 0L);
+
+ // The active segment is uncleanable and hence not filthy from the POV
of the CleanerManager.
+ Optional<LogToClean> filthiestLog =
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats());
+ assertEquals(Optional.empty(), filthiestLog);
+ }
+
+ @Test
+ public void testDirtyOffsetLargerThanActiveSegmentBaseOffset() throws
IOException {
+ // It is possible in the case of an unclean leader election for the
checkpoint
+ // dirty offset to get ahead of the active segment base offset, but
still be
+ // within the range of the log.
+
+ TopicPartition tp = new TopicPartition("foo", 0);
+
+ ConcurrentMap<TopicPartition, UnifiedLog> logs = new
ConcurrentHashMap<>();
+ UnifiedLog log = createLog(2048, TopicConfig.CLEANUP_POLICY_COMPACT,
tp);
+ logs.put(tp, log);
+
+ appendRecords(log, 3);
+ appendRecords(log, 3);
+
+ assertEquals(1, log.logSegments().size());
+ assertEquals(0L, log.activeSegment().baseOffset());
+
+ LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+ cleanerManager.addCheckpoint(tp, 3L);
+
+ // These segments are uncleanable and hence not filthy
+ Optional<LogToClean> filthiestLog =
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats());
+ assertEquals(Optional.empty(), filthiestLog);
+ }
+
+ /**
+ * When checking for logs with segments ready for deletion
+ * we shouldn't consider logs where cleanup.policy=delete
+ * as they are handled by the LogManager
+ */
+ @Test
+ public void
testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs() throws
IOException {
+ MemoryRecords records =
LogTestUtils.singletonRecords("test".getBytes(), null);
+ UnifiedLog log = createLog(records.sizeInBytes() * 5,
TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0));
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+
+ int readyToDelete = cleanerManager.deletableLogs().size();
+ assertEquals(0, readyToDelete, "should have 0 logs ready to be
deleted");
+ }
+
+ /**
+ * We should find logs with segments ready to be deleted when
cleanup.policy=compact,delete
+ */
+ @Test
+ public void
testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactDeleteLogs()
throws IOException {
+ MemoryRecords records =
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+ UnifiedLog log = createLog(records.sizeInBytes() * 5,
TopicConfig.CLEANUP_POLICY_COMPACT + "," +
+ TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0));
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+
+ int readyToDelete = cleanerManager.deletableLogs().size();
+ assertEquals(1, readyToDelete, "should have 1 logs ready to be
deleted");
+ }
+
+ /**
+ * When looking for logs with segments ready to be deleted we should
consider
+ * logs with cleanup.policy=compact because they may have segments from
before the log start offset
+ */
+ @Test
+ public void
testLogsWithSegmentsToDeleteShouldConsiderCleanupPolicyCompactLogs() throws
IOException {
+ MemoryRecords records =
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+ UnifiedLog log = createLog(records.sizeInBytes() * 5,
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+
+ int readyToDelete = cleanerManager.deletableLogs().size();
+ assertEquals(1, readyToDelete, "should have 1 logs ready to be
deleted");
+ }
+
+ /**
+ * log under cleanup should be ineligible for compaction
+ */
+ @Test
+ public void testLogsUnderCleanupIneligibleForCompaction() throws
IOException {
+ MemoryRecords records =
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+ UnifiedLog log = createLog(records.sizeInBytes() * 5,
TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0));
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+
+ log.appendAsLeader(records, 0);
+ log.roll();
+ log.appendAsLeader(LogTestUtils.singletonRecords("test2".getBytes(),
"test2".getBytes()), 0);
+ log.updateHighWatermark(2L);
+
+ // simulate cleanup thread working on the log partition
+ List<Map.Entry<TopicPartition, UnifiedLog>> deletableLog =
cleanerManager.pauseCleaningForNonCompactedPartitions();
+ assertEquals(1, deletableLog.size(), "should have 1 logs ready to be
deleted");
+
+ // change cleanup policy from delete to compact
+ Properties logProps = new Properties();
+ logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG,
log.config().segmentSize());
+ logProps.put(TopicConfig.RETENTION_MS_CONFIG,
log.config().retentionMs);
+ logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT);
+ logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0);
+ LogConfig config = new LogConfig(logProps);
+ log.updateConfig(config);
+
+ // log cleanup in progress, the log is not available for compaction
+ Optional<LogToClean> cleanable =
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats());
+ assertTrue(cleanable.isEmpty(), "should have 0 logs ready to be
compacted");
+
+ // log cleanup finished, and log can be picked up for compaction
+
cleanerManager.resumeCleaning(deletableLog.stream().map(Map.Entry::getKey).collect(Collectors.toSet()));
+ Optional<LogToClean> cleanable2 =
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats());
+ assertTrue(cleanable2.isPresent(), "should have 1 logs ready to be
compacted");
+
+ // update cleanup policy to delete
+ logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE);
+ LogConfig config2 = new LogConfig(logProps);
+ log.updateConfig(config2);
+
+ // compaction in progress, should have 0 log eligible for log cleanup
+ List<Map.Entry<TopicPartition, UnifiedLog>> deletableLog2 =
cleanerManager.pauseCleaningForNonCompactedPartitions();
+ assertEquals(0, deletableLog2.size(), "should have 0 logs ready to be
deleted");
+
+ // compaction done, should have 1 log eligible for log cleanup
+
cleanerManager.doneDeleting(List.of(cleanable2.get().topicPartition()));
+ List<Map.Entry<TopicPartition, UnifiedLog>> deletableLog3 =
cleanerManager.pauseCleaningForNonCompactedPartitions();
+ assertEquals(1, deletableLog3.size(), "should have 1 logs ready to be
deleted");
+ }
+
+ @Test
+ public void testUpdateCheckpointsShouldAddOffsetToPartition() throws
IOException {
+ MemoryRecords records =
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+ UnifiedLog log = createLog(records.sizeInBytes() * 5,
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+
+ // expect the checkpoint offset is not the expectedOffset before doing
updateCheckpoints
+ assertNotEquals(OFFSET,
cleanerManager.allCleanerCheckpoints().getOrDefault(TOPIC_PARTITION, 0L));
+
+ cleanerManager.updateCheckpoints(logDir,
Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty());
+ // expect the checkpoint offset is now updated to the expected offset
after doing updateCheckpoints
+ assertEquals(OFFSET,
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+ }
+
+ @Test
+ public void testUpdateCheckpointsShouldRemovePartitionData() throws
IOException {
+ MemoryRecords records =
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+ UnifiedLog log = createLog(records.sizeInBytes() * 5,
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+
+ // write some data into the cleaner-offset-checkpoint file
+ cleanerManager.updateCheckpoints(logDir,
Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty());
+ assertEquals(OFFSET,
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+
+ // updateCheckpoints should remove the topicPartition data in the
logDir
+ cleanerManager.updateCheckpoints(logDir, Optional.empty(),
Optional.of(TOPIC_PARTITION));
+
assertFalse(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION));
+ }
+
+ @Test
+ public void testHandleLogDirFailureShouldRemoveDirAndData() throws
IOException {
+ MemoryRecords records =
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+ UnifiedLog log = createLog(records.sizeInBytes() * 5,
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+
+ // write some data into the cleaner-offset-checkpoint file in logDir
and logDir2
+ cleanerManager.updateCheckpoints(logDir,
Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty());
+ cleanerManager.updateCheckpoints(logDir2,
Optional.of(Map.entry(TOPIC_PARTITION_2, OFFSET)), Optional.empty());
+ assertEquals(OFFSET,
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+ assertEquals(OFFSET,
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION_2));
+
+ cleanerManager.handleLogDirFailure(logDir.getAbsolutePath());
+ // verify the partition data in logDir is gone, and data in logDir2 is
still there
+ assertEquals(OFFSET,
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION_2));
+
assertFalse(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION));
+ }
+
+ @Test
+ public void testMaybeTruncateCheckpointShouldTruncateData() throws
IOException {
+ MemoryRecords records =
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+ UnifiedLog log = createLog(records.sizeInBytes() * 5,
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+ long lowerOffset = 1L;
+ long higherOffset = 1000L;
+
+ // write some data into the cleaner-offset-checkpoint file in logDir
+ cleanerManager.updateCheckpoints(logDir,
Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty());
+ assertEquals(OFFSET,
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+
+ // we should not truncate the checkpoint data for checkpointed offset
<= the given offset (higherOffset)
+ cleanerManager.maybeTruncateCheckpoint(logDir, TOPIC_PARTITION,
higherOffset);
+ assertEquals(OFFSET,
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+
+ // we should truncate the checkpoint data for checkpointed offset >
the given offset (lowerOffset)
+ cleanerManager.maybeTruncateCheckpoint(logDir, TOPIC_PARTITION,
lowerOffset);
+ assertEquals(lowerOffset,
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+ }
+
+ @Test
+ public void testAlterCheckpointDirShouldRemoveDataInSrcDirAndAddInNewDir()
throws IOException {
+ MemoryRecords records =
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+ UnifiedLog log = createLog(records.sizeInBytes() * 5,
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+
+ // write some data into the cleaner-offset-checkpoint file in logDir
+ cleanerManager.updateCheckpoints(logDir,
Optional.of(Map.entry(TOPIC_PARTITION, OFFSET)), Optional.empty());
+ assertEquals(OFFSET,
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+
+ cleanerManager.alterCheckpointDir(TOPIC_PARTITION, logDir, logDir2);
+ // verify we still can get the partition offset after
alterCheckpointDir
+ // This data should locate in logDir2, not logDir
+ assertEquals(OFFSET,
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+
+ // force delete the logDir2 from checkpoints, so that the partition
data should also be deleted
+ cleanerManager.handleLogDirFailure(logDir2.getAbsolutePath());
+
assertFalse(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION));
+ }
+
+ /**
+ * Log under cleanup should still be eligible for log truncation.
+ */
+ @Test
+ public void testConcurrentLogCleanupAndLogTruncation() throws IOException {
+ MemoryRecords records =
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+ UnifiedLog log = createLog(records.sizeInBytes() * 5,
TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0));
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+
+ // log cleanup starts
+ List<Map.Entry<TopicPartition, UnifiedLog>> pausedPartitions =
cleanerManager.pauseCleaningForNonCompactedPartitions();
+ // Log truncation happens due to unclean leader election
+ cleanerManager.abortAndPauseCleaning(log.topicPartition());
+ cleanerManager.resumeCleaning(Set.of(log.topicPartition()));
+ // log cleanup finishes and pausedPartitions are resumed
+
cleanerManager.resumeCleaning(pausedPartitions.stream().map(Map.Entry::getKey).collect(Collectors.toSet()));
+
+ assertEquals(Optional.empty(),
cleanerManager.cleaningState(log.topicPartition()));
+ }
+
+ /**
+ * Log under cleanup should still be eligible for topic deletion.
+ */
+ @Test
+ public void testConcurrentLogCleanupAndTopicDeletion() throws IOException {
+ MemoryRecords records =
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+ UnifiedLog log = createLog(records.sizeInBytes() * 5,
TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log", 0));
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+
+ // log cleanup starts
+ List<Map.Entry<TopicPartition, UnifiedLog>> pausedPartitions =
cleanerManager.pauseCleaningForNonCompactedPartitions();
+ // Broker processes StopReplicaRequest with delete=true
+ cleanerManager.abortCleaning(log.topicPartition());
+ // log cleanup finishes and pausedPartitions are resumed
+
cleanerManager.resumeCleaning(pausedPartitions.stream().map(Map.Entry::getKey).collect(Collectors.toSet()));
+
+ assertEquals(Optional.empty(),
cleanerManager.cleaningState(log.topicPartition()));
+ }
+
+ /**
+ * When looking for logs with segments ready to be deleted we shouldn't
consider
+ * logs that have had their partition marked as uncleanable.
+ */
+ @Test
+ public void
testLogsWithSegmentsToDeleteShouldNotConsiderUncleanablePartitions() throws
IOException {
+ MemoryRecords records =
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+ UnifiedLog log = createLog(records.sizeInBytes() * 5,
TopicConfig.CLEANUP_POLICY_COMPACT, new TopicPartition("log", 0));
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+ cleanerManager.markPartitionUncleanable(log.dir().getParent(),
TOPIC_PARTITION);
+
+ int readyToDelete = cleanerManager.deletableLogs().size();
+ assertEquals(0, readyToDelete, "should have 0 logs ready to be
deleted");
+ }
+
+ /**
+ * Test computation of cleanable range with no minimum compaction lag
settings active where bounded by LSO.
+ */
+ @Test
+ public void testCleanableOffsetsForNone() throws IOException {
+ Properties logProps = new Properties();
+ logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+ UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(),
logProps));
+
+ while (log.numberOfSegments() < 8)
+ log.appendAsLeader(records((int) log.logEndOffset(), (int)
log.logEndOffset(), TIME.milliseconds()), 0);
+
+ log.updateHighWatermark(50);
+
+ Optional<Long> lastCleanOffset = Optional.of(0L);
+ OffsetsToClean cleanableOffsets =
LogCleanerManager.cleanableOffsets(log, lastCleanOffset, TIME.milliseconds());
+ assertEquals(0L, cleanableOffsets.firstDirtyOffset(), "The first
cleanable offset starts at the beginning of the log.");
+ assertEquals(log.highWatermark(), log.lastStableOffset(),
+ "The high watermark equals the last stable offset as no
transactions are in progress");
+ assertEquals(log.lastStableOffset(),
cleanableOffsets.firstUncleanableDirtyOffset(),
+ "The first uncleanable offset is bounded by the last stable
offset.");
+ }
+
+ /**
+ * Test computation of cleanable range with no minimum compaction lag
settings active where bounded by active segment.
+ */
+ @Test
+ public void testCleanableOffsetsActiveSegment() throws IOException {
+ Properties logProps = new Properties();
+ logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+ UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(),
logProps));
+
+ while (log.numberOfSegments() < 8)
+ log.appendAsLeader(records((int) log.logEndOffset(), (int)
log.logEndOffset(), TIME.milliseconds()), 0);
+
+ log.updateHighWatermark(log.logEndOffset());
+
+ Optional<Long> lastCleanOffset = Optional.of(0L);
+ OffsetsToClean cleanableOffsets =
LogCleanerManager.cleanableOffsets(log, lastCleanOffset, TIME.milliseconds());
+
+ assertEquals(0L, cleanableOffsets.firstDirtyOffset(), "The first
cleanable offset starts at the beginning of the log.");
+ assertEquals(log.activeSegment().baseOffset(),
cleanableOffsets.firstUncleanableDirtyOffset(),
+ "The first uncleanable offset begins with the active segment.");
+ }
+
+ /**
+ * Test computation of cleanable range with a minimum compaction lag time
+ */
+ @Test
+ public void testCleanableOffsetsForTime() throws IOException {
+ int compactionLag = 60 * 60 * 1000;
+ Properties logProps = new Properties();
+ logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+ logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag);
+ UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(),
logProps));
+
+ long t0 = TIME.milliseconds();
+ while (log.numberOfSegments() < 4)
+ log.appendAsLeader(records((int) log.logEndOffset(), (int)
log.logEndOffset(), t0), 0);
+
+ LogSegment activeSegAtT0 = log.activeSegment();
+
+ TIME.sleep(compactionLag + 1);
+ long t1 = TIME.milliseconds();
+
+ while (log.numberOfSegments() < 8)
+ log.appendAsLeader(records((int) log.logEndOffset(), (int)
log.logEndOffset(), t1), 0);
+
+ log.updateHighWatermark(log.logEndOffset());
+
+ Optional<Long> lastCleanOffset = Optional.of(0L);
+ OffsetsToClean cleanableOffsets =
LogCleanerManager.cleanableOffsets(log, lastCleanOffset, TIME.milliseconds());
+ assertEquals(0L, cleanableOffsets.firstDirtyOffset(), "The first
cleanable offset starts at the beginning of the log.");
+ assertEquals(activeSegAtT0.baseOffset(),
cleanableOffsets.firstUncleanableDirtyOffset(),
+ "The first uncleanable offset begins with the second block of log
entries.");
+ }
+
+ /**
+ * Test computation of cleanable range with a minimum compaction lag time
that is small enough that
+ * the active segment contains it.
+ */
+ @Test
+ public void testCleanableOffsetsForShortTime() throws IOException {
+ int compactionLag = 60 * 60 * 1000;
+ Properties logProps = new Properties();
+ logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+ logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag);
+
+ UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(),
logProps));
+
+ long t0 = TIME.milliseconds();
+ while (log.numberOfSegments() < 8)
+ log.appendAsLeader(records((int) log.logEndOffset(), (int)
log.logEndOffset(), t0), 0);
+
+ log.updateHighWatermark(log.logEndOffset());
+
+ TIME.sleep(compactionLag + 1);
+
+ Optional<Long> lastCleanOffset = Optional.of(0L);
+ OffsetsToClean cleanableOffsets =
LogCleanerManager.cleanableOffsets(log, lastCleanOffset, TIME.milliseconds());
+ assertEquals(0L, cleanableOffsets.firstDirtyOffset(), "The first
cleanable offset starts at the beginning of the log.");
+ assertEquals(log.activeSegment().baseOffset(),
cleanableOffsets.firstUncleanableDirtyOffset(),
+ "The first uncleanable offset begins with active segment.");
+ }
+
+ @Test
+ public void testCleanableOffsetsNeedsCheckpointReset() throws IOException {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ConcurrentMap<TopicPartition, UnifiedLog> logs =
setupIncreasinglyFilthyLogs(List.of(tp));
+ logs.get(tp).maybeIncrementLogStartOffset(10L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+
+ Optional<Long> lastCleanOffset = Optional.of(15L);
+ OffsetsToClean cleanableOffsets =
LogCleanerManager.cleanableOffsets(logs.get(tp), lastCleanOffset,
TIME.milliseconds());
+ assertFalse(cleanableOffsets.forceUpdateCheckpoint(), "Checkpoint
offset should not be reset if valid");
+
+ logs.get(tp).maybeIncrementLogStartOffset(20L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp),
lastCleanOffset, TIME.milliseconds());
+ assertTrue(cleanableOffsets.forceUpdateCheckpoint(), "Checkpoint
offset needs to be reset if less than log start offset");
+
+ lastCleanOffset = Optional.of(25L);
+ cleanableOffsets = LogCleanerManager.cleanableOffsets(logs.get(tp),
lastCleanOffset, TIME.milliseconds());
+ assertTrue(cleanableOffsets.forceUpdateCheckpoint(), "Checkpoint
offset needs to be reset if greater than log end offset");
+ }
+
+ @Test
+ public void testUndecidedTransactionalDataNotCleanable() throws
IOException {
+ int compactionLag = 60 * 60 * 1000;
+ Properties logProps = new Properties();
+ logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+ logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, compactionLag);
+ UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(),
logProps));
+
+ long producerId = 15L;
+ short producerEpoch = 0;
+ int sequence = 0;
+
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE,
producerId, producerEpoch, sequence,
+ new SimpleRecord(TIME.milliseconds(), "1".getBytes(),
"a".getBytes()),
+ new SimpleRecord(TIME.milliseconds(), "2".getBytes(),
"b".getBytes())), 0);
+
log.appendAsLeader(MemoryRecords.withTransactionalRecords(Compression.NONE,
producerId, producerEpoch, sequence + 2,
+ new SimpleRecord(TIME.milliseconds(), "3".getBytes(),
"c".getBytes())), 0);
+ log.roll();
+ log.updateHighWatermark(3L);
+
+ TIME.sleep(compactionLag + 1);
+ // although the compaction lag has been exceeded, the undecided data
should not be cleaned
+ OffsetsToClean cleanableOffsets =
LogCleanerManager.cleanableOffsets(log, Optional.of(0L), TIME.milliseconds());
+ assertEquals(0L, cleanableOffsets.firstDirtyOffset());
+ assertEquals(0L, cleanableOffsets.firstUncleanableDirtyOffset());
+
+
log.appendAsLeader(MemoryRecords.withEndTransactionMarker(TIME.milliseconds(),
producerId, producerEpoch,
+ new EndTransactionMarker(ControlRecordType.ABORT, 15)), 0,
AppendOrigin.COORDINATOR, RequestLocal.noCaching(),
+ VerificationGuard.SENTINEL,
TransactionVersion.TV_1.featureLevel());
+ log.roll();
+ log.updateHighWatermark(4L);
+
+ // the first segment should now become cleanable immediately
+ cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
Optional.of(0L), TIME.milliseconds());
+ assertEquals(0L, cleanableOffsets.firstDirtyOffset());
+ assertEquals(3L, cleanableOffsets.firstUncleanableDirtyOffset());
+
+ TIME.sleep(compactionLag + 1);
+
+ // the second segment becomes cleanable after the compaction lag
+ cleanableOffsets = LogCleanerManager.cleanableOffsets(log,
Optional.of(0L), TIME.milliseconds());
+ assertEquals(0L, cleanableOffsets.firstDirtyOffset());
+ assertEquals(4L, cleanableOffsets.firstUncleanableDirtyOffset());
+ }
+
+ @Test
+ public void testDoneCleaning() throws IOException {
+ Properties logProps = new Properties();
+ logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+ UnifiedLog log = makeLog(LogConfig.fromProps(LOG_CONFIG.originals(),
logProps));
+
+ while (log.numberOfSegments() < 8)
+ log.appendAsLeader(records((int) log.logEndOffset(), (int)
log.logEndOffset(), TIME.milliseconds()), 0);
+
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+ assertThrows(IllegalStateException.class, () ->
cleanerManager.doneCleaning(TOPIC_PARTITION, log.dir(), 1));
+
+ cleanerManager.setCleaningState(TOPIC_PARTITION,
LogCleaningState.logCleaningPaused(1));
+ assertThrows(IllegalStateException.class, () ->
cleanerManager.doneCleaning(TOPIC_PARTITION, log.dir(), 1));
+
+ cleanerManager.setCleaningState(TOPIC_PARTITION,
LOG_CLEANING_IN_PROGRESS);
+ long endOffset = 1L;
+ cleanerManager.doneCleaning(TOPIC_PARTITION, log.dir(), endOffset);
+
+ assertTrue(cleanerManager.cleaningState(TOPIC_PARTITION).isEmpty());
+
assertTrue(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION));
+ assertEquals(endOffset,
cleanerManager.allCleanerCheckpoints().get(TOPIC_PARTITION));
+
+ cleanerManager.setCleaningState(TOPIC_PARTITION, LOG_CLEANING_ABORTED);
+ cleanerManager.doneCleaning(TOPIC_PARTITION, log.dir(), endOffset);
+
+ assertEquals(LogCleaningState.logCleaningPaused(1),
cleanerManager.cleaningState(TOPIC_PARTITION).get());
+
assertTrue(cleanerManager.allCleanerCheckpoints().containsKey(TOPIC_PARTITION));
+ }
+
+ @Test
+ public void testDoneDeleting() throws IOException {
+ MemoryRecords records =
LogTestUtils.singletonRecords("test".getBytes(), "test".getBytes());
+ UnifiedLog log = createLog(records.sizeInBytes() * 5,
TopicConfig.CLEANUP_POLICY_COMPACT +
+ "," + TopicConfig.CLEANUP_POLICY_DELETE, new TopicPartition("log",
0));
+ LogCleanerManager cleanerManager = createCleanerManager(log);
+ TopicPartition tp = new TopicPartition("log", 0);
+
+ assertThrows(IllegalStateException.class, () ->
cleanerManager.doneDeleting(List.of(tp)));
+
+ cleanerManager.setCleaningState(tp,
LogCleaningState.logCleaningPaused(1));
+ assertThrows(IllegalStateException.class, () ->
cleanerManager.doneDeleting(List.of(tp)));
+
+ cleanerManager.setCleaningState(tp, LOG_CLEANING_IN_PROGRESS);
+ cleanerManager.doneDeleting(List.of(tp));
+ assertTrue(cleanerManager.cleaningState(tp).isEmpty());
+
+ cleanerManager.setCleaningState(tp, LOG_CLEANING_ABORTED);
+ cleanerManager.doneDeleting(List.of(tp));
+ assertEquals(LogCleaningState.logCleaningPaused(1),
cleanerManager.cleaningState(tp).get());
+ }
+
+ /**
+ * Logs with invalid checkpoint offsets should update their checkpoint
offset even if the log doesn't need cleaning.
+ */
+ @Test
+ public void testCheckpointUpdatedForInvalidOffsetNoCleaning() throws
IOException {
+ TopicPartition tp = new TopicPartition("foo", 0);
+ ConcurrentMap<TopicPartition, UnifiedLog> logs =
setupIncreasinglyFilthyLogs(List.of(tp));
+
+ logs.get(tp).maybeIncrementLogStartOffset(20L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+ cleanerManager.addCheckpoint(tp, 15L);
+
+ Optional<LogToClean> filthiestLog =
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats());
+ assertEquals(Optional.empty(), filthiestLog, "Log should not be
selected for cleaning");
+ assertEquals(20L, cleanerManager.checkpointOffset(tp), "Unselected log
should have checkpoint offset updated");
+ }
+
+ /**
+ * Logs with invalid checkpoint offsets should update their checkpoint
offset even if they aren't selected
+ * for immediate cleaning.
+ */
+ @Test
+ public void testCheckpointUpdatedForInvalidOffsetNotSelected() throws
IOException {
+ TopicPartition tp0 = new TopicPartition("foo", 0);
+ TopicPartition tp1 = new TopicPartition("foo", 1);
+ List<TopicPartition> partitions = List.of(tp0, tp1);
+
+ // create two logs, one with an invalid offset, and one that is
dirtier than the log with an invalid offset
+ ConcurrentMap<TopicPartition, UnifiedLog> logs =
setupIncreasinglyFilthyLogs(partitions);
+ logs.get(tp0).maybeIncrementLogStartOffset(15L,
LogStartOffsetIncrementReason.ClientRecordDeletion);
+ LogCleanerManagerMock cleanerManager = createCleanerManagerMock(logs);
+ cleanerManager.addCheckpoint(tp0, 10L);
+ cleanerManager.addCheckpoint(tp1, 5L);
+
+ LogToClean filthiestLog =
cleanerManager.grabFilthiestCompactedLog(TIME, new PreCleanStats()).get();
+ assertEquals(tp1, filthiestLog.topicPartition(), "Dirtier log should
be selected");
+ assertEquals(15L, cleanerManager.checkpointOffset(tp0), "Unselected
log should have checkpoint offset updated");
+ }
+
+ private LogCleanerManager createCleanerManager(UnifiedLog log) {
+ ConcurrentMap<TopicPartition, UnifiedLog> logs = new
ConcurrentHashMap<>();
+ logs.put(TOPIC_PARTITION, log);
+
+ return new LogCleanerManager(List.of(logDir, logDir2), logs, null);
+ }
+
+ private LogCleanerManagerMock
createCleanerManagerMock(ConcurrentMap<TopicPartition, UnifiedLog> pool) {
+ return new LogCleanerManagerMock(List.of(logDir), pool, null);
+ }
+
+ private UnifiedLog createLog(int segmentSize, String cleanupPolicy,
TopicPartition topicPartition) throws IOException {
+ LogConfig config = createLowRetentionLogConfig(segmentSize,
cleanupPolicy);
+ File partitionDir = new File(logDir,
UnifiedLog.logDirName(topicPartition));
+
+ return UnifiedLog.create(partitionDir, config, 0L, 0L, TIME.scheduler,
new BrokerTopicStats(), TIME, 5 * 60 * 1000,
+ PRODUCER_STATE_MANAGER_CONFIG,
PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, new LogDirFailureChannel(10),
+ true, Optional.empty());
+ }
+
+ private LogConfig createLowRetentionLogConfig(int segmentSize, String
cleanupPolicy) {
+ Properties logProps = new Properties();
+ logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, segmentSize);
+ logProps.put(TopicConfig.RETENTION_MS_CONFIG, 1);
+ logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy);
+ logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, 0.05); //
small for easier and clearer tests
+
+ return new LogConfig(logProps);
+ }
+
+ private void writeRecords(UnifiedLog log, int numBatches, int
recordsPerBatch, int batchesPerSegment) throws IOException {
+ for (int i = 0; i < numBatches; i++) {
+ appendRecords(log, recordsPerBatch);
+ if (i % batchesPerSegment == 0)
+ log.roll();
+ }
+ log.roll();
+ }
+
+ private void appendRecords(UnifiedLog log, int numRecords) throws
IOException {
+ long startOffset = log.logEndOffset();
+ long endOffset = startOffset + numRecords;
+
+ SimpleRecord[] records = IntStream.range((int) startOffset, (int)
endOffset)
+ .mapToObj(offset -> new SimpleRecord(TIME.milliseconds(),
String.format("key-%d", offset).getBytes(),
+ String.format("value-%d", offset).getBytes()))
+ .toArray(SimpleRecord[]::new);
+
+ log.appendAsLeader(MemoryRecords.withRecords(Compression.NONE,
records), 1);
+ log.maybeIncrementHighWatermark(log.logEndOffsetMetadata());
+ }
+
+ private UnifiedLog makeLog(LogConfig config) throws IOException {
+ return UnifiedLog.create(logDir, config, 0L, 0L, TIME.scheduler, new
BrokerTopicStats(), TIME, 5 * 60 * 1000,
+ PRODUCER_STATE_MANAGER_CONFIG,
PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, new LogDirFailureChannel(10),
+ true, Optional.empty());
+ }
+
+ private MemoryRecords records(int key, int value, long timestamp) {
+ return MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord(timestamp, Integer.toString(key).getBytes(),
+ Integer.toString(value).getBytes()));
+ }
+
+ private static LogConfig createLogConfig() {
+ Properties logProps = new Properties();
+ logProps.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 1024);
+ logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024);
+ logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_COMPACT);
+
+ return new LogConfig(logProps);
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
index 6a89b82043d..21643c9186f 100644
---
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
+++
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java
@@ -16,17 +16,25 @@
*/
package org.apache.kafka.storage.internals.log;
+import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.common.RequestLocal;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class LogTestUtils {
@@ -73,6 +81,39 @@ public class LogTestUtils {
return MemoryRecords.withEndTransactionMarker(offset, timestamp,
partitionLeaderEpoch, producerId, epoch, marker);
}
+ /**
+ * Wrap a single record log buffer.
+ */
+ public static MemoryRecords singletonRecords(byte[] value, byte[] key) {
+ return records(
+ List.of(new SimpleRecord(RecordBatch.NO_TIMESTAMP, key, value)),
+ RecordBatch.CURRENT_MAGIC_VALUE,
+ Compression.NONE,
+ RecordBatch.NO_PRODUCER_ID,
+ RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE,
+ 0L,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH
+ );
+ }
+
+ public static MemoryRecords records(List<SimpleRecord> records,
+ byte magicValue,
+ Compression codec,
+ long producerId,
+ short producerEpoch,
+ int sequence,
+ long baseOffset,
+ int partitionLeaderEpoch) {
+ ByteBuffer buf =
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records));
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue,
codec, TimestampType.CREATE_TIME, baseOffset,
+ System.currentTimeMillis(), producerId, producerEpoch, sequence,
false, partitionLeaderEpoch);
+
+ records.forEach(builder::append);
+
+ return builder.build();
+ }
+
public static class LogConfigBuilder {
private final Map<String, Object> configs = new HashMap<>();