Hi Jun,

I was thinking of the last try. In that case the remaining
delivery.timeout.ms is smaller than request timeout.ms. I am thinking that
we can always wait up to request timeout after sending that request to
avoid potential unnecessary PID reset. The concern of use remaining
delivery.timeout.ms in this case is that if there are many batches to be
expired in the queue, we may end up with continuous expirations and PID
reset.

Thanks,

Jiangjie (Becket) Qin



On Sun, Aug 27, 2017 at 12:08 PM, Jun Rao <j...@confluent.io> wrote:

> Hi, Jiangjie,
>
> If we want to enforce delivery.timeout.ms, we need to take the min right?
> Also, if a user sets a large delivery.timeout.ms, we probably don't want
> to
> wait for an inflight request longer than request.timeout.ms.
>
> Thanks,
>
> Jun
>
> On Fri, Aug 25, 2017 at 5:19 PM, Becket Qin <becket....@gmail.com> wrote:
>
> > Hi Jason,
> >
> > I see what you mean. That makes sense. So in the above case after the
> > producer resets PID, when it retry batch_0_tp1, the batch will still have
> > the old PID even if the producer has already got a new PID.
> >
> > @Jun, do you mean max(remaining delivery.timeout.ms, request.timeout.ms)
> > instead of min(remaining delivery.timeout.ms, request.timeout.ms)?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Aug 25, 2017 at 9:34 AM, Jun Rao <j...@confluent.io> wrote:
> >
> > > Hi, Becket,
> > >
> > > Good point on expiring inflight requests. Perhaps we can expire an
> > inflight
> > > request after min(remaining delivery.timeout.ms, request.timeout.ms).
> > This
> > > way, if a user sets a high delivery.timeout.ms, we can still recover
> > from
> > > broker power outage sooner.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Aug 24, 2017 at 12:52 PM, Becket Qin <becket....@gmail.com>
> > wrote:
> > >
> > > > Hi Jason,
> > > >
> > > > delivery.timeout.ms sounds good to me.
> > > >
> > > > I was referring to the case that we are resetting the PID/sequence
> > after
> > > > expire a batch. This is more about the sending the batches after the
> > > > expired batch.
> > > >
> > > > The scenario being discussed is expiring one of the batches in a
> > > in-flight
> > > > request and retry the other batches in the that in-flight request. So
> > > > consider the following case:
> > > > 1. Producer sends request_0 with two batches (batch_0_tp0 and
> > > batch_0_tp1).
> > > > 2. Broker receives the request enqueued the request to the log.
> > > > 3. Before the producer receives the response from the broker,
> > batch_0_tp0
> > > > expires. The producer will expire batch_0_tp0 immediately, resets
> PID,
> > > and
> > > > then resend batch_0_tp1, and maybe send batch_1_tp0 (i.e. the next
> > batch
> > > to
> > > > the expired batch) as well.
> > > >
> > > > For batch_0_tp1, it is OK to reuse PID and and sequence number. The
> > > problem
> > > > is for batch_1_tp0, If we reuse the same PID and the broker has
> already
> > > > appended batch_0_tp0, the broker will think batch_1_tp0 is a
> duplicate
> > > with
> > > > the same sequence number. As a result broker will drop batch_0_tp1.
> > That
> > > is
> > > > why we have to either bump up sequence number or reset PID. To avoid
> > this
> > > > complexity, I was suggesting not expire the in-flight batch
> > immediately,
> > > > but wait for the produce response. If the batch has been successfully
> > > > appended, we do not expire it. Otherwise, we expire it.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > >
> > > >
> > > > On Thu, Aug 24, 2017 at 11:26 AM, Jason Gustafson <
> ja...@confluent.io>
> > > > wrote:
> > > >
> > > > > @Becket
> > > > >
> > > > > Good point about unnecessarily resetting the PID in cases where we
> > know
> > > > the
> > > > > request has failed. Might be worth opening a JIRA to try and
> improve
> > > > this.
> > > > >
> > > > > So if we expire the batch prematurely and resend all
> > > > > > the other batches in the same request, chances are there will be
> > > > > > duplicates. If we wait for the response instead, it is less
> likely
> > to
> > > > > > introduce duplicates, and we may not need to reset the PID.
> > > > >
> > > > >
> > > > > Not sure I follow this. Are you assuming that we change the batch
> > > > > PID/sequence of the retried batches after resetting the PID? I
> think
> > we
> > > > > probably need to ensure that when we retry a batch, we always use
> the
> > > > same
> > > > > PID/sequence.
> > > > >
> > > > > By the way, as far as naming, `max.message.delivery.wait.ms` is
> > quite
> > > a
> > > > > mouthful. Could we shorten it? Perhaps `delivery.timeout.ms`?
> > > > >
> > > > > -Jason
> > > > >
> > > > > On Wed, Aug 23, 2017 at 8:51 PM, Becket Qin <becket....@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > If TCP timeout is longer than request.timeout.ms, the producer
> > will
> > > > > always
> > > > > > hit request.timeout.ms before hitting TCP timeout, right? That
> is
> > > why
> > > > we
> > > > > > added request.timeout.ms in the first place.
> > > > > >
> > > > > > You are right. Currently we are reset the PID and resend the
> > batches
> > > to
> > > > > > avoid OutOfOrderSequenceException when the expired batches are in
> > > > retry.
> > > > > >
> > > > > > This does not distinguish the reasons that caused the retry.
> There
> > > are
> > > > > two
> > > > > > cases:
> > > > > > 1. If the batch was in retry because it received an error
> response
> > > > (e.g.
> > > > > > NotLeaderForPartition), we actually don't need to reset PID in
> this
> > > > case
> > > > > > because we know that broker did not accept it.
> > > > > > 2. If the batch was in retry because it hit a timeout earlier,
> then
> > > we
> > > > > > should reset the PID (or optimistically send and only reset PID
> > when
> > > > > > receive OutOfOrderSequenceException?)
> > > > > > Case 1 is probably the most common case, so it looks that we are
> > > > > resetting
> > > > > > the PID more often than necessary. But because in case 1 the
> broker
> > > > does
> > > > > > not have the batch, there isn't much impact on resting PID and
> > resend
> > > > > other
> > > > > > than the additional round trip.
> > > > > >
> > > > > > Now we are introducing another case:
> > > > > > 3. A batch is in retry because we expired an in-flight request
> > before
> > > > it
> > > > > > hits request.timeout.ms.
> > > > > >
> > > > > > The difference between 2 and 3 is that in case 3 likely the
> broker
> > > has
> > > > > > appended the messages. So if we expire the batch prematurely and
> > > resend
> > > > > all
> > > > > > the other batches in the same request, chances are there will be
> > > > > > duplicates. If we wait for the response instead, it is less
> likely
> > to
> > > > > > introduce duplicates, and we may not need to reset the PID.
> > > > > >
> > > > > > That said, given that batch expiration is probably already rare
> > > enough,
> > > > > so
> > > > > > it may not be necessary to optimize for that.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 23, 2017 at 5:01 PM, Jun Rao <j...@confluent.io>
> wrote:
> > > > > >
> > > > > > > Hi, Becket,
> > > > > > >
> > > > > > > If a message expires while it's in an inflight produce request,
> > the
> > > > > > > producer will get a new PID if idempotent is enabled. This will
> > > > prevent
> > > > > > > subsequent messages from hitting OutOfOrderSequenceException.
> The
> > > > issue
> > > > > > of
> > > > > > > not expiring an inflight request is that if a broker server
> goes
> > > down
> > > > > > hard
> > > > > > > (e.g. power outage), the time that it takes for the client to
> > > detect
> > > > > the
> > > > > > > socket level error (this will be sth like 8+ minutes with the
> > > default
> > > > > TCP
> > > > > > > setting) is much longer than the default request.timeout.ms.
> > > > > > >
> > > > > > > Hi, Sumant,
> > > > > > >
> > > > > > > We can probably just default max.message.delivery.wait.ms to
> 30
> > > > secs,
> > > > > > the
> > > > > > > current default for request.timeout.ms.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Aug 23, 2017 at 3:38 PM, Sumant Tambe <
> suta...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > OK. Looks like starting the clock after closing the batch has
> > > > quite a
> > > > > > few
> > > > > > > > pitfalls. I can't think of a way of to work around it without
> > > > adding
> > > > > > yet
> > > > > > > > another config. So I won't discuss that here. Anyone? As I
> said
> > > > > > earlier,
> > > > > > > > I'm not hung up on super-accurate notification times.
> > > > > > > >
> > > > > > > > If we are going down the max.message.delievery.wait.ms
> route,
> > > what
> > > > > > would
> > > > > > > > be
> > > > > > > > the default? There seem to be a few options.
> > > > > > > >
> > > > > > > > 1. max.message.delievery.wait.ms=null. Nothing changes for
> > those
> > > > who
> > > > > > > don't
> > > > > > > > set it. I.e., batches expire after request.timeout.ms in
> > > > > accumulator.
> > > > > > If
> > > > > > > > they are past the accumulator stage, timeout after retries*(
> > > > > > > > request.timeout.ms+backoff).
> > > > > > > >
> > > > > > > > 2. max.message.delivery.wait.ms=request.timeout.ms. No
> > obervable
> > > > > > > > behavioral
> > > > > > > > change at the accumulator level as timeout value is same as
> > > before.
> > > > > > > Retries
> > > > > > > > will be done if as long as batch is under
> > > > > max.message.delivery.wait.ms
> > > > > > .
> > > > > > > > However, a batch can expire just after one try. That's ok IMO
> > > > because
> > > > > > > > request.timeout.ms tend to be large (Default 30000).
> > > > > > > >
> > > > > > > > 3. max.message.delivery.wait.ms=2*request.timeout.ms. Give
> > > > > opportunity
> > > > > > > for
> > > > > > > > two retries but warn that retries may not happen at all in
> some
> > > > rare
> > > > > > > > cases and a batch could expire before any attempt.
> > > > > > > >
> > > > > > > > 4. max.message.delivery.wait.ms=something else (a constant?)
> > > > > > > >
> > > > > > > > Thoughts?
> > > > > > > >
> > > > > > > > On 23 August 2017 at 09:01, Ismael Juma <ism...@juma.me.uk>
> > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Becket, that seems reasonable. Sumant, would you be
> > > > willing
> > > > > to
> > > > > > > > > update the KIP based on the discussion or are you still not
> > > > > > convinced?
> > > > > > > > >
> > > > > > > > > Ismael
> > > > > > > > >
> > > > > > > > > On Wed, Aug 23, 2017 at 6:04 AM, Becket Qin <
> > > > becket....@gmail.com>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > In general max.message.delivery.wait.ms is a cleaner
> > > approach.
> > > > > > That
> > > > > > > > > would
> > > > > > > > > > make the guarantee clearer. That said, there seem
> > subtleties
> > > in
> > > > > > some
> > > > > > > > > > scenarios:
> > > > > > > > > >
> > > > > > > > > > 1. I agree with Sumante that it is a little weird that a
> > > > message
> > > > > > > could
> > > > > > > > be
> > > > > > > > > > expired immediately if it happens to enter a batch that
> is
> > > > about
> > > > > to
> > > > > > > be
> > > > > > > > > > expired. But as Jun said, as long as we have multiple
> > > messages
> > > > > in a
> > > > > > > > > batch,
> > > > > > > > > > there isn't a cheap way to achieve a precise timeout. So
> > the
> > > > > > question
> > > > > > > > > > actually becomes whether it is more user-friendly to
> expire
> > > > early
> > > > > > > > (based
> > > > > > > > > on
> > > > > > > > > > the batch creation time) or expire late (based on the
> batch
> > > > close
> > > > > > > > time).
> > > > > > > > > I
> > > > > > > > > > think both are acceptable. Personally I think most users
> do
> > > not
> > > > > > > really
> > > > > > > > > care
> > > > > > > > > > about expire a little late as long as it eventually
> > expires.
> > > > So I
> > > > > > > would
> > > > > > > > > use
> > > > > > > > > > batch close time as long as there is a bound on that. But
> > it
> > > > > looks
> > > > > > > that
> > > > > > > > > we
> > > > > > > > > > do not really have a bound on when we will close a batch.
> > So
> > > > > > > expiration
> > > > > > > > > > based on batch create time may be the only option if we
> > don't
> > > > > want
> > > > > > to
> > > > > > > > > > introduce complexity.
> > > > > > > > > >
> > > > > > > > > > 2. If we timeout a batch in a request when it is still in
> > > > flight,
> > > > > > the
> > > > > > > > end
> > > > > > > > > > result of that batch is unclear to the users. It would be
> > > weird
> > > > > > that
> > > > > > > > user
> > > > > > > > > > receive exception saying those messages are expired while
> > > they
> > > > > > > actually
> > > > > > > > > > have been sent successfully. Also if idempotence is set
> to
> > > > true,
> > > > > > what
> > > > > > > > > would
> > > > > > > > > > the next sequence ID be after the expired batch? Reusing
> > the
> > > > same
> > > > > > > > > sequence
> > > > > > > > > > Id may result in data loss, and increment the sequence ID
> > may
> > > > > cause
> > > > > > > > > > OutOfOrderSequenceException. Besides, extracting an
> expired
> > > > batch
> > > > > > > from
> > > > > > > > a
> > > > > > > > > > request also introduces some complexity. Again,
> personally
> > I
> > > > > think
> > > > > > it
> > > > > > > > is
> > > > > > > > > > fine to expire a little bit late. So maybe we don't need
> to
> > > > > expire
> > > > > > a
> > > > > > > > > batch
> > > > > > > > > > that is already in flight. In the worst case we will
> expire
> > > it
> > > > > with
> > > > > > > > delay
> > > > > > > > > > of request.timeout.ms.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Tue, Aug 22, 2017 at 3:08 AM, Ismael Juma <
> > > > ism...@juma.me.uk>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Hi all,
> > > > > > > > > >>
> > > > > > > > > >> The discussion has been going on for a while, would it
> > help
> > > to
> > > > > > have
> > > > > > > a
> > > > > > > > > >> call to discuss this? I'd like to start a vote soonish
> so
> > > that
> > > > > we
> > > > > > > can
> > > > > > > > > >> include this in 1.0.0. I personally prefer
> > > > > > > > max.message.delivery.wait.ms
> > > > > > > > > .
> > > > > > > > > >> It seems like Jun, Apurva and Jason also prefer that.
> > > Sumant,
> > > > it
> > > > > > > seems
> > > > > > > > > like
> > > > > > > > > >> you still prefer a batch.expiry.ms, is that right? What
> > are
> > > > > your
> > > > > > > > > >> thoughts Joel and Becket?
> > > > > > > > > >>
> > > > > > > > > >> Ismael
> > > > > > > > > >>
> > > > > > > > > >> On Wed, Aug 16, 2017 at 6:34 PM, Jun Rao <
> > j...@confluent.io>
> > > > > > wrote:
> > > > > > > > > >>
> > > > > > > > > >>> Hi, Sumant,
> > > > > > > > > >>>
> > > > > > > > > >>> The semantics of linger.ms is a bit subtle. The
> > reasoning
> > > > for
> > > > > > the
> > > > > > > > > >>> current
> > > > > > > > > >>> implementation is the following. Let's say one sets
> > > > linger.ms
> > > > > > to 0
> > > > > > > > > (our
> > > > > > > > > >>> current default value). Creating a batch for every
> > message
> > > > will
> > > > > > be
> > > > > > > > bad
> > > > > > > > > >>> for
> > > > > > > > > >>> throughput. Instead, the current implementation only
> > forms
> > > a
> > > > > > batch
> > > > > > > > when
> > > > > > > > > >>> the
> > > > > > > > > >>> batch is sendable (i.e., broker is available, inflight
> > > > request
> > > > > > > limit
> > > > > > > > is
> > > > > > > > > >>> not
> > > > > > > > > >>> exceeded, etc). That way, the producer has more chance
> > for
> > > > > > > batching.
> > > > > > > > > The
> > > > > > > > > >>> implication is that a batch could be closed longer than
> > > > > > linger.ms.
> > > > > > > > > >>>
> > > > > > > > > >>> Now, on your concern about not having a precise way to
> > > > control
> > > > > > > delay
> > > > > > > > in
> > > > > > > > > >>> the
> > > > > > > > > >>> accumulator. It seems the batch.expiry.ms approach
> will
> > > have
> > > > > the
> > > > > > > > same
> > > > > > > > > >>> issue. If you start the clock when a batch is
> > initialized,
> > > > you
> > > > > > may
> > > > > > > > > expire
> > > > > > > > > >>> some messages in the same batch early than
> > batch.expiry.ms
> > > .
> > > > If
> > > > > > you
> > > > > > > > > start
> > > > > > > > > >>> the clock when the batch is closed, the expiration time
> > > could
> > > > > be
> > > > > > > > > >>> unbounded
> > > > > > > > > >>> because of the linger.ms implementation described
> above.
> > > > > > Starting
> > > > > > > > the
> > > > > > > > > >>> expiration clock on batch initialization will at least
> > > > > guarantee
> > > > > > > the
> > > > > > > > > time
> > > > > > > > > >>> to expire the first message is precise, which is
> probably
> > > > good
> > > > > > > > enough.
> > > > > > > > > >>>
> > > > > > > > > >>> Thanks,
> > > > > > > > > >>>
> > > > > > > > > >>> Jun
> > > > > > > > > >>>
> > > > > > > > > >>>
> > > > > > > > > >>>
> > > > > > > > > >>> On Tue, Aug 15, 2017 at 3:46 PM, Sumant Tambe <
> > > > > suta...@gmail.com
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >>>
> > > > > > > > > >>> > Question about "the closing of a batch can be delayed
> > > > longer
> > > > > > than
> > > > > > > > > >>> > linger.ms":
> > > > > > > > > >>> > Is it possible to cause an indefinite delay? At some
> > > point
> > > > > > bytes
> > > > > > > > > limit
> > > > > > > > > >>> > might kick in. Also, why is closing of a batch
> coupled
> > > with
> > > > > > > > > >>> availability of
> > > > > > > > > >>> > its destination? In this approach a batch chosen for
> > > > eviction
> > > > > > due
> > > > > > > > to
> > > > > > > > > >>> delay
> > > > > > > > > >>> > needs to "close" anyway, right (without regards to
> > > > > destination
> > > > > > > > > >>> > availability)?
> > > > > > > > > >>> >
> > > > > > > > > >>> > I'm not too worried about notifying at super-exact
> time
> > > > > > specified
> > > > > > > > in
> > > > > > > > > >>> the
> > > > > > > > > >>> > configs. But expiring before the full wait-span has
> > > elapsed
> > > > > > > sounds
> > > > > > > > a
> > > > > > > > > >>> little
> > > > > > > > > >>> > weird. So expiration time has a +/- spread. It works
> > more
> > > > > like
> > > > > > a
> > > > > > > > hint
> > > > > > > > > >>> than
> > > > > > > > > >>> > max. So why not message.delivery.wait.hint.ms?
> > > > > > > > > >>> >
> > > > > > > > > >>> > Yeah, cancellable future will be similar in
> complexity.
> > > > > > > > > >>> >
> > > > > > > > > >>> > I'm unsure if max.message.delivery.wait.ms will the
> > > final
> > > > > nail
> > > > > > > for
> > > > > > > > > >>> > producer
> > > > > > > > > >>> > timeouts. We still won't have a precise way to
> control
> > > > delay
> > > > > in
> > > > > > > > just
> > > > > > > > > >>> the
> > > > > > > > > >>> > accumulator segment. batch.expiry.ms does not try to
> > > > > abstract.
> > > > > > > > It's
> > > > > > > > > >>> very
> > > > > > > > > >>> > specific.
> > > > > > > > > >>> >
> > > > > > > > > >>> > My biggest concern at the moment is implementation
> > > > > complexity.
> > > > > > > > > >>> >
> > > > > > > > > >>> > At this state, I would like to encourage other
> > > independent
> > > > > > > > opinions.
> > > > > > > > > >>> >
> > > > > > > > > >>> > Regards,
> > > > > > > > > >>> > Sumant
> > > > > > > > > >>> >
> > > > > > > > > >>> > On 11 August 2017 at 17:35, Jun Rao <
> j...@confluent.io>
> > > > > wrote:
> > > > > > > > > >>> >
> > > > > > > > > >>> > > Hi, Sumant,
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > 1. Yes, it's probably reasonable to require
> > > > > > > > > >>> max.message.delivery.wait.ms
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > linger.ms. As for retries, perhaps we can set the
> > > > default
> > > > > > > > retries
> > > > > > > > > to
> > > > > > > > > >>> > > infinite or just ignore it. Then the latency will
> be
> > > > > bounded
> > > > > > by
> > > > > > > > > >>> > > max.message.delivery.wait.ms. request.timeout.ms
> is
> > > the
> > > > > max
> > > > > > > time
> > > > > > > > > the
> > > > > > > > > >>> > > request will be spending on the server. The client
> > can
> > > > > expire
> > > > > > > an
> > > > > > > > > >>> inflight
> > > > > > > > > >>> > > request early if needed.
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > 2. Well, since max.message.delivery.wait.ms
> > specifies
> > > > the
> > > > > > max,
> > > > > > > > > >>> calling
> > > > > > > > > >>> > the
> > > > > > > > > >>> > > callback a bit early may be ok? Note that
> > > > > > > > > >>> max.message.delivery.wait.ms
> > > > > > > > > >>> > > only
> > > > > > > > > >>> > > comes into play in the rare error case. So, I am
> not
> > > sure
> > > > > if
> > > > > > we
> > > > > > > > > need
> > > > > > > > > >>> to
> > > > > > > > > >>> > be
> > > > > > > > > >>> > > very precise. The issue with starting the clock on
> > > > closing
> > > > > a
> > > > > > > > batch
> > > > > > > > > is
> > > > > > > > > >>> > that
> > > > > > > > > >>> > > currently if the leader is not available, the
> closing
> > > of
> > > > a
> > > > > > > batch
> > > > > > > > > can
> > > > > > > > > >>> be
> > > > > > > > > >>> > > delayed longer than linger.ms.
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > 4. As you said, future.get(timeout) itself doesn't
> > > solve
> > > > > the
> > > > > > > > > problem
> > > > > > > > > >>> > since
> > > > > > > > > >>> > > you still need a way to expire the record in the
> > > sender.
> > > > > The
> > > > > > > > amount
> > > > > > > > > >>> of
> > > > > > > > > >>> > work
> > > > > > > > > >>> > > to implement a cancellable future is probably the
> > same?
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > Overall, my concern with patch work is that we have
> > > > > iterated
> > > > > > on
> > > > > > > > the
> > > > > > > > > >>> > produce
> > > > > > > > > >>> > > request timeout multiple times and new issues keep
> > > coming
> > > > > > back.
> > > > > > > > > >>> Ideally,
> > > > > > > > > >>> > > this time, we want to have a solution that covers
> all
> > > > > cases,
> > > > > > > even
> > > > > > > > > >>> though
> > > > > > > > > >>> > > that requires a bit more work.
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > Thanks,
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > Jun
> > > > > > > > > >>> > >
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > On Fri, Aug 11, 2017 at 12:30 PM, Sumant Tambe <
> > > > > > > > suta...@gmail.com>
> > > > > > > > > >>> > wrote:
> > > > > > > > > >>> > >
> > > > > > > > > >>> > > > Hi Jun,
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > Thanks for looking into it.
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > Yes, we did consider this message-level timeout
> > > > approach
> > > > > > and
> > > > > > > > > >>> expiring
> > > > > > > > > >>> > > > batches selectively in a request but rejected it
> > due
> > > to
> > > > > the
> > > > > > > > > >>> reasons of
> > > > > > > > > >>> > > > added complexity without a strong benefit to
> > > > > counter-weigh
> > > > > > > > that.
> > > > > > > > > >>> Your
> > > > > > > > > >>> > > > proposal is a slight variation so I'll mention
> some
> > > > > issues
> > > > > > > > here.
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > 1. It sounds like max.message.delivery.wait.ms
> > will
> > > > > > overlap
> > > > > > > > with
> > > > > > > > > >>> "time
> > > > > > > > > >>> > > > segments" of both linger.ms and retries * (
> > > > > > > request.timeout.ms
> > > > > > > > +
> > > > > > > > > >>> > > > retry.backoff.ms). In that case, which config
> set
> > > > takes
> > > > > > > > > >>> precedence? It
> > > > > > > > > >>> > > > would not make sense to configure configs from
> both
> > > > sets.
> > > > > > > > > >>> Especially,
> > > > > > > > > >>> > we
> > > > > > > > > >>> > > > discussed exhaustively internally that retries
> and
> > > > > > > > > >>> > > > max.message.delivery.wait.ms can't / shouldn't
> be
> > > > > > configured
> > > > > > > > > >>> together.
> > > > > > > > > >>> > > > Retires become moot as you already mention. I
> think
> > > > > that's
> > > > > > > > going
> > > > > > > > > >>> to be
> > > > > > > > > >>> > > > surprising to anyone wanting to use
> > > > > > > > max.message.delivery.wait.ms
> > > > > > > > > .
> > > > > > > > > >>> We
> > > > > > > > > >>> > > > probably need max.message.delivery.wait.ms >
> > > linger.ms
> > > > > or
> > > > > > > > > >>> something
> > > > > > > > > >>> > like
> > > > > > > > > >>> > > > that.
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > 2. If clock starts when a batch is created and
> > expire
> > > > > when
> > > > > > > > > >>> > > > max.message.delivery.wait.ms is over in the
> > > > accumulator,
> > > > > > the
> > > > > > > > > last
> > > > > > > > > >>> few
> > > > > > > > > >>> > > > messages in the expiring batch may not have lived
> > > long
> > > > > > > enough.
> > > > > > > > As
> > > > > > > > > >>> the
> > > > > > > > > >>> > > > config seems to suggests per-message timeout,
> it's
> > > > > > incorrect
> > > > > > > to
> > > > > > > > > >>> expire
> > > > > > > > > >>> > > > messages prematurely. On the other hand if clock
> > > starts
> > > > > > after
> > > > > > > > > >>> batch is
> > > > > > > > > >>> > > > closed (which also implies that linger.ms is not
> > > > covered
> > > > > > by
> > > > > > > > the
> > > > > > > > > >>> > > > max.message.delivery.wait.ms config), no message
> > > would
> > > > > be
> > > > > > be
> > > > > > > > > >>> expired
> > > > > > > > > >>> > too
> > > > > > > > > >>> > > > soon. Yeah, expiration may be little bit too late
> > but
> > > > > hey,
> > > > > > > this
> > > > > > > > > >>> ain't
> > > > > > > > > >>> > > > real-time service.
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > 3. I agree that steps #3, #4, (and #5) are
> complex
> > to
> > > > > > > > implement.
> > > > > > > > > >>> On the
> > > > > > > > > >>> > > > other hand, batch.expiry.ms is next to trivial
> to
> > > > > > implement.
> > > > > > > > We
> > > > > > > > > >>> just
> > > > > > > > > >>> > > pass
> > > > > > > > > >>> > > > the config all the way down to
> > > > ProducerBatch.maybeExpire
> > > > > > and
> > > > > > > be
> > > > > > > > > >>> done
> > > > > > > > > >>> > with
> > > > > > > > > >>> > > > it.
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > 4. Do you think the effect of
> > > > > max.message.delivery.wait.ms
> > > > > > > can
> > > > > > > > > be
> > > > > > > > > >>> > > > simulated
> > > > > > > > > >>> > > > with future.get(timeout) method? Copying excerpt
> > from
> > > > the
> > > > > > > > kip-91:
> > > > > > > > > >>> An
> > > > > > > > > >>> > > > end-to-end timeout may be partially emulated
> using
> > > the
> > > > > > > > > >>> > > future.get(timeout).
> > > > > > > > > >>> > > > The timeout must be greater than (
> batch.expiry.ms
> > +
> > > > > > nRetries
> > > > > > > > * (
> > > > > > > > > >>> > > > request.timeout.ms + retry.backoff.ms)). Note
> that
> > > > when
> > > > > > > future
> > > > > > > > > >>> times
> > > > > > > > > >>> > > out,
> > > > > > > > > >>> > > > Sender may continue to send the records in the
> > > > > background.
> > > > > > To
> > > > > > > > > avoid
> > > > > > > > > >>> > that,
> > > > > > > > > >>> > > > implementing a cancellable future is a
> possibility.
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > For simplicity, we could just implement a trivial
> > > > method
> > > > > in
> > > > > > > > > >>> producer
> > > > > > > > > >>> > > > ProducerConfigs.maxMessageDeliveryWaitMs() and
> > > return
> > > > a
> > > > > > > number
> > > > > > > > > >>> based
> > > > > > > > > >>> > on
> > > > > > > > > >>> > > > this formula? Users of future.get can use this
> > > timeout
> > > > > > value.
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > Thoughts?
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > Regards,
> > > > > > > > > >>> > > > Sumant
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > On 11 August 2017 at 07:50, Sumant Tambe <
> > > > > > suta...@gmail.com>
> > > > > > > > > >>> wrote:
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > > > >
> > > > > > > > > >>> > > > > Thanks for the KIP. Nice documentation on all
> > > current
> > > > > > > issues
> > > > > > > > > >>> with the
> > > > > > > > > >>> > > > >> timeout.
> > > > > > > > > >>> > > > >
> > > > > > > > > >>> > > > > For the KIP writeup, all credit goes to Joel
> > Koshy.
> > > > > > > > > >>> > > > >
> > > > > > > > > >>> > > > > I'll follow up on your comments a little later.
> > > > > > > > > >>> > > > >
> > > > > > > > > >>> > > > >
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >> You also brought up a good use case for timing
> > > out a
> > > > > > > > message.
> > > > > > > > > >>> For
> > > > > > > > > >>> > > > >> applications that collect and send sensor data
> > to
> > > > > Kafka,
> > > > > > > if
> > > > > > > > > the
> > > > > > > > > >>> data
> > > > > > > > > >>> > > > can't
> > > > > > > > > >>> > > > >> be sent to Kafka for some reason, the
> > application
> > > > may
> > > > > > > prefer
> > > > > > > > > to
> > > > > > > > > >>> > buffer
> > > > > > > > > >>> > > > the
> > > > > > > > > >>> > > > >> more recent data in the accumulator. Without a
> > > > > timeout,
> > > > > > > the
> > > > > > > > > >>> > > accumulator
> > > > > > > > > >>> > > > >> will be filled with old records and new
> records
> > > > can't
> > > > > be
> > > > > > > > > added.
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >> Your proposal makes sense for a developer who
> is
> > > > > > familiar
> > > > > > > > with
> > > > > > > > > >>> how
> > > > > > > > > >>> > the
> > > > > > > > > >>> > > > >> producer works. I am not sure if this is very
> > > > > intuitive
> > > > > > to
> > > > > > > > the
> > > > > > > > > >>> users
> > > > > > > > > >>> > > > since
> > > > > > > > > >>> > > > >> it may not be very easy for them to figure out
> > how
> > > > to
> > > > > > > > > configure
> > > > > > > > > >>> the
> > > > > > > > > >>> > > new
> > > > > > > > > >>> > > > >> knob to bound the amount of the time when a
> > > message
> > > > is
> > > > > > > > > >>> completed.
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >> From users' perspective, Apurva's suggestion
> of
> > > > > > > > > >>> > > > >> max.message.delivery.wait.ms (which
> > > > > > > > > >>> > > > >> bounds the time when a message is in the
> > > accumulator
> > > > > to
> > > > > > > the
> > > > > > > > > time
> > > > > > > > > >>> > when
> > > > > > > > > >>> > > > the
> > > > > > > > > >>> > > > >> callback is called) seems more intuition. You
> > > listed
> > > > > > this
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > >>> > > > rejected
> > > > > > > > > >>> > > > >> section since it requires additional logic to
> > > > rebatch
> > > > > > > when a
> > > > > > > > > >>> produce
> > > > > > > > > >>> > > > >> request expires. However, this may not be too
> > bad.
> > > > The
> > > > > > > > > >>> following are
> > > > > > > > > >>> > > the
> > > > > > > > > >>> > > > >> things that we have to do.
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >> 1. The clock starts when a batch is created.
> > > > > > > > > >>> > > > >> 2. If the batch can't be drained within
> > > > > > > > > >>> > max.message.delivery.wait.ms,
> > > > > > > > > >>> > > > all
> > > > > > > > > >>> > > > >> messages in the batch will fail and the
> callback
> > > > will
> > > > > be
> > > > > > > > > called.
> > > > > > > > > >>> > > > >> 3. When sending a produce request, we
> calculate
> > an
> > > > > > > > expireTime
> > > > > > > > > >>> for
> > > > > > > > > >>> > the
> > > > > > > > > >>> > > > >> request that equals to the remaining
> expiration
> > > time
> > > > > for
> > > > > > > the
> > > > > > > > > >>> oldest
> > > > > > > > > >>> > > > batch
> > > > > > > > > >>> > > > >> in the request.
> > > > > > > > > >>> > > > >> 4. We set the minimum of the expireTime of all
> > > > > inflight
> > > > > > > > > >>> requests as
> > > > > > > > > >>> > > the
> > > > > > > > > >>> > > > >> timeout in the selector poll call (so that the
> > > > > selector
> > > > > > > can
> > > > > > > > > >>> wake up
> > > > > > > > > >>> > > > before
> > > > > > > > > >>> > > > >> the expiration time).
> > > > > > > > > >>> > > > >> 5. If the produce response can't be received
> > > within
> > > > > > > > > expireTime,
> > > > > > > > > >>> we
> > > > > > > > > >>> > > > expire
> > > > > > > > > >>> > > > >> all batches in the produce request whose
> > > expiration
> > > > > time
> > > > > > > has
> > > > > > > > > >>> been
> > > > > > > > > >>> > > > reached.
> > > > > > > > > >>> > > > >> For the rest of the batches, we resend them
> in a
> > > new
> > > > > > > produce
> > > > > > > > > >>> > request.
> > > > > > > > > >>> > > > >> 6. If the producer response has a retriable
> > error,
> > > > we
> > > > > > just
> > > > > > > > > >>> backoff a
> > > > > > > > > >>> > > bit
> > > > > > > > > >>> > > > >> and then retry the produce request as today.
> The
> > > > > number
> > > > > > of
> > > > > > > > > >>> retries
> > > > > > > > > >>> > > > doesn't
> > > > > > > > > >>> > > > >> really matter now. We just keep retrying until
> > the
> > > > > > > > expiration
> > > > > > > > > >>> time
> > > > > > > > > >>> > is
> > > > > > > > > >>> > > > >> reached. It's possible that a produce request
> is
> > > > never
> > > > > > > > retried
> > > > > > > > > >>> due
> > > > > > > > > >>> > to
> > > > > > > > > >>> > > > >> expiration. However, this seems the right
> thing
> > to
> > > > do
> > > > > > > since
> > > > > > > > > the
> > > > > > > > > >>> > users
> > > > > > > > > >>> > > > want
> > > > > > > > > >>> > > > >> to timeout the message at this time.
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >> Implementation wise, there will be a bit more
> > > > > complexity
> > > > > > > in
> > > > > > > > > >>> step 3
> > > > > > > > > >>> > and
> > > > > > > > > >>> > > > 4,
> > > > > > > > > >>> > > > >> but probably not too bad. The benefit is that
> > this
> > > > is
> > > > > > more
> > > > > > > > > >>> intuitive
> > > > > > > > > >>> > > to
> > > > > > > > > >>> > > > >> the
> > > > > > > > > >>> > > > >> end user.
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >> Does that sound reasonable to you?
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >> Thanks,
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >> Jun
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >> On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe
> <
> > > > > > > > > >>> suta...@gmail.com>
> > > > > > > > > >>> > > > wrote:
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >> > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta
> <
> > > > > > > > > >>> apu...@confluent.io>
> > > > > > > > > >>> > > > >> wrote:
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > > > > There seems to be no relationship with
> > > > cluster
> > > > > > > > > metadata
> > > > > > > > > >>> > > > >> availability
> > > > > > > > > >>> > > > >> > or
> > > > > > > > > >>> > > > >> > > > > staleness. Expiry is just based on the
> > > time
> > > > > > since
> > > > > > > > the
> > > > > > > > > >>> batch
> > > > > > > > > >>> > > has
> > > > > > > > > >>> > > > >> been
> > > > > > > > > >>> > > > >> > > > ready.
> > > > > > > > > >>> > > > >> > > > > Please correct me if I am wrong.
> > > > > > > > > >>> > > > >> > > > >
> > > > > > > > > >>> > > > >> > > >
> > > > > > > > > >>> > > > >> > > > I was not very specific about where we
> do
> > > > > > > expiration.
> > > > > > > > I
> > > > > > > > > >>> > glossed
> > > > > > > > > >>> > > > over
> > > > > > > > > >>> > > > >> > some
> > > > > > > > > >>> > > > >> > > > details because (again) we've other
> > > mechanisms
> > > > > to
> > > > > > > > detect
> > > > > > > > > >>> non
> > > > > > > > > >>> > > > >> progress.
> > > > > > > > > >>> > > > >> > > The
> > > > > > > > > >>> > > > >> > > > condition (!muted.contains(tp) &&
> > > > > (isMetadataStale
> > > > > > > ||
> > > > > > > > > >>> > > > >> > > > > cluster.leaderFor(tp) == null)) is
> used
> > in
> > > > > > > > > >>> > > > >> > > > RecordAccumualtor.expiredBatches:
> > > > > > > > > >>> > > > >> > > > https://github.com/apache/
> > > > > > > > kafka/blob/trunk/clients/src/
> > > > > > > > > >>> > > > >> > > > main/java/org/apache/kafka/
> > > > > > > > clients/producer/internals/
> > > > > > > > > >>> > > > >> > > > RecordAccumulator.java#L443
> > > > > > > > > >>> > > > >> > > >
> > > > > > > > > >>> > > > >> > > >
> > > > > > > > > >>> > > > >> > > > Effectively, we expire in all the
> > following
> > > > > cases
> > > > > > > > > >>> > > > >> > > > 1) producer is partitioned from the
> > brokers.
> > > > > When
> > > > > > > > > >>> metadata age
> > > > > > > > > >>> > > > grows
> > > > > > > > > >>> > > > >> > > beyond
> > > > > > > > > >>> > > > >> > > > 3x it's max value. It's safe to say that
> > > we're
> > > > > not
> > > > > > > > > >>> talking to
> > > > > > > > > >>> > > the
> > > > > > > > > >>> > > > >> > brokers
> > > > > > > > > >>> > > > >> > > > at all. Report.
> > > > > > > > > >>> > > > >> > > > 2) fresh metadata && leader for a
> > partition
> > > is
> > > > > not
> > > > > > > > known
> > > > > > > > > >>> && a
> > > > > > > > > >>> > > > batch
> > > > > > > > > >>> > > > >> is
> > > > > > > > > >>> > > > >> > > > sitting there for longer than
> > > > > request.timeout.ms.
> > > > > > > > This
> > > > > > > > > >>> is one
> > > > > > > > > >>> > > > case
> > > > > > > > > >>> > > > >> we
> > > > > > > > > >>> > > > >> > > > would
> > > > > > > > > >>> > > > >> > > > like to improve and use batch.expiry.ms
> > > > because
> > > > > > > > > >>> > > > request.timeout.ms
> > > > > > > > > >>> > > > >> is
> > > > > > > > > >>> > > > >> > > too
> > > > > > > > > >>> > > > >> > > > small.
> > > > > > > > > >>> > > > >> > > > 3) fresh metadata && leader for a
> > partition
> > > is
> > > > > > known
> > > > > > > > &&
> > > > > > > > > >>> batch
> > > > > > > > > >>> > is
> > > > > > > > > >>> > > > >> > sitting
> > > > > > > > > >>> > > > >> > > > there for longer than batch.expiry.ms.
> > This
> > > > is
> > > > > a
> > > > > > > new
> > > > > > > > > case
> > > > > > > > > >>> > that
> > > > > > > > > >>> > > is
> > > > > > > > > >>> > > > >> > > > different
> > > > > > > > > >>> > > > >> > > > from #2. This is the catch-up mode case.
> > > > Things
> > > > > > are
> > > > > > > > > >>> moving too
> > > > > > > > > >>> > > > >> slowly.
> > > > > > > > > >>> > > > >> > > > Pipeline SLAs are broken. Report and
> > > shutdown
> > > > > kmm.
> > > > > > > > > >>> > > > >> > > >
> > > > > > > > > >>> > > > >> > > > The second and the third cases are
> useful
> > > to a
> > > > > > > > real-time
> > > > > > > > > >>> app
> > > > > > > > > >>> > > for a
> > > > > > > > > >>> > > > >> > > > completely different reason. Report,
> > forget
> > > > > about
> > > > > > > the
> > > > > > > > > >>> batch,
> > > > > > > > > >>> > and
> > > > > > > > > >>> > > > >> just
> > > > > > > > > >>> > > > >> > > move
> > > > > > > > > >>> > > > >> > > > on (without shutting down).
> > > > > > > > > >>> > > > >> > > >
> > > > > > > > > >>> > > > >> > > >
> > > > > > > > > >>> > > > >> > > If I understand correctly, you are talking
> > > > about a
> > > > > > > fork
> > > > > > > > of
> > > > > > > > > >>> > apache
> > > > > > > > > >>> > > > >> kafka
> > > > > > > > > >>> > > > >> > > which has these additional conditions?
> > Because
> > > > > that
> > > > > > > > check
> > > > > > > > > >>> > doesn't
> > > > > > > > > >>> > > > >> exist
> > > > > > > > > >>> > > > >> > on
> > > > > > > > > >>> > > > >> > > trunk today.
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > Right. It is our internal release in
> LinkedIn.
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > Or are you proposing to change the behavior
> of
> > > > > expiry
> > > > > > to
> > > > > > > > > >>> > > > >> > > account for stale metadata and partitioned
> > > > > producers
> > > > > > > as
> > > > > > > > > >>> part of
> > > > > > > > > >>> > > this
> > > > > > > > > >>> > > > >> KIP?
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > No. It's our temporary solution in the
> absence
> > > of
> > > > > > > kip-91.
> > > > > > > > > Note
> > > > > > > > > >>> > that
> > > > > > > > > >>> > > we
> > > > > > > > > >>> > > > >> dont
> > > > > > > > > >>> > > > >> > like increasing request.timeout.ms. Without
> > our
> > > > > extra
> > > > > > > > > >>> conditions
> > > > > > > > > >>> > > our
> > > > > > > > > >>> > > > >> > batches expire too soon--a problem in kmm
> > > catchup
> > > > > > mode.
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > If we get batch.expiry.ms, we will
> configure
> > it
> > > > to
> > > > > 20
> > > > > > > > mins.
> > > > > > > > > >>> > > > maybeExpire
> > > > > > > > > >>> > > > >> > will use the config instead of r.t.ms. The
> > > extra
> > > > > > > > conditions
> > > > > > > > > >>> will
> > > > > > > > > >>> > be
> > > > > > > > > >>> > > > >> > unnecessary. All three cases shall be
> covered
> > > via
> > > > > the
> > > > > > > > > >>> batch.expiry
> > > > > > > > > >>> > > > >> timeout.
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >> > >
> > > > > > > > > >>> > > > >> > >
> > > > > > > > > >>> > > > >> >
> > > > > > > > > >>> > > > >>
> > > > > > > > > >>> > > > >
> > > > > > > > > >>> > > >
> > > > > > > > > >>> > >
> > > > > > > > > >>> >
> > > > > > > > > >>>
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to