[ https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15276207#comment-15276207 ]
Mickael Maison commented on KAFKA-3587: --------------------------------------- I've updated the PR with all your feedback: https://github.com/apache/kafka/pull/1332 > LogCleaner fails due to incorrect offset map computation on a replica > --------------------------------------------------------------------- > > Key: KAFKA-3587 > URL: https://issues.apache.org/jira/browse/KAFKA-3587 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.9.0.1 > Environment: Linux > Reporter: Kiran Pillarisetty > Assignee: Edoardo Comar > Fix For: 0.10.0.0 > > Attachments: 0001-POC-improving-deduping-segments.patch > > > Log Cleaner fails to compact a segment even when the number of messages in it > is less than the offset map. > In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner > computes segment size by subtracting segment's base offset from the latest > offset ("segmentSize = segment.nextOffset() - segment.baseOffset"). This > works fine until you create another replica. When you create a replica, it's > segment could contain data which is already compacted on other brokers. > Depending up on the type of data, offset difference could be too big, larger > than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail > on that segment. > Scenario: > - Kafka 0.9.0.1 > - Cluster has two brokers. > - Server.properties: > log.cleaner.enable=true > log.cleaner.dedupe.buffer.size=10485760 #10MB > log.roll.ms=300000 > delete.topic.enable=true > log.cleanup.policy=compact > Steps to reproduce: > 1. Create a topic with replication-factor of 1. > ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic > test.log.compact.1M --partitions 1 --replication-factor 1 --config > cleanup.policy=compact --config segment.ms=300000 > 2. Use kafka-console-producer.sh to produce a single message with the > following key: > LC1,{"test": "xyz"} > 3. Use kafka-console-producer.sh to produce a large number of messages with > the following key: > LC2,{"test": "abc"} > 4. Let log cleaner run. Make sure log is compacted. Verify with: > ./kafka-run-class.sh kafka.tools.DumpLogSegments --files > 00000000000000000000.log --print-data-log > Dumping 00000000000000000000.log > Starting offset: 0 > offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: > NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": > "xyz"} > offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 > compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 > payload: {"test": "abc"} > 5. Increase Replication Factor to 2. Followed these steps: > http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor > 6. Notice that log cleaner fails to compact the newly created replica with > the following error. > [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to > (kafka.log.LogCleaner) > java.lang.IllegalArgumentException: requirement failed: 7206179 messages in > segment test.log.compact.1M-0/00000000000000000000.log but offset map can fit > only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease > log.cleaner.threads > at scala.Predef$.require(Predef.scala:219) > at > kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584) > at > kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580) > at > scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570) > at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580) > at kafka.log.Cleaner.clean(LogCleaner.scala:322) > at > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > [2016-04-18 14:49:45,601] INFO [kafka-log-cleaner-thread-0], Stopped > (kafka.log.LogCleaner) > 7. Examine the entries in the replica segment: > ./kafka-run-class.sh kafka.tools.DumpLogSegments --files > 00000000000000000000.log --print-data-log > There are only 218418 messages in that segment. > However, Log Cleaner seems to think that there are 7206179 messages in that > segment (as per the above error) > Error stems from this line in LogCleaner.scala: > """val segmentSize = segment.nextOffset() - segment.baseOffset""" > In Replica's log segment file ( 00000000000000000000.log), ending offset is > 7206178. Beginning offset is 0. That makes Log Cleaner think that there are > 7206179 messages in that segment although there are only 218418 messages in > it. > IMO, to address this kind of scenario, LogCleaner.scala should check for the > number of messages in the segment, instead of subtracting beginning offset > from the ending offset. -- This message was sent by Atlassian JIRA (v6.3.4#6332)