[ https://issues.apache.org/jira/browse/KAFKA-765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13585652#comment-13585652 ]
Sriram Subramanian commented on KAFKA-765: ------------------------------------------ The change seems to cause a leak. The IOException during decompression now does not shutdown the broker (ByteBufferMessageSet.Scala). If the read below throws, close is never called on the input stream. You would need to wrap the code in try - finally. val compressed = CompressionFactory(message.compressionCodec, inputStream) Stream.continually(compressed.read(intermediateBuffer)).takeWhile(_ > 0).foreach { dataRead => outputStream.write(intermediateBuffer, 0, dataRead) } compressed.close() > Corrupted messages in produce request could shutdown the broker > --------------------------------------------------------------- > > Key: KAFKA-765 > URL: https://issues.apache.org/jira/browse/KAFKA-765 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.8 > Reporter: Jun Rao > Assignee: Neha Narkhede > Priority: Blocker > Labels: p1 > Attachments: KAFKA-765.patch > > > In kafka.log.append(), we convert all IOException to KafkaStorageException. > This will cause the caller to shutdown the broker. However, if there is a > corrupted compressed message, validMessages.assignOffsets() in append() could > also throw an IOException when decompressing the message. In this case, we > shouldn't shut down the broker and should just fail this particular produce > request. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira