Repository: kafka Updated Branches: refs/heads/trunk 5b42b538e -> d2f50fc38
KAFKA-527; Use in-place decompression enabled inner iterator to replace old decompress function; reviewed by Joel Koshy and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d2f50fc3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d2f50fc3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d2f50fc3 Branch: refs/heads/trunk Commit: d2f50fc3886896bc569fea7fb308036008b89f94 Parents: 5b42b53 Author: Guozhang Wang <[email protected]> Authored: Thu Mar 26 15:43:18 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Mar 26 15:43:18 2015 -0700 ---------------------------------------------------------------------- .../scala/kafka/consumer/ConsumerIterator.scala | 2 +- core/src/main/scala/kafka/log/LogSegment.scala | 2 +- .../kafka/message/ByteBufferMessageSet.scala | 62 ++++++++++++++------ .../scala/kafka/tools/DumpLogSegments.scala | 2 +- .../src/test/scala/unit/kafka/log/LogTest.scala | 10 ++-- .../unit/kafka/producer/SyncProducerTest.scala | 2 +- 6 files changed, 52 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/main/scala/kafka/consumer/ConsumerIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index 78fbf75..b00a4dc 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -37,7 +37,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk val clientId: String) extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { - private var current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) + private val current: AtomicReference[Iterator[MessageAndOffset]] = new AtomicReference(null) private var currentTopicInfo: PartitionTopicInfo = null private var consumedOffset: Long = -1L private val consumerTopicStats = ConsumerTopicStatsRegistry.getConsumerTopicStat(clientId) http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/main/scala/kafka/log/LogSegment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index ac96434..0256764 100644 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -182,7 +182,7 @@ class LogSegment(val log: FileMessageSet, case NoCompressionCodec => entry.offset case _ => - ByteBufferMessageSet.decompress(entry.message).head.offset + ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala index 2d6cfc0..9dfe914 100644 --- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala +++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala @@ -17,12 +17,13 @@ package kafka.message -import kafka.utils.Logging +import kafka.utils.{IteratorTemplate, Logging} +import kafka.common.KafkaException + import java.nio.ByteBuffer import java.nio.channels._ -import java.io.{InputStream, DataOutputStream} +import java.io._ import java.util.concurrent.atomic.AtomicLong -import kafka.utils.IteratorTemplate object ByteBufferMessageSet { @@ -58,19 +59,42 @@ object ByteBufferMessageSet { } } - def decompress(message: Message): ByteBufferMessageSet = { - val outputStream = new BufferingOutputStream(math.min(math.max(message.size, 1024), 1 << 16)) - val inputStream: InputStream = new ByteBufferBackedInputStream(message.payload) - val compressed = CompressionFactory(message.compressionCodec, inputStream) - try { - outputStream.write(compressed) - } finally { - compressed.close() + /** Deep iterator that decompresses the message sets in-place. */ + def deepIterator(wrapperMessage: Message): Iterator[MessageAndOffset] = { + new IteratorTemplate[MessageAndOffset] { + + val inputStream: InputStream = new ByteBufferBackedInputStream(wrapperMessage.payload) + val compressed: DataInputStream = new DataInputStream(CompressionFactory(wrapperMessage.compressionCodec, inputStream)) + + override def makeNext(): MessageAndOffset = { + try { + // read the offset + val offset = compressed.readLong() + // read record size + val size = compressed.readInt() + + if (size < Message.MinHeaderSize) + throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator") + + // read the record into an intermediate record buffer + // and hence has to do extra copy + val bufferArray = new Array[Byte](size) + compressed.readFully(bufferArray, 0, size) + val buffer = ByteBuffer.wrap(bufferArray) + + val newMessage = new Message(buffer) + + // the decompressed message should not be a wrapper message since we do not allow nested compression + new MessageAndOffset(newMessage, offset) + } catch { + case eofe: EOFException => + compressed.close() + allDone() + case ioe: IOException => + throw new KafkaException(ioe) + } + } } - val outputBuffer = ByteBuffer.allocate(outputStream.size) - outputStream.writeTo(outputBuffer) - outputBuffer.rewind - new ByteBufferMessageSet(outputBuffer) } private[kafka] def writeMessage(buffer: ByteBuffer, message: Message, offset: Long) { @@ -150,7 +174,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi var topIter = buffer.slice() var innerIter: Iterator[MessageAndOffset] = null - def innerDone():Boolean = (innerIter == null || !innerIter.hasNext) + def innerDone(): Boolean = (innerIter == null || !innerIter.hasNext) def makeNextOuter: MessageAndOffset = { // if there isn't at least an offset and size, we are done @@ -159,7 +183,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi val offset = topIter.getLong() val size = topIter.getInt() if(size < Message.MinHeaderSize) - throw new InvalidMessageException("Message found with corrupt size (" + size + ")") + throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator") // we have an incomplete message if(topIter.remaining < size) @@ -179,7 +203,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi innerIter = null new MessageAndOffset(newMessage, offset) case _ => - innerIter = ByteBufferMessageSet.decompress(newMessage).internalIterator() + innerIter = ByteBufferMessageSet.deepIterator(newMessage) if(!innerIter.hasNext) innerIter = null makeNext() @@ -194,7 +218,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi if(innerDone()) makeNextOuter else - innerIter.next + innerIter.next() } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index fe2cc11..b7a3630 100644 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -180,7 +180,7 @@ object DumpLogSegments { case NoCompressionCodec => getSingleMessageIterator(messageAndOffset) case _ => - ByteBufferMessageSet.decompress(message).iterator + ByteBufferMessageSet.deepIterator(message) } } else getSingleMessageIterator(messageAndOffset) http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/test/scala/unit/kafka/log/LogTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 8cd5f2f..3c0599c 100644 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -269,13 +269,13 @@ class LogTest extends JUnitSuite { log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes))) log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes))) - def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).messageSet.head.message) + def read(offset: Int) = ByteBufferMessageSet.deepIterator(log.read(offset, 4096).messageSet.head.message) /* we should always get the first message in the compressed set when reading any offset in the set */ - assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset) - assertEquals("Read at offset 1 should produce 0", 0, read(1).head.offset) - assertEquals("Read at offset 2 should produce 2", 2, read(2).head.offset) - assertEquals("Read at offset 3 should produce 2", 2, read(3).head.offset) + assertEquals("Read at offset 0 should produce 0", 0, read(0).next().offset) + assertEquals("Read at offset 1 should produce 0", 0, read(1).next().offset) + assertEquals("Read at offset 2 should produce 2", 2, read(2).next().offset) + assertEquals("Read at offset 3 should produce 2", 2, read(3).next().offset) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/d2f50fc3/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala index b5208a5..812df59 100644 --- a/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala +++ b/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala @@ -31,7 +31,7 @@ import kafka.api.ProducerResponseStatus import kafka.common.{TopicAndPartition, ErrorMapping} class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness { - private var messageBytes = new Array[Byte](2); + private val messageBytes = new Array[Byte](2) // turning off controlled shutdown since testProducerCanTimeout() explicitly shuts down request handler pool. val configs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfigs(1, false).head)) val zookeeperConnect = TestZKUtils.zookeeperConnect
