Sam Meder created KAFKA-994:
-------------------------------

             Summary: High level consumer doesn't throw an exception when the 
message it is trying to fetch exceeds the configured fetch size
                 Key: KAFKA-994
                 URL: https://issues.apache.org/jira/browse/KAFKA-994
             Project: Kafka
          Issue Type: Bug
          Components: consumer
    Affects Versions: 0.8
            Reporter: Sam Meder
            Assignee: Neha Narkhede


The high level consumer code is supposed to throw an exception when it 
encounters a message that exceeds its configured max message size. The relevant 
code form ConsumerIterator.scala is:

      // if we just updated the current chunk and it is empty that means the 
fetch size is too small!
      if(currentDataChunk.messages.validBytes == 0)
        throw new MessageSizeTooLargeException("Found a message larger than the 
maximum fetch size of this consumer on topic " +
                                               "%s partition %d at fetch offset 
%d. Increase the fetch size, or decrease the maximum message size the broker 
will allow."
                                               
.format(currentDataChunk.topicInfo.topic, 
currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
    }

The problem is that KAFKA-846 changed PartitionTopicInfo.enqueue:

   def enqueue(messages: ByteBufferMessageSet) {
-    val size = messages.sizeInBytes
+    val size = messages.validBytes
     if(size > 0) {

i.e. chunks that contain messages that are too big (validBytes = 0) will never 
even be enqueued, so won't ever hit the too-large message check in 
ConsumerIterator... 

I think that just changing "if(size > 0) {" to if(messages.sizeInBytes > 0) {" 
should do the trick?

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to