Re: Consumer behavior when message exceeds fetch.message.max.bytes
Yes. It's good to enforce that. Could you file a jira and attach your patch there? Thanks, Jun On Thu, Aug 1, 2013 at 7:39 AM, Sam Meder sam.me...@jivesoftware.comwrote: Seems like a good idea to enforce this? Maybe something like this: diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a64b210..1c3bfdd 100644 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -198,7 +198,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro val replicaSocketReceiveBufferBytes = props.getInt(ReplicaSocketReceiveBufferBytesProp, ConsumerConfig.SocketBufferSize) /* the number of byes of messages to attempt to fetch */ - val replicaFetchMaxBytes = props.getInt(ReplicaFetchMaxBytesProp, ConsumerConfig.FetchSize) + val replicaFetchMaxBytes = props.getIntInRange(ReplicaFetchMaxBytesProp, ConsumerConfig.FetchSize, (messageMaxBytes, Int.MaxValue)) /* max wait time for each fetcher request issued by follower replicas*/ val replicaFetchWaitMaxMs = props.getInt(ReplicaFetchWaitMaxMsProp, 500) Not sure is message.max.bytes only counts payload or whole message + any headers, so it may be that it should be a bit larger even. /Sam On Aug 1, 2013, at 7:04 AM, Jun Rao jun...@gmail.com wrote: server: replica.fetch.max.bytes should be = message.max.bytes. Otherwise, the follower will get stuck when replicating data from the leader. Thanks, Jun On Wed, Jul 31, 2013 at 10:10 AM, Sam Meder sam.me...@jivesoftware.com wrote: I also noticed that there are two properties related to messages size on the server: replica.fetch.max.bytes and message.max.bytes. What happens when replica.fetch.max.bytes is lower than message.max.bytes? Should there even be two properties? /Sam On Jul 31, 2013, at 5:25 PM, Sam Meder sam.me...@jivesoftware.com wrote: We're expecting to occasionally have to deal with pretty large messages being sent to Kafka. We will of course set the fetch size appropriately high, but are concerned about the behavior when the message exceeds the fetch size. As far as I can tell the current behavior when a message that is too large is encountered is to pretend it is not there and not notify the consumer in any way. IMO it would be better to throw an exception than silently ignoring the issue (with the current code one can't really distinguish a large message from no data at all). Thoughts? /Sam
[jira] [Updated] (KAFKA-994) High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size
[ https://issues.apache.org/jira/browse/KAFKA-994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Meder updated KAFKA-994: Description: The high level consumer code is supposed to throw an exception when it encounters a message that exceeds its configured max message size. The relevant code form ConsumerIterator.scala is: // if we just updated the current chunk and it is empty that means the fetch size is too small! if(currentDataChunk.messages.validBytes == 0) throw new MessageSizeTooLargeException(Found a message larger than the maximum fetch size of this consumer on topic + %s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow. .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } The problem is that KAFKA-846 changed PartitionTopicInfo.enqueue: def enqueue(messages: ByteBufferMessageSet) { -val size = messages.sizeInBytes +val size = messages.validBytes if(size 0) { i.e. chunks that contain messages that are too big (validBytes = 0) will never even be enqueued, so won't ever hit the too-large message check in ConsumerIterator... I've attached a patch that passes our tests... was: The high level consumer code is supposed to throw an exception when it encounters a message that exceeds its configured max message size. The relevant code form ConsumerIterator.scala is: // if we just updated the current chunk and it is empty that means the fetch size is too small! if(currentDataChunk.messages.validBytes == 0) throw new MessageSizeTooLargeException(Found a message larger than the maximum fetch size of this consumer on topic + %s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow. .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } The problem is that KAFKA-846 changed PartitionTopicInfo.enqueue: def enqueue(messages: ByteBufferMessageSet) { -val size = messages.sizeInBytes +val size = messages.validBytes if(size 0) { i.e. chunks that contain messages that are too big (validBytes = 0) will never even be enqueued, so won't ever hit the too-large message check in ConsumerIterator... I think that just changing if(size 0) { to if(messages.sizeInBytes 0) { should do the trick? High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size --- Key: KAFKA-994 URL: https://issues.apache.org/jira/browse/KAFKA-994 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Reporter: Sam Meder Assignee: Neha Narkhede Attachments: messageSize.patch The high level consumer code is supposed to throw an exception when it encounters a message that exceeds its configured max message size. The relevant code form ConsumerIterator.scala is: // if we just updated the current chunk and it is empty that means the fetch size is too small! if(currentDataChunk.messages.validBytes == 0) throw new MessageSizeTooLargeException(Found a message larger than the maximum fetch size of this consumer on topic + %s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow. .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } The problem is that KAFKA-846 changed PartitionTopicInfo.enqueue: def enqueue(messages: ByteBufferMessageSet) { -val size = messages.sizeInBytes +val size = messages.validBytes if(size 0) { i.e. chunks that contain messages that are too big (validBytes = 0) will never even be enqueued, so won't ever hit the too-large message check in ConsumerIterator... I've attached a patch that passes our tests... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-994) High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size
[ https://issues.apache.org/jira/browse/KAFKA-994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Meder updated KAFKA-994: Fix Version/s: 0.8 Status: Patch Available (was: Open) High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size --- Key: KAFKA-994 URL: https://issues.apache.org/jira/browse/KAFKA-994 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Reporter: Sam Meder Assignee: Neha Narkhede Fix For: 0.8 Attachments: messageSize.patch The high level consumer code is supposed to throw an exception when it encounters a message that exceeds its configured max message size. The relevant code form ConsumerIterator.scala is: // if we just updated the current chunk and it is empty that means the fetch size is too small! if(currentDataChunk.messages.validBytes == 0) throw new MessageSizeTooLargeException(Found a message larger than the maximum fetch size of this consumer on topic + %s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow. .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } The problem is that KAFKA-846 changed PartitionTopicInfo.enqueue: def enqueue(messages: ByteBufferMessageSet) { -val size = messages.sizeInBytes +val size = messages.validBytes if(size 0) { i.e. chunks that contain messages that are too big (validBytes = 0) will never even be enqueued, so won't ever hit the too-large message check in ConsumerIterator... I've attached a patch that passes our tests... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-995) Enforce that the value for replica.fetch.max.bytes is always = the value for message.max.bytes
Sam Meder created KAFKA-995: --- Summary: Enforce that the value for replica.fetch.max.bytes is always = the value for message.max.bytes Key: KAFKA-995 URL: https://issues.apache.org/jira/browse/KAFKA-995 Project: Kafka Issue Type: Improvement Components: config Affects Versions: 0.8 Reporter: Sam Meder Priority: Minor Fix For: 0.8 replica.fetch.max.bytes must always be = message.max.bytes for replication to work correctly. This ticket adds enforcement of the constraint. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-995) Enforce that the value for replica.fetch.max.bytes is always = the value for message.max.bytes
[ https://issues.apache.org/jira/browse/KAFKA-995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Meder updated KAFKA-995: Status: Patch Available (was: Open) Enforce that the value for replica.fetch.max.bytes is always = the value for message.max.bytes --- Key: KAFKA-995 URL: https://issues.apache.org/jira/browse/KAFKA-995 Project: Kafka Issue Type: Improvement Components: config Affects Versions: 0.8 Reporter: Sam Meder Priority: Minor Fix For: 0.8 Attachments: replica_fetch_size_config.patch replica.fetch.max.bytes must always be = message.max.bytes for replication to work correctly. This ticket adds enforcement of the constraint. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-994) High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size
[ https://issues.apache.org/jira/browse/KAFKA-994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13726525#comment-13726525 ] Jun Rao commented on KAFKA-994: --- Thanks for filing this jira. Looks like a real issue. Just changing validBytes to sizeInBytes may not be enough. In enqueue(), currently we expect the chunk to have at least one message in order to get the next fetch offset. Of course, if we hit a large message, that expectation won't be true. So, we have to change the logic a bit such that if there is not a single message in the chunk, we don't move the fetch offset, but still insert the chunk to the queue (so that the consumer thread can see it). Thanks, Jun High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size --- Key: KAFKA-994 URL: https://issues.apache.org/jira/browse/KAFKA-994 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Reporter: Sam Meder Assignee: Neha Narkhede Fix For: 0.8 Attachments: messageSize.patch The high level consumer code is supposed to throw an exception when it encounters a message that exceeds its configured max message size. The relevant code form ConsumerIterator.scala is: // if we just updated the current chunk and it is empty that means the fetch size is too small! if(currentDataChunk.messages.validBytes == 0) throw new MessageSizeTooLargeException(Found a message larger than the maximum fetch size of this consumer on topic + %s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow. .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } The problem is that KAFKA-846 changed PartitionTopicInfo.enqueue: def enqueue(messages: ByteBufferMessageSet) { -val size = messages.sizeInBytes +val size = messages.validBytes if(size 0) { i.e. chunks that contain messages that are too big (validBytes = 0) will never even be enqueued, so won't ever hit the too-large message check in ConsumerIterator... I've attached a patch that passes our tests... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-994) High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size
[ https://issues.apache.org/jira/browse/KAFKA-994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13726535#comment-13726535 ] Sam Meder commented on KAFKA-994: - Yea, I realized that after the initial write-up, the attached patch actually does what you describe. High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size --- Key: KAFKA-994 URL: https://issues.apache.org/jira/browse/KAFKA-994 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Reporter: Sam Meder Assignee: Neha Narkhede Fix For: 0.8 Attachments: messageSize.patch The high level consumer code is supposed to throw an exception when it encounters a message that exceeds its configured max message size. The relevant code form ConsumerIterator.scala is: // if we just updated the current chunk and it is empty that means the fetch size is too small! if(currentDataChunk.messages.validBytes == 0) throw new MessageSizeTooLargeException(Found a message larger than the maximum fetch size of this consumer on topic + %s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow. .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } The problem is that KAFKA-846 changed PartitionTopicInfo.enqueue: def enqueue(messages: ByteBufferMessageSet) { -val size = messages.sizeInBytes +val size = messages.validBytes if(size 0) { i.e. chunks that contain messages that are too big (validBytes = 0) will never even be enqueued, so won't ever hit the too-large message check in ConsumerIterator... I've attached a patch that passes our tests... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-994) High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size
[ https://issues.apache.org/jira/browse/KAFKA-994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13726536#comment-13726536 ] Jay Kreps commented on KAFKA-994: - Ack, nice catch. High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size --- Key: KAFKA-994 URL: https://issues.apache.org/jira/browse/KAFKA-994 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Reporter: Sam Meder Assignee: Neha Narkhede Fix For: 0.8 Attachments: messageSize.patch The high level consumer code is supposed to throw an exception when it encounters a message that exceeds its configured max message size. The relevant code form ConsumerIterator.scala is: // if we just updated the current chunk and it is empty that means the fetch size is too small! if(currentDataChunk.messages.validBytes == 0) throw new MessageSizeTooLargeException(Found a message larger than the maximum fetch size of this consumer on topic + %s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow. .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } The problem is that KAFKA-846 changed PartitionTopicInfo.enqueue: def enqueue(messages: ByteBufferMessageSet) { -val size = messages.sizeInBytes +val size = messages.validBytes if(size 0) { i.e. chunks that contain messages that are too big (validBytes = 0) will never even be enqueued, so won't ever hit the too-large message check in ConsumerIterator... I've attached a patch that passes our tests... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-996) Capitalize first letter for log entries
[ https://issues.apache.org/jira/browse/KAFKA-996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-996: Attachment: KAFKA-996.v1.patch Need to be rebased once 0.8 fixes are merged to trunk. Capitalize first letter for log entries --- Key: KAFKA-996 URL: https://issues.apache.org/jira/browse/KAFKA-996 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Guozhang Wang Attachments: KAFKA-996.v1.patch -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-996) Capitalize first letter for log entries
[ https://issues.apache.org/jira/browse/KAFKA-996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-996: Status: Patch Available (was: Open) Capitalize first letter for log entries --- Key: KAFKA-996 URL: https://issues.apache.org/jira/browse/KAFKA-996 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Guozhang Wang Attachments: KAFKA-996.v1.patch -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-967) Use key range in ProducerPerformance
[ https://issues.apache.org/jira/browse/KAFKA-967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13726576#comment-13726576 ] Guozhang Wang commented on KAFKA-967: - Jun, before this patch each message will use a different key in ProducerPerformance, so are you suggesting that if messageKeyRangeOpt it not specified we should still use a distinct key for each message? Currently if messageKeyRangeOpt is not set it uses a default key range. Use key range in ProducerPerformance Key: KAFKA-967 URL: https://issues.apache.org/jira/browse/KAFKA-967 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang Assignee: Guozhang Wang Attachments: KAFKA-967.v1.patch, KAFKA-967.v2.patch Currently in ProducerPerformance, the key of the message is set to MessageID. It would better to set it to a specific key within a key range (Integer type) so that we can test the semantic partitioning case. This is related to KAFKA-957. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-992) Double Check on Broker Registration to Avoid False NodeExist Exception
[ https://issues.apache.org/jira/browse/KAFKA-992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-992: Attachment: KAFKA-992.v2.patch Unit tests passed Double Check on Broker Registration to Avoid False NodeExist Exception -- Key: KAFKA-992 URL: https://issues.apache.org/jira/browse/KAFKA-992 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Assignee: Guozhang Wang Attachments: KAFKA-992.v1.patch, KAFKA-992.v2.patch There is a potential bug in Zookeeper that when the ZK leader processes a lot of session expiration events (this could be due to a long GC or a fsync operation, etc), it marks the session as expired but does not delete the corresponding ephemeral znode at the same time. Meanwhile, a new session event will be fired on the kafka server and the server will request the same ephemeral node to be created on handling the new session. When it enters the zookeeper processing queue, this operation receives a NodeExists error since zookeeper leader has not finished deleting that ephemeral znode and still thinks the previous session holds it. Kafka assumes that the NodeExists error on ephemeral node creation is ok since that is a legitimate condition that happens during session disconnects on zookeeper. However, a NodeExists error is only valid if the owner session id also matches Kafka server's current zookeeper session id. The bug is that before sending a NodeExists error, Zookeeper should check if the ephemeral node in question is held by a session that has marked as expired. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] Subscription: outstanding kafka patches
Issue Subscription Filter: outstanding kafka patches (70 issues) The list of outstanding kafka patches Subscriber: kafka-mailing-list Key Summary KAFKA-996 Capitalize first letter for log entries https://issues.apache.org/jira/browse/KAFKA-996 KAFKA-995 Enforce that the value for replica.fetch.max.bytes is always = the value for message.max.bytes https://issues.apache.org/jira/browse/KAFKA-995 KAFKA-994 High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size https://issues.apache.org/jira/browse/KAFKA-994 KAFKA-992 Double Check on Broker Registration to Avoid False NodeExist Exception https://issues.apache.org/jira/browse/KAFKA-992 KAFKA-991 Reduce the queue size in hadoop producer https://issues.apache.org/jira/browse/KAFKA-991 KAFKA-985 Increasing log retention quickly overflows scala Int https://issues.apache.org/jira/browse/KAFKA-985 KAFKA-984 Avoid a full rebalance in cases when a new topic is discovered but container/broker set stay the same https://issues.apache.org/jira/browse/KAFKA-984 KAFKA-982 Logo for Kafka https://issues.apache.org/jira/browse/KAFKA-982 KAFKA-981 Unable to pull Kafka binaries with Ivy https://issues.apache.org/jira/browse/KAFKA-981 KAFKA-976 Order-Preserving Mirror Maker Testcase https://issues.apache.org/jira/browse/KAFKA-976 KAFKA-974 can't use public release maven repo because of failure of downloaded dependency https://issues.apache.org/jira/browse/KAFKA-974 KAFKA-967 Use key range in ProducerPerformance https://issues.apache.org/jira/browse/KAFKA-967 KAFKA-956 High-level consumer fails to check topic metadata response for errors https://issues.apache.org/jira/browse/KAFKA-956 KAFKA-955 After a leader change, messages sent with ack=0 are lost https://issues.apache.org/jira/browse/KAFKA-955 KAFKA-946 Kafka Hadoop Consumer fails when verifying message checksum https://issues.apache.org/jira/browse/KAFKA-946 KAFKA-943 Move all configuration key string to constants https://issues.apache.org/jira/browse/KAFKA-943 KAFKA-923 Improve controller failover latency https://issues.apache.org/jira/browse/KAFKA-923 KAFKA-917 Expose zk.session.timeout.ms in console consumer https://issues.apache.org/jira/browse/KAFKA-917 KAFKA-915 System Test - Mirror Maker testcase_5001 failed https://issues.apache.org/jira/browse/KAFKA-915 KAFKA-885 sbt package builds two kafka jars https://issues.apache.org/jira/browse/KAFKA-885 KAFKA-881 Kafka broker not respecting log.roll.hours https://issues.apache.org/jira/browse/KAFKA-881 KAFKA-873 Consider replacing zkclient with curator (with zkclient-bridge) https://issues.apache.org/jira/browse/KAFKA-873 KAFKA-868 System Test - add test case for rolling controlled shutdown https://issues.apache.org/jira/browse/KAFKA-868 KAFKA-863 System Test - update 0.7 version of kafka-run-class.sh for Migration Tool test cases https://issues.apache.org/jira/browse/KAFKA-863 KAFKA-859 support basic auth protection of mx4j console https://issues.apache.org/jira/browse/KAFKA-859 KAFKA-855 Ant+Ivy build for Kafka https://issues.apache.org/jira/browse/KAFKA-855 KAFKA-854 Upgrade dependencies for 0.8 https://issues.apache.org/jira/browse/KAFKA-854 KAFKA-815 Improve SimpleConsumerShell to take in a max messages config option https://issues.apache.org/jira/browse/KAFKA-815 KAFKA-745 Remove getShutdownReceive() and other kafka specific code from the RequestChannel https://issues.apache.org/jira/browse/KAFKA-745 KAFKA-735 Add looping and JSON output for ConsumerOffsetChecker https://issues.apache.org/jira/browse/KAFKA-735 KAFKA-718 kafka-run-class.sh should use reasonable gc settings https://issues.apache.org/jira/browse/KAFKA-718 KAFKA-717 scala 2.10 build support https://issues.apache.org/jira/browse/KAFKA-717 KAFKA-686 0.8 Kafka broker should give a better error message when running against 0.7 zookeeper https://issues.apache.org/jira/browse/KAFKA-686 KAFKA-677 Retention process gives exception if an empty segment is chosen for collection https://issues.apache.org/jira/browse/KAFKA-677 KAFKA-674 Clean Shutdown Testing - Log segments checksums mismatch https://issues.apache.org/jira/browse/KAFKA-674 KAFKA-652 Create testcases for clean shut-down https://issues.apache.org/jira/browse/KAFKA-652 KAFKA-649 Cleanup log4j logging https://issues.apache.org/jira/browse/KAFKA-649 KAFKA-645 Create a shell script to run System Test with DEBUG details and tee console output to a file
[jira] [Updated] (KAFKA-881) Kafka broker not respecting log.roll.hours
[ https://issues.apache.org/jira/browse/KAFKA-881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sam Meder updated KAFKA-881: Attachment: kafka-roll-0.8.patch Kafka broker not respecting log.roll.hours -- Key: KAFKA-881 URL: https://issues.apache.org/jira/browse/KAFKA-881 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.7.2 Reporter: Dan F Assignee: Jay Kreps Attachments: kafka-roll-0.8.patch, kafka-roll.again.patch, kafka_roll.patch We are running Kafka 0.7.2. We set log.roll.hours=1. I hoped that meant logs would be rolled every hour, or more. Only, sometimes logs that are many hours (sometimes days) old have more data added to them. This perturbs our systems for reasons I won't get in to. I don't know Scala or Kafka well, but I have proposal for why this might happen: upon restart, a broker forgets when its log files have been appended to (firstAppendTime). Then a potentially infinite amount of time later, the restarted broker receives another message for the particular (topic, partition), and starts the clock again. It will then roll over that log after an hour. https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/server/KafkaConfig.scala says: /* the maximum time before a new log segment is rolled out */ val logRollHours = Utils.getIntInRange(props, log.roll.hours, 24*7, (1, Int.MaxValue)) https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/log/Log.scala has maybeRoll, which needs segment.firstAppendTime defined. It also has updateFirstAppendTime() which says if it's empty, then set it. If my hypothesis is correct about why it is happening, here is a case where rolling is longer than an hour, even on a high volume topic: - write to a topic for 20 minutes - restart the broker - wait for 5 days - write to a topic for 20 minutes - restart the broker - write to a topic for an hour The rollover time was now 5 days, 1 hour, 40 minutes. You can make it as long as you want. Proposed solution: The very easiest thing to do would be to have Kafka re-initialized firstAppendTime with the file creation time. Unfortunately, there is no file creation time in UNIX. There is ctime, change time, updated when a file's inode information is changed. One solution is to embed the firstAppendTime in the filename (say, seconds since epoch). Then when you open it you could reset firstAppendTime to exactly what it really was. This ignores clock drift or resetting. One could set firstAppendTime to min(filename-based time, current time). A second solution is to make the Kafka log roll over at specific times, regardless of when the file was created. Conceptually, time can be divided into windows of size log.rollover.hours since epoch (UNIX time 0, 1970). So, when firstAppendTime is empty, compute the next rollover time (say, next = (hours since epoch) % (log.rollover.hours) + log.rollover.hours). If the file mtime (last modified) is before the current rollover window ( (next-log.rollover.hours) .. next ), roll it over right away. Otherwise, roll over when you cross next, and reset next. A third solution (not perfect, but an approximation at least) would be to not to write to a segment if firstAppendTime is not defined and the timestamp on the file is more than log.roll.hours old. There are probably other solutions. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-881) Kafka broker not respecting log.roll.hours
[ https://issues.apache.org/jira/browse/KAFKA-881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13726789#comment-13726789 ] Sam Meder commented on KAFKA-881: - Attached a version of this patch that applies against 0.8 Kafka broker not respecting log.roll.hours -- Key: KAFKA-881 URL: https://issues.apache.org/jira/browse/KAFKA-881 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.7.2 Reporter: Dan F Assignee: Jay Kreps Attachments: kafka-roll-0.8.patch, kafka-roll.again.patch, kafka_roll.patch We are running Kafka 0.7.2. We set log.roll.hours=1. I hoped that meant logs would be rolled every hour, or more. Only, sometimes logs that are many hours (sometimes days) old have more data added to them. This perturbs our systems for reasons I won't get in to. I don't know Scala or Kafka well, but I have proposal for why this might happen: upon restart, a broker forgets when its log files have been appended to (firstAppendTime). Then a potentially infinite amount of time later, the restarted broker receives another message for the particular (topic, partition), and starts the clock again. It will then roll over that log after an hour. https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/server/KafkaConfig.scala says: /* the maximum time before a new log segment is rolled out */ val logRollHours = Utils.getIntInRange(props, log.roll.hours, 24*7, (1, Int.MaxValue)) https://svn.apache.org/repos/asf/kafka/branches/0.7/core/src/main/scala/kafka/log/Log.scala has maybeRoll, which needs segment.firstAppendTime defined. It also has updateFirstAppendTime() which says if it's empty, then set it. If my hypothesis is correct about why it is happening, here is a case where rolling is longer than an hour, even on a high volume topic: - write to a topic for 20 minutes - restart the broker - wait for 5 days - write to a topic for 20 minutes - restart the broker - write to a topic for an hour The rollover time was now 5 days, 1 hour, 40 minutes. You can make it as long as you want. Proposed solution: The very easiest thing to do would be to have Kafka re-initialized firstAppendTime with the file creation time. Unfortunately, there is no file creation time in UNIX. There is ctime, change time, updated when a file's inode information is changed. One solution is to embed the firstAppendTime in the filename (say, seconds since epoch). Then when you open it you could reset firstAppendTime to exactly what it really was. This ignores clock drift or resetting. One could set firstAppendTime to min(filename-based time, current time). A second solution is to make the Kafka log roll over at specific times, regardless of when the file was created. Conceptually, time can be divided into windows of size log.rollover.hours since epoch (UNIX time 0, 1970). So, when firstAppendTime is empty, compute the next rollover time (say, next = (hours since epoch) % (log.rollover.hours) + log.rollover.hours). If the file mtime (last modified) is before the current rollover window ( (next-log.rollover.hours) .. next ), roll it over right away. Otherwise, roll over when you cross next, and reset next. A third solution (not perfect, but an approximation at least) would be to not to write to a segment if firstAppendTime is not defined and the timestamp on the file is more than log.roll.hours old. There are probably other solutions. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13726891#comment-13726891 ] Jay Kreps commented on KAFKA-991: - +1 Reduce the queue size in hadoop producer Key: KAFKA-991 URL: https://issues.apache.org/jira/browse/KAFKA-991 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Labels: bugs Fix For: 0.8 Attachments: kafka-991-v1.patch Currently the queue.size in hadoop producer is 10MB. This means that the KafkaRecordWriter will hit the send button on kafka producer after the size of uncompressed queued messages becomes greater than 10MB. (The other condition on which the messages are sent is if their number exceeds SHORT.MAX_VALUE). Considering that the server accepts a (compressed) batch of messages of sizeupto 1 million bytes minus the log overhead, we should probably reduce the queue size in hadoop producer. We should do two things: 1. change max message size on the broker to 1 million + log overhead, because that will make the client message size easy to remember. Right now the maximum number of bytes that can be accepted from a client in a batch of messages is an awkward 88. (I don't have a stronger reason). We have set fetch size on the consumer to 1MB, this gives us a lot of room even if the log overhead increased with further versions. 2. Set the default number of bytes on hadoop producer to 1 million bytes. Anyone who wants higher throughput can override this config using kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13726955#comment-13726955 ] Swapnil Ghike commented on KAFKA-991: - Also reviewed by Jun, Neha. The reason why we use sync producer in hadoop producer is because the caller may want to get the exceptions from kafka producer and kill their job. Reduce the queue size in hadoop producer Key: KAFKA-991 URL: https://issues.apache.org/jira/browse/KAFKA-991 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Labels: bugs Fix For: 0.8 Attachments: kafka-991-v1.patch Currently the queue.size in hadoop producer is 10MB. This means that the KafkaRecordWriter will hit the send button on kafka producer after the size of uncompressed queued messages becomes greater than 10MB. (The other condition on which the messages are sent is if their number exceeds SHORT.MAX_VALUE). Considering that the server accepts a (compressed) batch of messages of sizeupto 1 million bytes minus the log overhead, we should probably reduce the queue size in hadoop producer. We should do two things: 1. change max message size on the broker to 1 million + log overhead, because that will make the client message size easy to remember. Right now the maximum number of bytes that can be accepted from a client in a batch of messages is an awkward 88. (I don't have a stronger reason). We have set fetch size on the consumer to 1MB, this gives us a lot of room even if the log overhead increased with further versions. 2. Set the default number of bytes on hadoop producer to 1 million bytes. Anyone who wants higher throughput can override this config using kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13726958#comment-13726958 ] Joel Koshy commented on KAFKA-991: -- +1 Committed to 0.8 Minor comment: - queue size is unintuitive. sounds like number of messages, but it is bytes - The totalSize queueSize check should ideally be done before adding it to msgList. Reduce the queue size in hadoop producer Key: KAFKA-991 URL: https://issues.apache.org/jira/browse/KAFKA-991 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Labels: bugs Fix For: 0.8 Attachments: kafka-991-v1.patch Currently the queue.size in hadoop producer is 10MB. This means that the KafkaRecordWriter will hit the send button on kafka producer after the size of uncompressed queued messages becomes greater than 10MB. (The other condition on which the messages are sent is if their number exceeds SHORT.MAX_VALUE). Considering that the server accepts a (compressed) batch of messages of sizeupto 1 million bytes minus the log overhead, we should probably reduce the queue size in hadoop producer. We should do two things: 1. change max message size on the broker to 1 million + log overhead, because that will make the client message size easy to remember. Right now the maximum number of bytes that can be accepted from a client in a batch of messages is an awkward 88. (I don't have a stronger reason). We have set fetch size on the consumer to 1MB, this gives us a lot of room even if the log overhead increased with further versions. 2. Set the default number of bytes on hadoop producer to 1 million bytes. Anyone who wants higher throughput can override this config using kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: [VOTE] Logo runoff vote
+1 296 (changing my vote from the one in the jira) On Wed, Jul 31, 2013 at 10:00 PM, Jun Rao jun...@gmail.com wrote: +1 for 296 Thanks, Jun On Wed, Jul 31, 2013 at 1:34 PM, Jay Kreps jay.kr...@gmail.com wrote: Okay folks we did a survey on the JIRA (KAFKA-982) and it was more or less a tie between two logos: https://issues.apache.org/jira/secure/attachment/12593545/296.png https://issues.apache.org/jira/secure/attachment/12593547/298.jpeg Personally I like both, but we need to pick one. So let's quickly do a binding vote from the PMC on these two. Please indicate whether you are voting for 298 or 296 (i.e. the image name). To help visualize it a little better I plopped them onto the web site and took a screenshot (don't get hung up on the site design, this is just to give an idea). Here are screenshots: https://issues.apache.org/jira/secure/attachment/12595266/296-screenshot.png https://issues.apache.org/jira/secure/attachment/12595269/298-screenshot.png Cheers, -Jay
[jira] [Updated] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Swapnil Ghike updated KAFKA-991: Attachment: kafka-991-followup.patch Reduce the queue size in hadoop producer Key: KAFKA-991 URL: https://issues.apache.org/jira/browse/KAFKA-991 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Labels: bugs Fix For: 0.8 Attachments: kafka-991-followup.patch, kafka-991-v1.patch Currently the queue.size in hadoop producer is 10MB. This means that the KafkaRecordWriter will hit the send button on kafka producer after the size of uncompressed queued messages becomes greater than 10MB. (The other condition on which the messages are sent is if their number exceeds SHORT.MAX_VALUE). Considering that the server accepts a (compressed) batch of messages of sizeupto 1 million bytes minus the log overhead, we should probably reduce the queue size in hadoop producer. We should do two things: 1. change max message size on the broker to 1 million + log overhead, because that will make the client message size easy to remember. Right now the maximum number of bytes that can be accepted from a client in a batch of messages is an awkward 88. (I don't have a stronger reason). We have set fetch size on the consumer to 1MB, this gives us a lot of room even if the log overhead increased with further versions. 2. Set the default number of bytes on hadoop producer to 1 million bytes. Anyone who wants higher throughput can override this config using kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13726995#comment-13726995 ] Swapnil Ghike commented on KAFKA-991: - Thanks Joel. Would you mind committing the follow up patch as well, so that we can complete the hadoop producer queue size/queue bytes related change. Reduce the queue size in hadoop producer Key: KAFKA-991 URL: https://issues.apache.org/jira/browse/KAFKA-991 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Labels: bugs Fix For: 0.8 Attachments: kafka-991-followup.patch, kafka-991-v1.patch Currently the queue.size in hadoop producer is 10MB. This means that the KafkaRecordWriter will hit the send button on kafka producer after the size of uncompressed queued messages becomes greater than 10MB. (The other condition on which the messages are sent is if their number exceeds SHORT.MAX_VALUE). Considering that the server accepts a (compressed) batch of messages of sizeupto 1 million bytes minus the log overhead, we should probably reduce the queue size in hadoop producer. We should do two things: 1. change max message size on the broker to 1 million + log overhead, because that will make the client message size easy to remember. Right now the maximum number of bytes that can be accepted from a client in a batch of messages is an awkward 88. (I don't have a stronger reason). We have set fetch size on the consumer to 1MB, this gives us a lot of room even if the log overhead increased with further versions. 2. Set the default number of bytes on hadoop producer to 1 million bytes. Anyone who wants higher throughput can override this config using kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Swapnil Ghike updated KAFKA-991: Attachment: kafka-991-followup-v2.patch Fixed README. Reduce the queue size in hadoop producer Key: KAFKA-991 URL: https://issues.apache.org/jira/browse/KAFKA-991 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Labels: bugs Fix For: 0.8 Attachments: kafka-991-followup.patch, kafka-991-followup-v2.patch, kafka-991-v1.patch Currently the queue.size in hadoop producer is 10MB. This means that the KafkaRecordWriter will hit the send button on kafka producer after the size of uncompressed queued messages becomes greater than 10MB. (The other condition on which the messages are sent is if their number exceeds SHORT.MAX_VALUE). Considering that the server accepts a (compressed) batch of messages of sizeupto 1 million bytes minus the log overhead, we should probably reduce the queue size in hadoop producer. We should do two things: 1. change max message size on the broker to 1 million + log overhead, because that will make the client message size easy to remember. Right now the maximum number of bytes that can be accepted from a client in a batch of messages is an awkward 88. (I don't have a stronger reason). We have set fetch size on the consumer to 1MB, this gives us a lot of room even if the log overhead increased with further versions. 2. Set the default number of bytes on hadoop producer to 1 million bytes. Anyone who wants higher throughput can override this config using kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13727092#comment-13727092 ] Joel Koshy commented on KAFKA-991: -- Thanks for the follow-up patch. totalBytes is set to zero in sendMsgList so the next batch totalBytes will less (incorrect) by valBytes. Reduce the queue size in hadoop producer Key: KAFKA-991 URL: https://issues.apache.org/jira/browse/KAFKA-991 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Labels: bugs Fix For: 0.8 Attachments: kafka-991-followup.patch, kafka-991-followup-v2.patch, kafka-991-v1.patch Currently the queue.size in hadoop producer is 10MB. This means that the KafkaRecordWriter will hit the send button on kafka producer after the size of uncompressed queued messages becomes greater than 10MB. (The other condition on which the messages are sent is if their number exceeds SHORT.MAX_VALUE). Considering that the server accepts a (compressed) batch of messages of sizeupto 1 million bytes minus the log overhead, we should probably reduce the queue size in hadoop producer. We should do two things: 1. change max message size on the broker to 1 million + log overhead, because that will make the client message size easy to remember. Right now the maximum number of bytes that can be accepted from a client in a batch of messages is an awkward 88. (I don't have a stronger reason). We have set fetch size on the consumer to 1MB, this gives us a lot of room even if the log overhead increased with further versions. 2. Set the default number of bytes on hadoop producer to 1 million bytes. Anyone who wants higher throughput can override this config using kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Swapnil Ghike updated KAFKA-991: Attachment: kafka-991-followup-3.patch Thanks, attached a new patch to address that. Reduce the queue size in hadoop producer Key: KAFKA-991 URL: https://issues.apache.org/jira/browse/KAFKA-991 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Labels: bugs Fix For: 0.8 Attachments: kafka-991-followup-3.patch, kafka-991-followup.patch, kafka-991-followup-v2.patch, kafka-991-v1.patch Currently the queue.size in hadoop producer is 10MB. This means that the KafkaRecordWriter will hit the send button on kafka producer after the size of uncompressed queued messages becomes greater than 10MB. (The other condition on which the messages are sent is if their number exceeds SHORT.MAX_VALUE). Considering that the server accepts a (compressed) batch of messages of sizeupto 1 million bytes minus the log overhead, we should probably reduce the queue size in hadoop producer. We should do two things: 1. change max message size on the broker to 1 million + log overhead, because that will make the client message size easy to remember. Right now the maximum number of bytes that can be accepted from a client in a batch of messages is an awkward 88. (I don't have a stronger reason). We have set fetch size on the consumer to 1MB, this gives us a lot of room even if the log overhead increased with further versions. 2. Set the default number of bytes on hadoop producer to 1 million bytes. Anyone who wants higher throughput can override this config using kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: [VOTE] Logo runoff vote
I am changing my vote too +1 296 It's an awesome logo and looks better on the web page /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Aug 1, 2013, at 6:10 PM, Joel Koshy jjkosh...@gmail.com wrote: +1 296 (changing my vote from the one in the jira) On Wed, Jul 31, 2013 at 10:00 PM, Jun Rao jun...@gmail.com wrote: +1 for 296 Thanks, Jun On Wed, Jul 31, 2013 at 1:34 PM, Jay Kreps jay.kr...@gmail.com wrote: Okay folks we did a survey on the JIRA (KAFKA-982) and it was more or less a tie between two logos: https://issues.apache.org/jira/secure/attachment/12593545/296.png https://issues.apache.org/jira/secure/attachment/12593547/298.jpeg Personally I like both, but we need to pick one. So let's quickly do a binding vote from the PMC on these two. Please indicate whether you are voting for 298 or 296 (i.e. the image name). To help visualize it a little better I plopped them onto the web site and took a screenshot (don't get hung up on the site design, this is just to give an idea). Here are screenshots: https://issues.apache.org/jira/secure/attachment/12595266/296-screenshot.png https://issues.apache.org/jira/secure/attachment/12595269/298-screenshot.png Cheers, -Jay
[jira] [Updated] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-991: - Resolution: Fixed Status: Resolved (was: Patch Available) Reduce the queue size in hadoop producer Key: KAFKA-991 URL: https://issues.apache.org/jira/browse/KAFKA-991 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Labels: bugs Fix For: 0.8 Attachments: kafka-991-followup-3.patch, kafka-991-followup.patch, kafka-991-followup-v2.patch, kafka-991-v1.patch Currently the queue.size in hadoop producer is 10MB. This means that the KafkaRecordWriter will hit the send button on kafka producer after the size of uncompressed queued messages becomes greater than 10MB. (The other condition on which the messages are sent is if their number exceeds SHORT.MAX_VALUE). Considering that the server accepts a (compressed) batch of messages of sizeupto 1 million bytes minus the log overhead, we should probably reduce the queue size in hadoop producer. We should do two things: 1. change max message size on the broker to 1 million + log overhead, because that will make the client message size easy to remember. Right now the maximum number of bytes that can be accepted from a client in a batch of messages is an awkward 88. (I don't have a stronger reason). We have set fetch size on the consumer to 1MB, this gives us a lot of room even if the log overhead increased with further versions. 2. Set the default number of bytes on hadoop producer to 1 million bytes. Anyone who wants higher throughput can override this config using kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Closed] (KAFKA-991) Reduce the queue size in hadoop producer
[ https://issues.apache.org/jira/browse/KAFKA-991?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy closed KAFKA-991. +1 Committed to 0.8 Reduce the queue size in hadoop producer Key: KAFKA-991 URL: https://issues.apache.org/jira/browse/KAFKA-991 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Swapnil Ghike Assignee: Swapnil Ghike Labels: bugs Fix For: 0.8 Attachments: kafka-991-followup-3.patch, kafka-991-followup.patch, kafka-991-followup-v2.patch, kafka-991-v1.patch Currently the queue.size in hadoop producer is 10MB. This means that the KafkaRecordWriter will hit the send button on kafka producer after the size of uncompressed queued messages becomes greater than 10MB. (The other condition on which the messages are sent is if their number exceeds SHORT.MAX_VALUE). Considering that the server accepts a (compressed) batch of messages of sizeupto 1 million bytes minus the log overhead, we should probably reduce the queue size in hadoop producer. We should do two things: 1. change max message size on the broker to 1 million + log overhead, because that will make the client message size easy to remember. Right now the maximum number of bytes that can be accepted from a client in a batch of messages is an awkward 88. (I don't have a stronger reason). We have set fetch size on the consumer to 1MB, this gives us a lot of room even if the log overhead increased with further versions. 2. Set the default number of bytes on hadoop producer to 1 million bytes. Anyone who wants higher throughput can override this config using kafka.output.queue.size -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
Re: [VOTE] Logo runoff vote
Cool, I think it is (nearly) unanimous and we have the whole PMC except for Jakob who is on vacation. So I am calling it for 296. -Jay On Thu, Aug 1, 2013 at 5:08 PM, Joe Stein crypt...@gmail.com wrote: I am changing my vote too +1 296 It's an awesome logo and looks better on the web page /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Aug 1, 2013, at 6:10 PM, Joel Koshy jjkosh...@gmail.com wrote: +1 296 (changing my vote from the one in the jira) On Wed, Jul 31, 2013 at 10:00 PM, Jun Rao jun...@gmail.com wrote: +1 for 296 Thanks, Jun On Wed, Jul 31, 2013 at 1:34 PM, Jay Kreps jay.kr...@gmail.com wrote: Okay folks we did a survey on the JIRA (KAFKA-982) and it was more or less a tie between two logos: https://issues.apache.org/jira/secure/attachment/12593545/296.png https://issues.apache.org/jira/secure/attachment/12593547/298.jpeg Personally I like both, but we need to pick one. So let's quickly do a binding vote from the PMC on these two. Please indicate whether you are voting for 298 or 296 (i.e. the image name). To help visualize it a little better I plopped them onto the web site and took a screenshot (don't get hung up on the site design, this is just to give an idea). Here are screenshots: https://issues.apache.org/jira/secure/attachment/12595266/296-screenshot.png https://issues.apache.org/jira/secure/attachment/12595269/298-screenshot.png Cheers, -Jay
[jira] [Updated] (KAFKA-994) High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size
[ https://issues.apache.org/jira/browse/KAFKA-994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-994: -- Resolution: Fixed Assignee: Sam Meder (was: Neha Narkhede) Status: Resolved (was: Patch Available) Thanks for the patch. Committed to 0.8. High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size --- Key: KAFKA-994 URL: https://issues.apache.org/jira/browse/KAFKA-994 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Reporter: Sam Meder Assignee: Sam Meder Fix For: 0.8 Attachments: messageSize.patch The high level consumer code is supposed to throw an exception when it encounters a message that exceeds its configured max message size. The relevant code form ConsumerIterator.scala is: // if we just updated the current chunk and it is empty that means the fetch size is too small! if(currentDataChunk.messages.validBytes == 0) throw new MessageSizeTooLargeException(Found a message larger than the maximum fetch size of this consumer on topic + %s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow. .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } The problem is that KAFKA-846 changed PartitionTopicInfo.enqueue: def enqueue(messages: ByteBufferMessageSet) { -val size = messages.sizeInBytes +val size = messages.validBytes if(size 0) { i.e. chunks that contain messages that are too big (validBytes = 0) will never even be enqueued, so won't ever hit the too-large message check in ConsumerIterator... I've attached a patch that passes our tests... -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira