Repository: kafka Updated Branches: refs/heads/0.8.1 1e9e107ee -> 874620d96
KAFKA-1327; Log cleaner metrics follow-up patch to reset dirtiest log cleanable ratio; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/874620d9 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/874620d9 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/874620d9 Branch: refs/heads/0.8.1 Commit: 874620d965c066519686315c591e09aa379304d6 Parents: 1e9e107 Author: Joel Koshy <jjko...@gmail.com> Authored: Mon Apr 21 11:41:03 2014 -0700 Committer: Joel Koshy <jjko...@gmail.com> Committed: Mon Apr 21 17:06:39 2014 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/LogCleaner.scala | 10 ++++++---- .../scala/kafka/log/LogCleanerManager.scala | 20 +++++++++++--------- 2 files changed, 17 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/874620d9/core/src/main/scala/kafka/log/LogCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index b9ffe00..2faa196 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -19,7 +19,6 @@ package kafka.log import scala.collection._ import scala.math -import java.util.concurrent.TimeUnit import java.nio._ import java.util.Date import java.io.File @@ -215,6 +214,7 @@ class LogCleaner(val config: CleanerConfig, */ def recordStats(id: Int, name: String, from: Long, to: Long, stats: CleanerStats) { this.lastStats = stats + cleaner.statsUnderlying.swap def mb(bytes: Double) = bytes / (1024*1024) val message = "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, %d])%n".format(id, name, from, to) + @@ -260,9 +260,10 @@ private[log] class Cleaner(val id: Int, this.logIdent = "Cleaner " + id + ": " - /* stats on this cleaning */ - val stats = new CleanerStats(time) - + /* cleaning stats - one instance for the current (or next) cleaning cycle and one for the last completed cycle */ + val statsUnderlying = (new CleanerStats(time), new CleanerStats(time)) + def stats = statsUnderlying._1 + /* buffer used for read i/o */ private var readBuffer = ByteBuffer.allocate(ioBufferSize) @@ -304,6 +305,7 @@ private[log] class Cleaner(val id: Int, stats.bufferUtilization = offsetMap.utilization stats.allDone() + endOffset } http://git-wip-us.apache.org/repos/asf/kafka/blob/874620d9/core/src/main/scala/kafka/log/LogCleanerManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala index 683d722..e8ced6a 100644 --- a/core/src/main/scala/kafka/log/LogCleanerManager.scala +++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala @@ -58,7 +58,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To private val pausedCleaningCond = lock.newCondition() /* a gauge for tracking the cleanable ratio of the dirtiest log */ - private var dirtiestLogCleanableRatio = 0.0 + @volatile private var dirtiestLogCleanableRatio = 0.0 newGauge("max-dirty-percent", new Gauge[Int] { def value = (100 * dirtiestLogCleanableRatio).toInt }) /** @@ -79,9 +79,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To .filterNot(l => inProgress.contains(l._1)) // skip any logs already in-progress .map(l => LogToClean(l._1, l._2, // create a LogToClean instance for each lastClean.getOrElse(l._1, l._2.logSegments.head.baseOffset))) - .filter(l => l.totalBytes > 0) // skip any empty logs - if(!dirtyLogs.isEmpty) - this.dirtiestLogCleanableRatio = dirtyLogs.max.cleanableRatio + .filter(l => l.totalBytes > 0) // skip any empty logs + this.dirtiestLogCleanableRatio = if (!dirtyLogs.isEmpty) dirtyLogs.max.cleanableRatio else 0 val cleanableLogs = dirtyLogs.filter(l => l.cleanableRatio > l.log.config.minCleanableRatio) // and must meet the minimum threshold for dirty byte ratio if(cleanableLogs.isEmpty) { None @@ -126,7 +125,8 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To case LogCleaningInProgress => inProgress.put(topicAndPartition, LogCleaningAborted) case s => - throw new IllegalStateException(("Partiiton %s can't be aborted and pasued since it's in %s state").format(topicAndPartition, s)) + throw new IllegalStateException("Compaction for partition %s cannot be aborted and paused since it is in %s state." + .format(topicAndPartition, s)) } } while (!isCleaningInState(topicAndPartition, LogCleaningPaused)) @@ -142,17 +142,19 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inLock(lock) { inProgress.get(topicAndPartition) match { case None => - throw new IllegalStateException(("Partiiton %s can't be resumed since it's never paused").format(topicAndPartition)) + throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is not paused." + .format(topicAndPartition)) case Some(state) => state match { case LogCleaningPaused => inProgress.remove(topicAndPartition) case s => - throw new IllegalStateException(("Partiiton %s can't be resumed since it's in %s state").format(topicAndPartition, s)) + throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is in %s state." + .format(topicAndPartition, s)) } } } - info("The cleaning for partition %s is resumed".format(topicAndPartition)) + info("Compaction for partition %s is resumed".format(topicAndPartition)) } /** @@ -194,7 +196,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To inProgress.put(topicAndPartition, LogCleaningPaused) pausedCleaningCond.signalAll() case s => - throw new IllegalStateException(("In-progress partiiton %s can't be in %s state").format(topicAndPartition, s)) + throw new IllegalStateException("In-progress partition %s cannot be in %s state.".format(topicAndPartition, s)) } } }