Repository: kafka Updated Branches: refs/heads/trunk 61281f5c5 -> 579d473ce
KAFKA-3330; Truncate log cleaner offset checkpoint if the log is truncated becketqin Can you take a look? Author: Dong Lin <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes #1009 from lindong28/KAFKA-3330 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/579d473c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/579d473c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/579d473c Branch: refs/heads/trunk Commit: 579d473ce9c5ef1a442af734e362dd545e5ab988 Parents: 61281f5 Author: Dong Lin <[email protected]> Authored: Thu Mar 17 15:21:20 2016 -0700 Committer: Jun Rao <[email protected]> Committed: Thu Mar 17 15:21:20 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/LogCleaner.scala | 7 +++++++ core/src/main/scala/kafka/log/LogCleanerManager.scala | 12 ++++++++++++ core/src/main/scala/kafka/log/LogManager.scala | 8 ++++++-- 3 files changed, 25 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/579d473c/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index a2e1913..e23234b 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -134,6 +134,13 @@ class LogCleaner(val config: CleanerConfig, } /** + * Truncate cleaner offset checkpoint for the given partition if its checkpointed offset is larger than the given offset + */ + def maybeTruncateCheckpoint(dataDir: File, topicAndPartition: TopicAndPartition, offset: Long) { + cleanerManager.maybeTruncateCheckpoint(dataDir, topicAndPartition, offset) + } + + /** * Abort the cleaning of a particular partition if it's in progress, and pause any future cleaning of this partition. * This call blocks until the cleaning of the partition is aborted and paused. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/579d473c/core/src/main/scala/kafka/log/LogCleanerManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index f6795d3..f92db4e 100755 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -210,6 +210,18 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To } } + def maybeTruncateCheckpoint(dataDir: File, topicAndPartition: TopicAndPartition, offset: Long) { + inLock(lock) { + if (logs.get(topicAndPartition).config.compact) { + val checkpoint = checkpoints(dataDir) + val existing = checkpoint.read() + + if (existing.getOrElse(topicAndPartition, 0L) > offset) + checkpoint.write(existing + (topicAndPartition -> offset)) + } + } + } + /** * Save out the endOffset and remove the given log from the in-progress set, if not aborted. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/579d473c/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index b64fac6..749c622 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -286,8 +286,10 @@ class LogManager(val logDirs: Array[File], if (needToStopCleaner && cleaner != null) cleaner.abortAndPauseCleaning(topicAndPartition) log.truncateTo(truncateOffset) - if (needToStopCleaner && cleaner != null) + if (needToStopCleaner && cleaner != null) { + cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicAndPartition, log.activeSegment.baseOffset) cleaner.resumeCleaning(topicAndPartition) + } } } checkpointRecoveryPointOffsets() @@ -305,8 +307,10 @@ class LogManager(val logDirs: Array[File], if (cleaner != null) cleaner.abortAndPauseCleaning(topicAndPartition) log.truncateFullyAndStartAt(newOffset) - if (cleaner != null) + if (cleaner != null) { + cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicAndPartition, log.activeSegment.baseOffset) cleaner.resumeCleaning(topicAndPartition) + } } checkpointRecoveryPointOffsets() }
