You just need to set batch.size to be less than 2MB. max.request.size in
the producer has to be less than socket.request.max.bytes in the broker.

Thanks,

Jun

On Wed, Sep 10, 2014 at 2:22 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com>
wrote:

> Hi Jun,
>
> Thank for highlighting.  Please correct me my understanding.  If the max
> request size <= message.max.bytes then batch size will be optimally decided
> ( batch size will be determine by  either max.request.size limit or
> batch.size which ever is less).
>
> Here is configuration parameter I was referring to on new producer side and
> Broker config.  Effectively eliminating the data loss issue or large amount
> of messages (except for one the really exceed the limit).  That is what I
> had asked for in previous email.
>
> *New Producer Config:*
> max.request.size=2MB
> batch.size=1000
>
> *Broker Config:*
> message.max.bytes=2MB
>
> Thanks,
> Bhavesh
>
> On Wed, Sep 10, 2014 at 10:10 AM, Jun Rao <jun...@gmail.com> wrote:
>
> > Actually, with the new producer, you can configure the batch size in
> bytes.
> > If you set the batch size to be smaller than the max message size,
> messages
> > exceeding the max message limit will be in its own batch. Then, only one
> > message will be rejected by the broker.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Sep 9, 2014 at 5:51 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com
> > >
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Is there any plug-ability that Developer can customize batching logic
> or
> > > inject custom code for this ? Shall I file Jira for this issues.
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > > On Tue, Sep 9, 2014 at 3:52 PM, Jun Rao <jun...@gmail.com> wrote:
> > >
> > > > No, the new producer doesn't address that problem.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Sep 9, 2014 at 12:59 PM, Bhavesh Mistry <
> > > > mistry.p.bhav...@gmail.com>
> > > > wrote:
> > > >
> > > > > HI Jun,
> > > > >
> > > > > Thanks for clarification.  Follow up questions, does new producer
> > solve
> > > > the
> > > > > issues highlight.  In event of compression and async mode in new
> > > > producer,
> > > > > will it break down messages to this UPPER limit and submit or new
> > > > producer
> > > > > strictly honor batch size.  I am just asking if compression batch
> > size
> > > > > message reaches this configured limit, does batch broken down to
> sub
> > > > batch
> > > > > that is within limit.  I would like to minimize the data loss due
> to
> > > this
> > > > > limit.
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bhavesh
> > > > >
> > > > > On Mon, Sep 8, 2014 at 9:17 PM, Jun Rao <jun...@gmail.com> wrote:
> > > > >
> > > > > > Are you using compression in the producer? If so,
> message.max.bytes
> > > > > applies
> > > > > > to the compressed size of a batch of messages. Otherwise,
> > > > > message.max.bytes
> > > > > > applies to the size of each individual message.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry <
> > > > > mistry.p.bhav...@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > I am referring to wiki
> > > http://kafka.apache.org/08/configuration.html
> > > > > and
> > > > > > > following parameter control max batch message bytes as far as I
> > > know.
> > > > > > > Kafka Community, please correct me if I am wrong.  I do not
> want
> > to
> > > > > > create
> > > > > > > confusion for Kafka User Community here.   Also, if you
> increase
> > > this
> > > > > > limit
> > > > > > > than you have to set the corresponding limit increase on
> consumer
> > > > side
> > > > > as
> > > > > > > well (fetch.message.max.bytes).
> > > > > > >
> > > > > > > Since we are using batch async mode, our messages are getting
> > drop
> > > > > > sometime
> > > > > > > if the entire batch bytes  exceed this limit so I was asking
> > Kafka
> > > > > > > Developers if any optimal way to determine the batch size based
> > on
> > > > this
> > > > > > > limit to minimize the data loss. Because, entire batch is
> > rejected
> > > by
> > > > > > > brokers.
> > > > > > >
> > > > > > > message.max.bytes 1000000 The maximum size of a message that
> the
> > > > server
> > > > > > can
> > > > > > > receive. It is important that this property be in sync with the
> > > > maximum
> > > > > > > fetch size your consumers use or else an unruly producer will
> be
> > > able
> > > > > to
> > > > > > > publish messages too large for consumers to consume.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Bhavesh
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> > > > > > > alexis.mi...@airbedandbreakfast.com> wrote:
> > > > > > >
> > > > > > > > Hi Bhavesh
> > > > > > > >
> > > > > > > > can you explain what limit you're referring to?
> > > > > > > > I'm asking because `message.max.bytes` is applied per message
> > not
> > > > per
> > > > > > > > batch.
> > > > > > > > is there another limit I should be aware of?
> > > > > > > >
> > > > > > > > thanks
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> > > > > > > mistry.p.bhav...@gmail.com
> > > > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Jun,
> > > > > > > > >
> > > > > > > > > We have similar problem.  We have variable length of
> > messages.
> > > > So
> > > > > > when
> > > > > > > > we
> > > > > > > > > have fixed size of Batch sometime the batch exceed the
> limit
> > > set
> > > > on
> > > > > > the
> > > > > > > > > brokers (2MB).
> > > > > > > > >
> > > > > > > > > So can Producer have some extra logic to determine the
> > optimal
> > > > > batch
> > > > > > > size
> > > > > > > > > by looking at configured message.max.bytes  value.
> > > > > > > > >
> > > > > > > > > During the metadata update, Producer will get this value
> from
> > > the
> > > > > > > Broker
> > > > > > > > > for each topic and Producer will check if current batch
> size
> > > > reach
> > > > > > this
> > > > > > > > > limit than break batch into smaller chunk such way that It
> > > would
> > > > > not
> > > > > > > > exceed
> > > > > > > > > limit (unless single message exceed the limit). Basically
> try
> > > to
> > > > > > avoid
> > > > > > > > data
> > > > > > > > > loss as much as possible.
> > > > > > > > >
> > > > > > > > > Please let me know what is your opinion on this...
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Bhavesh
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > > > > > > > > alexis.mi...@airbedandbreakfast.com> wrote:
> > > > > > > > >
> > > > > > > > > > Thanks Jun.
> > > > > > > > > >
> > > > > > > > > > I'll create a jira and try to provide a patch. I think
> this
> > > is
> > > > > > pretty
> > > > > > > > > > serious.
> > > > > > > > > >
> > > > > > > > > > On Friday, August 29, 2014, Jun Rao <jun...@gmail.com>
> > > wrote:
> > > > > > > > > >
> > > > > > > > > > > The goal of batching is mostly to reduce the # RPC
> calls
> > to
> > > > the
> > > > > > > > broker.
> > > > > > > > > > If
> > > > > > > > > > > compression is enabled, a larger batch typically
> implies
> > > > better
> > > > > > > > > > compression
> > > > > > > > > > > ratio.
> > > > > > > > > > >
> > > > > > > > > > > The reason that we have to fail the whole batch is that
> > the
> > > > > error
> > > > > > > > code
> > > > > > > > > in
> > > > > > > > > > > the produce response is per partition, instead of per
> > > > message.
> > > > > > > > > > >
> > > > > > > > > > > Retrying individual messages on MessageSizeTooLarge
> seems
> > > > > > > reasonable.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Jun
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > > > > > > > > alexis.mi...@airbedandbreakfast.com <javascript:;>>
> > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Could you explain the goals of batches? I was
> assuming
> > > this
> > > > > was
> > > > > > > > > simply
> > > > > > > > > > a
> > > > > > > > > > > > performance optimization, but this behavior makes me
> > > think
> > > > > I'm
> > > > > > > > > missing
> > > > > > > > > > > > something.
> > > > > > > > > > > > is a batch more than a list of *independent*
> messages?
> > > > > > > > > > > >
> > > > > > > > > > > > Why would you reject the whole batch? One invalid
> > message
> > > > > > causes
> > > > > > > > the
> > > > > > > > > > loss
> > > > > > > > > > > > of batch.num.messages-1 messages :(
> > > > > > > > > > > > It seems pretty critical to me.
> > > > > > > > > > > >
> > > > > > > > > > > > If ack=0, the producer will never know about it.
> > > > > > > > > > > > If ack !=0, the producer will retry the whole batch.
> If
> > > the
> > > > > > issue
> > > > > > > > was
> > > > > > > > > > > > related to data corruption (etc), retries might work.
> > But
> > > > in
> > > > > > the
> > > > > > > > case
> > > > > > > > > > of
> > > > > > > > > > > > "big message", the batch will always be rejected and
> > the
> > > > > > producer
> > > > > > > > > will
> > > > > > > > > > > give
> > > > > > > > > > > > up.
> > > > > > > > > > > >
> > > > > > > > > > > > If the messages are indeed considered independent, I
> > > think
> > > > > this
> > > > > > > is
> > > > > > > > a
> > > > > > > > > > > pretty
> > > > > > > > > > > > serious issue.
> > > > > > > > > > > >
> > > > > > > > > > > > I see 2 possible fix approaches:
> > > > > > > > > > > > - the broker could reject only the invalid messages
> > > > > > > > > > > > - the broker could reject the whole batch (like
> today)
> > > but
> > > > > the
> > > > > > > > > producer
> > > > > > > > > > > (if
> > > > > > > > > > > > ack!=0) could retry messages one at a time on
> exception
> > > > like
> > > > > > > > > > > > "MessageSizeTooLarge".
> > > > > > > > > > > >
> > > > > > > > > > > > opinions?
> > > > > > > > > > > >
> > > > > > > > > > > > Alexis
> > > > > > > > > > > >
> > > > > > > > > > > > ```
> > > > > > > > > > > > [2014-08-29 16:00:35,170] WARN Produce request with
> > > > > correlation
> > > > > > > id
> > > > > > > > 46
> > > > > > > > > > > > failed due to [test,1]:
> > > > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > > [2014-08-29 16:00:35,284] WARN Produce request with
> > > > > correlation
> > > > > > > id
> > > > > > > > 51
> > > > > > > > > > > > failed due to [test,0]:
> > > > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > > [2014-08-29 16:00:35,392] WARN Produce request with
> > > > > correlation
> > > > > > > id
> > > > > > > > 56
> > > > > > > > > > > > failed due to [test,0]:
> > > > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > > [2014-08-29 16:00:35,499] WARN Produce request with
> > > > > correlation
> > > > > > > id
> > > > > > > > 61
> > > > > > > > > > > > failed due to [test,1]:
> > > > > > kafka.common.MessageSizeTooLargeException
> > > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > > [2014-08-29 16:00:35,603] ERROR Failed to send
> requests
> > > for
> > > > > > > topics
> > > > > > > > > test
> > > > > > > > > > > > with correlation ids in [43,62]
> > > > > > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > > > > > > [2014-08-29 16:00:35,603] ERROR Error in handling
> batch
> > > of
> > > > 3
> > > > > > > events
> > > > > > > > > > > > (kafka.producer.async.ProducerSendThread)
> > > > > > > > > > > > kafka.common.FailedToSendMessageException: Failed to
> > send
> > > > > > > messages
> > > > > > > > > > after
> > > > > > > > > > > 3
> > > > > > > > > > > > tries.
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > > > > > > > > >  at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > > > > > > > > > at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > > > > > > > > >  at
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > > > > > > > > > ```
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <
> > > jun...@gmail.com
> > > > > > > > > > > <javascript:;>> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > That's right. If one message in a batch exceeds the
> > > size
> > > > > > limit,
> > > > > > > > the
> > > > > > > > > > > whole
> > > > > > > > > > > > > batch is rejected.
> > > > > > > > > > > > >
> > > > > > > > > > > > > When determining message.max.bytes, the most
> > important
> > > > > thing
> > > > > > to
> > > > > > > > > > > consider
> > > > > > > > > > > > is
> > > > > > > > > > > > > probably memory since currently we need to allocate
> > > > memory
> > > > > > for
> > > > > > > a
> > > > > > > > > full
> > > > > > > > > > > > > message in the broker and the producer and the
> > consumer
> > > > > > client.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Jun
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > > > > > > > > > alexis.mi...@airbedandbreakfast.com
> <javascript:;>>
> > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > am I miss reading this loop:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > it seems like all messages from `validMessages`
> > > (which
> > > > is
> > > > > > > > > > > > > > ByteBufferMessageSet) are NOT appended if one of
> > the
> > > > > > message
> > > > > > > > size
> > > > > > > > > > > > exceeds
> > > > > > > > > > > > > > the limit.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I hope I'm missing something.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon <
> > > > > > > > > > > > > > alexis.mi...@airbedandbreakfast.com
> > <javascript:;>>
> > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Jun,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > thanks for you answer.
> > > > > > > > > > > > > > > Unfortunately the size won't help much, I'd
> like
> > to
> > > > see
> > > > > > the
> > > > > > > > > > actual
> > > > > > > > > > > > > > message
> > > > > > > > > > > > > > > data.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > By the way what are the things to consider when
> > > > > deciding
> > > > > > on
> > > > > > > > > > > > > > > `message.max.bytes` value?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao <
> > > > > > jun...@gmail.com
> > > > > > > > > > > <javascript:;>> wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >> The message size check is currently only done
> on
> > > the
> > > > > > > broker.
> > > > > > > > > If
> > > > > > > > > > > you
> > > > > > > > > > > > > > enable
> > > > > > > > > > > > > > >> trace level logging in RequestChannel, you
> will
> > > see
> > > > > the
> > > > > > > > > produce
> > > > > > > > > > > > > request,
> > > > > > > > > > > > > > >> which includes the size of each partition.
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Thanks,
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> Jun
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis Midon
> <
> > > > > > > > > > > > > > >> alexis.mi...@airbedandbreakfast.com
> > > <javascript:;>>
> > > > > > > wrote:
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >> > Hello,
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > my brokers are reporting that some received
> > > > messages
> > > > > > > > exceed
> > > > > > > > > > the
> > > > > > > > > > > > > > >> > `message.max.bytes` value.
> > > > > > > > > > > > > > >> > I'd like to know what producers are at fault
> > but
> > > > It
> > > > > is
> > > > > > > > > pretty
> > > > > > > > > > > much
> > > > > > > > > > > > > > >> > impossible:
> > > > > > > > > > > > > > >> > - the brokers don't log the content of the
> > > > rejected
> > > > > > > > messages
> > > > > > > > > > > > > > >> > - the log messages do not contain the IP of
> > the
> > > > > > > producers
> > > > > > > > > > > > > > >> > - on the consumer side, no exception is
> thrown
> > > > > (afaik
> > > > > > it
> > > > > > > > is
> > > > > > > > > > > > because
> > > > > > > > > > > > > > >> Ack-0
> > > > > > > > > > > > > > >> > is used). The only kind of notification is
> to
> > > > closed
> > > > > > the
> > > > > > > > > > > > connection.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > [1] Do you have any suggestions to track
> down
> > > the
> > > > > > guilty
> > > > > > > > > > > producers
> > > > > > > > > > > > > or
> > > > > > > > > > > > > > >> find
> > > > > > > > > > > > > > >> > out the message content?
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Even though it makes total sense to have the
> > > limit
> > > > > > > defined
> > > > > > > > > and
> > > > > > > > > > > > > applied
> > > > > > > > > > > > > > >> on
> > > > > > > > > > > > > > >> > the brokers, I was thinking that this check
> > > could
> > > > > also
> > > > > > > be
> > > > > > > > > > > applied
> > > > > > > > > > > > by
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > producers. Some google results suggest that
> > > > > > > > > > `message.max.bytes`
> > > > > > > > > > > > > might
> > > > > > > > > > > > > > be
> > > > > > > > > > > > > > >> > used by the producers but I can't find any
> > trace
> > > > of
> > > > > > that
> > > > > > > > > > > behavior
> > > > > > > > > > > > in
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > >> > code.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > The closest thing I have is
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > > > > > > > > > >> > but it simply logs the message size and
> > content
> > > > and
> > > > > > the
> > > > > > > > log
> > > > > > > > > > > level
> > > > > > > > > > > > is
> > > > > > > > > > > > > > >> trace.
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > [2] could you please confirm if such a
> > > > producer-side
> > > > > > > check
> > > > > > > > > > > exists?
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > thanks!
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >> > Alexis
> > > > > > > > > > > > > > >> >
> > > > > > > > > > > > > > >>
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to