[ 
https://issues.apache.org/jira/browse/KAFKA-3915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15353396#comment-15353396
 ] 

Jun Rao commented on KAFKA-3915:
--------------------------------

[~twbecker], thanks for reporting this. 

Keeping the old message format during cleaning will be an easy fix. However, 
converting the message format to the latest version allows us to potentially 
deprecate the old message format in the future, even though we don't get more 
information on timestamp during conversion. Another point is that in the 
future, when copying the messages out to the new segment, we may want to use 
the compression type specified at the topic level instead of the original 
compression type. If we do that, the new message size could also be different. 
So, it's probably worth thinking through how to make the cleaner handle the 
case of write overflow.

cc [~becket_qin].

> LogCleaner IO buffers do not account for potential size difference due to 
> message format change
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3915
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3915
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>    Affects Versions: 0.10.0.0
>            Reporter: Tommy Becker
>            Priority: Critical
>             Fix For: 0.10.0.1
>
>
> We are upgrading from Kafka 0.8.1 to 0.10.0.0 and discovered an issue after 
> getting the following exception from the log cleaner:
> {code}
> [2016-06-28 10:02:18,759] ERROR [kafka-log-cleaner-thread-0], Error due to  
> (kafka.log.LogCleaner)
> java.nio.BufferOverflowException
>       at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206)
>       at 
> kafka.message.ByteBufferMessageSet$.writeMessage(ByteBufferMessageSet.scala:169)
>       at kafka.log.Cleaner$$anonfun$cleanInto$1.apply(LogCleaner.scala:435)
>       at kafka.log.Cleaner$$anonfun$cleanInto$1.apply(LogCleaner.scala:429)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
>       at kafka.log.Cleaner.cleanInto(LogCleaner.scala:429)
>       at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:380)
>       at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:376)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:376)
>       at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:343)
>       at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:342)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at kafka.log.Cleaner.clean(LogCleaner.scala:342)
>       at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:237)
>       at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:215)
>       at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> {code}
> At first this seems impossible because the input and output buffers are 
> identically sized. But in the case where the source messages are of an older 
> format, additional space may be required to write them out in the new one. 
> Since the message header is 8 bytes larger in 0.10.0, this failure can 
> happen. 
> We're planning to work around this by adding the following config:
> {code}log.message.format.version=0.8.1{code} but this definitely needs a fix.
> We could simply preserve the existing message format (since in this case we 
> can't retroactively add a timestamp anyway). Otherwise, the log cleaner would 
> have to be smarter about ensuring there is sufficient "slack space" in the 
> output buffer to account for the size difference * the number of messages in 
> the input buffer. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to