Sam Meder created KAFKA-994: ------------------------------- Summary: High level consumer doesn't throw an exception when the message it is trying to fetch exceeds the configured fetch size Key: KAFKA-994 URL: https://issues.apache.org/jira/browse/KAFKA-994 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Reporter: Sam Meder Assignee: Neha Narkhede
The high level consumer code is supposed to throw an exception when it encounters a message that exceeds its configured max message size. The relevant code form ConsumerIterator.scala is: // if we just updated the current chunk and it is empty that means the fetch size is too small! if(currentDataChunk.messages.validBytes == 0) throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) } The problem is that KAFKA-846 changed PartitionTopicInfo.enqueue: def enqueue(messages: ByteBufferMessageSet) { - val size = messages.sizeInBytes + val size = messages.validBytes if(size > 0) { i.e. chunks that contain messages that are too big (validBytes = 0) will never even be enqueued, so won't ever hit the too-large message check in ConsumerIterator... I think that just changing "if(size > 0) {" to if(messages.sizeInBytes > 0) {" should do the trick? -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira