[ 
https://issues.apache.org/jira/browse/KAFKA-1718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14179485#comment-14179485
 ] 

Evan Huus edited comment on KAFKA-1718 at 10/22/14 2:26 AM:
------------------------------------------------------------

??I guess your go producer can send multiple compressed messages for a single 
partition??

Yes, that's exactly what it's doing. If it collects enough messages for a 
partition that they would exceed {{message.max.bytes}} when compressed 
together, it batches them and sends each batch as a compressed message in the 
same messageSet.

??Is there any benefit in doing that???

More-or-less to get around the limit on message sizes, which I guess doesn't 
work so well :)

A few points on this then:
* Currently (with default broker settings) you can produce just under 100MiB 
(socket.request.max.bytes) of messages to the broker uncompressed in a single 
request, but you can't produce that seem batch of messages in compressed form 
since the resulting compressed message would almost certainly be larger than 
1MB (message.max.bytes). This discrepancy seems odd to me.
* I understand the desire to limit "real" message sizes to prevent misbehaving 
producers from causing problems. However, I don't think the limit is 
particularly useful when applied to the compressed "meta-messages"; why 
shouldn't they be arbitrarily large, within the limits of 
{{socket.request.max.bytes}}?
* I don't think the broker should assume there's only one compressed message 
per message-set; if a message-set contains multiple compressed messages, it 
should process them one-at-a-time and store each individually, rather than 
trying to do them all at once.

Thanks for all your help!

Edit: If for some reason you decide to keep the current behaviour as-is, please 
document this in the protocol spec on the wiki, since as far as I can the spec 
gives no reason to believe that multiple compressed messages will be combined, 
and that the _combined_ length will be relevant.


was (Author: eapache):
??I guess your go producer can send multiple compressed messages for a single 
partition??

Yes, that's exactly what it's doing. If it collects enough messages for a 
partition that they would exceed {{message.max.bytes}} when compressed 
together, it batches them and sends each batch as a compressed message in the 
same messageSet.

??Is there any benefit in doing that???

More-or-less to get around the limit on message sizes, which I guess doesn't 
work so well :)

A few points on this then:
* Currently (with default broker settings) you can produce just under 100MiB 
(socket.request.max.bytes) of messages to the broker uncompressed in a single 
request, but you can't produce that seem batch of messages in compressed form 
since the resulting compressed message would almost certainly be larger than 
1MB (message.max.bytes). This discrepancy seems odd to me.
* I understand the desire to limit "real" message sizes to prevent misbehaving 
producers from causing problems. However, I don't think the limit is 
particularly useful when applied to the compressed "meta-messages"; why 
shouldn't they be arbitrarily large, within the limits of 
{{socket.request.max.bytes}}?
* I don't think the broker should assume there's only one compressed message 
per message-set; if a message-set contains multiple compressed messages, it 
should process them one-at-a-time and store each individually, rather than 
trying to do them all at once.

Thanks for all your help!

> "Message Size Too Large" error when only small messages produced with Snappy
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-1718
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1718
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8.1.1
>            Reporter: Evan Huus
>            Priority: Critical
>
> I'm the primary author of the Go bindings, and while I originally received 
> this as a bug against my bindings, I'm coming to the conclusion that it's a 
> bug in the broker somehow.
> Specifically, take a look at the last two kafka packets in the following 
> packet capture: https://dl.dropboxusercontent.com/u/171647/kafka.pcapng (you 
> will need a trunk build of Wireshark to fully decode the kafka part of the 
> packets).
> The produce request contains two partitions on one topic. Each partition has 
> one message set (sizes 977205 bytes and 967362 bytes respectively). Each 
> message set is a sequential collection of snappy-compressed messages, each 
> message of size 46899. When uncompressed, each message contains a message set 
> of 999600 bytes, containing a sequence of uncompressed 1024-byte messages.
> However, the broker responds to this with a MessageSizeTooLarge error, full 
> stacktrace from the broker logs being:
> kafka.common.MessageSizeTooLargeException: Message size is 1070127 bytes 
> which exceeds the maximum configured message size of 1000012.
>       at kafka.log.Log$$anonfun$append$1.apply(Log.scala:267)
>       at kafka.log.Log$$anonfun$append$1.apply(Log.scala:265)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:32)
>       at kafka.log.Log.append(Log.scala:265)
>       at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:354)
>       at 
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:376)
>       at 
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:366)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>       at 
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>       at 
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>       at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>       at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>       at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:366)
>       at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:292)
>       at kafka.server.KafkaApis.handle(KafkaApis.scala:185)
>       at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:42)
>       at java.lang.Thread.run(Thread.java:695)
> Since as far as I can tell none of the sizes in the actual produced packet 
> exceed the defined maximum, I can only assume that the broker is 
> miscalculating something somewhere and throwing the exception improperly.
> ---
> This issue can be reliably reproduced using an out-of-the-box binary download 
> of 0.8.1.1 and the following gist: 
> https://gist.github.com/eapache/ce0f15311c605a165ce7 (you will need to use 
> the `producer-ng` branch of the Sarama library).
> ---
> I am happy to provide any more information you might need, or to do relevant 
> experiments etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to