[jira] [Updated] (KAFKA-1011) Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline
[ https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1011: - Assignee: Guozhang Wang Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline -- Key: KAFKA-1011 URL: https://issues.apache.org/jira/browse/KAFKA-1011 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8.1 The way MirrorMaker works today is that its consumers could use deep iterator to decompress messages received from the source brokers and its producers could re-compress the messages while sending them to the target brokers. Since MirrorMakers use a centralized data channel for its consumers to pipe messages to its producers, and since producers would compress messages with the same topic within a batch as a single produce request, this could result in messages accepted at the front end of the pipeline being dropped at the target brokers of the MirrorMaker due to MesageSizeTooLargeException if it happens that one batch of messages contain too many messages of the same topic in MirrorMaker's producer. If we can use shallow iterator at the MirrorMaker's consumer side to directly pipe compressed messages this issue can be fixed. Also as Swapnil pointed out, currently if the MirrorMaker lags and there are large messages in the MirrorMaker queue (large after decompression), it can run into an OutOfMemoryException. Shallow iteration will be very helpful in avoiding this exception. The proposed solution of this issue is also related to KAFKA-527. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1011) Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline
[ https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13747631#comment-13747631 ] Guozhang Wang commented on KAFKA-1011: -- Proposed Approach: 1. Since the compression function ByteBufferMessageSet.create will only be called over a set of messages either with the same key or with key null, we can write the key to the compressed wrapper message according to their keys (currently it is always written as null). 2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing the parameter from KafkaStream to consumerIterator; in consumerIterator, if isShallow is true call currentDataChunk.messages.shallowIterator otherwise call currentDataChunk.messages.iterator 3. Also in consumerIterator, if shallowIterator is true, construct MessageAndMetadata with value directly assigned as message: Message instead of fromBytes(Utils.readBytes(item.message.payload)) 4. In MirrorMaker, set shallowIterator to true, and upon read each msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead of KeyedMessage[Array[Byte], Array[Byte]]. 5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second compression of compressed message. 6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker producer's event handler would use the message key to decide the outgoing partition, hence compressed messages with the same key would go to the same partition. Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline -- Key: KAFKA-1011 URL: https://issues.apache.org/jira/browse/KAFKA-1011 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Fix For: 0.8.1 The way MirrorMaker works today is that its consumers could use deep iterator to decompress messages received from the source brokers and its producers could re-compress the messages while sending them to the target brokers. Since MirrorMakers use a centralized data channel for its consumers to pipe messages to its producers, and since producers would compress messages with the same topic within a batch as a single produce request, this could result in messages accepted at the front end of the pipeline being dropped at the target brokers of the MirrorMaker due to MesageSizeTooLargeException if it happens that one batch of messages contain too many messages of the same topic in MirrorMaker's producer. If we can use shallow iterator at the MirrorMaker's consumer side to directly pipe compressed messages this issue can be fixed. Also as Swapnil pointed out, currently if the MirrorMaker lags and there are large messages in the MirrorMaker queue (large after decompression), it can run into an OutOfMemoryException. Shallow iteration will be very helpful in avoiding this exception. The proposed solution of this issue is also related to KAFKA-527. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-1019) kafka-preferred-replica-election.sh will fail without clear error message if /brokers/topics/[topic]/partitions does not exist
Guozhang Wang created KAFKA-1019: Summary: kafka-preferred-replica-election.sh will fail without clear error message if /brokers/topics/[topic]/partitions does not exist Key: KAFKA-1019 URL: https://issues.apache.org/jira/browse/KAFKA-1019 Project: Kafka Issue Type: Bug Affects Versions: 0.8.1 Reporter: Guozhang Wang Fix For: 0.8.1 From Libo Yu: I tried to run kafka-preferred-replica-election.sh on our kafka cluster. But I got this expection: Failed to start preferred replica election org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/uattoqaaa.default/partitions I checked zookeeper and there is no /brokers/topics/uattoqaaa.default/partitions. All I found is /brokers/topics/uattoqaaa.default. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Resolved] (KAFKA-1017) High number of open file handles in 0.8 producer
[ https://issues.apache.org/jira/browse/KAFKA-1017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-1017. Resolution: Fixed Fix Version/s: 0.8 Good patch. +1. Committed to 0.8. High number of open file handles in 0.8 producer Key: KAFKA-1017 URL: https://issues.apache.org/jira/browse/KAFKA-1017 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Fix For: 0.8 Attachments: kafka-1017.patch Reported by Jun Rao: For over-partitioned topics, each broker could be the leader for at least 1 partition. In the producer, we randomly select a partition to send the data. Pretty soon, each producer will establish a connection to each of the n brokers. Effectively, we increased the # of socket connections by a factor of n, compared to 0.7. The increased number of socket connections increases the number of open file handles, this could come pretty close to the OS limit. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1017) High number of open file handles in 0.8 producer
[ https://issues.apache.org/jira/browse/KAFKA-1017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13747663#comment-13747663 ] Guozhang Wang commented on KAFKA-1017: -- Good catch, thanks Swapnil! High number of open file handles in 0.8 producer Key: KAFKA-1017 URL: https://issues.apache.org/jira/browse/KAFKA-1017 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Fix For: 0.8 Attachments: kafka-1017.patch Reported by Jun Rao: For over-partitioned topics, each broker could be the leader for at least 1 partition. In the producer, we randomly select a partition to send the data. Pretty soon, each producer will establish a connection to each of the n brokers. Effectively, we increased the # of socket connections by a factor of n, compared to 0.7. The increased number of socket connections increases the number of open file handles, this could come pretty close to the OS limit. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-955) After a leader change, messages sent with ack=0 are lost
[ https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13747692#comment-13747692 ] Jun Rao commented on KAFKA-955: --- Magnus, thanks for your comment. What you suggested is interesting and could be a more effective way of communicating between the producer and the broker. It does require that the producer be able to receive requests initiated at the broker. We do plan to make the producer side processing selector based for efficiency reason. However, this will be a post 0.8 item. We could consider your suggestion then. Regarding your concern about dropped messages, my take is the following. If a client chooses not to receive an ack, it probably means that losing a few batch of messages is not that important. If a client does care about data loss, it can choose ack with 1 or -1. The throughout will be less. However, there are other ways to improve the throughput (e.g., using a larger batch size and/or more instances of producers). Guozhang, patch v3 looks good to me overall. A few more comments: 30. SyncProducerTest.testMessagesizeTooLargeWithAckZero(): You hardcoded the sleep to 500ms. Could you change it to the waitUntil style wait such that the test can finish early if the conditions have been met? 31. KafkaApi.handleProducerRequest(): The logging should probably be at debug level since this doesn't indicate an error at the broker. It's really an error for the client. After a leader change, messages sent with ack=0 are lost Key: KAFKA-955 URL: https://issues.apache.org/jira/browse/KAFKA-955 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Assignee: Guozhang Wang Attachments: KAFKA-955.v1.patch, KAFKA-955.v1.patch, KAFKA-955.v2.patch, KAFKA-955.v3.patch If the leader changes for a partition, and a producer is sending messages with ack=0, then messages will be lost, since the producer has no active way of knowing that the leader has changed, until it's next metadata refresh update. The broker receiving the message, which is no longer the leader, logs a message like this: Produce request with correlation id 7136261 from client on partition [mytopic,0] failed due to Leader not local for partition [mytopic,0] on broker 508818741 This is exacerbated by the controlled shutdown mechanism, which forces an immediate leader change. A possible solution to this would be for a broker which receives a message, for a topic that it is no longer the leader for (and if the ack level is 0), then the broker could just silently forward the message over to the current leader. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-955) After a leader change, messages sent with ack=0 are lost
[ https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-955: Attachment: KAFKA-955.v4.patch Thanks for the comments Jun. 30. Done. 31. After a second thought I realized that we do not need to sleep since the second message size is large enough to cause the socket buffer to flush immediately, and by then the socket close should have been triggered by the server. This has been verified in the unit test. Made some minor changes on comments and rebased on 0.8 After a leader change, messages sent with ack=0 are lost Key: KAFKA-955 URL: https://issues.apache.org/jira/browse/KAFKA-955 Project: Kafka Issue Type: Bug Reporter: Jason Rosenberg Assignee: Guozhang Wang Attachments: KAFKA-955.v1.patch, KAFKA-955.v1.patch, KAFKA-955.v2.patch, KAFKA-955.v3.patch, KAFKA-955.v4.patch If the leader changes for a partition, and a producer is sending messages with ack=0, then messages will be lost, since the producer has no active way of knowing that the leader has changed, until it's next metadata refresh update. The broker receiving the message, which is no longer the leader, logs a message like this: Produce request with correlation id 7136261 from client on partition [mytopic,0] failed due to Leader not local for partition [mytopic,0] on broker 508818741 This is exacerbated by the controlled shutdown mechanism, which forces an immediate leader change. A possible solution to this would be for a broker which receives a message, for a topic that it is no longer the leader for (and if the ack level is 0), then the broker could just silently forward the message over to the current leader. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] Subscription: outstanding kafka patches
Issue Subscription Filter: outstanding kafka patches (69 issues) The list of outstanding kafka patches Subscriber: kafka-mailing-list Key Summary KAFKA-1012 Implement an Offset Manager and hook offset requests to it https://issues.apache.org/jira/browse/KAFKA-1012 KAFKA-1008 Unmap before resizing https://issues.apache.org/jira/browse/KAFKA-1008 KAFKA-1005 kafka.perf.ConsumerPerformance not shutting down consumer https://issues.apache.org/jira/browse/KAFKA-1005 KAFKA-1004 Handle topic event for trivial whitelist topic filters https://issues.apache.org/jira/browse/KAFKA-1004 KAFKA-998 Producer should not retry on non-recoverable error codes https://issues.apache.org/jira/browse/KAFKA-998 KAFKA-997 Provide a strict verification mode when reading configuration properties https://issues.apache.org/jira/browse/KAFKA-997 KAFKA-996 Capitalize first letter for log entries https://issues.apache.org/jira/browse/KAFKA-996 KAFKA-995 Enforce that the value for replica.fetch.max.bytes is always = the value for message.max.bytes https://issues.apache.org/jira/browse/KAFKA-995 KAFKA-990 Fix ReassignPartitionCommand and improve usability https://issues.apache.org/jira/browse/KAFKA-990 KAFKA-984 Avoid a full rebalance in cases when a new topic is discovered but container/broker set stay the same https://issues.apache.org/jira/browse/KAFKA-984 KAFKA-982 Logo for Kafka https://issues.apache.org/jira/browse/KAFKA-982 KAFKA-981 Unable to pull Kafka binaries with Ivy https://issues.apache.org/jira/browse/KAFKA-981 KAFKA-976 Order-Preserving Mirror Maker Testcase https://issues.apache.org/jira/browse/KAFKA-976 KAFKA-967 Use key range in ProducerPerformance https://issues.apache.org/jira/browse/KAFKA-967 KAFKA-956 High-level consumer fails to check topic metadata response for errors https://issues.apache.org/jira/browse/KAFKA-956 KAFKA-955 After a leader change, messages sent with ack=0 are lost https://issues.apache.org/jira/browse/KAFKA-955 KAFKA-946 Kafka Hadoop Consumer fails when verifying message checksum https://issues.apache.org/jira/browse/KAFKA-946 KAFKA-923 Improve controller failover latency https://issues.apache.org/jira/browse/KAFKA-923 KAFKA-917 Expose zk.session.timeout.ms in console consumer https://issues.apache.org/jira/browse/KAFKA-917 KAFKA-885 sbt package builds two kafka jars https://issues.apache.org/jira/browse/KAFKA-885 KAFKA-881 Kafka broker not respecting log.roll.hours https://issues.apache.org/jira/browse/KAFKA-881 KAFKA-873 Consider replacing zkclient with curator (with zkclient-bridge) https://issues.apache.org/jira/browse/KAFKA-873 KAFKA-868 System Test - add test case for rolling controlled shutdown https://issues.apache.org/jira/browse/KAFKA-868 KAFKA-863 System Test - update 0.7 version of kafka-run-class.sh for Migration Tool test cases https://issues.apache.org/jira/browse/KAFKA-863 KAFKA-859 support basic auth protection of mx4j console https://issues.apache.org/jira/browse/KAFKA-859 KAFKA-855 Ant+Ivy build for Kafka https://issues.apache.org/jira/browse/KAFKA-855 KAFKA-854 Upgrade dependencies for 0.8 https://issues.apache.org/jira/browse/KAFKA-854 KAFKA-815 Improve SimpleConsumerShell to take in a max messages config option https://issues.apache.org/jira/browse/KAFKA-815 KAFKA-745 Remove getShutdownReceive() and other kafka specific code from the RequestChannel https://issues.apache.org/jira/browse/KAFKA-745 KAFKA-735 Add looping and JSON output for ConsumerOffsetChecker https://issues.apache.org/jira/browse/KAFKA-735 KAFKA-717 scala 2.10 build support https://issues.apache.org/jira/browse/KAFKA-717 KAFKA-686 0.8 Kafka broker should give a better error message when running against 0.7 zookeeper https://issues.apache.org/jira/browse/KAFKA-686 KAFKA-677 Retention process gives exception if an empty segment is chosen for collection https://issues.apache.org/jira/browse/KAFKA-677 KAFKA-674 Clean Shutdown Testing - Log segments checksums mismatch https://issues.apache.org/jira/browse/KAFKA-674 KAFKA-652 Create testcases for clean shut-down https://issues.apache.org/jira/browse/KAFKA-652 KAFKA-649 Cleanup log4j logging https://issues.apache.org/jira/browse/KAFKA-649 KAFKA-645 Create a shell script to run System Test with DEBUG details and tee console output to a file https://issues.apache.org/jira/browse/KAFKA-645 KAFKA-598 decouple fetch size from max message size https://issues.apache.org/jira/browse/KAFKA-598 KAFKA-583 SimpleConsumerShell may
[jira] [Created] (KAFKA-1020) Remove getAllReplicasOnBroker from KafkaController
Guozhang Wang created KAFKA-1020: Summary: Remove getAllReplicasOnBroker from KafkaController Key: KAFKA-1020 URL: https://issues.apache.org/jira/browse/KAFKA-1020 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Today KafkaController call getAllReplicasOnBroker on broker failure and new broker start up to get all the replicas that broker is holding (or suppose to hold). This function actually issue a read on each topic's partition znodes. With large number of topic/partitions this could seriously increase the latency of handling broker failure and new broker startup. On the other hand, ControllerContext maintains a partitionReplicaAssignment cache, which is designed to keep the most updated partition replica assignment according to ZK. So instead of reading from ZK, we could just read from the local cache, given that partitionReplicaAssignment is guaranteed to be up-to-date. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-949) Integrate kafka into YARN
[ https://issues.apache.org/jira/browse/KAFKA-949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13748147#comment-13748147 ] Kam Kasravi commented on KAFKA-949: --- This work is available under https://github.com/kkasravi/kafka-yarn, I can provide a patch which adds the project to https://github.com/apache/kafka/tree/0.8/contrib/yarn. Kafka-yarn has a dependency on BIGTOP-989 which installs kafka 0.8 beta1 as a service on linux (deb, rpm). Please advise. Kam Integrate kafka into YARN - Key: KAFKA-949 URL: https://issues.apache.org/jira/browse/KAFKA-949 Project: Kafka Issue Type: New Feature Components: contrib Affects Versions: 0.8 Environment: hadoop 2-0.X Reporter: Kam Kasravi kafka is being added to bigtop (BIGTOP-989). Having kafka services available under YARN will enable a number of cluster operations for kafka that YARN handles. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-1008) Unmap before resizing
[ https://issues.apache.org/jira/browse/KAFKA-1008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13748206#comment-13748206 ] David Lao commented on KAFKA-1008: -- Hi Jay, The patch does not seem to apply cleanly on the 0.8 branch (see below). Can you look into generating a new patch for 0.8? git apply --check KAFKA-1008-v6.patch error: patch failed: core/src/main/scala/kafka/log/OffsetIndex.scala:52 error: core/src/main/scala/kafka/log/OffsetIndex.scala: patch does not apply error: patch failed: core/src/main/scala/kafka/utils/Utils.scala:21 error: core/src/main/scala/kafka/utils/Utils.scala: patch does not apply error: patch failed: core/src/test/scala/unit/kafka/utils/UtilsTest.scala:18 error: core/src/test/scala/unit/kafka/utils/UtilsTest.scala: patch does not apply Unmap before resizing - Key: KAFKA-1008 URL: https://issues.apache.org/jira/browse/KAFKA-1008 Project: Kafka Issue Type: Bug Components: core, log Affects Versions: 0.8 Environment: Windows, Linux, Mac OS Reporter: Elizabeth Wei Assignee: Jay Kreps Labels: patch Fix For: 0.8 Attachments: KAFKA-1008-v6.patch, unmap-v5.patch Original Estimate: 1h Remaining Estimate: 1h While I was studying how MappedByteBuffer works, I saw a sharing runtime exception on Windows. I applied what I learned to generate a patch which uses an internal open JDK API to solve this problem. Following Jay's advice, I made a helper method called tryUnmap(). -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Comment Edited] (KAFKA-1011) Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline
[ https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13747631#comment-13747631 ] Guozhang Wang edited comment on KAFKA-1011 at 8/23/13 2:30 AM: --- Proposed Approach: 1. Since the compression function ByteBufferMessageSet.create will only be called over a set of messages either with the same key hash value or with key null, we can write the key to the compressed wrapper message according to partition id (currently it is always written as null). 2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing the parameter from KafkaStream to consumerIterator; in consumerIterator, if isShallow is true call currentDataChunk.messages.shallowIterator otherwise call currentDataChunk.messages.iterator 3. Also in consumerIterator, if shallowIterator is true, construct MessageAndMetadata with value directly assigned as message: Message instead of fromBytes(Utils.readBytes(item.message.payload)) 4. In MirrorMaker, set shallowIterator to true, and upon read each msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead of KeyedMessage[Array[Byte], Array[Byte]]. 5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second compression of compressed message. 6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker producer's event handler would use the message key to decide the outgoing partition, hence compressed messages with the same key would go to the same partition. was (Author: guozhang): Proposed Approach: 1. Since the compression function ByteBufferMessageSet.create will only be called over a set of messages either with the same key or with key null, we can write the key to the compressed wrapper message according to their keys (currently it is always written as null). 2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing the parameter from KafkaStream to consumerIterator; in consumerIterator, if isShallow is true call currentDataChunk.messages.shallowIterator otherwise call currentDataChunk.messages.iterator 3. Also in consumerIterator, if shallowIterator is true, construct MessageAndMetadata with value directly assigned as message: Message instead of fromBytes(Utils.readBytes(item.message.payload)) 4. In MirrorMaker, set shallowIterator to true, and upon read each msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead of KeyedMessage[Array[Byte], Array[Byte]]. 5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second compression of compressed message. 6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker producer's event handler would use the message key to decide the outgoing partition, hence compressed messages with the same key would go to the same partition. Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline -- Key: KAFKA-1011 URL: https://issues.apache.org/jira/browse/KAFKA-1011 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8.1 The way MirrorMaker works today is that its consumers could use deep iterator to decompress messages received from the source brokers and its producers could re-compress the messages while sending them to the target brokers. Since MirrorMakers use a centralized data channel for its consumers to pipe messages to its producers, and since producers would compress messages with the same topic within a batch as a single produce request, this could result in messages accepted at the front end of the pipeline being dropped at the target brokers of the MirrorMaker due to MesageSizeTooLargeException if it happens that one batch of messages contain too many messages of the same topic in MirrorMaker's producer. If we can use shallow iterator at the MirrorMaker's consumer side to directly pipe compressed messages this issue can be fixed. Also as Swapnil pointed out, currently if the MirrorMaker lags and there are large messages in the MirrorMaker queue (large after decompression), it can run into an OutOfMemoryException. Shallow iteration will be very helpful in avoiding this exception. The proposed solution of this issue is also related to KAFKA-527. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Comment Edited] (KAFKA-1011) Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline
[ https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13747631#comment-13747631 ] Guozhang Wang edited comment on KAFKA-1011 at 8/23/13 2:32 AM: --- Proposed Approach: 1. Since the compression function ByteBufferMessageSet.create will only be called over a set of messages either with the same key hash value or with key null, we can write the key to the compressed wrapper message according to their destination broker id (currently it is always written as null). 2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing the parameter from KafkaStream to consumerIterator; in consumerIterator, if isShallow is true call currentDataChunk.messages.shallowIterator otherwise call currentDataChunk.messages.iterator 3. Also in consumerIterator, if shallowIterator is true, construct MessageAndMetadata with value directly assigned as message: Message instead of fromBytes(Utils.readBytes(item.message.payload)) 4. In MirrorMaker, set shallowIterator to true, and upon read each msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead of KeyedMessage[Array[Byte], Array[Byte]]. 5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second compression of compressed message. 6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker producer's event handler would use the message key to decide the outgoing partition, hence compressed messages with the same key would go to the same partition. was (Author: guozhang): Proposed Approach: 1. Since the compression function ByteBufferMessageSet.create will only be called over a set of messages either with the same key hash value or with key null, we can write the key to the compressed wrapper message according to partition id (currently it is always written as null). 2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing the parameter from KafkaStream to consumerIterator; in consumerIterator, if isShallow is true call currentDataChunk.messages.shallowIterator otherwise call currentDataChunk.messages.iterator 3. Also in consumerIterator, if shallowIterator is true, construct MessageAndMetadata with value directly assigned as message: Message instead of fromBytes(Utils.readBytes(item.message.payload)) 4. In MirrorMaker, set shallowIterator to true, and upon read each msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead of KeyedMessage[Array[Byte], Array[Byte]]. 5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second compression of compressed message. 6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker producer's event handler would use the message key to decide the outgoing partition, hence compressed messages with the same key would go to the same partition. Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline -- Key: KAFKA-1011 URL: https://issues.apache.org/jira/browse/KAFKA-1011 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8.1 The way MirrorMaker works today is that its consumers could use deep iterator to decompress messages received from the source brokers and its producers could re-compress the messages while sending them to the target brokers. Since MirrorMakers use a centralized data channel for its consumers to pipe messages to its producers, and since producers would compress messages with the same topic within a batch as a single produce request, this could result in messages accepted at the front end of the pipeline being dropped at the target brokers of the MirrorMaker due to MesageSizeTooLargeException if it happens that one batch of messages contain too many messages of the same topic in MirrorMaker's producer. If we can use shallow iterator at the MirrorMaker's consumer side to directly pipe compressed messages this issue can be fixed. Also as Swapnil pointed out, currently if the MirrorMaker lags and there are large messages in the MirrorMaker queue (large after decompression), it can run into an OutOfMemoryException. Shallow iteration will be very helpful in avoiding this exception. The proposed solution of this issue is also related to KAFKA-527. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Comment Edited] (KAFKA-1011) Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline
[ https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13747631#comment-13747631 ] Guozhang Wang edited comment on KAFKA-1011 at 8/23/13 3:13 AM: --- Proposed Approach: 1. Since the compression function ByteBufferMessageSet.create will only be called over a set of messages either with the same key hash value or with key null, we can write the key to the compressed wrapper message according to their destination partition id (currently it is always written as null). 2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing the parameter from KafkaStream to consumerIterator; in consumerIterator, if isShallow is true call currentDataChunk.messages.shallowIterator otherwise call currentDataChunk.messages.iterator 3. Also in consumerIterator, if shallowIterator is true, construct MessageAndMetadata with value directly assigned as message: Message instead of fromBytes(Utils.readBytes(item.message.payload)) 4. In MirrorMaker, set shallowIterator to true, and upon read each msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead of KeyedMessage[Array[Byte], Array[Byte]]. 5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second compression of compressed message. 6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker producer's event handler would use the message key to decide the outgoing partition, hence compressed messages with the same key would go to the same partition. was (Author: guozhang): Proposed Approach: 1. Since the compression function ByteBufferMessageSet.create will only be called over a set of messages either with the same key hash value or with key null, we can write the key to the compressed wrapper message according to their destination broker id (currently it is always written as null). 2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing the parameter from KafkaStream to consumerIterator; in consumerIterator, if isShallow is true call currentDataChunk.messages.shallowIterator otherwise call currentDataChunk.messages.iterator 3. Also in consumerIterator, if shallowIterator is true, construct MessageAndMetadata with value directly assigned as message: Message instead of fromBytes(Utils.readBytes(item.message.payload)) 4. In MirrorMaker, set shallowIterator to true, and upon read each msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead of KeyedMessage[Array[Byte], Array[Byte]]. 5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second compression of compressed message. 6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker producer's event handler would use the message key to decide the outgoing partition, hence compressed messages with the same key would go to the same partition. Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline -- Key: KAFKA-1011 URL: https://issues.apache.org/jira/browse/KAFKA-1011 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Fix For: 0.8.1 The way MirrorMaker works today is that its consumers could use deep iterator to decompress messages received from the source brokers and its producers could re-compress the messages while sending them to the target brokers. Since MirrorMakers use a centralized data channel for its consumers to pipe messages to its producers, and since producers would compress messages with the same topic within a batch as a single produce request, this could result in messages accepted at the front end of the pipeline being dropped at the target brokers of the MirrorMaker due to MesageSizeTooLargeException if it happens that one batch of messages contain too many messages of the same topic in MirrorMaker's producer. If we can use shallow iterator at the MirrorMaker's consumer side to directly pipe compressed messages this issue can be fixed. Also as Swapnil pointed out, currently if the MirrorMaker lags and there are large messages in the MirrorMaker queue (large after decompression), it can run into an OutOfMemoryException. Shallow iteration will be very helpful in avoiding this exception. The proposed solution of this issue is also related to KAFKA-527. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira