[
https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13747631#comment-13747631
]
Guozhang Wang edited comment on KAFKA-1011 at 8/23/13 3:13 AM:
---------------------------------------------------------------
Proposed Approach:
1. Since the compression function ByteBufferMessageSet.create will only be
called over a set of messages either with the same key hash value or with key
null, we can write the key to the compressed wrapper message according to their
destination partition id (currently it is always written as null).
2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing
the parameter from KafkaStream to consumerIterator; in consumerIterator, if
isShallow is true call currentDataChunk.messages.shallowIterator otherwise call
currentDataChunk.messages.iterator
3. Also in consumerIterator, if shallowIterator is true, construct
MessageAndMetadata with value directly assigned as message: Message instead of
fromBytes(Utils.readBytes(item.message.payload))
4. In MirrorMaker, set shallowIterator to true, and upon read each
msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead
of KeyedMessage[Array[Byte], Array[Byte]].
5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second
compression of compressed message.
6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker
producer's event handler would use the message key to decide the outgoing
partition, hence compressed messages with the same key would go to the same
partition.
was (Author: guozhang):
Proposed Approach:
1. Since the compression function ByteBufferMessageSet.create will only be
called over a set of messages either with the same key hash value or with key
null, we can write the key to the compressed wrapper message according to their
destination broker id (currently it is always written as null).
2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing
the parameter from KafkaStream to consumerIterator; in consumerIterator, if
isShallow is true call currentDataChunk.messages.shallowIterator otherwise call
currentDataChunk.messages.iterator
3. Also in consumerIterator, if shallowIterator is true, construct
MessageAndMetadata with value directly assigned as message: Message instead of
fromBytes(Utils.readBytes(item.message.payload))
4. In MirrorMaker, set shallowIterator to true, and upon read each
msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead
of KeyedMessage[Array[Byte], Array[Byte]].
5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second
compression of compressed message.
6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker
producer's event handler would use the message key to decide the outgoing
partition, hence compressed messages with the same key would go to the same
partition.
> Decompression and re-compression on MirrorMaker could result in messages
> being dropped in the pipeline
> ------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-1011
> URL: https://issues.apache.org/jira/browse/KAFKA-1011
> Project: Kafka
> Issue Type: Bug
> Reporter: Guozhang Wang
> Assignee: Guozhang Wang
> Fix For: 0.8.1
>
>
> The way MirrorMaker works today is that its consumers could use deep iterator
> to decompress messages received from the source brokers and its producers
> could re-compress the messages while sending them to the target brokers.
> Since MirrorMakers use a centralized data channel for its consumers to pipe
> messages to its producers, and since producers would compress messages with
> the same topic within a batch as a single produce request, this could result
> in messages accepted at the front end of the pipeline being dropped at the
> target brokers of the MirrorMaker due to MesageSizeTooLargeException if it
> happens that one batch of messages contain too many messages of the same
> topic in MirrorMaker's producer. If we can use shallow iterator at the
> MirrorMaker's consumer side to directly pipe compressed messages this issue
> can be fixed.
> Also as Swapnil pointed out, currently if the MirrorMaker lags and there are
> large messages in the MirrorMaker queue (large after decompression), it can
> run into an OutOfMemoryException. Shallow iteration will be very helpful in
> avoiding this exception.
> The proposed solution of this issue is also related to KAFKA-527.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira