Thanks Jun.

I'll create a jira and try to provide a patch. I think this is pretty
serious.

On Friday, August 29, 2014, Jun Rao <jun...@gmail.com> wrote:

> The goal of batching is mostly to reduce the # RPC calls to the broker. If
> compression is enabled, a larger batch typically implies better compression
> ratio.
>
> The reason that we have to fail the whole batch is that the error code in
> the produce response is per partition, instead of per message.
>
> Retrying individual messages on MessageSizeTooLarge seems reasonable.
>
> Thanks,
>
> Jun
>
>
> On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote:
>
> > Could you explain the goals of batches? I was assuming this was simply a
> > performance optimization, but this behavior makes me think I'm missing
> > something.
> > is a batch more than a list of *independent* messages?
> >
> > Why would you reject the whole batch? One invalid message causes the loss
> > of batch.num.messages-1 messages :(
> > It seems pretty critical to me.
> >
> > If ack=0, the producer will never know about it.
> > If ack !=0, the producer will retry the whole batch. If the issue was
> > related to data corruption (etc), retries might work. But in the case of
> > "big message", the batch will always be rejected and the producer will
> give
> > up.
> >
> > If the messages are indeed considered independent, I think this is a
> pretty
> > serious issue.
> >
> > I see 2 possible fix approaches:
> > - the broker could reject only the invalid messages
> > - the broker could reject the whole batch (like today) but the producer
> (if
> > ack!=0) could retry messages one at a time on exception like
> > "MessageSizeTooLarge".
> >
> > opinions?
> >
> > Alexis
> >
> > ```
> > [2014-08-29 16:00:35,170] WARN Produce request with correlation id 46
> > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,284] WARN Produce request with correlation id 51
> > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,392] WARN Produce request with correlation id 56
> > failed due to [test,0]: kafka.common.MessageSizeTooLargeException
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,499] WARN Produce request with correlation id 61
> > failed due to [test,1]: kafka.common.MessageSizeTooLargeException
> > (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,603] ERROR Failed to send requests for topics test
> > with correlation ids in [43,62]
> (kafka.producer.async.DefaultEventHandler)
> > [2014-08-29 16:00:35,603] ERROR Error in handling batch of 3 events
> > (kafka.producer.async.ProducerSendThread)
> > kafka.common.FailedToSendMessageException: Failed to send messages after
> 3
> > tries.
> > at
> >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> >  at
> >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> >  at
> >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > ```
> >
> >
> > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <jun...@gmail.com
> <javascript:;>> wrote:
> >
> > > That's right. If one message in a batch exceeds the size limit, the
> whole
> > > batch is rejected.
> > >
> > > When determining message.max.bytes, the most important thing to
> consider
> > is
> > > probably memory since currently we need to allocate memory for a full
> > > message in the broker and the producer and the consumer client.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote:
> > >
> > > > am I miss reading this loop:
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > >
> > > > it seems like all messages from `validMessages` (which is
> > > > ByteBufferMessageSet) are NOT appended if one of the message size
> > exceeds
> > > > the limit.
> > > >
> > > > I hope I'm missing something.
> > > >
> > > >
> > > >
> > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > thanks for you answer.
> > > > > Unfortunately the size won't help much, I'd like to see the actual
> > > > message
> > > > > data.
> > > > >
> > > > > By the way what are the things to consider when deciding on
> > > > > `message.max.bytes` value?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <jun...@gmail.com
> <javascript:;>> wrote:
> > > > >
> > > > >> The message size check is currently only done on the broker. If
> you
> > > > enable
> > > > >> trace level logging in RequestChannel, you will see the produce
> > > request,
> > > > >> which includes the size of each partition.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Jun
> > > > >>
> > > > >>
> > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon <
> > > > >> alexis.mi...@airbedandbreakfast.com <javascript:;>> wrote:
> > > > >>
> > > > >> > Hello,
> > > > >> >
> > > > >> > my brokers are reporting that some received messages exceed the
> > > > >> > `message.max.bytes` value.
> > > > >> > I'd like to know what producers are at fault but It is pretty
> much
> > > > >> > impossible:
> > > > >> > - the brokers don't log the content of the rejected messages
> > > > >> > - the log messages do not contain the IP of the producers
> > > > >> > - on the consumer side, no exception is thrown (afaik it is
> > because
> > > > >> Ack-0
> > > > >> > is used). The only kind of notification is to closed the
> > connection.
> > > > >> >
> > > > >> > [1] Do you have any suggestions to track down the guilty
> producers
> > > or
> > > > >> find
> > > > >> > out the message content?
> > > > >> >
> > > > >> > Even though it makes total sense to have the limit defined and
> > > applied
> > > > >> on
> > > > >> > the brokers, I was thinking that this check could also be
> applied
> > by
> > > > the
> > > > >> > producers. Some google results suggest that `message.max.bytes`
> > > might
> > > > be
> > > > >> > used by the producers but I can't find any trace of that
> behavior
> > in
> > > > the
> > > > >> > code.
> > > > >> >
> > > > >> > The closest thing I have is
> > > > >> >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > >> > but it simply logs the message size and content and the log
> level
> > is
> > > > >> trace.
> > > > >> >
> > > > >> > [2] could you please confirm if such a producer-side check
> exists?
> > > > >> >
> > > > >> >
> > > > >> > thanks!
> > > > >> >
> > > > >> > Alexis
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to