[ 
https://issues.apache.org/jira/browse/STORM-1863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiasheng Wang updated STORM-1863:
---------------------------------
    Description: 
In kafka ConsumerIterator, there is some codes like this:

// if we just updated the current chunk and it is empty that means the fetch 
size is too small! if(currentDataChunk.messages.validBytes == 0)
 throw new MessageSizeTooLargeException("Found a message larger than the 
maximum fetch size of this consumer on topic " + "%s partition %d at fetch 
offset %d. Increase the fetch size, or decrease the maximum message size the 
broker will allow." .format(currentDataChunk.topicInfo.topic, 
currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))

When "fetch.message.max.bytes" config is smaller than the actual message size 
in topic, ConsumerIterator will throw an exception to notify user.
But in storm-kafka, there is no such logic. And as a result, if 
KafkaConfig.fetchSizeBytes is smaller than actual message size, the topology 
will fetch no data but still be running.
To prevent this situation, we need throw MessageSizeTooLargeException as well.

  was:
In kafka ConsumerIterator, there is some codes like this:

// if we just updated the current chunk and it is empty that means the fetch 
size is too small! if(currentDataChunk.messages.validBytes == 0) throw new 
MessageSizeTooLargeException("Found a message larger than the maximum fetch 
size of this consumer on topic " + "%s partition %d at fetch offset %d. 
Increase the fetch size, or decrease the maximum message size the broker will 
allow." .format(currentDataChunk.topicInfo.topic, 
currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))

When "fetch.message.max.bytes" config is smaller than the actual message size 
in topic, ConsumerIterator will throw an exception to notify user.
But in storm-kafka, there is no such logic. And as a result, if 
KafkaConfig.fetchSizeBytes is smaller than actual message size, the topology 
will fetch no data but still be running.
To prevent this situation, we need throw MessageSizeTooLargeException as well.


> Throw exception if messages fetched by storm-kafka is emtpy
> -----------------------------------------------------------
>
>                 Key: STORM-1863
>                 URL: https://issues.apache.org/jira/browse/STORM-1863
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-kafka
>            Reporter: Jiasheng Wang
>
> In kafka ConsumerIterator, there is some codes like this:
> // if we just updated the current chunk and it is empty that means the fetch 
> size is too small! if(currentDataChunk.messages.validBytes == 0)
>  throw new MessageSizeTooLargeException("Found a message larger than the 
> maximum fetch size of this consumer on topic " + "%s partition %d at fetch 
> offset %d. Increase the fetch size, or decrease the maximum message size the 
> broker will allow." .format(currentDataChunk.topicInfo.topic, 
> currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
> When "fetch.message.max.bytes" config is smaller than the actual message size 
> in topic, ConsumerIterator will throw an exception to notify user.
> But in storm-kafka, there is no such logic. And as a result, if 
> KafkaConfig.fetchSizeBytes is smaller than actual message size, the topology 
> will fetch no data but still be running.
> To prevent this situation, we need throw MessageSizeTooLargeException as well.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to