[ 
https://issues.apache.org/jira/browse/KAFKA-5316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16028760#comment-16028760
 ] 

ASF GitHub Bot commented on KAFKA-5316:
---------------------------------------

GitHub user hachikuji opened a pull request:

    https://github.com/apache/kafka/pull/3165

    KAFKA-5316: LogCleaner should account for larger record sets after cleaning

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/hachikuji/kafka KAFKA-5316-0.10.2

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/3165.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3165
    
----
commit 98539df716ca866f06a708d7556ad39ca4e1caae
Author: Jason Gustafson <ja...@confluent.io>
Date:   2017-05-30T05:49:33Z

    KAFKA-5316: LogCleaner should account for larger record sets after cleaning

----


> Log cleaning can increase message size and cause cleaner to crash with buffer 
> overflow
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5316
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5316
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jason Gustafson
>            Assignee: Jason Gustafson
>             Fix For: 0.11.0.0
>
>
> We have observed in practice that it is possible for a compressed record set 
> to grow after cleaning. Since the size of the cleaner's input and output 
> buffers are identical, this can lead to overflow of the output buffer:
> {code}
> [2017-05-23 15:05:15,480] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.nio.BufferOverflowException
>         at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206)
>         at org.apache.kafka.common.record.LogEntry.writeTo(LogEntry.java:104)
>         at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:163)
>         at 
> org.apache.kafka.common.record.MemoryRecords.filterTo(MemoryRecords.java:114)
>         at kafka.log.Cleaner.cleanInto(LogCleaner.scala:468)
>         at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
>         at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
>         at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
>         at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
>         at scala.collection.immutable.List.foreach(List.scala:318)
>         at kafka.log.Cleaner.clean(LogCleaner.scala:362)
>         at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
>         at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-05-23 15:05:15,481] INFO [kafka-log-cleaner-thread-0], Stopped  
> (kafka.log.LogCleaner)
> {code}
> It is also then possible for a compressed message set to grow beyond the max 
> message size. Due to the changes in KIP-74 to alter fetch semantics, the 
> suggestion for this case is to allow the recompressed message set to exceed 
> the max message size. This should be rare in practice and won't prevent 
> consumers from making progress.
> To handle the overflow issue, one option is to allocate a temporary buffer 
> when filtering in {{MemoryRecords.filterTo}} and return it in the result. As 
> an optimization, we can resort to this only when there is a single 
> recompressed message set which is larger than the entire write buffer. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to