Repository: kafka
Updated Branches:
  refs/heads/trunk 2ac70c0b7 -> 302a246c7


KAFKA-4296; Fix LogCleaner statistics rolling

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #2016 from hachikuji/KAFKA-4296


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/302a246c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/302a246c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/302a246c

Branch: refs/heads/trunk
Commit: 302a246c72677b1045696f4d6dda51da8c6a1da0
Parents: 2ac70c0
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Oct 21 14:41:04 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Oct 21 14:41:04 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  | 102 ++-
 .../test/scala/unit/kafka/log/CleanerTest.scala | 801 ------------------
 .../scala/unit/kafka/log/LogCleanerTest.scala   | 839 +++++++++++++++++++
 3 files changed, 896 insertions(+), 846 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/302a246c/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 219957f..bb8a89a 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -236,8 +236,9 @@ class LogCleaner(val config: CleanerConfig,
           // there's a log, clean it
           var endOffset = cleanable.firstDirtyOffset
           try {
-            endOffset = cleaner.clean(cleanable)
-            recordStats(cleaner.id, cleanable.log.name, 
cleanable.firstDirtyOffset, endOffset, cleaner.stats)
+            val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
+            recordStats(cleaner.id, cleanable.log.name, 
cleanable.firstDirtyOffset, endOffset, cleanerStats)
+            endOffset = nextDirtyOffset
           } catch {
             case pe: LogCleaningAbortedException => // task can be aborted, 
let it go.
           } finally {
@@ -263,7 +264,6 @@ class LogCleaner(val config: CleanerConfig,
      */
     def recordStats(id: Int, name: String, from: Long, to: Long, stats: 
CleanerStats) {
       this.lastStats = stats
-      cleaner.statsUnderlying.swap
       def mb(bytes: Double) = bytes / (1024*1024)
       val message = 
         "%n\tLog cleaner thread %d cleaned log %s (dirty section = [%d, 
%d])%n".format(id, name, from, to) + 
@@ -314,10 +314,6 @@ private[log] class Cleaner(val id: Int,
   override val loggerName = classOf[LogCleaner].getName
 
   this.logIdent = "Cleaner " + id + ": "
-  
-  /* cleaning stats - one instance for the current (or next) cleaning cycle 
and one for the last completed cycle */
-  val statsUnderlying = (new CleanerStats(time), new CleanerStats(time))
-  def stats = statsUnderlying._1
 
   /* buffer used for read i/o */
   private var readBuffer = ByteBuffer.allocate(ioBufferSize)
@@ -332,17 +328,18 @@ private[log] class Cleaner(val id: Int,
    *
    * @param cleanable The log to be cleaned
    *
-   * @return The first offset not cleaned
+   * @return The first offset not cleaned and the statistics for this round of 
cleaning
    */
-  private[log] def clean(cleanable: LogToClean): Long = {
-    stats.clear()
+  private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
+    val stats = new CleanerStats()
+
     info("Beginning cleaning of log %s.".format(cleanable.log.name))
     val log = cleanable.log
 
     // build the offset map
     info("Building offset map for %s...".format(cleanable.log.name))
     val upperBoundOffset = cleanable.firstUncleanableOffset
-    buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, 
offsetMap)
+    buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, 
offsetMap, stats)
     val endOffset = offsetMap.latestOffset + 1
     stats.indexDone()
     
@@ -361,14 +358,14 @@ private[log] class Cleaner(val id: Int,
     // group the segments and clean the groups
     info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior 
to %s)...".format(log.name, new Date(cleanableHorizionMs), new 
Date(deleteHorizonMs)))
     for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), 
log.config.segmentSize, log.config.maxIndexSize))
-      cleanSegments(log, group, offsetMap, deleteHorizonMs)
+      cleanSegments(log, group, offsetMap, deleteHorizonMs, stats)
 
     // record buffer utilization
     stats.bufferUtilization = offsetMap.utilization
     
     stats.allDone()
 
-    endOffset
+    (endOffset, stats)
   }
 
   /**
@@ -378,11 +375,13 @@ private[log] class Cleaner(val id: Int,
    * @param segments The group of segments being cleaned
    * @param map The offset map to use for cleaning segments
    * @param deleteHorizonMs The time to retain delete tombstones
+   * @param stats Collector for cleaning statistics
    */
   private[log] def cleanSegments(log: Log,
                                  segments: Seq[LogSegment], 
                                  map: OffsetMap, 
-                                 deleteHorizonMs: Long) {
+                                 deleteHorizonMs: Long,
+                                 stats: CleanerStats) {
     // create a new segment with the suffix .cleaned appended to both the log 
and index name
     val logFile = new File(segments.head.log.file.getPath + 
Log.CleanedFileSuffix)
     logFile.delete()
@@ -401,7 +400,7 @@ private[log] class Cleaner(val id: Int,
         val retainDeletes = old.largestTimestamp > deleteHorizonMs
         info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s 
deletes."
             .format(old.baseOffset, log.name, new Date(old.largestTimestamp), 
cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
-        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, 
log.config.maxMessageSize)
+        cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, 
log.config.maxMessageSize, stats)
       }
 
       // trim excess index
@@ -440,13 +439,15 @@ private[log] class Cleaner(val id: Int,
    * @param map The key=>offset mapping
    * @param retainDeletes Should delete tombstones be retained while cleaning 
this segment
    * @param maxLogMessageSize The maximum message size of the corresponding 
topic
+   * @param stats Collector for cleaning statistics
    */
   private[log] def cleanInto(topicAndPartition: TopicAndPartition,
                              source: LogSegment,
                              dest: LogSegment,
                              map: OffsetMap,
                              retainDeletes: Boolean,
-                             maxLogMessageSize: Int) {
+                             maxLogMessageSize: Int,
+                             stats: CleanerStats) {
     var position = 0
     while (position < source.log.sizeInBytes) {
       checkDone(topicAndPartition)
@@ -466,7 +467,7 @@ private[log] class Cleaner(val id: Int,
 
         stats.readMessage(size)
         if (shallowMessage.compressionCodec == NoCompressionCodec) {
-          if (shouldRetainMessage(source, map, retainDeletes, 
shallowMessageAndOffset)) {
+          if (shouldRetainMessage(source, map, retainDeletes, 
shallowMessageAndOffset, stats)) {
             ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, 
shallowOffset)
             stats.recopyMessage(size)
             if (shallowMessage.timestamp > maxTimestamp) {
@@ -487,7 +488,7 @@ private[log] class Cleaner(val id: Int,
 
           for (deepMessageAndOffset <- 
ByteBufferMessageSet.deepIterator(shallowMessageAndOffset)) {
             messagesRead += 1
-            if (shouldRetainMessage(source, map, retainDeletes, 
deepMessageAndOffset)) {
+            if (shouldRetainMessage(source, map, retainDeletes, 
deepMessageAndOffset, stats)) {
               // Check for log corruption due to KAFKA-4298. If we find it, 
make sure that we overwrite
               // the corrupted entry with correct data.
               if (shallowMagic != deepMessageAndOffset.message.magic)
@@ -505,8 +506,10 @@ private[log] class Cleaner(val id: Int,
           // There are no messages compacted out and no message format 
conversion, write the original message set back
           if (writeOriginalMessageSet)
             ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, 
shallowOffset)
-          else
-            compressMessages(writeBuffer, shallowMessage.compressionCodec, 
retainedMessages)
+          else if (retainedMessages.nonEmpty) {
+            val retainedSize = compressMessages(writeBuffer, 
shallowMessage.compressionCodec, retainedMessages)
+            stats.recopyMessage(retainedSize)
+          }
         }
       }
 
@@ -529,9 +532,12 @@ private[log] class Cleaner(val id: Int,
 
   private def compressMessages(buffer: ByteBuffer,
                                compressionCodec: CompressionCodec,
-                               messageAndOffsets: Seq[MessageAndOffset]) {
+                               messageAndOffsets: Seq[MessageAndOffset]): Int 
= {
     require(compressionCodec != NoCompressionCodec, s"compressionCodec must 
not be $NoCompressionCodec")
-    if (messageAndOffsets.nonEmpty) {
+
+    if (messageAndOffsets.isEmpty) {
+      0
+    } else {
       val messages = messageAndOffsets.map(_.message)
       val magicAndTimestamp = MessageSet.magicAndLargestTimestamp(messages)
 
@@ -565,14 +571,15 @@ private[log] class Cleaner(val id: Int,
         }
       }
       ByteBufferMessageSet.writeMessage(buffer, messageWriter, offset)
-      stats.recopyMessage(messageWriter.size + MessageSet.LogOverhead)
+      messageWriter.size + MessageSet.LogOverhead
     }
   }
 
   private def shouldRetainMessage(source: kafka.log.LogSegment,
                                   map: kafka.log.OffsetMap,
                                   retainDeletes: Boolean,
-                                  entry: kafka.message.MessageAndOffset): 
Boolean = {
+                                  entry: kafka.message.MessageAndOffset,
+                                  stats: CleanerStats): Boolean = {
     val pastLatestOffset = entry.offset > map.latestOffset
     if (pastLatestOffset)
       return true
@@ -658,8 +665,13 @@ private[log] class Cleaner(val id: Int,
    * @param start The offset at which dirty messages begin
    * @param end The ending offset for the map that is being built
    * @param map The map in which to store the mappings
+   * @param stats Collector for cleaning statistics
    */
-  private[log] def buildOffsetMap(log: Log, start: Long, end: Long, map: 
OffsetMap) {
+  private[log] def buildOffsetMap(log: Log,
+                                  start: Long,
+                                  end: Long,
+                                  map: OffsetMap,
+                                  stats: CleanerStats) {
     map.clear()
     val dirty = log.logSegments(start, end).toBuffer
     info("Building offset map for log %s for %d segments in offset range [%d, 
%d).".format(log.name, dirty.size, start, end))
@@ -670,7 +682,7 @@ private[log] class Cleaner(val id: Int,
     for (segment <- dirty if !full) {
       checkDone(log.topicAndPartition)
 
-      full = buildOffsetMapForSegment(log.topicAndPartition, segment, map, 
start, log.config.maxMessageSize)
+      full = buildOffsetMapForSegment(log.topicAndPartition, segment, map, 
start, log.config.maxMessageSize, stats)
       if (full)
         debug("Offset map is full, %d segments fully mapped, segment with base 
offset %d is partially mapped".format(dirty.indexOf(segment), 
segment.baseOffset))
     }
@@ -682,10 +694,16 @@ private[log] class Cleaner(val id: Int,
    *
    * @param segment The segment to index
    * @param map The map in which to store the key=>offset mapping
+   * @param stats Collector for cleaning statistics
    *
    * @return If the map was filled whilst loading from this segment
    */
-  private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, 
segment: LogSegment, map: OffsetMap, start: Long, maxLogMessageSize: Int): 
Boolean = {
+  private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition,
+                                       segment: LogSegment,
+                                       map: OffsetMap,
+                                       start: Long,
+                                       maxLogMessageSize: Int,
+                                       stats: CleanerStats): Boolean = {
     var position = segment.index.lookup(start).position
     val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
     while (position < segment.log.sizeInBytes) {
@@ -719,12 +737,19 @@ private[log] class Cleaner(val id: Int,
 /**
  * A simple struct for collecting stats about log cleaning
  */
-private case class CleanerStats(time: Time = SystemTime) {
-  var startTime, mapCompleteTime, endTime, bytesRead, bytesWritten, 
mapBytesRead, mapMessagesRead, messagesRead,
-      messagesWritten, invalidMessagesRead = 0L
+private class CleanerStats(time: Time = SystemTime) {
+  val startTime = time.milliseconds
+  var mapCompleteTime = -1L
+  var endTime = -1L
+  var bytesRead = 0L
+  var bytesWritten = 0L
+  var mapBytesRead = 0L
+  var mapMessagesRead = 0L
+  var messagesRead = 0L
+  var invalidMessagesRead = 0L
+  var messagesWritten = 0L
   var bufferUtilization = 0.0d
-  clear()
-  
+
   def readMessage(size: Int) {
     messagesRead += 1
     bytesRead += size
@@ -759,19 +784,6 @@ private case class CleanerStats(time: Time = SystemTime) {
   
   def elapsedIndexSecs = (mapCompleteTime - startTime)/1000.0
   
-  def clear() {
-    startTime = time.milliseconds
-    mapCompleteTime = -1L
-    endTime = -1L
-    bytesRead = 0L
-    bytesWritten = 0L
-    mapBytesRead = 0L
-    mapMessagesRead = 0L
-    messagesRead = 0L
-    invalidMessagesRead = 0L
-    messagesWritten = 0L
-    bufferUtilization = 0.0d
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/302a246c/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
deleted file mode 100755
index 536f10d..0000000
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ /dev/null
@@ -1,801 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.io.{DataOutputStream, File}
-import java.nio._
-import java.nio.file.Paths
-import java.util.Properties
-
-import kafka.common._
-import kafka.message._
-import kafka.utils._
-import org.apache.kafka.common.record.{MemoryRecords, TimestampType}
-import org.apache.kafka.common.utils.Utils
-import org.junit.Assert._
-import org.junit.{After, Test}
-import org.scalatest.junit.JUnitSuite
-
-import scala.collection._
-
-/**
- * Unit tests for the log cleaning logic
- */
-class CleanerTest extends JUnitSuite {
-  
-  val tmpdir = TestUtils.tempDir()
-  val dir = TestUtils.randomPartitionLogDir(tmpdir)
-  val logProps = new Properties()
-  logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-  logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
-  logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-  val logConfig = LogConfig(logProps)
-  val time = new MockTime()
-  val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, 
checkIntervalMs = Long.MaxValue, time = time)
-  
-  @After
-  def teardown(): Unit = {
-    Utils.delete(tmpdir)
-  }
-  
-  /**
-   * Test simple log cleaning
-   */
-  @Test
-  def testCleanSegments(): Unit = {
-    val cleaner = makeCleaner(Int.MaxValue)
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-
-    // append messages to the log until we have four segments
-    while(log.numberOfSegments < 4)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-    val keysFound = keysInLog(log)
-    assertEquals(0L until log.logEndOffset, keysFound)
-
-    // pretend we have the following keys
-    val keys = immutable.ListSet(1, 3, 5, 7, 9)
-    val map = new FakeOffsetMap(Int.MaxValue)
-    keys.foreach(k => map.put(key(k), Long.MaxValue))
-
-    // clean the log
-    cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L)
-    val shouldRemain = keysInLog(log).filter(!keys.contains(_))
-    assertEquals(shouldRemain, keysInLog(log))
-  }
-
-  /**
-   * Test log cleaning with logs containing messages larger than default 
message size
-   */
-  @Test
-  def testLargeMessage() {
-    val largeMessageSize = 1024 * 1024
-    // Create cleaner with very small default max message size
-    val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, largeMessageSize * 16: 
java.lang.Integer)
-    logProps.put(LogConfig.MaxMessageBytesProp, largeMessageSize * 2: 
java.lang.Integer)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-
-    while(log.numberOfSegments < 2)
-      log.append(message(log.logEndOffset.toInt, 
Array.fill(largeMessageSize)(0: Byte)))
-    val keysFound = keysInLog(log)
-    assertEquals(0L until log.logEndOffset, keysFound)
-
-    // pretend we have the following keys
-    val keys = immutable.ListSet(1, 3, 5, 7, 9)
-    val map = new FakeOffsetMap(Int.MaxValue)
-    keys.foreach(k => map.put(key(k), Long.MaxValue))
-
-    // clean the log
-    cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L)
-    val shouldRemain = keysInLog(log).filter(!keys.contains(_))
-    assertEquals(shouldRemain, keysInLog(log))
-  }
-
-  @Test
-  def testCleaningWithDeletes(): Unit = {
-    val cleaner = makeCleaner(Int.MaxValue)
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-    
-    // append messages with the keys 0 through N
-    while(log.numberOfSegments < 2)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-      
-    // delete all even keys between 0 and N
-    val leo = log.logEndOffset
-    for(key <- 0 until leo.toInt by 2)
-      log.append(deleteMessage(key))
-      
-    // append some new unique keys to pad out to a new active segment
-    while(log.numberOfSegments < 4)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-      
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, 
log.activeSegment.baseOffset))
-    val keys = keysInLog(log).toSet
-    assertTrue("None of the keys we deleted should still exist.", 
-               (0 until leo.toInt by 2).forall(!keys.contains(_)))
-  }
-
-  @Test
-  def testPartialSegmentClean(): Unit = {
-    // because loadFactor is 0.75, this means we can fit 2 messages in the map
-    val cleaner = makeCleaner(2)
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-
-    log.append(message(0,0)) // offset 0
-    log.append(message(1,1)) // offset 1
-    log.append(message(0,0)) // offset 2
-    log.append(message(1,1)) // offset 3
-    log.append(message(0,0)) // offset 4
-    // roll the segment, so we can clean the messages already appended
-    log.roll()
-
-    // clean the log with only one message removed
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, 
log.activeSegment.baseOffset))
-    assertEquals(immutable.List(1,0,1,0), keysInLog(log))
-    assertEquals(immutable.List(1,2,3,4), offsetsInLog(log))
-
-    // continue to make progress, even though we can only clean one message at 
a time
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 3, 
log.activeSegment.baseOffset))
-    assertEquals(immutable.List(0,1,0), keysInLog(log))
-    assertEquals(immutable.List(2,3,4), offsetsInLog(log))
-
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 4, 
log.activeSegment.baseOffset))
-    assertEquals(immutable.List(1,0), keysInLog(log))
-    assertEquals(immutable.List(3,4), offsetsInLog(log))
-  }
-
-  @Test
-  def testCleaningWithUncleanableSection(): Unit = {
-    val cleaner = makeCleaner(Int.MaxValue)
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-
-    // Number of distinct keys. For an effective test this should be small 
enough such that each log segment contains some duplicates.
-    val N = 10
-    val numCleanableSegments = 2
-    val numTotalSegments = 7
-
-    // append messages with the keys 0 through N-1, values equal offset
-    while(log.numberOfSegments <= numCleanableSegments)
-      log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
-
-    // at this point one message past the cleanable segments has been added
-    // the entire segment containing the first uncleanable offset should not 
be cleaned.
-    val firstUncleanableOffset = log.logEndOffset + 1  // +1  so it is past 
the baseOffset
-
-    while(log.numberOfSegments < numTotalSegments - 1)
-      log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
-
-    // the last (active) segment has just one message
-
-    def distinctValuesBySegment = log.logSegments.map(s => s.log.map(m => 
TestUtils.readString(m.message.payload)).toSet.size).toSeq
-
-    val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
-    assertTrue("Test is not effective unless each segment contains duplicates. 
Increase segment size or decrease number of keys.",
-      distinctValuesBySegment.reverse.tail.forall(_ > N))
-
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, 
firstUncleanableOffset))
-
-    val distinctValuesBySegmentAfterClean = distinctValuesBySegment
-
-    assertTrue("The cleanable segments should have fewer number of values 
after cleaning",
-      
disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean).take(numCleanableSegments).forall
 { case (before, after) => after < before })
-    assertTrue("The uncleanable segments should have the same number of values 
after cleaning", 
disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
-      .slice(numCleanableSegments, numTotalSegments).forall { x => x._1 == 
x._2 })
-  }
-
-  @Test
-  def testLogToClean(): Unit = {
-    // create a log with small segment size
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-
-    // create 6 segments with only one message in each segment
-    val messageSet = TestUtils.singleMessageSet(payload = 
Array.fill[Byte](50)(0), key = 1.toString.getBytes)
-    for (i <- 0 until 6)
-      log.append(messageSet, assignOffsets = true)
-
-    val logToClean = LogToClean(TopicAndPartition("test", 0), log, 
log.activeSegment.baseOffset, log.activeSegment.baseOffset)
-
-    assertEquals("Total bytes of LogToClean should equal size of all segments 
excluding the active segment",
-      logToClean.totalBytes, log.size - log.activeSegment.size)
-  }
-
-  @Test
-  def testLogToCleanWithUncleanableSection(): Unit = {
-    // create a log with small segment size
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-
-    // create 6 segments with only one message in each segment
-    val messageSet = TestUtils.singleMessageSet(payload = 
Array.fill[Byte](50)(0), key = 1.toString.getBytes)
-    for (i <- 0 until 6)
-      log.append(messageSet, assignOffsets = true)
-
-    // segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] 
are uncleanable
-    val segs = log.logSegments.toSeq
-    val logToClean = LogToClean(TopicAndPartition("test", 0), log, 
segs(2).baseOffset, segs(4).baseOffset)
-
-    val expectedCleanSize = segs.take(2).map(_.size).sum
-    val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum
-    val expectedUncleanableSize = segs.drop(4).map(_.size).sum
-
-    assertEquals("Uncleanable bytes of LogToClean should equal size of all 
segments prior the one containing first dirty",
-      logToClean.cleanBytes, expectedCleanSize)
-    assertEquals("Cleanable bytes of LogToClean should equal size of all 
segments from the one containing first dirty offset" +
-      " to the segment prior to the one with the first uncleanable offset",
-      logToClean.cleanableBytes, expectedCleanableSize)
-    assertEquals("Total bytes should be the sum of the clean and cleanable 
segments", logToClean.totalBytes, expectedCleanSize + expectedCleanableSize)
-    assertEquals("Total cleanable ratio should be the ratio of cleanable size 
to clean plus cleanable", logToClean.cleanableRatio,
-      expectedCleanableSize / (expectedCleanSize + 
expectedCleanableSize).toDouble, 1.0e-6d)
-  }
-
-  @Test
-  def testCleaningWithUnkeyedMessages(): Unit = {
-    val cleaner = makeCleaner(Int.MaxValue)
-
-    // create a log with compaction turned off so we can append unkeyed 
messages
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Delete)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-
-    // append unkeyed messages
-    while(log.numberOfSegments < 2)
-      log.append(unkeyedMessage(log.logEndOffset.toInt))
-    val numInvalidMessages = unkeyedMessageCountInLog(log)
-
-    val sizeWithUnkeyedMessages = log.size
-
-    // append keyed messages
-    while(log.numberOfSegments < 3)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-
-    val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, 
log.activeSegment.baseOffset))
-
-    assertEquals("Log should only contain keyed messages after cleaning.", 0, 
unkeyedMessageCountInLog(log))
-    assertEquals("Log should only contain keyed messages after cleaning.", 
expectedSizeAfterCleaning, log.size)
-    assertEquals("Cleaner should have seen %d invalid messages.", 
numInvalidMessages, cleaner.stats.invalidMessagesRead)
-  }
-  
-  /* extract all the keys from a log */
-  def keysInLog(log: Log): Iterable[Int] =
-    log.logSegments.flatMap(s => 
s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => 
TestUtils.readString(m.message.key).toInt))
-
-  /* extract all the offsets from a log */
-  def offsetsInLog(log: Log): Iterable[Long] =
-    log.logSegments.flatMap(s => 
s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => m.offset))
-
-  def unkeyedMessageCountInLog(log: Log) =
-    log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => 
!m.message.hasKey)).sum
-
-  def abortCheckDone(topicAndPartition: TopicAndPartition): Unit = {
-    throw new LogCleaningAbortedException()
-  }
-
-  /**
-   * Test that abortion during cleaning throws a LogCleaningAbortedException
-   */
-  @Test
-  def testCleanSegmentsWithAbort(): Unit = {
-    val cleaner = makeCleaner(Int.MaxValue, abortCheckDone)
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-
-    // append messages to the log until we have four segments
-    while(log.numberOfSegments < 4)
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-
-    val keys = keysInLog(log)
-    val map = new FakeOffsetMap(Int.MaxValue)
-    keys.foreach(k => map.put(key(k), Long.MaxValue))
-    intercept[LogCleaningAbortedException] {
-      cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L)
-    }
-  }
-
-  /**
-   * Validate the logic for grouping log segments together for cleaning
-   */
-  @Test
-  def testSegmentGrouping(): Unit = {
-    val cleaner = makeCleaner(Int.MaxValue)
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-    
-    // append some messages to the log
-    var i = 0
-    while(log.numberOfSegments < 10) {
-      log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = 
"hello".getBytes))
-      i += 1
-    }
-    
-    // grouping by very large values should result in a single group with all 
the segments in it
-    var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = Int.MaxValue)
-    assertEquals(1, groups.size)
-    assertEquals(log.numberOfSegments, groups.head.size)
-    checkSegmentOrder(groups)
-    
-    // grouping by very small values should result in all groups having one 
entry
-    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, 
maxIndexSize = Int.MaxValue)
-    assertEquals(log.numberOfSegments, groups.size)
-    assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
-    checkSegmentOrder(groups)
-    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = 1)
-    assertEquals(log.numberOfSegments, groups.size)
-    assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
-    checkSegmentOrder(groups)
-
-    val groupSize = 3
-    
-    // check grouping by log size
-    val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1
-    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, 
maxIndexSize = Int.MaxValue)
-    checkSegmentOrder(groups)
-    assertTrue("All but the last group should be the target size.", 
groups.dropRight(1).forall(_.size == groupSize))
-    
-    // check grouping by index size
-    val indexSize = 
log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1
-    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = indexSize)
-    checkSegmentOrder(groups)
-    assertTrue("All but the last group should be the target size.", 
groups.dropRight(1).forall(_.size == groupSize))
-  }
-  
-  /**
-   * Validate the logic for grouping log segments together for cleaning when 
only a small number of
-   * messages are retained, but the range of offsets is greater than 
Int.MaxValue. A group should not
-   * contain a range of offsets greater than Int.MaxValue to ensure that 
relative offsets can be
-   * stored in 4 bytes.
-   */
-  @Test
-  def testSegmentGroupingWithSparseOffsets(): Unit = {
-    val cleaner = makeCleaner(Int.MaxValue)
-
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
-
-    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
-       
-    // fill up first segment
-    while (log.numberOfSegments == 1)
-      log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = 
"hello".getBytes))
-    
-    // forward offset and append message to next segment at offset Int.MaxValue
-    val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new 
LongRef(Int.MaxValue - 1),
-      new Message("hello".getBytes, "hello".getBytes, Message.NoTimestamp, 
Message.MagicValue_V1))
-    log.append(messageSet, assignOffsets = false)
-    log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = 
"hello".getBytes))
-    assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
-    
-    // grouping should result in a single group with maximum relative offset 
of Int.MaxValue
-    var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = Int.MaxValue)
-    assertEquals(1, groups.size)
-    
-    // append another message, making last offset of second segment > 
Int.MaxValue
-    log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = 
"hello".getBytes))
-    
-    // grouping should not group the two segments to ensure that maximum 
relative offset in each group <= Int.MaxValue
-    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = Int.MaxValue)
-    assertEquals(2, groups.size)
-    checkSegmentOrder(groups)
-    
-    // append more messages, creating new segments, further grouping should 
still occur
-    while (log.numberOfSegments < 4)
-      log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = 
"hello".getBytes))
-
-    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = Int.MaxValue)
-    assertEquals(log.numberOfSegments - 1, groups.size)
-    for (group <- groups)
-      assertTrue("Relative offset greater than Int.MaxValue", 
group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue)
-    checkSegmentOrder(groups)
-    
-  }
-  
-  private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]): Unit = {
-    val offsets = groups.flatMap(_.map(_.baseOffset))
-    assertEquals("Offsets should be in increasing order.", offsets.sorted, 
offsets)
-  }
-  
-  /**
-   * Test building an offset map off the log
-   */
-  @Test
-  def testBuildOffsetMap(): Unit = {
-    val map = new FakeOffsetMap(1000)
-    val log = makeLog()
-    val cleaner = makeCleaner(Int.MaxValue)
-    val start = 0
-    val end = 500
-    val offsets = writeToLog(log, (start until end) zip (start until end))
-    def checkRange(map: FakeOffsetMap, start: Int, end: Int) {
-      cleaner.buildOffsetMap(log, start, end, map)
-      val endOffset = map.latestOffset + 1
-      assertEquals("Last offset should be the end offset.", end, endOffset)
-      assertEquals("Should have the expected number of messages in the map.", 
end-start, map.size)
-      for(i <- start until end)
-        assertEquals("Should find all the keys", i.toLong, map.get(key(i)))
-      assertEquals("Should not find a value too small", -1L, map.get(key(start 
- 1)))
-      assertEquals("Should not find a value too large", -1L, map.get(key(end)))
-    }
-    val segments = log.logSegments.toSeq
-    checkRange(map, 0, segments(1).baseOffset.toInt)
-    checkRange(map, segments(1).baseOffset.toInt, segments(3).baseOffset.toInt)
-    checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt)
-  }
-  
-  
-  /**
-   * Tests recovery if broker crashes at the following stages during the 
cleaning sequence
-   * <ol>
-   *   <li> Cleaner has created .cleaned log containing multiple segments, 
swap sequence not yet started
-   *   <li> .cleaned log renamed to .swap, old segment files not yet renamed 
to .deleted
-   *   <li> .cleaned log renamed to .swap, old segment files renamed to 
.deleted, but not yet deleted
-   *   <li> .swap suffix removed, completing the swap, but async delete of 
.deleted files not yet complete
-   * </ol>
-   */
-  @Test
-  def testRecoveryAfterCrash(): Unit = {
-    val cleaner = makeCleaner(Int.MaxValue)
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
-    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
-    logProps.put(LogConfig.FileDeleteDelayMsProp, 10: java.lang.Integer)
-
-    val config = LogConfig.fromProps(logConfig.originals, logProps)
-      
-    def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log 
= {   
-      // Recover log file and check that after recovery, keys are as expected
-      // and all temporary files have been deleted
-      val recoveredLog = makeLog(config = config)
-      time.sleep(config.fileDeleteDelayMs + 1)
-      for (file <- dir.listFiles) {
-        assertFalse("Unexpected .deleted file after recovery", 
file.getName.endsWith(Log.DeletedFileSuffix))
-        assertFalse("Unexpected .cleaned file after recovery", 
file.getName.endsWith(Log.CleanedFileSuffix))
-        assertFalse("Unexpected .swap file after recovery", 
file.getName.endsWith(Log.SwapFileSuffix))
-      }
-      assertEquals(expectedKeys, keysInLog(recoveredLog))
-      recoveredLog
-    }
-    
-    // create a log and append some messages
-    var log = makeLog(config = config)
-    var messageCount = 0
-    while(log.numberOfSegments < 10) {
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-      messageCount += 1
-    }
-    val allKeys = keysInLog(log)
-    
-    // pretend we have odd-numbered keys
-    val offsetMap = new FakeOffsetMap(Int.MaxValue)
-    for (k <- 1 until messageCount by 2)
-      offsetMap.put(key(k), Long.MaxValue)
-     
-    // clean the log
-    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L)
-    var cleanedKeys = keysInLog(log)
-    
-    // 1) Simulate recovery just after .cleaned file is created, before rename 
to .swap
-    //    On recovery, clean operation is aborted. All messages should be 
present in the log
-    log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix)
-    for (file <- dir.listFiles if 
file.getName.endsWith(Log.DeletedFileSuffix)) {
-      Utils.atomicMoveWithFallback(file.toPath, 
Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
-    }
-    log = recoverAndCheck(config, allKeys)
-    
-    // clean again
-    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L)
-    cleanedKeys = keysInLog(log)
-    
-    // 2) Simulate recovery just after swap file is created, before old 
segment files are
-    //    renamed to .deleted. Clean operation is resumed during recovery. 
-    log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
-    for (file <- dir.listFiles if 
file.getName.endsWith(Log.DeletedFileSuffix)) {
-      Utils.atomicMoveWithFallback(file.toPath, 
Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
-    }   
-    log = recoverAndCheck(config, cleanedKeys)
-
-    // add some more messages and clean the log again
-    while(log.numberOfSegments < 10) {
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-      messageCount += 1
-    }
-    for (k <- 1 until messageCount by 2)
-      offsetMap.put(key(k), Long.MaxValue)    
-    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L)
-    cleanedKeys = keysInLog(log)
-    
-    // 3) Simulate recovery after swap file is created and old segments files 
are renamed
-    //    to .deleted. Clean operation is resumed during recovery.
-    log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
-    log = recoverAndCheck(config, cleanedKeys)
-    
-    // add some more messages and clean the log again
-    while(log.numberOfSegments < 10) {
-      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
-      messageCount += 1
-    }
-    for (k <- 1 until messageCount by 2)
-      offsetMap.put(key(k), Long.MaxValue)    
-    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L)
-    cleanedKeys = keysInLog(log)
-    
-    // 4) Simulate recovery after swap is complete, but async deletion
-    //    is not yet complete. Clean operation is resumed during recovery.
-    recoverAndCheck(config, cleanedKeys)
-    
-  }
-
-  @Test
-  def testBuildOffsetMapFakeLarge(): Unit = {
-    val map = new FakeOffsetMap(1000)
-    val logProps = new Properties()
-    logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
-    logProps.put(LogConfig.SegmentIndexBytesProp, 72: java.lang.Integer)
-    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-    val logConfig = LogConfig(logProps)
-    val log = makeLog(config = logConfig)
-    val cleaner = makeCleaner(Int.MaxValue)
-    val start = 0
-    val end = 2
-    val offsetSeq = Seq(0L, 7206178L)
-    val offsets = writeToLog(log, (start until end) zip (start until end), 
offsetSeq)
-    cleaner.buildOffsetMap(log, start, end, map)
-    val endOffset = map.latestOffset
-    assertEquals("Last offset should be the end offset.", 7206178L, endOffset)
-    assertEquals("Should have the expected number of messages in the map.", 
end - start, map.size)
-    assertEquals("Map should contain first value", 0L, map.get(key(0)))
-    assertEquals("Map should contain second value", 7206178L, map.get(key(1)))
-  }
-
-  /**
-   * Test building a partial offset map of part of a log segment
-   */
-  @Test
-  def testBuildPartialOffsetMap(): Unit = {
-    // because loadFactor is 0.75, this means we can fit 2 messages in the map
-    val map = new FakeOffsetMap(3)
-    val log = makeLog()
-    val cleaner = makeCleaner(2)
-
-    log.append(message(0,0))
-    log.append(message(1,1))
-    log.append(message(2,2))
-    log.append(message(3,3))
-    log.append(message(4,4))
-    log.roll()
-
-    cleaner.buildOffsetMap(log, 2, Int.MaxValue, map)
-    assertEquals(2, map.size)
-    assertEquals(-1, map.get(key(0)))
-    assertEquals(2, map.get(key(2)))
-    assertEquals(3, map.get(key(3)))
-    assertEquals(-1, map.get(key(4)))
-  }
-
-  /**
-   * This test verifies that messages corrupted by KAFKA-4298 are fixed by the 
cleaner
-   */
-  @Test
-  def testCleanCorruptMessageSet() {
-    val codec = SnappyCompressionCodec
-
-    val logProps = new Properties()
-    logProps.put(LogConfig.CompressionTypeProp, codec.name)
-    val logConfig = LogConfig(logProps)
-
-    val log = makeLog(config = logConfig)
-    val cleaner = makeCleaner(10)
-
-    // messages are constructed so that the payload matches the expecting 
offset to
-    // make offset validation easier after cleaning
-
-    // one compressed log entry with duplicates
-    val dupSetKeys = (0 until 2) ++ (0 until 2)
-    val dupSetOffset = 25
-    val dupSet = dupSetKeys zip (dupSetOffset until dupSetOffset + 
dupSetKeys.size)
-
-    // and one without (should still be fixed by the cleaner)
-    val noDupSetKeys = 3 until 5
-    val noDupSetOffset = 50
-    val noDupSet = noDupSetKeys zip (noDupSetOffset until noDupSetOffset + 
noDupSetKeys.size)
-
-    log.append(invalidCleanedMessage(dupSetOffset, dupSet, codec), 
assignOffsets = false)
-    log.append(invalidCleanedMessage(noDupSetOffset, noDupSet, codec), 
assignOffsets = false)
-
-    log.roll()
-
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, 
log.activeSegment.baseOffset))
-
-    for (segment <- log.logSegments; shallowMessage <- segment.log.iterator; 
deepMessage <- ByteBufferMessageSet.deepIterator(shallowMessage)) {
-      assertEquals(shallowMessage.message.magic, deepMessage.message.magic)
-      val value = TestUtils.readString(deepMessage.message.payload).toLong
-      assertEquals(deepMessage.offset, value)
-    }
-  }
-
-  /**
-   * Verify that the client can handle corrupted messages. Located here for 
now since the client
-   * does not support writing messages with the old magic.
-   */
-  @Test
-  def testClientHandlingOfCorruptMessageSet(): Unit = {
-    import JavaConverters._
-
-    val keys = 1 until 10
-    val offset = 50
-    val set = keys zip (offset until offset + keys.size)
-
-    val corruptedMessage = invalidCleanedMessage(offset, set)
-    val records = MemoryRecords.readableRecords(corruptedMessage.buffer)
-
-    for (logEntry <- records.iterator.asScala) {
-      val offset = logEntry.offset
-      val value = TestUtils.readString(logEntry.record.value).toLong
-      assertEquals(offset, value)
-    }
-  }
-
-  private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], 
offsetSeq: Iterable[Long]): Iterable[Long] = {
-    for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
-      yield log.append(messageWithOffset(key, value, offset), assignOffsets = 
false).firstOffset
-  }
-
-  private def invalidCleanedMessage(initialOffset: Long,
-                                    keysAndValues: Iterable[(Int, Int)],
-                                    codec: CompressionCodec = 
SnappyCompressionCodec): ByteBufferMessageSet = {
-    // this function replicates the old versions of the cleaner which under 
some circumstances
-    // would write invalid compressed message sets with the outer magic set to 
1 and the inner
-    // magic set to 0
-
-    val messages = keysAndValues.map(kv =>
-      new Message(key = kv._1.toString.getBytes,
-        bytes = kv._2.toString.getBytes,
-        timestamp = Message.NoTimestamp,
-        magicValue = Message.MagicValue_V0))
-
-    val messageWriter = new 
MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 
1 << 16))
-    var lastOffset = initialOffset
-
-    messageWriter.write(
-      codec = codec,
-      timestamp = Message.NoTimestamp,
-      timestampType = TimestampType.CREATE_TIME,
-      magicValue = Message.MagicValue_V1) { outputStream =>
-
-      val output = new DataOutputStream(CompressionFactory(codec, 
Message.MagicValue_V1, outputStream))
-      try {
-        for (message <- messages) {
-          val innerOffset = lastOffset - initialOffset
-          output.writeLong(innerOffset)
-          output.writeInt(message.size)
-          output.write(message.buffer.array, message.buffer.arrayOffset, 
message.buffer.limit)
-          lastOffset += 1
-        }
-      } finally {
-        output.close()
-      }
-    }
-    val buffer = ByteBuffer.allocate(messageWriter.size + 
MessageSet.LogOverhead)
-    ByteBufferMessageSet.writeMessage(buffer, messageWriter, lastOffset - 1)
-    buffer.rewind()
-
-    new ByteBufferMessageSet(buffer)
-  }
-
-  private def messageWithOffset(key: Int, value: Int, offset: Long) =
-    new ByteBufferMessageSet(NoCompressionCodec, Seq(offset),
-                             new Message(key = key.toString.getBytes,
-                                         bytes = value.toString.getBytes,
-                                         timestamp = Message.NoTimestamp,
-                                         magicValue = Message.MagicValue_V1))
-  
-  
-  def makeLog(dir: File = dir, config: LogConfig = logConfig) =
-    new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = 
time.scheduler, time = time)
-
-  def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */  }
-
-  def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = 
noOpCheckDone, maxMessageSize: Int = 64*1024) =
-    new Cleaner(id = 0, 
-                offsetMap = new FakeOffsetMap(capacity), 
-                ioBufferSize = maxMessageSize,
-                maxIoBufferSize = maxMessageSize,
-                dupBufferLoadFactor = 0.75,
-                throttler = throttler, 
-                time = time,
-                checkDone = checkDone )
-  
-  def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
-    for((key, value) <- seq)
-      yield log.append(message(key, value)).firstOffset
-  }
-  
-  def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
-  
-  def message(key: Int, value: Int): ByteBufferMessageSet =
-    message(key, value.toString.getBytes)
-
-  def message(key: Int, value: Array[Byte]) =
-    new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
-                                         bytes = value,
-                                         timestamp = Message.NoTimestamp,
-                                         magicValue = Message.MagicValue_V1))
-
-  def unkeyedMessage(value: Int) =
-    new ByteBufferMessageSet(new Message(bytes = value.toString.getBytes))
-
-  def deleteMessage(key: Int) =
-    new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
-                                         bytes = null,
-                                         timestamp = Message.NoTimestamp,
-                                         magicValue = Message.MagicValue_V1))
-  
-}
-
-class FakeOffsetMap(val slots: Int) extends OffsetMap {
-  val map = new java.util.HashMap[String, Long]()
-  var lastOffset = -1L
-
-  private def keyFor(key: ByteBuffer) =
-    new String(Utils.readBytes(key.duplicate), "UTF-8")
-
-  def put(key: ByteBuffer, offset: Long): Unit = {
-    lastOffset = offset
-    map.put(keyFor(key), offset)
-  }
-  
-  def get(key: ByteBuffer): Long = {
-    val k = keyFor(key)
-    if(map.containsKey(k))
-      map.get(k)
-    else
-      -1L
-  }
-  
-  def clear(): Unit = map.clear()
-  
-  def size: Int = map.size
-
-  def latestOffset: Long = lastOffset
-
-  override def toString: String = map.toString()
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/302a246c/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
new file mode 100755
index 0000000..24a6366
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -0,0 +1,839 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.{DataOutputStream, File}
+import java.nio._
+import java.nio.file.Paths
+import java.util.Properties
+
+import kafka.common._
+import kafka.message._
+import kafka.utils._
+import org.apache.kafka.common.record.{MemoryRecords, TimestampType}
+import org.apache.kafka.common.utils.Utils
+import org.junit.Assert._
+import org.junit.{After, Test}
+import org.scalatest.junit.JUnitSuite
+
+import scala.collection._
+
+/**
+ * Unit tests for the log cleaning logic
+ */
+class LogCleanerTest extends JUnitSuite {
+  
+  val tmpdir = TestUtils.tempDir()
+  val dir = TestUtils.randomPartitionLogDir(tmpdir)
+  val logProps = new Properties()
+  logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+  logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
+  logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+  val logConfig = LogConfig(logProps)
+  val time = new MockTime()
+  val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, 
checkIntervalMs = Long.MaxValue, time = time)
+  
+  @After
+  def teardown(): Unit = {
+    Utils.delete(tmpdir)
+  }
+  
+  /**
+   * Test simple log cleaning
+   */
+  @Test
+  def testCleanSegments(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    // append messages to the log until we have four segments
+    while(log.numberOfSegments < 4)
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+    val keysFound = keysInLog(log)
+    assertEquals(0L until log.logEndOffset, keysFound)
+
+    // pretend we have the following keys
+    val keys = immutable.ListSet(1, 3, 5, 7, 9)
+    val map = new FakeOffsetMap(Int.MaxValue)
+    keys.foreach(k => map.put(key(k), Long.MaxValue))
+
+    // clean the log
+    val segments = log.logSegments.take(3).toSeq
+    val stats = new CleanerStats()
+    val expectedBytesRead = segments.map(_.size).sum
+    cleaner.cleanSegments(log, segments, map, 0L, stats)
+    val shouldRemain = keysInLog(log).filter(!keys.contains(_))
+    assertEquals(shouldRemain, keysInLog(log))
+    assertEquals(expectedBytesRead, stats.bytesRead)
+  }
+
+  /**
+   * Test log cleaning with logs containing messages larger than default 
message size
+   */
+  @Test
+  def testLargeMessage() {
+    val largeMessageSize = 1024 * 1024
+    // Create cleaner with very small default max message size
+    val cleaner = makeCleaner(Int.MaxValue, maxMessageSize=1024)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, largeMessageSize * 16: 
java.lang.Integer)
+    logProps.put(LogConfig.MaxMessageBytesProp, largeMessageSize * 2: 
java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    while(log.numberOfSegments < 2)
+      log.append(message(log.logEndOffset.toInt, 
Array.fill(largeMessageSize)(0: Byte)))
+    val keysFound = keysInLog(log)
+    assertEquals(0L until log.logEndOffset, keysFound)
+
+    // pretend we have the following keys
+    val keys = immutable.ListSet(1, 3, 5, 7, 9)
+    val map = new FakeOffsetMap(Int.MaxValue)
+    keys.foreach(k => map.put(key(k), Long.MaxValue))
+
+    // clean the log
+    val stats = new CleanerStats()
+    cleaner.cleanSegments(log, Seq(log.logSegments.head), map, 0L, stats)
+    val shouldRemain = keysInLog(log).filter(!keys.contains(_))
+    assertEquals(shouldRemain, keysInLog(log))
+  }
+
+  @Test
+  def testCleaningWithDeletes(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+    
+    // append messages with the keys 0 through N
+    while(log.numberOfSegments < 2)
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      
+    // delete all even keys between 0 and N
+    val leo = log.logEndOffset
+    for(key <- 0 until leo.toInt by 2)
+      log.append(deleteMessage(key))
+      
+    // append some new unique keys to pad out to a new active segment
+    while(log.numberOfSegments < 4)
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, 
log.activeSegment.baseOffset))
+    val keys = keysInLog(log).toSet
+    assertTrue("None of the keys we deleted should still exist.", 
+               (0 until leo.toInt by 2).forall(!keys.contains(_)))
+  }
+
+  def testLogCleanerStats(): Unit = {
+    // because loadFactor is 0.75, this means we can fit 2 messages in the map
+    val cleaner = makeCleaner(2)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    log.append(message(0,0)) // offset 0
+    log.append(message(1,1)) // offset 1
+    log.append(message(0,0)) // offset 2
+    log.append(message(1,1)) // offset 3
+    log.append(message(0,0)) // offset 4
+    // roll the segment, so we can clean the messages already appended
+    log.roll()
+
+    val initialLogSize = log.size
+
+    val (endOffset, stats) = 
cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, 
log.activeSegment.baseOffset))
+    assertEquals(5, endOffset)
+    assertEquals(5, stats.messagesRead)
+    assertEquals(initialLogSize, stats.bytesRead)
+    assertEquals(2, stats.messagesWritten)
+    assertEquals(log.size, stats.bytesWritten)
+    assertEquals(0, stats.invalidMessagesRead)
+    assertTrue(stats.endTime >= stats.startTime)
+  }
+
+  @Test
+  def testPartialSegmentClean(): Unit = {
+    // because loadFactor is 0.75, this means we can fit 2 messages in the map
+    val cleaner = makeCleaner(2)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    log.append(message(0,0)) // offset 0
+    log.append(message(1,1)) // offset 1
+    log.append(message(0,0)) // offset 2
+    log.append(message(1,1)) // offset 3
+    log.append(message(0,0)) // offset 4
+    // roll the segment, so we can clean the messages already appended
+    log.roll()
+
+    // clean the log with only one message removed
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, 
log.activeSegment.baseOffset))
+    assertEquals(immutable.List(1,0,1,0), keysInLog(log))
+    assertEquals(immutable.List(1,2,3,4), offsetsInLog(log))
+
+    // continue to make progress, even though we can only clean one message at 
a time
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 3, 
log.activeSegment.baseOffset))
+    assertEquals(immutable.List(0,1,0), keysInLog(log))
+    assertEquals(immutable.List(2,3,4), offsetsInLog(log))
+
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 4, 
log.activeSegment.baseOffset))
+    assertEquals(immutable.List(1,0), keysInLog(log))
+    assertEquals(immutable.List(3,4), offsetsInLog(log))
+  }
+
+  @Test
+  def testCleaningWithUncleanableSection(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    // Number of distinct keys. For an effective test this should be small 
enough such that each log segment contains some duplicates.
+    val N = 10
+    val numCleanableSegments = 2
+    val numTotalSegments = 7
+
+    // append messages with the keys 0 through N-1, values equal offset
+    while(log.numberOfSegments <= numCleanableSegments)
+      log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
+
+    // at this point one message past the cleanable segments has been added
+    // the entire segment containing the first uncleanable offset should not 
be cleaned.
+    val firstUncleanableOffset = log.logEndOffset + 1  // +1  so it is past 
the baseOffset
+
+    while(log.numberOfSegments < numTotalSegments - 1)
+      log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
+
+    // the last (active) segment has just one message
+
+    def distinctValuesBySegment = log.logSegments.map(s => s.log.map(m => 
TestUtils.readString(m.message.payload)).toSet.size).toSeq
+
+    val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
+    assertTrue("Test is not effective unless each segment contains duplicates. 
Increase segment size or decrease number of keys.",
+      distinctValuesBySegment.reverse.tail.forall(_ > N))
+
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, 
firstUncleanableOffset))
+
+    val distinctValuesBySegmentAfterClean = distinctValuesBySegment
+
+    assertTrue("The cleanable segments should have fewer number of values 
after cleaning",
+      
disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean).take(numCleanableSegments).forall
 { case (before, after) => after < before })
+    assertTrue("The uncleanable segments should have the same number of values 
after cleaning", 
disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
+      .slice(numCleanableSegments, numTotalSegments).forall { x => x._1 == 
x._2 })
+  }
+
+  @Test
+  def testLogToClean(): Unit = {
+    // create a log with small segment size
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    // create 6 segments with only one message in each segment
+    val messageSet = TestUtils.singleMessageSet(payload = 
Array.fill[Byte](50)(0), key = 1.toString.getBytes)
+    for (i <- 0 until 6)
+      log.append(messageSet, assignOffsets = true)
+
+    val logToClean = LogToClean(TopicAndPartition("test", 0), log, 
log.activeSegment.baseOffset, log.activeSegment.baseOffset)
+
+    assertEquals("Total bytes of LogToClean should equal size of all segments 
excluding the active segment",
+      logToClean.totalBytes, log.size - log.activeSegment.size)
+  }
+
+  @Test
+  def testLogToCleanWithUncleanableSection(): Unit = {
+    // create a log with small segment size
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    // create 6 segments with only one message in each segment
+    val messageSet = TestUtils.singleMessageSet(payload = 
Array.fill[Byte](50)(0), key = 1.toString.getBytes)
+    for (i <- 0 until 6)
+      log.append(messageSet, assignOffsets = true)
+
+    // segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] 
are uncleanable
+    val segs = log.logSegments.toSeq
+    val logToClean = LogToClean(TopicAndPartition("test", 0), log, 
segs(2).baseOffset, segs(4).baseOffset)
+
+    val expectedCleanSize = segs.take(2).map(_.size).sum
+    val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum
+    val expectedUncleanableSize = segs.drop(4).map(_.size).sum
+
+    assertEquals("Uncleanable bytes of LogToClean should equal size of all 
segments prior the one containing first dirty",
+      logToClean.cleanBytes, expectedCleanSize)
+    assertEquals("Cleanable bytes of LogToClean should equal size of all 
segments from the one containing first dirty offset" +
+      " to the segment prior to the one with the first uncleanable offset",
+      logToClean.cleanableBytes, expectedCleanableSize)
+    assertEquals("Total bytes should be the sum of the clean and cleanable 
segments", logToClean.totalBytes, expectedCleanSize + expectedCleanableSize)
+    assertEquals("Total cleanable ratio should be the ratio of cleanable size 
to clean plus cleanable", logToClean.cleanableRatio,
+      expectedCleanableSize / (expectedCleanSize + 
expectedCleanableSize).toDouble, 1.0e-6d)
+  }
+
+  @Test
+  def testCleaningWithUnkeyedMessages(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue)
+
+    // create a log with compaction turned off so we can append unkeyed 
messages
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Delete)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    // append unkeyed messages
+    while(log.numberOfSegments < 2)
+      log.append(unkeyedMessage(log.logEndOffset.toInt))
+    val numInvalidMessages = unkeyedMessageCountInLog(log)
+
+    val sizeWithUnkeyedMessages = log.size
+
+    // append keyed messages
+    while(log.numberOfSegments < 3)
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+
+    val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
+    val (_, stats) = cleaner.clean(LogToClean(TopicAndPartition("test", 0), 
log, 0, log.activeSegment.baseOffset))
+
+    assertEquals("Log should only contain keyed messages after cleaning.", 0, 
unkeyedMessageCountInLog(log))
+    assertEquals("Log should only contain keyed messages after cleaning.", 
expectedSizeAfterCleaning, log.size)
+    assertEquals("Cleaner should have seen %d invalid messages.", 
numInvalidMessages, stats.invalidMessagesRead)
+  }
+  
+  /* extract all the keys from a log */
+  def keysInLog(log: Log): Iterable[Int] =
+    log.logSegments.flatMap(s => 
s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => 
TestUtils.readString(m.message.key).toInt))
+
+  /* extract all the offsets from a log */
+  def offsetsInLog(log: Log): Iterable[Long] =
+    log.logSegments.flatMap(s => 
s.log.filter(!_.message.isNull).filter(_.message.hasKey).map(m => m.offset))
+
+  def unkeyedMessageCountInLog(log: Log) =
+    log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => 
!m.message.hasKey)).sum
+
+  def abortCheckDone(topicAndPartition: TopicAndPartition): Unit = {
+    throw new LogCleaningAbortedException()
+  }
+
+  /**
+   * Test that abortion during cleaning throws a LogCleaningAbortedException
+   */
+  @Test
+  def testCleanSegmentsWithAbort(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue, abortCheckDone)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+
+    // append messages to the log until we have four segments
+    while(log.numberOfSegments < 4)
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+
+    val keys = keysInLog(log)
+    val map = new FakeOffsetMap(Int.MaxValue)
+    keys.foreach(k => map.put(key(k), Long.MaxValue))
+    intercept[LogCleaningAbortedException] {
+      cleaner.cleanSegments(log, log.logSegments.take(3).toSeq, map, 0L, new 
CleanerStats())
+    }
+  }
+
+  /**
+   * Validate the logic for grouping log segments together for cleaning
+   */
+  @Test
+  def testSegmentGrouping(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+    
+    // append some messages to the log
+    var i = 0
+    while(log.numberOfSegments < 10) {
+      log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = 
"hello".getBytes))
+      i += 1
+    }
+    
+    // grouping by very large values should result in a single group with all 
the segments in it
+    var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = Int.MaxValue)
+    assertEquals(1, groups.size)
+    assertEquals(log.numberOfSegments, groups.head.size)
+    checkSegmentOrder(groups)
+    
+    // grouping by very small values should result in all groups having one 
entry
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 1, 
maxIndexSize = Int.MaxValue)
+    assertEquals(log.numberOfSegments, groups.size)
+    assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
+    checkSegmentOrder(groups)
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = 1)
+    assertEquals(log.numberOfSegments, groups.size)
+    assertTrue("All groups should be singletons.", groups.forall(_.size == 1))
+    checkSegmentOrder(groups)
+
+    val groupSize = 3
+    
+    // check grouping by log size
+    val logSize = log.logSegments.take(groupSize).map(_.size).sum.toInt + 1
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = logSize, 
maxIndexSize = Int.MaxValue)
+    checkSegmentOrder(groups)
+    assertTrue("All but the last group should be the target size.", 
groups.dropRight(1).forall(_.size == groupSize))
+    
+    // check grouping by index size
+    val indexSize = 
log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = indexSize)
+    checkSegmentOrder(groups)
+    assertTrue("All but the last group should be the target size.", 
groups.dropRight(1).forall(_.size == groupSize))
+  }
+  
+  /**
+   * Validate the logic for grouping log segments together for cleaning when 
only a small number of
+   * messages are retained, but the range of offsets is greater than 
Int.MaxValue. A group should not
+   * contain a range of offsets greater than Int.MaxValue to ensure that 
relative offsets can be
+   * stored in 4 bytes.
+   */
+  @Test
+  def testSegmentGroupingWithSparseOffsets(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue)
+
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
+       
+    // fill up first segment
+    while (log.numberOfSegments == 1)
+      log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = 
"hello".getBytes))
+    
+    // forward offset and append message to next segment at offset Int.MaxValue
+    val messageSet = new ByteBufferMessageSet(NoCompressionCodec, new 
LongRef(Int.MaxValue - 1),
+      new Message("hello".getBytes, "hello".getBytes, Message.NoTimestamp, 
Message.MagicValue_V1))
+    log.append(messageSet, assignOffsets = false)
+    log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = 
"hello".getBytes))
+    assertEquals(Int.MaxValue, log.activeSegment.index.lastOffset)
+    
+    // grouping should result in a single group with maximum relative offset 
of Int.MaxValue
+    var groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = Int.MaxValue)
+    assertEquals(1, groups.size)
+    
+    // append another message, making last offset of second segment > 
Int.MaxValue
+    log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = 
"hello".getBytes))
+    
+    // grouping should not group the two segments to ensure that maximum 
relative offset in each group <= Int.MaxValue
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = Int.MaxValue)
+    assertEquals(2, groups.size)
+    checkSegmentOrder(groups)
+    
+    // append more messages, creating new segments, further grouping should 
still occur
+    while (log.numberOfSegments < 4)
+      log.append(TestUtils.singleMessageSet(payload = "hello".getBytes, key = 
"hello".getBytes))
+
+    groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = 
Int.MaxValue, maxIndexSize = Int.MaxValue)
+    assertEquals(log.numberOfSegments - 1, groups.size)
+    for (group <- groups)
+      assertTrue("Relative offset greater than Int.MaxValue", 
group.last.index.lastOffset - group.head.index.baseOffset <= Int.MaxValue)
+    checkSegmentOrder(groups)
+    
+  }
+  
+  private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]): Unit = {
+    val offsets = groups.flatMap(_.map(_.baseOffset))
+    assertEquals("Offsets should be in increasing order.", offsets.sorted, 
offsets)
+  }
+  
+  /**
+   * Test building an offset map off the log
+   */
+  @Test
+  def testBuildOffsetMap(): Unit = {
+    val map = new FakeOffsetMap(1000)
+    val log = makeLog()
+    val cleaner = makeCleaner(Int.MaxValue)
+    val start = 0
+    val end = 500
+    writeToLog(log, (start until end) zip (start until end))
+
+    def checkRange(map: FakeOffsetMap, start: Int, end: Int) {
+      val stats = new CleanerStats()
+      cleaner.buildOffsetMap(log, start, end, map, stats)
+      val endOffset = map.latestOffset + 1
+      assertEquals("Last offset should be the end offset.", end, endOffset)
+      assertEquals("Should have the expected number of messages in the map.", 
end-start, map.size)
+      for(i <- start until end)
+        assertEquals("Should find all the keys", i.toLong, map.get(key(i)))
+      assertEquals("Should not find a value too small", -1L, map.get(key(start 
- 1)))
+      assertEquals("Should not find a value too large", -1L, map.get(key(end)))
+      assertEquals(end - start, stats.mapMessagesRead)
+    }
+
+    val segments = log.logSegments.toSeq
+    checkRange(map, 0, segments(1).baseOffset.toInt)
+    checkRange(map, segments(1).baseOffset.toInt, segments(3).baseOffset.toInt)
+    checkRange(map, segments(3).baseOffset.toInt, log.logEndOffset.toInt)
+  }
+  
+  
+  /**
+   * Tests recovery if broker crashes at the following stages during the 
cleaning sequence
+   * <ol>
+   *   <li> Cleaner has created .cleaned log containing multiple segments, 
swap sequence not yet started
+   *   <li> .cleaned log renamed to .swap, old segment files not yet renamed 
to .deleted
+   *   <li> .cleaned log renamed to .swap, old segment files renamed to 
.deleted, but not yet deleted
+   *   <li> .swap suffix removed, completing the swap, but async delete of 
.deleted files not yet complete
+   * </ol>
+   */
+  @Test
+  def testRecoveryAfterCrash(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+    logProps.put(LogConfig.FileDeleteDelayMsProp, 10: java.lang.Integer)
+
+    val config = LogConfig.fromProps(logConfig.originals, logProps)
+      
+    def recoverAndCheck(config: LogConfig, expectedKeys : Iterable[Int]) : Log 
= {   
+      // Recover log file and check that after recovery, keys are as expected
+      // and all temporary files have been deleted
+      val recoveredLog = makeLog(config = config)
+      time.sleep(config.fileDeleteDelayMs + 1)
+      for (file <- dir.listFiles) {
+        assertFalse("Unexpected .deleted file after recovery", 
file.getName.endsWith(Log.DeletedFileSuffix))
+        assertFalse("Unexpected .cleaned file after recovery", 
file.getName.endsWith(Log.CleanedFileSuffix))
+        assertFalse("Unexpected .swap file after recovery", 
file.getName.endsWith(Log.SwapFileSuffix))
+      }
+      assertEquals(expectedKeys, keysInLog(recoveredLog))
+      recoveredLog
+    }
+    
+    // create a log and append some messages
+    var log = makeLog(config = config)
+    var messageCount = 0
+    while(log.numberOfSegments < 10) {
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      messageCount += 1
+    }
+    val allKeys = keysInLog(log)
+    
+    // pretend we have odd-numbered keys
+    val offsetMap = new FakeOffsetMap(Int.MaxValue)
+    for (k <- 1 until messageCount by 2)
+      offsetMap.put(key(k), Long.MaxValue)
+     
+    // clean the log
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, 
new CleanerStats())
+    var cleanedKeys = keysInLog(log)
+    
+    // 1) Simulate recovery just after .cleaned file is created, before rename 
to .swap
+    //    On recovery, clean operation is aborted. All messages should be 
present in the log
+    log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix)
+    for (file <- dir.listFiles if 
file.getName.endsWith(Log.DeletedFileSuffix)) {
+      Utils.atomicMoveWithFallback(file.toPath, 
Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
+    }
+    log = recoverAndCheck(config, allKeys)
+    
+    // clean again
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, 
new CleanerStats())
+    cleanedKeys = keysInLog(log)
+    
+    // 2) Simulate recovery just after swap file is created, before old 
segment files are
+    //    renamed to .deleted. Clean operation is resumed during recovery. 
+    log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
+    for (file <- dir.listFiles if 
file.getName.endsWith(Log.DeletedFileSuffix)) {
+      Utils.atomicMoveWithFallback(file.toPath, 
Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
+    }   
+    log = recoverAndCheck(config, cleanedKeys)
+
+    // add some more messages and clean the log again
+    while(log.numberOfSegments < 10) {
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      messageCount += 1
+    }
+    for (k <- 1 until messageCount by 2)
+      offsetMap.put(key(k), Long.MaxValue)    
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, 
new CleanerStats())
+    cleanedKeys = keysInLog(log)
+    
+    // 3) Simulate recovery after swap file is created and old segments files 
are renamed
+    //    to .deleted. Clean operation is resumed during recovery.
+    log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
+    log = recoverAndCheck(config, cleanedKeys)
+    
+    // add some more messages and clean the log again
+    while(log.numberOfSegments < 10) {
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
+      messageCount += 1
+    }
+    for (k <- 1 until messageCount by 2)
+      offsetMap.put(key(k), Long.MaxValue)    
+    cleaner.cleanSegments(log, log.logSegments.take(9).toSeq, offsetMap, 0L, 
new CleanerStats())
+    cleanedKeys = keysInLog(log)
+    
+    // 4) Simulate recovery after swap is complete, but async deletion
+    //    is not yet complete. Clean operation is resumed during recovery.
+    recoverAndCheck(config, cleanedKeys)
+  }
+
+  @Test
+  def testBuildOffsetMapFakeLarge(): Unit = {
+    val map = new FakeOffsetMap(1000)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 72: java.lang.Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    val logConfig = LogConfig(logProps)
+    val log = makeLog(config = logConfig)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val start = 0
+    val end = 2
+    val offsetSeq = Seq(0L, 7206178L)
+    writeToLog(log, (start until end) zip (start until end), offsetSeq)
+    cleaner.buildOffsetMap(log, start, end, map, new CleanerStats())
+    val endOffset = map.latestOffset
+    assertEquals("Last offset should be the end offset.", 7206178L, endOffset)
+    assertEquals("Should have the expected number of messages in the map.", 
end - start, map.size)
+    assertEquals("Map should contain first value", 0L, map.get(key(0)))
+    assertEquals("Map should contain second value", 7206178L, map.get(key(1)))
+  }
+
+  /**
+   * Test building a partial offset map of part of a log segment
+   */
+  @Test
+  def testBuildPartialOffsetMap(): Unit = {
+    // because loadFactor is 0.75, this means we can fit 2 messages in the map
+    val map = new FakeOffsetMap(3)
+    val log = makeLog()
+    val cleaner = makeCleaner(2)
+
+    log.append(message(0,0))
+    log.append(message(1,1))
+    log.append(message(2,2))
+    log.append(message(3,3))
+    log.append(message(4,4))
+    log.roll()
+
+    val stats = new CleanerStats()
+    cleaner.buildOffsetMap(log, 2, Int.MaxValue, map, stats)
+    assertEquals(2, map.size)
+    assertEquals(-1, map.get(key(0)))
+    assertEquals(2, map.get(key(2)))
+    assertEquals(3, map.get(key(3)))
+    assertEquals(-1, map.get(key(4)))
+    assertEquals(4, stats.mapMessagesRead)
+  }
+
+  /**
+   * This test verifies that messages corrupted by KAFKA-4298 are fixed by the 
cleaner
+   */
+  @Test
+  def testCleanCorruptMessageSet() {
+    val codec = SnappyCompressionCodec
+
+    val logProps = new Properties()
+    logProps.put(LogConfig.CompressionTypeProp, codec.name)
+    val logConfig = LogConfig(logProps)
+
+    val log = makeLog(config = logConfig)
+    val cleaner = makeCleaner(10)
+
+    // messages are constructed so that the payload matches the expecting 
offset to
+    // make offset validation easier after cleaning
+
+    // one compressed log entry with duplicates
+    val dupSetKeys = (0 until 2) ++ (0 until 2)
+    val dupSetOffset = 25
+    val dupSet = dupSetKeys zip (dupSetOffset until dupSetOffset + 
dupSetKeys.size)
+
+    // and one without (should still be fixed by the cleaner)
+    val noDupSetKeys = 3 until 5
+    val noDupSetOffset = 50
+    val noDupSet = noDupSetKeys zip (noDupSetOffset until noDupSetOffset + 
noDupSetKeys.size)
+
+    log.append(invalidCleanedMessage(dupSetOffset, dupSet, codec), 
assignOffsets = false)
+    log.append(invalidCleanedMessage(noDupSetOffset, noDupSet, codec), 
assignOffsets = false)
+
+    log.roll()
+
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, 
log.activeSegment.baseOffset))
+
+    for (segment <- log.logSegments; shallowMessage <- segment.log.iterator; 
deepMessage <- ByteBufferMessageSet.deepIterator(shallowMessage)) {
+      assertEquals(shallowMessage.message.magic, deepMessage.message.magic)
+      val value = TestUtils.readString(deepMessage.message.payload).toLong
+      assertEquals(deepMessage.offset, value)
+    }
+  }
+
+  /**
+   * Verify that the client can handle corrupted messages. Located here for 
now since the client
+   * does not support writing messages with the old magic.
+   */
+  @Test
+  def testClientHandlingOfCorruptMessageSet(): Unit = {
+    import JavaConverters._
+
+    val keys = 1 until 10
+    val offset = 50
+    val set = keys zip (offset until offset + keys.size)
+
+    val corruptedMessage = invalidCleanedMessage(offset, set)
+    val records = MemoryRecords.readableRecords(corruptedMessage.buffer)
+
+    for (logEntry <- records.iterator.asScala) {
+      val offset = logEntry.offset
+      val value = TestUtils.readString(logEntry.record.value).toLong
+      assertEquals(offset, value)
+    }
+  }
+
+  private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], 
offsetSeq: Iterable[Long]): Iterable[Long] = {
+    for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
+      yield log.append(messageWithOffset(key, value, offset), assignOffsets = 
false).firstOffset
+  }
+
+  private def invalidCleanedMessage(initialOffset: Long,
+                                    keysAndValues: Iterable[(Int, Int)],
+                                    codec: CompressionCodec = 
SnappyCompressionCodec): ByteBufferMessageSet = {
+    // this function replicates the old versions of the cleaner which under 
some circumstances
+    // would write invalid compressed message sets with the outer magic set to 
1 and the inner
+    // magic set to 0
+
+    val messages = keysAndValues.map(kv =>
+      new Message(key = kv._1.toString.getBytes,
+        bytes = kv._2.toString.getBytes,
+        timestamp = Message.NoTimestamp,
+        magicValue = Message.MagicValue_V0))
+
+    val messageWriter = new 
MessageWriter(math.min(math.max(MessageSet.messageSetSize(messages) / 2, 1024), 
1 << 16))
+    var lastOffset = initialOffset
+
+    messageWriter.write(
+      codec = codec,
+      timestamp = Message.NoTimestamp,
+      timestampType = TimestampType.CREATE_TIME,
+      magicValue = Message.MagicValue_V1) { outputStream =>
+
+      val output = new DataOutputStream(CompressionFactory(codec, 
Message.MagicValue_V1, outputStream))
+      try {
+        for (message <- messages) {
+          val innerOffset = lastOffset - initialOffset
+          output.writeLong(innerOffset)
+          output.writeInt(message.size)
+          output.write(message.buffer.array, message.buffer.arrayOffset, 
message.buffer.limit)
+          lastOffset += 1
+        }
+      } finally {
+        output.close()
+      }
+    }
+    val buffer = ByteBuffer.allocate(messageWriter.size + 
MessageSet.LogOverhead)
+    ByteBufferMessageSet.writeMessage(buffer, messageWriter, lastOffset - 1)
+    buffer.rewind()
+
+    new ByteBufferMessageSet(buffer)
+  }
+
+  private def messageWithOffset(key: Int, value: Int, offset: Long) =
+    new ByteBufferMessageSet(NoCompressionCodec, Seq(offset),
+                             new Message(key = key.toString.getBytes,
+                                         bytes = value.toString.getBytes,
+                                         timestamp = Message.NoTimestamp,
+                                         magicValue = Message.MagicValue_V1))
+  
+  
+  def makeLog(dir: File = dir, config: LogConfig = logConfig) =
+    new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = 
time.scheduler, time = time)
+
+  def noOpCheckDone(topicAndPartition: TopicAndPartition) { /* do nothing */  }
+
+  def makeCleaner(capacity: Int, checkDone: (TopicAndPartition) => Unit = 
noOpCheckDone, maxMessageSize: Int = 64*1024) =
+    new Cleaner(id = 0, 
+                offsetMap = new FakeOffsetMap(capacity), 
+                ioBufferSize = maxMessageSize,
+                maxIoBufferSize = maxMessageSize,
+                dupBufferLoadFactor = 0.75,
+                throttler = throttler, 
+                time = time,
+                checkDone = checkDone )
+  
+  def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
+    for((key, value) <- seq)
+      yield log.append(message(key, value)).firstOffset
+  }
+  
+  def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
+  
+  def message(key: Int, value: Int): ByteBufferMessageSet =
+    message(key, value.toString.getBytes)
+
+  def message(key: Int, value: Array[Byte]) =
+    new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
+                                         bytes = value,
+                                         timestamp = Message.NoTimestamp,
+                                         magicValue = Message.MagicValue_V1))
+
+  def unkeyedMessage(value: Int) =
+    new ByteBufferMessageSet(new Message(bytes = value.toString.getBytes))
+
+  def deleteMessage(key: Int) =
+    new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
+                                         bytes = null,
+                                         timestamp = Message.NoTimestamp,
+                                         magicValue = Message.MagicValue_V1))
+  
+}
+
+class FakeOffsetMap(val slots: Int) extends OffsetMap {
+  val map = new java.util.HashMap[String, Long]()
+  var lastOffset = -1L
+
+  private def keyFor(key: ByteBuffer) =
+    new String(Utils.readBytes(key.duplicate), "UTF-8")
+
+  def put(key: ByteBuffer, offset: Long): Unit = {
+    lastOffset = offset
+    map.put(keyFor(key), offset)
+  }
+  
+  def get(key: ByteBuffer): Long = {
+    val k = keyFor(key)
+    if(map.containsKey(k))
+      map.get(k)
+    else
+      -1L
+  }
+  
+  def clear(): Unit = map.clear()
+  
+  def size: Int = map.size
+
+  def latestOffset: Long = lastOffset
+
+  override def toString: String = map.toString()
+}

Reply via email to