Repository: kafka
Updated Branches:
  refs/heads/0.10.0 2a1b3b93b -> 9181ecf24


KAFKA-3587; LogCleaner fails due to incorrect offset map computation

Removed the over pessimistic require and instead attempt to fill the dedup 
buffer. Use the (only) map until full;
this may allow to process all dirty segment (optimism) or may happen in the 
middle of a dirt segment.
In either case, do compaction using the map loaded that way.

This patch was developed with edoardocomar

Author: Mickael Maison <mickael.mai...@gmail.com>

Reviewers: Jun Rao <jun...@gmail.com>, Guozhang Wang <wangg...@gmail.com>

Closes #1332 from mimaison/KAFKA-3587

(cherry picked from commit 2caf872c2e51d689c6ac20240c4a306e36d98b15)
Signed-off-by: Ismael Juma <ism...@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9181ecf2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9181ecf2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9181ecf2

Branch: refs/heads/0.10.0
Commit: 9181ecf247297ea7f7cdd2b69a578a487c74fdf5
Parents: 2a1b3b9
Author: Mickael Maison <mickael.mai...@gmail.com>
Authored: Mon May 9 18:10:40 2016 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Mon May 9 18:11:08 2016 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala  | 27 ++++++++++------
 .../test/scala/unit/kafka/log/CleanerTest.scala | 33 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9181ecf2/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 0f742f9..c6636be 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -615,17 +615,19 @@ private[log] class Cleaner(val id: Int,
     // but we may be able to fit more (if there is lots of duplication in the 
dirty section of the log)
     var offset = dirty.head.baseOffset
     require(offset == start, "Last clean offset is %d but segment base offset 
is %d for log %s.".format(start, offset, log.name))
-    val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
     var full = false
     for (segment <- dirty if !full) {
       checkDone(log.topicAndPartition)
-      val segmentSize = segment.nextOffset() - segment.baseOffset
 
-      require(segmentSize <= maxDesiredMapSize, "%d messages in segment %s/%s 
but offset map can fit only %d. You can increase log.cleaner.dedupe.buffer.size 
or decrease log.cleaner.threads".format(segmentSize,  log.name, 
segment.log.file.getName, maxDesiredMapSize))
-      if (map.size + segmentSize <= maxDesiredMapSize)
-        offset = buildOffsetMapForSegment(log.topicAndPartition, segment, map)
-      else
+      val newOffset = buildOffsetMapForSegment(log.topicAndPartition, segment, 
map)
+      if (newOffset > -1L)
+        offset = newOffset
+      else {
+        // If not even one segment can fit in the map, compaction cannot happen
+        require(offset > start, "Unable to build the offset map for segment 
%s/%s. You can increase log.cleaner.dedupe.buffer.size or decrease 
log.cleaner.threads".format(log.name, segment.log.file.getName))
+        debug("Offset map is full, %d segments fully mapped, segment with base 
offset %d is partially mapped".format(dirty.indexOf(segment), 
segment.baseOffset))
         full = true
+      }
     }
     info("Offset map for log %s complete.".format(log.name))
     offset
@@ -637,11 +639,12 @@ private[log] class Cleaner(val id: Int,
    * @param segment The segment to index
    * @param map The map in which to store the key=>offset mapping
    *
-   * @return The final offset covered by the map
+   * @return The final offset covered by the map or -1 if the map is full
    */
   private def buildOffsetMapForSegment(topicAndPartition: TopicAndPartition, 
segment: LogSegment, map: OffsetMap): Long = {
     var position = 0
     var offset = segment.baseOffset
+    val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
     while (position < segment.log.sizeInBytes) {
       checkDone(topicAndPartition)
       readBuffer.clear()
@@ -650,8 +653,14 @@ private[log] class Cleaner(val id: Int,
       val startPosition = position
       for (entry <- messages) {
         val message = entry.message
-        if (message.hasKey)
-          map.put(message.key, entry.offset)
+        if (message.hasKey) {
+          if (map.size < maxDesiredMapSize)
+            map.put(message.key, entry.offset)
+          else {
+            // The map is full, stop looping and return
+            return -1L
+          }
+        }
         offset = entry.offset
         stats.indexMessagesRead(1)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9181ecf2/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index b6849f0..752a260 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -423,6 +423,39 @@ class CleanerTest extends JUnitSuite {
     recoverAndCheck(config, cleanedKeys)
     
   }
+
+  @Test
+  def testBuildOffsetMapFakeLarge() {
+    val map = new FakeOffsetMap(1000)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, 72: java.lang.Integer)
+    logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    val logConfig = LogConfig(logProps)
+    val log = makeLog(config = logConfig)
+    val cleaner = makeCleaner(Int.MaxValue)
+    val start = 0
+    val end = 2
+    val offsetSeq = Seq(0L, 7206178L)
+    val offsets = writeToLog(log, (start until end) zip (start until end), 
offsetSeq)
+    val endOffset = cleaner.buildOffsetMap(log, start, end, map)
+    assertEquals("Last offset should be the end offset.", 7206178L, endOffset)
+    assertEquals("Should have the expected number of messages in the map.", 
end - start, map.size)
+    assertEquals("Map should contain first value", 0L, map.get(key(0)))
+    assertEquals("Map should contain second value", 7206178L, map.get(key(1)))
+  }
+
+  private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], 
offsetSeq: Iterable[Long]): Iterable[Long] = {
+    for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
+      yield log.append(messageWithOffset(key, value, offset), assignOffsets = 
false).firstOffset
+  }
+
+  private def messageWithOffset(key: Int, value: Int, offset: Long) =
+    new ByteBufferMessageSet(NoCompressionCodec, Seq(offset),
+                             new Message(key = key.toString.getBytes,
+                                         bytes = value.toString.getBytes,
+                                         timestamp = Message.NoTimestamp,
+                                         magicValue = Message.MagicValue_V1))
   
   
   def makeLog(dir: File = dir, config: LogConfig = logConfig) =

Reply via email to