Thanks Jun. I'll create a jira and try to provide a patch. I think this is pretty serious.
On Friday, August 29, 2014, Jun Rao <jun...@gmail.com> wrote: > The goal of batching is mostly to reduce the # RPC calls to the broker. If > compression is enabled, a larger batch typically implies better compression > ratio. > > The reason that we have to fail the whole batch is that the error code in > the produce response is per partition, instead of per message. > > Retrying individual messages on MessageSizeTooLarge seems reasonable. > > Thanks, > > Jun > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon < > alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote: > > > Could you explain the goals of batches? I was assuming this was simply a > > performance optimization, but this behavior makes me think I'm missing > > something. > > is a batch more than a list of *independent* messages? > > > > Why would you reject the whole batch? One invalid message causes the loss > > of batch.num.messages-1 messages :( > > It seems pretty critical to me. > > > > If ack=0, the producer will never know about it. > > If ack !=0, the producer will retry the whole batch. If the issue was > > related to data corruption (etc), retries might work. But in the case of > > "big message", the batch will always be rejected and the producer will > give > > up. > > > > If the messages are indeed considered independent, I think this is a > pretty > > serious issue. > > > > I see 2 possible fix approaches: > > - the broker could reject only the invalid messages > > - the broker could reject the whole batch (like today) but the producer > (if > > ack!=0) could retry messages one at a time on exception like > > "MessageSizeTooLarge". > > > > opinions? > > > > Alexis > > > > ``` > > [2014-08-29 16:00:35,170] WARN Produce request with correlation id 46 > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException > > (kafka.producer.async.DefaultEventHandler) > > [2014-08-29 16:00:35,284] WARN Produce request with correlation id 51 > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException > > (kafka.producer.async.DefaultEventHandler) > > [2014-08-29 16:00:35,392] WARN Produce request with correlation id 56 > > failed due to [test,0]: kafka.common.MessageSizeTooLargeException > > (kafka.producer.async.DefaultEventHandler) > > [2014-08-29 16:00:35,499] WARN Produce request with correlation id 61 > > failed due to [test,1]: kafka.common.MessageSizeTooLargeException > > (kafka.producer.async.DefaultEventHandler) > > [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics test > > with correlation ids in [43,62] > (kafka.producer.async.DefaultEventHandler) > > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events > > (kafka.producer.async.ProducerSendThread) > > kafka.common.FailedToSendMessageException: Failed to send messages after > 3 > > tries. > > at > > > > > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > > at > > > > > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > > at > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > > at > > > > > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > > ``` > > > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <jun...@gmail.com > <javascript:;>> wrote: > > > > > That's right. If one message in a batch exceeds the size limit, the > whole > > > batch is rejected. > > > > > > When determining message.max.bytes, the most important thing to > consider > > is > > > probably memory since currently we need to allocate memory for a full > > > message in the broker and the producer and the consumer client. > > > > > > Thanks, > > > > > > Jun > > > > > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon < > > > alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote: > > > > > > > am I miss reading this loop: > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269 > > > > > > > > it seems like all messages from `validMessages` (which is > > > > ByteBufferMessageSet) are NOT appended if one of the message size > > exceeds > > > > the limit. > > > > > > > > I hope I'm missing something. > > > > > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon < > > > > alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote: > > > > > > > > > Hi Jun, > > > > > > > > > > thanks for you answer. > > > > > Unfortunately the size won't help much, I'd like to see the actual > > > > message > > > > > data. > > > > > > > > > > By the way what are the things to consider when deciding on > > > > > `message.max.bytes` value? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <jun...@gmail.com > <javascript:;>> wrote: > > > > > > > > > >> The message size check is currently only done on the broker. If > you > > > > enable > > > > >> trace level logging in RequestChannel, you will see the produce > > > request, > > > > >> which includes the size of each partition. > > > > >> > > > > >> Thanks, > > > > >> > > > > >> Jun > > > > >> > > > > >> > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon < > > > > >> alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote: > > > > >> > > > > >> > Hello, > > > > >> > > > > > >> > my brokers are reporting that some received messages exceed the > > > > >> > `message.max.bytes` value. > > > > >> > I'd like to know what producers are at fault but It is pretty > much > > > > >> > impossible: > > > > >> > - the brokers don't log the content of the rejected messages > > > > >> > - the log messages do not contain the IP of the producers > > > > >> > - on the consumer side, no exception is thrown (afaik it is > > because > > > > >> Ack-0 > > > > >> > is used). The only kind of notification is to closed the > > connection. > > > > >> > > > > > >> > [1] Do you have any suggestions to track down the guilty > producers > > > or > > > > >> find > > > > >> > out the message content? > > > > >> > > > > > >> > Even though it makes total sense to have the limit defined and > > > applied > > > > >> on > > > > >> > the brokers, I was thinking that this check could also be > applied > > by > > > > the > > > > >> > producers. Some google results suggest that `message.max.bytes` > > > might > > > > be > > > > >> > used by the producers but I can't find any trace of that > behavior > > in > > > > the > > > > >> > code. > > > > >> > > > > > >> > The closest thing I have is > > > > >> > > > > > >> > > > > > >> > > > > > > > > > > https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67 > > > > >> > but it simply logs the message size and content and the log > level > > is > > > > >> trace. > > > > >> > > > > > >> > [2] could you please confirm if such a producer-side check > exists? > > > > >> > > > > > >> > > > > > >> > thanks! > > > > >> > > > > > >> > Alexis > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >