[jira] [Updated] (KAFKA-1011) Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline

2013-08-22 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-1011:
-

Assignee: Guozhang Wang

 Decompression and re-compression on MirrorMaker could result in messages 
 being dropped in the pipeline
 --

 Key: KAFKA-1011
 URL: https://issues.apache.org/jira/browse/KAFKA-1011
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.8.1


 The way MirrorMaker works today is that its consumers could use deep iterator 
 to decompress messages received from the source brokers and its producers 
 could re-compress the messages while sending them to the target brokers. 
 Since MirrorMakers use a centralized data channel for its consumers to pipe 
 messages to its producers, and since producers would compress messages with 
 the same topic within a batch as a single produce request, this could result 
 in messages accepted at the front end of the pipeline being dropped at the 
 target brokers of the MirrorMaker due to MesageSizeTooLargeException if it 
 happens that one batch of messages contain too many messages of the same 
 topic in MirrorMaker's producer. If we can use shallow iterator at the 
 MirrorMaker's consumer side to directly pipe compressed messages this issue 
 can be fixed. 
 Also as Swapnil pointed out, currently if the MirrorMaker lags and there are 
 large messages in the MirrorMaker queue (large after decompression), it can 
 run into an OutOfMemoryException. Shallow iteration will be very helpful in 
 avoiding this exception.
 The proposed solution of this issue is also related to KAFKA-527.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-1011) Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline

2013-08-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13747631#comment-13747631
 ] 

Guozhang Wang commented on KAFKA-1011:
--

Proposed Approach:

1. Since the compression function ByteBufferMessageSet.create will only be 
called over a set of messages either with the same key or with key null, we can 
write the key to the compressed wrapper message according to their keys 
(currently it is always written as null).

2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing 
the parameter from KafkaStream to consumerIterator; in consumerIterator, if 
isShallow is true call currentDataChunk.messages.shallowIterator otherwise call 
currentDataChunk.messages.iterator

3. Also in consumerIterator, if shallowIterator is true, construct 
MessageAndMetadata with value directly assigned as message: Message instead of 
fromBytes(Utils.readBytes(item.message.payload))

4. In MirrorMaker, set shallowIterator to true, and upon read each 
msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead 
of  KeyedMessage[Array[Byte], Array[Byte]].

5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second 
compression of compressed message.

6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker 
producer's event handler would use the message key to decide the outgoing 
partition, hence compressed messages with the same key would go to the same 
partition.

 Decompression and re-compression on MirrorMaker could result in messages 
 being dropped in the pipeline
 --

 Key: KAFKA-1011
 URL: https://issues.apache.org/jira/browse/KAFKA-1011
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
 Fix For: 0.8.1


 The way MirrorMaker works today is that its consumers could use deep iterator 
 to decompress messages received from the source brokers and its producers 
 could re-compress the messages while sending them to the target brokers. 
 Since MirrorMakers use a centralized data channel for its consumers to pipe 
 messages to its producers, and since producers would compress messages with 
 the same topic within a batch as a single produce request, this could result 
 in messages accepted at the front end of the pipeline being dropped at the 
 target brokers of the MirrorMaker due to MesageSizeTooLargeException if it 
 happens that one batch of messages contain too many messages of the same 
 topic in MirrorMaker's producer. If we can use shallow iterator at the 
 MirrorMaker's consumer side to directly pipe compressed messages this issue 
 can be fixed. 
 Also as Swapnil pointed out, currently if the MirrorMaker lags and there are 
 large messages in the MirrorMaker queue (large after decompression), it can 
 run into an OutOfMemoryException. Shallow iteration will be very helpful in 
 avoiding this exception.
 The proposed solution of this issue is also related to KAFKA-527.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Created] (KAFKA-1019) kafka-preferred-replica-election.sh will fail without clear error message if /brokers/topics/[topic]/partitions does not exist

2013-08-22 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-1019:


 Summary: kafka-preferred-replica-election.sh will fail without 
clear error message if /brokers/topics/[topic]/partitions does not exist
 Key: KAFKA-1019
 URL: https://issues.apache.org/jira/browse/KAFKA-1019
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1
Reporter: Guozhang Wang
 Fix For: 0.8.1


From Libo Yu:

I tried to run kafka-preferred-replica-election.sh on our kafka cluster.
But I got this expection:
Failed to start preferred replica election
org.I0Itec.zkclient.exception.ZkNoNodeException: 
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode 
for /brokers/topics/uattoqaaa.default/partitions

I checked zookeeper and there is no 
/brokers/topics/uattoqaaa.default/partitions. All I found is
/brokers/topics/uattoqaaa.default.


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Resolved] (KAFKA-1017) High number of open file handles in 0.8 producer

2013-08-22 Thread Jun Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-1017.


   Resolution: Fixed
Fix Version/s: 0.8

Good patch. +1. Committed to 0.8.

 High number of open file handles in 0.8 producer
 

 Key: KAFKA-1017
 URL: https://issues.apache.org/jira/browse/KAFKA-1017
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8
Reporter: Swapnil Ghike
Assignee: Swapnil Ghike
 Fix For: 0.8

 Attachments: kafka-1017.patch


 Reported by Jun Rao:
 For over-partitioned topics, each broker could be the leader for at least 1 
 partition. In the producer, we randomly select a partition to send the data. 
 Pretty soon, each producer will establish a connection to each of the n 
 brokers. Effectively, we increased the # of socket connections by a factor of 
 n, compared to 0.7.
 The increased number of socket connections increases the number of open file 
 handles, this could come pretty  close to the OS limit.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-1017) High number of open file handles in 0.8 producer

2013-08-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13747663#comment-13747663
 ] 

Guozhang Wang commented on KAFKA-1017:
--

Good catch, thanks Swapnil!

 High number of open file handles in 0.8 producer
 

 Key: KAFKA-1017
 URL: https://issues.apache.org/jira/browse/KAFKA-1017
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8
Reporter: Swapnil Ghike
Assignee: Swapnil Ghike
 Fix For: 0.8

 Attachments: kafka-1017.patch


 Reported by Jun Rao:
 For over-partitioned topics, each broker could be the leader for at least 1 
 partition. In the producer, we randomly select a partition to send the data. 
 Pretty soon, each producer will establish a connection to each of the n 
 brokers. Effectively, we increased the # of socket connections by a factor of 
 n, compared to 0.7.
 The increased number of socket connections increases the number of open file 
 handles, this could come pretty  close to the OS limit.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-955) After a leader change, messages sent with ack=0 are lost

2013-08-22 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13747692#comment-13747692
 ] 

Jun Rao commented on KAFKA-955:
---

Magnus, thanks for your comment. What you suggested is interesting and could be 
a more effective way of communicating between the producer and the broker. It 
does require that the producer be able to receive requests initiated at the 
broker. We do plan to make the producer side processing selector based for 
efficiency reason. However, this will be a post 0.8 item. We could consider 
your suggestion then. Regarding your concern about dropped messages, my take is 
the following. If a client chooses not to receive an ack, it probably means 
that losing a few batch of messages is not that important. If a client does 
care about data loss, it can choose ack with 1 or -1. The throughout will be 
less. However, there are other ways to improve the throughput (e.g., using a 
larger batch size and/or more instances of producers).

Guozhang, patch v3 looks good to me overall. A few more comments:

30. SyncProducerTest.testMessagesizeTooLargeWithAckZero(): You hardcoded the 
sleep to 500ms. Could you change it to the waitUntil style wait such that the 
test can finish early if the conditions have been met?

31. KafkaApi.handleProducerRequest(): The logging should probably be at debug 
level since this doesn't indicate an error at the broker. It's really an error 
for the client.





 After a leader change, messages sent with ack=0 are lost
 

 Key: KAFKA-955
 URL: https://issues.apache.org/jira/browse/KAFKA-955
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg
Assignee: Guozhang Wang
 Attachments: KAFKA-955.v1.patch, KAFKA-955.v1.patch, 
 KAFKA-955.v2.patch, KAFKA-955.v3.patch


 If the leader changes for a partition, and a producer is sending messages 
 with ack=0, then messages will be lost, since the producer has no active way 
 of knowing that the leader has changed, until it's next metadata refresh 
 update.
 The broker receiving the message, which is no longer the leader, logs a 
 message like this:
 Produce request with correlation id 7136261 from client  on partition 
 [mytopic,0] failed due to Leader not local for partition [mytopic,0] on 
 broker 508818741
 This is exacerbated by the controlled shutdown mechanism, which forces an 
 immediate leader change.
 A possible solution to this would be for a broker which receives a message, 
 for a topic that it is no longer the leader for (and if the ack level is 0), 
 then the broker could just silently forward the message over to the current 
 leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Updated] (KAFKA-955) After a leader change, messages sent with ack=0 are lost

2013-08-22 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-955:


Attachment: KAFKA-955.v4.patch

Thanks for the comments Jun.

30. Done.
31. After a second thought I realized that we do not need to sleep since the 
second message size is large enough to cause the socket buffer to flush 
immediately, and by then the socket close should have been triggered by the 
server. This has been verified in the unit test.

Made some minor changes on comments and rebased on 0.8

 After a leader change, messages sent with ack=0 are lost
 

 Key: KAFKA-955
 URL: https://issues.apache.org/jira/browse/KAFKA-955
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Rosenberg
Assignee: Guozhang Wang
 Attachments: KAFKA-955.v1.patch, KAFKA-955.v1.patch, 
 KAFKA-955.v2.patch, KAFKA-955.v3.patch, KAFKA-955.v4.patch


 If the leader changes for a partition, and a producer is sending messages 
 with ack=0, then messages will be lost, since the producer has no active way 
 of knowing that the leader has changed, until it's next metadata refresh 
 update.
 The broker receiving the message, which is no longer the leader, logs a 
 message like this:
 Produce request with correlation id 7136261 from client  on partition 
 [mytopic,0] failed due to Leader not local for partition [mytopic,0] on 
 broker 508818741
 This is exacerbated by the controlled shutdown mechanism, which forces an 
 immediate leader change.
 A possible solution to this would be for a broker which receives a message, 
 for a topic that it is no longer the leader for (and if the ack level is 0), 
 then the broker could just silently forward the message over to the current 
 leader.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] Subscription: outstanding kafka patches

2013-08-22 Thread jira
Issue Subscription
Filter: outstanding kafka patches (69 issues)
The list of outstanding kafka patches
Subscriber: kafka-mailing-list

Key Summary
KAFKA-1012  Implement an Offset Manager and hook offset requests to it
https://issues.apache.org/jira/browse/KAFKA-1012
KAFKA-1008  Unmap before resizing
https://issues.apache.org/jira/browse/KAFKA-1008
KAFKA-1005  kafka.perf.ConsumerPerformance not shutting down consumer
https://issues.apache.org/jira/browse/KAFKA-1005
KAFKA-1004  Handle topic event for trivial whitelist topic filters
https://issues.apache.org/jira/browse/KAFKA-1004
KAFKA-998   Producer should not retry on non-recoverable error codes
https://issues.apache.org/jira/browse/KAFKA-998
KAFKA-997   Provide a strict verification mode when reading configuration 
properties
https://issues.apache.org/jira/browse/KAFKA-997
KAFKA-996   Capitalize first letter for log entries
https://issues.apache.org/jira/browse/KAFKA-996
KAFKA-995   Enforce that the value for replica.fetch.max.bytes is always = the 
value for message.max.bytes
https://issues.apache.org/jira/browse/KAFKA-995
KAFKA-990   Fix ReassignPartitionCommand and improve usability
https://issues.apache.org/jira/browse/KAFKA-990
KAFKA-984   Avoid a full rebalance in cases when a new topic is discovered but 
container/broker set stay the same
https://issues.apache.org/jira/browse/KAFKA-984
KAFKA-982   Logo for Kafka
https://issues.apache.org/jira/browse/KAFKA-982
KAFKA-981   Unable to pull Kafka binaries with Ivy
https://issues.apache.org/jira/browse/KAFKA-981
KAFKA-976   Order-Preserving Mirror Maker Testcase
https://issues.apache.org/jira/browse/KAFKA-976
KAFKA-967   Use key range in ProducerPerformance
https://issues.apache.org/jira/browse/KAFKA-967
KAFKA-956   High-level consumer fails to check topic metadata response for 
errors
https://issues.apache.org/jira/browse/KAFKA-956
KAFKA-955   After a leader change, messages sent with ack=0 are lost
https://issues.apache.org/jira/browse/KAFKA-955
KAFKA-946   Kafka Hadoop Consumer fails when verifying message checksum
https://issues.apache.org/jira/browse/KAFKA-946
KAFKA-923   Improve controller failover latency
https://issues.apache.org/jira/browse/KAFKA-923
KAFKA-917   Expose zk.session.timeout.ms in console consumer
https://issues.apache.org/jira/browse/KAFKA-917
KAFKA-885   sbt package builds two kafka jars
https://issues.apache.org/jira/browse/KAFKA-885
KAFKA-881   Kafka broker not respecting log.roll.hours
https://issues.apache.org/jira/browse/KAFKA-881
KAFKA-873   Consider replacing zkclient with curator (with zkclient-bridge)
https://issues.apache.org/jira/browse/KAFKA-873
KAFKA-868   System Test - add test case for rolling controlled shutdown
https://issues.apache.org/jira/browse/KAFKA-868
KAFKA-863   System Test - update 0.7 version of kafka-run-class.sh for 
Migration Tool test cases
https://issues.apache.org/jira/browse/KAFKA-863
KAFKA-859   support basic auth protection of mx4j console
https://issues.apache.org/jira/browse/KAFKA-859
KAFKA-855   Ant+Ivy build for Kafka
https://issues.apache.org/jira/browse/KAFKA-855
KAFKA-854   Upgrade dependencies for 0.8
https://issues.apache.org/jira/browse/KAFKA-854
KAFKA-815   Improve SimpleConsumerShell to take in a max messages config option
https://issues.apache.org/jira/browse/KAFKA-815
KAFKA-745   Remove getShutdownReceive() and other kafka specific code from the 
RequestChannel
https://issues.apache.org/jira/browse/KAFKA-745
KAFKA-735   Add looping and JSON output for ConsumerOffsetChecker
https://issues.apache.org/jira/browse/KAFKA-735
KAFKA-717   scala 2.10 build support
https://issues.apache.org/jira/browse/KAFKA-717
KAFKA-686   0.8 Kafka broker should give a better error message when running 
against 0.7 zookeeper
https://issues.apache.org/jira/browse/KAFKA-686
KAFKA-677   Retention process gives exception if an empty segment is chosen for 
collection
https://issues.apache.org/jira/browse/KAFKA-677
KAFKA-674   Clean Shutdown Testing - Log segments checksums mismatch
https://issues.apache.org/jira/browse/KAFKA-674
KAFKA-652   Create testcases for clean shut-down
https://issues.apache.org/jira/browse/KAFKA-652
KAFKA-649   Cleanup log4j logging
https://issues.apache.org/jira/browse/KAFKA-649
KAFKA-645   Create a shell script to run System Test with DEBUG details and 
tee console output to a file
https://issues.apache.org/jira/browse/KAFKA-645
KAFKA-598   decouple fetch size from max message size
https://issues.apache.org/jira/browse/KAFKA-598
KAFKA-583   SimpleConsumerShell may 

[jira] [Created] (KAFKA-1020) Remove getAllReplicasOnBroker from KafkaController

2013-08-22 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-1020:


 Summary: Remove getAllReplicasOnBroker from KafkaController
 Key: KAFKA-1020
 URL: https://issues.apache.org/jira/browse/KAFKA-1020
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang


Today KafkaController call getAllReplicasOnBroker on broker failure and new 
broker start up to get all the replicas that broker is holding (or suppose to 
hold). This function actually issue a read on each topic's partition znodes. 
With large number of topic/partitions this could seriously increase the latency 
of handling broker failure and new broker startup.

On the other hand, ControllerContext maintains a partitionReplicaAssignment 
cache, which is designed to keep the most updated partition replica assignment 
according to ZK. So instead of reading from ZK, we could just read from the 
local cache, given that partitionReplicaAssignment is guaranteed to be 
up-to-date.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-949) Integrate kafka into YARN

2013-08-22 Thread Kam Kasravi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13748147#comment-13748147
 ] 

Kam Kasravi commented on KAFKA-949:
---

This work is available under https://github.com/kkasravi/kafka-yarn, I can 
provide a patch which adds the project to 
https://github.com/apache/kafka/tree/0.8/contrib/yarn. Kafka-yarn has a 
dependency on BIGTOP-989 which installs kafka 0.8 beta1 as a service on linux 
(deb, rpm). Please advise.
Kam

 Integrate kafka into YARN
 -

 Key: KAFKA-949
 URL: https://issues.apache.org/jira/browse/KAFKA-949
 Project: Kafka
  Issue Type: New Feature
  Components: contrib
Affects Versions: 0.8
 Environment: hadoop 2-0.X
Reporter: Kam Kasravi

 kafka is being added to bigtop (BIGTOP-989). Having kafka services available 
 under YARN will enable a number of cluster operations for kafka that YARN 
 handles.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Commented] (KAFKA-1008) Unmap before resizing

2013-08-22 Thread David Lao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13748206#comment-13748206
 ] 

David Lao commented on KAFKA-1008:
--

Hi Jay,
The patch does not seem to apply cleanly on the 0.8 branch (see below). Can you 
look into generating a new patch for 0.8? 

git apply --check KAFKA-1008-v6.patch
error: patch failed: core/src/main/scala/kafka/log/OffsetIndex.scala:52
error: core/src/main/scala/kafka/log/OffsetIndex.scala: patch does not apply
error: patch failed: core/src/main/scala/kafka/utils/Utils.scala:21
error: core/src/main/scala/kafka/utils/Utils.scala: patch does not apply
error: patch failed: core/src/test/scala/unit/kafka/utils/UtilsTest.scala:18
error: core/src/test/scala/unit/kafka/utils/UtilsTest.scala: patch does not 
apply


 Unmap before resizing
 -

 Key: KAFKA-1008
 URL: https://issues.apache.org/jira/browse/KAFKA-1008
 Project: Kafka
  Issue Type: Bug
  Components: core, log
Affects Versions: 0.8
 Environment: Windows, Linux, Mac OS
Reporter: Elizabeth Wei
Assignee: Jay Kreps
  Labels: patch
 Fix For: 0.8

 Attachments: KAFKA-1008-v6.patch, unmap-v5.patch

   Original Estimate: 1h
  Remaining Estimate: 1h

 While I was studying how MappedByteBuffer works, I saw a sharing runtime 
 exception on Windows. I applied what I learned to generate a patch which uses 
 an internal open JDK API to solve this problem.
 Following Jay's advice, I made a helper method called tryUnmap(). 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Comment Edited] (KAFKA-1011) Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline

2013-08-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13747631#comment-13747631
 ] 

Guozhang Wang edited comment on KAFKA-1011 at 8/23/13 2:30 AM:
---

Proposed Approach:

1. Since the compression function ByteBufferMessageSet.create will only be 
called over a set of messages either with the same key hash value or with key 
null, we can write the key to the compressed wrapper message according to 
partition id (currently it is always written as null).

2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing 
the parameter from KafkaStream to consumerIterator; in consumerIterator, if 
isShallow is true call currentDataChunk.messages.shallowIterator otherwise call 
currentDataChunk.messages.iterator

3. Also in consumerIterator, if shallowIterator is true, construct 
MessageAndMetadata with value directly assigned as message: Message instead of 
fromBytes(Utils.readBytes(item.message.payload))

4. In MirrorMaker, set shallowIterator to true, and upon read each 
msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead 
of  KeyedMessage[Array[Byte], Array[Byte]].

5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second 
compression of compressed message.

6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker 
producer's event handler would use the message key to decide the outgoing 
partition, hence compressed messages with the same key would go to the same 
partition.

  was (Author: guozhang):
Proposed Approach:

1. Since the compression function ByteBufferMessageSet.create will only be 
called over a set of messages either with the same key or with key null, we can 
write the key to the compressed wrapper message according to their keys 
(currently it is always written as null).

2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing 
the parameter from KafkaStream to consumerIterator; in consumerIterator, if 
isShallow is true call currentDataChunk.messages.shallowIterator otherwise call 
currentDataChunk.messages.iterator

3. Also in consumerIterator, if shallowIterator is true, construct 
MessageAndMetadata with value directly assigned as message: Message instead of 
fromBytes(Utils.readBytes(item.message.payload))

4. In MirrorMaker, set shallowIterator to true, and upon read each 
msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead 
of  KeyedMessage[Array[Byte], Array[Byte]].

5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second 
compression of compressed message.

6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker 
producer's event handler would use the message key to decide the outgoing 
partition, hence compressed messages with the same key would go to the same 
partition.
  
 Decompression and re-compression on MirrorMaker could result in messages 
 being dropped in the pipeline
 --

 Key: KAFKA-1011
 URL: https://issues.apache.org/jira/browse/KAFKA-1011
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.8.1


 The way MirrorMaker works today is that its consumers could use deep iterator 
 to decompress messages received from the source brokers and its producers 
 could re-compress the messages while sending them to the target brokers. 
 Since MirrorMakers use a centralized data channel for its consumers to pipe 
 messages to its producers, and since producers would compress messages with 
 the same topic within a batch as a single produce request, this could result 
 in messages accepted at the front end of the pipeline being dropped at the 
 target brokers of the MirrorMaker due to MesageSizeTooLargeException if it 
 happens that one batch of messages contain too many messages of the same 
 topic in MirrorMaker's producer. If we can use shallow iterator at the 
 MirrorMaker's consumer side to directly pipe compressed messages this issue 
 can be fixed. 
 Also as Swapnil pointed out, currently if the MirrorMaker lags and there are 
 large messages in the MirrorMaker queue (large after decompression), it can 
 run into an OutOfMemoryException. Shallow iteration will be very helpful in 
 avoiding this exception.
 The proposed solution of this issue is also related to KAFKA-527.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Comment Edited] (KAFKA-1011) Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline

2013-08-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13747631#comment-13747631
 ] 

Guozhang Wang edited comment on KAFKA-1011 at 8/23/13 2:32 AM:
---

Proposed Approach:

1. Since the compression function ByteBufferMessageSet.create will only be 
called over a set of messages either with the same key hash value or with key 
null, we can write the key to the compressed wrapper message according to their 
destination broker id (currently it is always written as null).

2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing 
the parameter from KafkaStream to consumerIterator; in consumerIterator, if 
isShallow is true call currentDataChunk.messages.shallowIterator otherwise call 
currentDataChunk.messages.iterator

3. Also in consumerIterator, if shallowIterator is true, construct 
MessageAndMetadata with value directly assigned as message: Message instead of 
fromBytes(Utils.readBytes(item.message.payload))

4. In MirrorMaker, set shallowIterator to true, and upon read each 
msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead 
of  KeyedMessage[Array[Byte], Array[Byte]].

5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second 
compression of compressed message.

6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker 
producer's event handler would use the message key to decide the outgoing 
partition, hence compressed messages with the same key would go to the same 
partition.

  was (Author: guozhang):
Proposed Approach:

1. Since the compression function ByteBufferMessageSet.create will only be 
called over a set of messages either with the same key hash value or with key 
null, we can write the key to the compressed wrapper message according to 
partition id (currently it is always written as null).

2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing 
the parameter from KafkaStream to consumerIterator; in consumerIterator, if 
isShallow is true call currentDataChunk.messages.shallowIterator otherwise call 
currentDataChunk.messages.iterator

3. Also in consumerIterator, if shallowIterator is true, construct 
MessageAndMetadata with value directly assigned as message: Message instead of 
fromBytes(Utils.readBytes(item.message.payload))

4. In MirrorMaker, set shallowIterator to true, and upon read each 
msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead 
of  KeyedMessage[Array[Byte], Array[Byte]].

5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second 
compression of compressed message.

6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker 
producer's event handler would use the message key to decide the outgoing 
partition, hence compressed messages with the same key would go to the same 
partition.
  
 Decompression and re-compression on MirrorMaker could result in messages 
 being dropped in the pipeline
 --

 Key: KAFKA-1011
 URL: https://issues.apache.org/jira/browse/KAFKA-1011
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.8.1


 The way MirrorMaker works today is that its consumers could use deep iterator 
 to decompress messages received from the source brokers and its producers 
 could re-compress the messages while sending them to the target brokers. 
 Since MirrorMakers use a centralized data channel for its consumers to pipe 
 messages to its producers, and since producers would compress messages with 
 the same topic within a batch as a single produce request, this could result 
 in messages accepted at the front end of the pipeline being dropped at the 
 target brokers of the MirrorMaker due to MesageSizeTooLargeException if it 
 happens that one batch of messages contain too many messages of the same 
 topic in MirrorMaker's producer. If we can use shallow iterator at the 
 MirrorMaker's consumer side to directly pipe compressed messages this issue 
 can be fixed. 
 Also as Swapnil pointed out, currently if the MirrorMaker lags and there are 
 large messages in the MirrorMaker queue (large after decompression), it can 
 run into an OutOfMemoryException. Shallow iteration will be very helpful in 
 avoiding this exception.
 The proposed solution of this issue is also related to KAFKA-527.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira


[jira] [Comment Edited] (KAFKA-1011) Decompression and re-compression on MirrorMaker could result in messages being dropped in the pipeline

2013-08-22 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13747631#comment-13747631
 ] 

Guozhang Wang edited comment on KAFKA-1011 at 8/23/13 3:13 AM:
---

Proposed Approach:

1. Since the compression function ByteBufferMessageSet.create will only be 
called over a set of messages either with the same key hash value or with key 
null, we can write the key to the compressed wrapper message according to their 
destination partition id (currently it is always written as null).

2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing 
the parameter from KafkaStream to consumerIterator; in consumerIterator, if 
isShallow is true call currentDataChunk.messages.shallowIterator otherwise call 
currentDataChunk.messages.iterator

3. Also in consumerIterator, if shallowIterator is true, construct 
MessageAndMetadata with value directly assigned as message: Message instead of 
fromBytes(Utils.readBytes(item.message.payload))

4. In MirrorMaker, set shallowIterator to true, and upon read each 
msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead 
of  KeyedMessage[Array[Byte], Array[Byte]].

5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second 
compression of compressed message.

6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker 
producer's event handler would use the message key to decide the outgoing 
partition, hence compressed messages with the same key would go to the same 
partition.

  was (Author: guozhang):
Proposed Approach:

1. Since the compression function ByteBufferMessageSet.create will only be 
called over a set of messages either with the same key hash value or with key 
null, we can write the key to the compressed wrapper message according to their 
destination broker id (currently it is always written as null).

2. Add a isShallow parameter to consumerIterator and KafkaStream, and passing 
the parameter from KafkaStream to consumerIterator; in consumerIterator, if 
isShallow is true call currentDataChunk.messages.shallowIterator otherwise call 
currentDataChunk.messages.iterator

3. Also in consumerIterator, if shallowIterator is true, construct 
MessageAndMetadata with value directly assigned as message: Message instead of 
fromBytes(Utils.readBytes(item.message.payload))

4. In MirrorMaker, set shallowIterator to true, and upon read each 
msgAndMetadata from stream, create KeyedMessage[Array[Byte], Message] instead 
of  KeyedMessage[Array[Byte], Array[Byte]].

5. Also in MirrorMaker, set CompressionCodec to NoCompression to avoid second 
compression of compressed message.

6. Ordering in MirrorMaker will be automatically preserved since MirrorMaker 
producer's event handler would use the message key to decide the outgoing 
partition, hence compressed messages with the same key would go to the same 
partition.
  
 Decompression and re-compression on MirrorMaker could result in messages 
 being dropped in the pipeline
 --

 Key: KAFKA-1011
 URL: https://issues.apache.org/jira/browse/KAFKA-1011
 Project: Kafka
  Issue Type: Bug
Reporter: Guozhang Wang
Assignee: Guozhang Wang
 Fix For: 0.8.1


 The way MirrorMaker works today is that its consumers could use deep iterator 
 to decompress messages received from the source brokers and its producers 
 could re-compress the messages while sending them to the target brokers. 
 Since MirrorMakers use a centralized data channel for its consumers to pipe 
 messages to its producers, and since producers would compress messages with 
 the same topic within a batch as a single produce request, this could result 
 in messages accepted at the front end of the pipeline being dropped at the 
 target brokers of the MirrorMaker due to MesageSizeTooLargeException if it 
 happens that one batch of messages contain too many messages of the same 
 topic in MirrorMaker's producer. If we can use shallow iterator at the 
 MirrorMaker's consumer side to directly pipe compressed messages this issue 
 can be fixed. 
 Also as Swapnil pointed out, currently if the MirrorMaker lags and there are 
 large messages in the MirrorMaker queue (large after decompression), it can 
 run into an OutOfMemoryException. Shallow iteration will be very helpful in 
 avoiding this exception.
 The proposed solution of this issue is also related to KAFKA-527.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira