Repository: kafka Updated Branches: refs/heads/0.10.0 6b1a6d955 -> c47c3b0b5
KAFKA-3933; Always fully read deepIterator Avoids leaking native memory and hence crashing brokers on bootup due to running out of memory. Seeeing as `messageFormat > 0` always reads the full compressed message set and is the default going forwards, we can use that behaviour to always close the compressor when calling `deepIterator` Author: Tom Crayford <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #1660 from tcrayford/dont_leak_native_memory_round_2 (cherry picked from commit 8a417c89d2f0b7861b2dec26f02e4e302b64b604) Signed-off-by: Ismael Juma <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c47c3b0b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c47c3b0b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c47c3b0b Branch: refs/heads/0.10.0 Commit: c47c3b0b583a849fdf3ed0a06835427a2801950a Parents: 6b1a6d9 Author: Tom Crayford <[email protected]> Authored: Tue Jul 26 02:31:37 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Tue Jul 26 02:59:06 2016 +0100 ---------------------------------------------------------------------- .../kafka/message/ByteBufferMessageSet.scala | 69 ++++++++++---------- 1 file changed, 34 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c47c3b0b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index a116d4b..98f6131 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -17,7 +17,7 @@ package kafka.message -import kafka.utils.{IteratorTemplate, Logging} +import kafka.utils.{CoreUtils, IteratorTemplate, Logging} import kafka.common.{KafkaException, LongRef} import java.nio.ByteBuffer import java.nio.channels._ @@ -85,36 +85,45 @@ object ByteBufferMessageSet { new IteratorTemplate[MessageAndOffset] { val MessageAndOffset(wrapperMessage, wrapperMessageOffset) = wrapperMessageAndOffset + + if (wrapperMessage.payload == null) + throw new KafkaException(s"Message payload is null: $wrapperMessage") + val wrapperMessageTimestampOpt: Option[Long] = if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestamp) else None val wrapperMessageTimestampTypeOpt: Option[TimestampType] = if (wrapperMessage.magic > MagicValue_V0) Some(wrapperMessage.timestampType) else None - if (wrapperMessage.payload == null) - throw new KafkaException(s"Message payload is null: $wrapperMessage") - val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload) - val compressed = try { - new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream)) - } catch { - case ioe: IOException => - throw new InvalidMessageException(s"Failed to instantiate input stream compressed with ${wrapperMessage.compressionCodec}", ioe) - } + var lastInnerOffset = -1L - val messageAndOffsets = if (wrapperMessageAndOffset.message.magic > MagicValue_V0) { + val messageAndOffsets = { + val inputStream = new ByteBufferBackedInputStream(wrapperMessage.payload) + val compressed = try { + new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, wrapperMessage.magic, inputStream)) + } catch { + case ioe: IOException => + throw new InvalidMessageException(s"Failed to instantiate input stream compressed with ${wrapperMessage.compressionCodec}", ioe) + } + val innerMessageAndOffsets = new ArrayDeque[MessageAndOffset]() try { while (true) - innerMessageAndOffsets.add(readMessageFromStream()) + innerMessageAndOffsets.add(readMessageFromStream(compressed)) } catch { case eofe: EOFException => - compressed.close() + // we don't do anything at all here, because the finally + // will close the compressed input stream, and we simply + // want to return the innerMessageAndOffsets case ioe: IOException => throw new InvalidMessageException(s"Error while reading message from stream compressed with ${wrapperMessage.compressionCodec}", ioe) + } finally { + CoreUtils.swallow(compressed.close()) } - Some(innerMessageAndOffsets) - } else None - private def readMessageFromStream(): MessageAndOffset = { + innerMessageAndOffsets + } + + private def readMessageFromStream(compressed: DataInputStream): MessageAndOffset = { val innerOffset = compressed.readLong() val recordSize = compressed.readInt() @@ -138,25 +147,15 @@ object ByteBufferMessageSet { } override def makeNext(): MessageAndOffset = { - messageAndOffsets match { - // Using inner offset and timestamps - case Some(innerMessageAndOffsets) => - innerMessageAndOffsets.pollFirst() match { - case null => allDone() - case MessageAndOffset(message, offset) => - val relativeOffset = offset - lastInnerOffset - val absoluteOffset = wrapperMessageOffset + relativeOffset - new MessageAndOffset(message, absoluteOffset) - } - // Not using inner offset and timestamps - case None => - try readMessageFromStream() - catch { - case eofe: EOFException => - compressed.close() - allDone() - case ioe: IOException => - throw new KafkaException(ioe) + messageAndOffsets.pollFirst() match { + case null => allDone() + case nextMessage@ MessageAndOffset(message, offset) => + if (wrapperMessage.magic > MagicValue_V0) { + val relativeOffset = offset - lastInnerOffset + val absoluteOffset = wrapperMessageOffset + relativeOffset + new MessageAndOffset(message, absoluteOffset) + } else { + nextMessage } } }
