[ 
https://issues.apache.org/jira/browse/KAFKA-765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13585652#comment-13585652
 ] 

Sriram Subramanian commented on KAFKA-765:
------------------------------------------

The change seems to cause a leak. The IOException during decompression now does 
not shutdown the broker (ByteBufferMessageSet.Scala). If the read below throws, 
close is never called on the input stream. You would need to wrap the code in 
try - finally.

val compressed = CompressionFactory(message.compressionCodec, inputStream)
    Stream.continually(compressed.read(intermediateBuffer)).takeWhile(_ > 
0).foreach { dataRead =>
      outputStream.write(intermediateBuffer, 0, dataRead)
    }
compressed.close()
                
> Corrupted messages in produce request could shutdown the broker
> ---------------------------------------------------------------
>
>                 Key: KAFKA-765
>                 URL: https://issues.apache.org/jira/browse/KAFKA-765
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8
>            Reporter: Jun Rao
>            Assignee: Neha Narkhede
>            Priority: Blocker
>              Labels: p1
>         Attachments: KAFKA-765.patch
>
>
> In kafka.log.append(), we convert all IOException to KafkaStorageException. 
> This will cause the caller to shutdown the broker. However, if there is a 
> corrupted compressed message, validMessages.assignOffsets() in append() could 
> also throw an IOException when decompressing the message. In this case, we 
> shouldn't shut down the broker and should just fail this particular produce 
> request.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to