[ 
https://issues.apache.org/jira/browse/KAFKA-3587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274483#comment-15274483
 ] 

Manas Alekar commented on KAFKA-3587:
-------------------------------------

The issue with just building the map till it is full as proposed in pull 
request 1332 is that the offset boundary reported by *buildOffsetMap* is now in 
the middle of a segment which has not been fully de-deuplicated. I doubt clean, 
the way it is written works at this point.

{code}
private[log] def clean(cleanable: LogToClean): Long = {
...
    val endOffset = buildOffsetMap(log, cleanable.firstDirtyOffset, 
upperBoundOffset, offsetMap) + 1
...
    for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), 
log.config.segmentSize, log.config.maxIndexSize))
      cleanSegments(log, group, offsetMap, deleteHorizonMs)
{code}

This is easily fixed by only updating the offset returned by buildOffsetMap if 
the map merged to segment boundary, otherwise report the last complete segment 
boundary. However, this leads to an interesting situation.

Consider,

{quote}

<segment0> (Ka, Oa) ... (Kx, O1) ... </segment0> ... </segment4> ... (Kx, O2) 
... (Kb, Ob) ... </segment4>

--------------------------------------------------------------------------------------------------------^
 offsetMap fills here
{quote}

If we continue to greedily de duplicate segments in buildOffsetMap, and we will 
offsetMap between Kx and Kb, we now have (k2,O2) reported in the compacted 
segment for segments 1 to 3 as well as the offsetMap for the next iteration of 
buildOffsetMap called at segment4 baseOffset.

I am under the impression that this is unacceptable. [~junrao]'s comment seems 
to indicate that this might be acceptable. If it is, this has the advantage 
over my pull request that *SkimpyOfffsetMap.putAll* is likely to suffer a lot 
if collision probability in the map is high. Of course, this can be fixed by 
switching hash algorithms, but avoiding the problem altogether would be 
preferable.

> LogCleaner fails due to incorrect offset map computation on a replica
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-3587
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3587
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.9.0.1
>         Environment: Linux
>            Reporter: Kiran Pillarisetty
>            Assignee: Edoardo Comar
>         Attachments: 0001-POC-improving-deduping-segments.patch
>
>
> Log Cleaner fails to compact a segment even when the number of messages in it 
> is less than the offset map.
> In version 0.9.0.1, (LogCleaner.scala -> buildOffsetMap()), LogCleaner 
> computes segment size by subtracting segment's base offset from the latest 
> offset ("segmentSize = segment.nextOffset() - segment.baseOffset").  This 
> works fine until you create another replica. When you create a replica, it's 
> segment could contain data which is already compacted on other brokers. 
> Depending up on the type of data, offset difference could be too big, larger 
> than the offset map (maxDesiredMapSize), and that causes LogCleaner to fail 
> on that segment.
> Scenario:
> - Kafka 0.9.0.1
> - Cluster has two brokers.
> - Server.properties:
> log.cleaner.enable=true
> log.cleaner.dedupe.buffer.size=10485760 #10MB
> log.roll.ms=300000
> delete.topic.enable=true
> log.cleanup.policy=compact
> Steps to reproduce:
> 1. Create a topic with replication-factor of 1.
> ./kafka-topics.sh --zookeeper=localhost:2181 --create --topic 
> test.log.compact.1M --partitions 1 --replication-factor 1 --config 
> cleanup.policy=compact --config segment.ms=300000
> 2. Use kafka-console-producer.sh to produce a single message with the 
> following key:
> LC1,{"test": "xyz"}
> 3. Use  kafka-console-producer.sh to produce a large number of messages with 
> the following key:
> LC2,{"test": "abc"}
> 4. Let log cleaner run. Make sure log is compacted.  Verify with:
>  ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files 
> 00000000000000000000.log  --print-data-log
> Dumping 00000000000000000000.log
> Starting offset: 0
> offset: 0 position: 0 isvalid: true payloadsize: 11 magic: 0 compresscodec: 
> NoCompressionCodec crc: 3067045277 keysize: 11 key: LC1 payload: {"test": 
> "xyz"}
> offset: 7869818 position: 48 isvalid: true payloadsize: 11 magic: 0 
> compresscodec: NoCompressionCodec crc: 2668089711 keysize: 11 key: LC2 
> payload: {"test": "abc"}
> 5.  Increase Replication Factor to 2.  Followed these steps: 
> http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor
> 6. Notice that log cleaner fails to compact the newly created replica with 
> the following error.
> [2016-04-18 14:49:45,599] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: 7206179 messages in 
> segment test.log.compact.1M-0/00000000000000000000.log but offset map can fit 
> only 393215. You can increase log.cleaner.dedupe.buffer.size or decrease 
> log.cleaner.threads
>         at scala.Predef$.require(Predef.scala:219)
>         at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:584)
>         at 
> kafka.log.Cleaner$$anonfun$buildOffsetMap$4.apply(LogCleaner.scala:580)
>         at 
> scala.collection.immutable.Stream$StreamWithFilter.foreach(Stream.scala:570)
>         at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:580)
>         at kafka.log.Cleaner.clean(LogCleaner.scala:322)
>         at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:230)
>         at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:208)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-04-18 14:49:45,601] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> 7. Examine the entries in the replica segment:
> ./kafka-run-class.sh kafka.tools.DumpLogSegments --files 
> 00000000000000000000.log  --print-data-log
> There are only 218418 messages in that segment.
> However, Log Cleaner seems to think that there are 7206179 messages in that 
> segment (as per the above error)
> Error stems from this line in LogCleaner.scala:
> """val segmentSize = segment.nextOffset() - segment.baseOffset"""
> In Replica's log segment file ( 00000000000000000000.log), ending offset is 
> 7206178. Beginning offset is 0.  That makes Log Cleaner think that there are 
> 7206179 messages in that segment although there are only 218418 messages in 
> it.
> IMO,  to address this kind of scenario, LogCleaner.scala should check for the 
> number of messages in the segment, instead of subtracting beginning offset 
> from the ending offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to