IMO, having 4 different timeouts makes it confusing for the user and it
requires the client to understand the internals of kafka. We should have a
single timeout from the users perspective and handle other timeouts
internally like a batch timeout.

Mayuresh

On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Hey Jay,
>
> That is also a viable solution.
>
> I think the main purpose is to let user know how long they can block,
> which is important.
>
> I have some question over the proposal, though. Will user still need to
> send linger.ms? Will request timeout cover linger.ms as well?
> My concern of letting request timeout also cover the time spent in
> accumulator is that this will result in the actually request timeout
> indeterministic.
> Also, implementation wise, a request can have multiple batches, the time
> spent in the accumulator could vary a lot. If one of the batch times out,
> what should we do the the rest of the batches?
> I think we probably want to separate batch timeout and request timeout.
>
> Maybe we can do this:
> Max.send.block.ms
> Request.timeout
> Batch.timeout
> Replication.timeout
>
> So in send() we use max.send.block.ms only. In accumulator, we use
> batch.timeout, in NetWorkClient, we use request.timeout. Replication
> timeout is needed anyway.
>
> This looks more understandable from what I can see.
>
> What do you think?
>
> Jiangjie (Becket) Qin
>
> On 5/19/15, 11:48 AM, "Jay Kreps" <jay.kr...@gmail.com> wrote:
>
> >So the alternative to consider would be to instead have
> >   max.block.ms (or something)
> >   request.timeout
> >   replication.timeout
> >
> >I think this better captures what the user cares about. Here is how it
> >would work.
> >
> >*max.send.block.ms <http://max.send.block.ms>* is the bound on the
> maximum
> >time the producer.send() call can block.
> >This subsumes the existing metadata timeout use case but not the proposed
> >use for the time in the accumulator. It *also* acts as a bound on the time
> >you can block on BufferPool allocation (we'd have to add this but that
> >should be easy).
> >
> >*request.timeout* is the bound on the time after send() complete until you
> >get an acknowledgement. This covers the connection timeout, and the time
> >in
> >the accumulator. So to implement this, the time we set in the request sent
> >via NetworkClient would have already subtracted off the time spent in the
> >accumulator, and if the request retried we would include both the time in
> >the accumulator an the time taken for the first request, etc. In other
> >words this is the upper bound on the time to the Future being satisfied.
> >
> >*replication.timeout* will default to something reasonable but maybe you
> >can override it if you want?
> >
> >Thoughts?
> >
> >-Jay
> >
> >On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat <
> >gharatmayures...@gmail.com> wrote:
> >
> >> So what I understand is that, we would have 3 time outs :
> >> 1) replication timeout
> >> 2) request timeout
> >> 3) metadata timeout (existing)
> >>
> >> The request timeout has to be greater than the replication timeout.
> >> request timeout is for messages already sent to kafka and the producer
> >>is
> >> waiting for them.
> >>
> >> Thanks,
> >>
> >> Mayuresh
> >>
> >> On Tue, May 19, 2015 at 11:12 AM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> >>
> >> > I think this looks good. What I think is missing is an overview of the
> >> > timeouts from the user's perspective.
> >> >
> >> > My worry is that it is quite complicated to reason about the current
> >>set
> >> of
> >> > timeouts. Currently we have
> >> >    timeout.ms
> >> >    metadata.fetch.timeout.ms
> >> >
> >> > The proposed settings I think are:
> >> >   batch.expiration.ms
> >> > request.timeout.ms
> >> > replication.timeout.ms
> >> >
> >> > I think maybe we can skip the batch.expiration.ms. Instead maybe we
> >>can
> >> > somehow combine these into a single request timeout so that we
> >>subtract
> >> the
> >> > time you spent waiting from the request timeout and/or replication
> >> timeout
> >> > somehow? I don't have an explicit proposal but my suspicion is that
> >>from
> >> > the user's point of view there is just one timeout related to the
> >>request
> >> > after which they don't care, and we can split that up between the
> >>batch
> >> > time and the request time. Thoughts?
> >> >
> >> > How are we handling connection timeouts? If a machine hard fails in
> >>the
> >> > middle of connection establishment there will be no outstanding
> >> requests. I
> >> > think this may be okay because connections are established when we
> >>want
> >> to
> >> > send a request and presumably we will begin the timer then?
> >> >
> >> > To that end I suggest we do two things:
> >> > 1. Include KAKFA-1788. I know that technically these two things are
> >> > different but from the user's point of view they aren't.
> >> > 2. Include in the KIP the explanation to the user of the full set of
> >> > timeouts, what they mean, how we will default them, and when to
> >>override
> >> > which.
> >> >
> >> > I know this is a hassle but I think the end experience will be a lot
> >> better
> >> > if we go through this thought process.
> >> >
> >> > -Jay
> >> >
> >> > On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin
> >><j...@linkedin.com.invalid
> >> >
> >> > wrote:
> >> >
> >> > > I modified the WIKI page to incorporate the feedbacks from mailing
> >>list
> >> > > and KIP hangout.
> >> > >
> >> > > - Added the deprecation plan for TIMEOUT_CONFIG
> >> > > - Added the actions to take after request timeout
> >> > >
> >> > > I finally chose to create a new connection if requests timeout. The
> >> > reason
> >> > > is:
> >> > > 1. In most cases, if a broker is just slow, as long as we set
> >>request
> >> > > timeout to be a reasonable value, we should not see many new
> >> connections
> >> > > get created.
> >> > > 2. If a broker is down, hopefully metadata refresh will find the new
> >> > > broker and we will not try to reconnect to the broker anymore.
> >> > >
> >> > > Comments are welcome!
> >> > >
> >> > > Thanks.
> >> > >
> >> > > Jiangjie (Becket) Qin
> >> > >
> >> > > On 5/12/15, 2:59 PM, "Mayuresh Gharat" <gharatmayures...@gmail.com>
> >> > wrote:
> >> > >
> >> > > >+1 Becket. That would give enough time for clients to move. We
> >>should
> >> > make
> >> > > >this change very clear.
> >> > > >
> >> > > >Thanks,
> >> > > >
> >> > > >Mayuresh
> >> > > >
> >> > > >On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin
> >> <j...@linkedin.com.invalid
> >> > >
> >> > > >wrote:
> >> > > >
> >> > > >> Hey Ewen,
> >> > > >>
> >> > > >> Very good summary about the compatibility. What you proposed
> >>makes
> >> > > >>sense.
> >> > > >> So basically we can do the following:
> >> > > >>
> >> > > >> In next release, i.e. 0.8.3:
> >> > > >> 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”)
> >> > > >> 2. Mark TIMEOUT_CONFIG as deprecated
> >> > > >> 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it
> >>is
> >> > > >> defined and give a warning about deprecation.
> >> > > >> In the release after 0.8.3, we remove TIMEOUT_CONFIG.
> >> > > >>
> >> > > >> This should give enough buffer for this change.
> >> > > >>
> >> > > >> Request timeout is a complete new thing we add to fix a bug, I’m
> >> with
> >> > > >>you
> >> > > >> it does not make sense to have it maintain the old buggy
> >>behavior.
> >> So
> >> > we
> >> > > >> can set it to a reasonable value instead of infinite.
> >> > > >>
> >> > > >> Jiangjie (Becket) Qin
> >> > > >>
> >> > > >> On 5/12/15, 12:03 PM, "Ewen Cheslack-Postava" <e...@confluent.io
> >
> >> > > wrote:
> >> > > >>
> >> > > >> >I think my confusion is coming from this:
> >> > > >> >
> >> > > >> >> So in this KIP, we only address (3). The only public interface
> >> > change
> >> > > >> >>is a
> >> > > >> >> new configuration of request timeout (and maybe change the
> >> > > >>configuration
> >> > > >> >> name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
> >> > > >> >
> >> > > >> >There are 3 possible compatibility issues I see here:
> >> > > >> >
> >> > > >> >* I assumed this meant the constants also change, so
> >>"timeout.ms"
> >> > > >>becomes
> >> > > >> >"
> >> > > >> >replication.timeout.ms". This breaks config files that worked
> on
> >> the
> >> > > >> >previous version and the only warning would be in release
> >>notes. We
> >> > do
> >> > > >> >warn
> >> > > >> >about unused configs so they might notice the problem.
> >> > > >> >
> >> > > >> >* Binary and source compatibility if someone configures their
> >> client
> >> > in
> >> > > >> >code and uses the TIMEOUT_CONFIG variable. Renaming it will
> >>cause
> >> > > >>existing
> >> > > >> >jars to break if you try to run against an updated client (which
> >> > seems
> >> > > >>not
> >> > > >> >very significant since I doubt people upgrade these without
> >> > recompiling
> >> > > >> >but
> >> > > >> >maybe I'm wrong about that). And it breaks builds without have
> >> > > >>deprecated
> >> > > >> >that field first, which again, is probably not the biggest issue
> >> but
> >> > is
> >> > > >> >annoying for users and when we accidentally changed the API we
> >> > > >>received a
> >> > > >> >complaint about breaking builds.
> >> > > >> >
> >> > > >> >* Behavior compatibility as Jay mentioned on the call -- setting
> >> the
> >> > > >> >config
> >> > > >> >(even if the name changed) doesn't have the same effect it used
> >>to.
> >> > > >> >
> >> > > >> >One solution, which admittedly is more painful to implement and
> >> > > >>maintain,
> >> > > >> >would be to maintain the timeout.ms config, have it override
> the
> >> > > others
> >> > > >> if
> >> > > >> >it is specified (including an infinite request timeout I
> >>guess?),
> >> and
> >> > > >>if
> >> > > >> >it
> >> > > >> >isn't specified, we can just use the new config variables.
> >>Given a
> >> > real
> >> > > >> >deprecation schedule, users would have better warning of changes
> >> and
> >> > a
> >> > > >> >window to make the changes.
> >> > > >> >
> >> > > >> >I actually think it might not be necessary to maintain the old
> >> > behavior
> >> > > >> >precisely, although maybe for some code it is an issue if they
> >> start
> >> > > >> >seeing
> >> > > >> >timeout exceptions that they wouldn't have seen before?
> >> > > >> >
> >> > > >> >-Ewen
> >> > > >> >
> >> > > >> >On Wed, May 6, 2015 at 6:06 PM, Jun Rao <j...@confluent.io>
> >>wrote:
> >> > > >> >
> >> > > >> >> Jiangjie,
> >> > > >> >>
> >> > > >> >> Yes, I think using metadata timeout to expire batches in the
> >> record
> >> > > >> >> accumulator makes sense.
> >> > > >> >>
> >> > > >> >> Thanks,
> >> > > >> >>
> >> > > >> >> Jun
> >> > > >> >>
> >> > > >> >> On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin
> >> > > >> >><j...@linkedin.com.invalid>
> >> > > >> >> wrote:
> >> > > >> >>
> >> > > >> >> > I incorporated Ewen and Guozhang’s comments in the KIP page.
> >> Want
> >> > > >>to
> >> > > >> >> speed
> >> > > >> >> > up on this KIP because currently we experience mirror-maker
> >> hung
> >> > > >>very
> >> > > >> >> > likely when a broker is down.
> >> > > >> >> >
> >> > > >> >> > I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used
> >> > > >>metadata
> >> > > >> >> > timeout to expire the batches which are sitting in
> >>accumulator
> >> > > >>without
> >> > > >> >> > leader info. I did that because the situation there is
> >> > essentially
> >> > > >> >> missing
> >> > > >> >> > metadata.
> >> > > >> >> >
> >> > > >> >> > As a summary of what I am thinking about the timeout in new
> >> > > >>Producer:
> >> > > >> >> >
> >> > > >> >> > 1. Metadata timeout:
> >> > > >> >> >   - used in send(), blocking
> >> > > >> >> >   - used in accumulator to expire batches with timeout
> >> exception.
> >> > > >> >> > 2. Linger.ms
> >> > > >> >> >   - Used in accumulator to ready the batch for drain
> >> > > >> >> > 3. Request timeout
> >> > > >> >> >   - Used in NetworkClient to expire a batch and retry if no
> >> > > >>response
> >> > > >> >>is
> >> > > >> >> > received for a request before timeout.
> >> > > >> >> >
> >> > > >> >> > So in this KIP, we only address (3). The only public
> >>interface
> >> > > >>change
> >> > > >> >>is
> >> > > >> >> a
> >> > > >> >> > new configuration of request timeout (and maybe change the
> >> > > >> >>configuration
> >> > > >> >> > name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG).
> >> > > >> >> >
> >> > > >> >> > Would like to see what people think of above approach?
> >> > > >> >> >
> >> > > >> >> > Jiangjie (Becket) Qin
> >> > > >> >> >
> >> > > >> >> > On 4/20/15, 6:02 PM, "Jiangjie Qin" <j...@linkedin.com>
> >>wrote:
> >> > > >> >> >
> >> > > >> >> > >Jun,
> >> > > >> >> > >
> >> > > >> >> > >I thought a little bit differently on this.
> >> > > >> >> > >Intuitively, I am thinking that if a partition is offline,
> >>the
> >> > > >> >>metadata
> >> > > >> >> > >for that partition should be considered not ready because
> >>we
> >> > don’t
> >> > > >> >>know
> >> > > >> >> > >which broker we should send the message to. So those sends
> >> need
> >> > > >>to be
> >> > > >> >> > >blocked on metadata timeout.
> >> > > >> >> > >Another thing I’m wondering is in which scenario an offline
> >> > > >>partition
> >> > > >> >> will
> >> > > >> >> > >become online again in a short period of time and how
> >>likely
> >> it
> >> > > >>will
> >> > > >> >> > >occur. My understanding is that the batch timeout for
> >>batches
> >> > > >> >>sitting in
> >> > > >> >> > >accumulator should be larger than linger.ms but should not
> >>be
> >> > too
> >> > > >> >>long
> >> > > >> >> > >(e.g. less than 60 seconds). Otherwise it will exhaust the
> >> > shared
> >> > > >> >>buffer
> >> > > >> >> > >with batches to be aborted.
> >> > > >> >> > >
> >> > > >> >> > >That said, I do agree it is reasonable to buffer the
> >>message
> >> for
> >> > > >>some
> >> > > >> >> time
> >> > > >> >> > >so messages to other partitions can still get sent. But
> >>adding
> >> > > >> >>another
> >> > > >> >> > >expiration in addition to linger.ms - which is essentially
> >>a
> >> > > >>timeout
> >> > > >> >>-
> >> > > >> >> > >sounds a little bit confusing. Maybe we can do this, let
> >>the
> >> > batch
> >> > > >> >>sit
> >> > > >> >> in
> >> > > >> >> > >accumulator up to linger.ms, then fail it if necessary.
> >> > > >> >> > >
> >> > > >> >> > >What do you think?
> >> > > >> >> > >
> >> > > >> >> > >Thanks,
> >> > > >> >> > >
> >> > > >> >> > >Jiangjie (Becket) Qin
> >> > > >> >> > >
> >> > > >> >> > >On 4/20/15, 1:11 PM, "Jun Rao" <j...@confluent.io> wrote:
> >> > > >> >> > >
> >> > > >> >> > >>Jiangjie,
> >> > > >> >> > >>
> >> > > >> >> > >>Allowing messages to be accumulated in an offline
> >>partition
> >> > > >>could be
> >> > > >> >> > >>useful
> >> > > >> >> > >>since the partition may become available before the
> >>request
> >> > > >>timeout
> >> > > >> >>or
> >> > > >> >> > >>linger time is reached. Now that we are planning to add a
> >>new
> >> > > >> >>timeout,
> >> > > >> >> it
> >> > > >> >> > >>would be useful to think through whether/how that applies
> >>to
> >> > > >> >>messages
> >> > > >> >> in
> >> > > >> >> > >>the accumulator too.
> >> > > >> >> > >>
> >> > > >> >> > >>Thanks,
> >> > > >> >> > >>
> >> > > >> >> > >>Jun
> >> > > >> >> > >>
> >> > > >> >> > >>
> >> > > >> >> > >>On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin
> >> > > >> >> <j...@linkedin.com.invalid
> >> > > >> >> > >
> >> > > >> >> > >>wrote:
> >> > > >> >> > >>
> >> > > >> >> > >>> Hi Harsha,
> >> > > >> >> > >>>
> >> > > >> >> > >>> Took a quick look at the patch. I think it is still a
> >> little
> >> > > >>bit
> >> > > >> >> > >>> different. KAFKA-1788 only handles the case where a
> >>batch
> >> > > >>sitting
> >> > > >> >>in
> >> > > >> >> > >>> accumulator for too long. The KIP is trying to solve the
> >> > issue
> >> > > >> >>where
> >> > > >> >> a
> >> > > >> >> > >>> batch has already been drained from accumulator and
> >>sent to
> >> > > >> >>broker.
> >> > > >> >> > >>> We might be able to apply timeout on batch level to
> >>merge
> >> > those
> >> > > >> >>two
> >> > > >> >> > >>>cases
> >> > > >> >> > >>> as Ewen suggested. But I’m not sure if it is a good
> >>idea to
> >> > > >>allow
> >> > > >> >> > >>>messages
> >> > > >> >> > >>> whose target partition is offline to sit in accumulator
> >>in
> >> > the
> >> > > >> >>first
> >> > > >> >> > >>>place.
> >> > > >> >> > >>>
> >> > > >> >> > >>> Jiangjie (Becket) Qin
> >> > > >> >> > >>>
> >> > > >> >> > >>> On 4/16/15, 10:19 AM, "Sriharsha Chintalapani"
> >> > > >><ka...@harsha.io>
> >> > > >> >> > wrote:
> >> > > >> >> > >>>
> >> > > >> >> > >>> >Guozhang and Jiangjie,
> >> > > >> >> > >>> >                 Isn’t this work being covered in
> >> > > >> >> > >>> >https://issues.apache.org/jira/browse/KAFKA-1788 . Can
> >> you
> >> > > >> please
> >> > > >> >> the
> >> > > >> >> > >>> >review the patch there.
> >> > > >> >> > >>> >Thanks,
> >> > > >> >> > >>> >Harsha
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >On April 15, 2015 at 10:39:40 PM, Guozhang Wang
> >> > > >> >>(wangg...@gmail.com
> >> > > >> >> )
> >> > > >> >> > >>> >wrote:
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >Thanks for the update Jiangjie,
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >I think it is actually NOT expected that hardware
> >> > > >>disconnection
> >> > > >> >>will
> >> > > >> >> > >>>be
> >> > > >> >> > >>> >detected by the selector, but rather will only be
> >>revealed
> >> > > >>upon
> >> > > >> >>TCP
> >> > > >> >> > >>> >timeout, which could be hours.
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >A couple of comments on the wiki:
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >1. "For KafkaProducer.close() and
> >>KafkaProducer.flush() we
> >> > > >>need
> >> > > >> >>the
> >> > > >> >> > >>> >request
> >> > > >> >> > >>> >timeout as implict timeout." I am not very clear what
> >>does
> >> > > >>this
> >> > > >> >> mean?
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >2. Currently the producer already has a
> >>"TIMEOUT_CONFIG"
> >> > which
> >> > > >> >> should
> >> > > >> >> > >>> >really be "REPLICATION_TIMEOUT_CONFIG". So if we
> >>decide to
> >> > > >>add "
> >> > > >> >> > >>> >REQUEST_TIMEOUT_CONFIG", I suggest we also make this
> >> > renaming:
> >> > > >> >> > >>>admittedly
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >it will change the config names but will reduce
> >>confusions
> >> > > >>moving
> >> > > >> >> > >>> >forward.
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >Guozhang
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
> >> > > >> >> > >>><j...@linkedin.com.invalid>
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >wrote:
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >> Checked the code again. It seems that the
> >>disconnected
> >> > > >>channel
> >> > > >> >>is
> >> > > >> >> > >>>not
> >> > > >> >> > >>> >> detected by selector as expected.
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >> Currently we are depending on the
> >> > > >> >> > >>> >> o.a.k.common.network.Selector.disconnected set to
> >>see if
> >> > we
> >> > > >> >>need
> >> > > >> >> to
> >> > > >> >> > >>>do
> >> > > >> >> > >>> >> something for a disconnected channel.
> >> > > >> >> > >>> >> However Selector.disconnected set is only updated
> >>when:
> >> > > >> >> > >>> >> 1. A write/read/connect to channel failed.
> >> > > >> >> > >>> >> 2. A Key is canceled
> >> > > >> >> > >>> >> However when a broker is down before it sends back
> >>the
> >> > > >> >>response,
> >> > > >> >> the
> >> > > >> >> > >>> >> client seems not be able to detect this failure.
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >> I did a simple test below:
> >> > > >> >> > >>> >> 1. Run a selector on one machine and an echo server
> >>on
> >> > > >>another
> >> > > >> >> > >>>machine.
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >> Connect a selector to an echo server
> >> > > >> >> > >>> >> 2. Send a message to echo server using selector, then
> >> let
> >> > > >>the
> >> > > >> >> > >>>selector
> >> > > >> >> > >>> >> poll() every 10 seconds.
> >> > > >> >> > >>> >> 3. After the sever received the message, unplug
> >>cable on
> >> > the
> >> > > >> >>echo
> >> > > >> >> > >>> >>server.
> >> > > >> >> > >>> >> 4. After waiting for 45 min. The selector still did
> >>not
> >> > > >> >>detected
> >> > > >> >> the
> >> > > >> >> > >>> >> network failure.
> >> > > >> >> > >>> >> Lsof on selector machine shows that the TCP
> >>connection
> >> is
> >> > > >>still
> >> > > >> >> > >>> >>considered
> >> > > >> >> > >>> >> ESTABLISHED.
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >> I’m not sure in this case what should we expect from
> >>the
> >> > > >> >> > >>> >> java.nio.channels.Selector. According to the
> >>document,
> >> the
> >> > > >> >> selector
> >> > > >> >> > >>> >>does
> >> > > >> >> > >>> >> not verify the status of the associated channel. In
> >>my
> >> > test
> >> > > >> >>case
> >> > > >> >> it
> >> > > >> >> > >>> >>looks
> >> > > >> >> > >>> >> even worse that OS did not think of the socket has
> >>been
> >> > > >> >> > >>>disconnected.
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >> Anyway. It seems adding the client side request
> >>timeout
> >> is
> >> > > >> >> > >>>necessary.
> >> > > >> >> > >>> >>I’ve
> >> > > >> >> > >>> >> updated the KIP page to clarify the problem we want
> >>to
> >> > solve
> >> > > >> >> > >>>according
> >> > > >> >> > >>> >>to
> >> > > >> >> > >>> >> Ewen’s comments.
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >> Thanks.
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >> Jiangjie (Becket) Qin
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava"
> >> > > >> >><e...@confluent.io>
> >> > > >> >> > >>>wrote:
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin
> >> > > >> >> > >>> >><j...@linkedin.com.invalid>
> >> > > >> >> > >>> >> >wrote:
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >> Hi Ewen, thanks for the comments. Very good
> >>points!
> >> > > >>Please
> >> > > >> >>see
> >> > > >> >> > >>> >>replies
> >> > > >> >> > >>> >> >> inline.
> >> > > >> >> > >>> >> >>
> >> > > >> >> > >>> >> >>
> >> > > >> >> > >>> >> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" <
> >> > > >> >> e...@confluent.io
> >> > > >> >> > >
> >> > > >> >> > >>> >> wrote:
> >> > > >> >> > >>> >> >>
> >> > > >> >> > >>> >> >> >Jiangjie,
> >> > > >> >> > >>> >> >> >
> >> > > >> >> > >>> >> >> >Great start. I have a couple of comments.
> >> > > >> >> > >>> >> >> >
> >> > > >> >> > >>> >> >> >Under the motivation section, is it really true
> >>that
> >> > the
> >> > > >> >> request
> >> > > >> >> > >>> >>will
> >> > > >> >> > >>> >> >> >never
> >> > > >> >> > >>> >> >> >be completed? Presumably if the broker goes down
> >>the
> >> > > >> >> connection
> >> > > >> >> > >>> >>will be
> >> > > >> >> > >>> >> >> >severed, at worst by a TCP timeout, which should
> >> clean
> >> > > >>up
> >> > > >> >>the
> >> > > >> >> > >>> >> >>connection
> >> > > >> >> > >>> >> >> >and any outstanding requests, right? I think the
> >> real
> >> > > >> >>reason
> >> > > >> >> we
> >> > > >> >> > >>> >>need a
> >> > > >> >> > >>> >> >> >different timeout is that the default TCP
> >>timeouts
> >> are
> >> > > >> >> > >>>ridiculously
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >> >>long
> >> > > >> >> > >>> >> >> >in
> >> > > >> >> > >>> >> >> >this context.
> >> > > >> >> > >>> >> >> Yes, when broker is completely down the request
> >> should
> >> > be
> >> > > >> >> cleared
> >> > > >> >> > >>>as
> >> > > >> >> > >>> >>you
> >> > > >> >> > >>> >> >> said. The case we encountered looks like the
> >>broker
> >> was
> >> > > >>just
> >> > > >> >> not
> >> > > >> >> > >>> >> >> responding but TCP connection was still alive
> >>though.
> >> > > >> >> > >>> >> >>
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >Ok, that makes sense.
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >>
> >> > > >> >> > >>> >> >> >
> >> > > >> >> > >>> >> >> >My second question is about whether this is the
> >> right
> >> > > >> >>level to
> >> > > >> >> > >>> >>tackle
> >> > > >> >> > >>> >> >>the
> >> > > >> >> > >>> >> >> >issue/what user-facing changes need to be made. A
> >> > > >>related
> >> > > >> >> > >>>problem
> >> > > >> >> > >>> >>came
> >> > > >> >> > >>> >> >>up
> >> > > >> >> > >>> >> >> >in
> >>https://issues.apache.org/jira/browse/KAFKA-1788
> >> > > >>where
> >> > > >> >> > >>>producer
> >> > > >> >> > >>> >> >> records
> >> > > >> >> > >>> >> >> >get stuck indefinitely because there's no
> >> client-side
> >> > > >> >>timeout.
> >> > > >> >> > >>>This
> >> > > >> >> > >>> >>KIP
> >> > > >> >> > >>> >> >> >wouldn't fix that problem or any problems caused
> >>by
> >> > > >>lack of
> >> > > >> >> > >>> >> >>connectivity
> >> > > >> >> > >>> >> >> >since this would only apply to in flight
> >>requests,
> >> > > >>which by
> >> > > >> >> > >>> >>definition
> >> > > >> >> > >>> >> >> >must
> >> > > >> >> > >>> >> >> >have been sent on an active connection.
> >> > > >> >> > >>> >> >> >
> >> > > >> >> > >>> >> >> >I suspect both types of problems probably need
> >>to be
> >> > > >> >>addressed
> >> > > >> >> > >>> >> >>separately
> >> > > >> >> > >>> >> >> >by introducing explicit timeouts. However,
> >>because
> >> the
> >> > > >> >> settings
> >> > > >> >> > >>> >> >>introduced
> >> > > >> >> > >>> >> >> >here are very much about the internal
> >> implementations
> >> > of
> >> > > >> >>the
> >> > > >> >> > >>> >>clients,
> >> > > >> >> > >>> >> >>I'm
> >> > > >> >> > >>> >> >> >wondering if this even needs to be a user-facing
> >> > > >>setting,
> >> > > >> >> > >>> >>especially
> >> > > >> >> > >>> >> >>if we
> >> > > >> >> > >>> >> >> >have to add other timeouts anyway. For example,
> >> would
> >> > a
> >> > > >> >>fixed,
> >> > > >> >> > >>> >>generous
> >> > > >> >> > >>> >> >> >value that's still much shorter than a TCP
> >>timeout,
> >> > say
> >> > > >> >>15s,
> >> > > >> >> be
> >> > > >> >> > >>> >>good
> >> > > >> >> > >>> >> >> >enough? If other timeouts would allow, for
> >>example,
> >> > the
> >> > > >> >> clients
> >> > > >> >> > >>>to
> >> > > >> >> > >>> >> >> >properly
> >> > > >> >> > >>> >> >> >exit even if requests have not hit their timeout,
> >> then
> >> > > >> >>what's
> >> > > >> >> > >>>the
> >> > > >> >> > >>> >> >>benefit
> >> > > >> >> > >>> >> >> >of being able to configure the request-level
> >> timeout?
> >> > > >> >> > >>> >> >> That is a very good point. We have three places
> >>that
> >> we
> >> > > >> >>might
> >> > > >> >> be
> >> > > >> >> > >>> >>able to
> >> > > >> >> > >>> >> >> enforce timeout for a message send:
> >> > > >> >> > >>> >> >> 1. Before append to accumulator - handled by
> >>metadata
> >> > > >> >>timeout
> >> > > >> >> on
> >> > > >> >> > >>>per
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >> >> message level.
> >> > > >> >> > >>> >> >> 2. Batch of messages inside accumulator - no
> >>timeout
> >> > > >> >>mechanism
> >> > > >> >> > >>>now.
> >> > > >> >> > >>> >> >> 3. Request of batches after messages leave the
> >> > > >>accumulator
> >> > > >> >>- we
> >> > > >> >> > >>>have
> >> > > >> >> > >>> >>a
> >> > > >> >> > >>> >> >> broker side timeout but no client side timeout for
> >> now.
> >> > > >> >> > >>> >> >> My current proposal only address (3) but not (2).
> >> > > >> >> > >>> >> >> Honestly I do not have a very clear idea about
> >>what
> >> > > >>should
> >> > > >> >>we
> >> > > >> >> do
> >> > > >> >> > >>> >>with
> >> > > >> >> > >>> >> >>(2)
> >> > > >> >> > >>> >> >> right now. But I am with you that we should not
> >> expose
> >> > > >>too
> >> > > >> >>many
> >> > > >> >> > >>> >> >> configurations to users. What I am thinking now to
> >> > handle
> >> > > >> >>(2)
> >> > > >> >> is
> >> > > >> >> > >>> >>when
> >> > > >> >> > >>> >> >>user
> >> > > >> >> > >>> >> >> call send, if we know that a partition is
> >>offline, we
> >> > > >>should
> >> > > >> >> > >>>throw
> >> > > >> >> > >>> >> >> exception immediately instead of putting it into
> >> > > >> >>accumulator.
> >> > > >> >> > >>>This
> >> > > >> >> > >>> >>would
> >> > > >> >> > >>> >> >> protect further memory consumption. We might also
> >> want
> >> > to
> >> > > >> >>fail
> >> > > >> >> > >>>all
> >> > > >> >> > >>> >>the
> >> > > >> >> > >>> >> >> batches in the dequeue once we found a partition
> >>is
> >> > > >>offline.
> >> > > >> >> That
> >> > > >> >> > >>> >> >>said, I
> >> > > >> >> > >>> >> >> feel timeout might not be quite applicable to (2).
> >> > > >> >> > >>> >> >> Do you have any suggestion on this?
> >> > > >> >> > >>> >> >>
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >Right, I didn't actually mean to solve 2 here, but
> >>was
> >> > > >>trying
> >> > > >> >>to
> >> > > >> >> > >>> >>figure
> >> > > >> >> > >>> >> >out
> >> > > >> >> > >>> >> >if a solution to 2 would reduce what we needed to
> >>do to
> >> > > >> >>address
> >> > > >> >> 3.
> >> > > >> >> > >>> >>(And
> >> > > >> >> > >>> >> >depending on how they are implemented, fixing 1
> >>might
> >> > also
> >> > > >> >> address
> >> > > >> >> > >>>2).
> >> > > >> >> > >>> >>It
> >> > > >> >> > >>> >> >sounds like you hit hang that I wasn't really
> >> expecting.
> >> > > >>This
> >> > > >> >> > >>>probably
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >> >just
> >> > > >> >> > >>> >> >means the KIP motivation needs to be a bit clearer
> >> about
> >> > > >>what
> >> > > >> >> type
> >> > > >> >> > >>>of
> >> > > >> >> > >>> >> >situation this addresses. The cause of the hang may
> >> also
> >> > be
> >> > > >> >> > >>>relevant
> >> > > >> >> > >>> >>-- if
> >> > > >> >> > >>> >> >it was something like a deadlock then that's
> >>something
> >> > that
> >> > > >> >> should
> >> > > >> >> > >>> >>just be
> >> > > >> >> > >>> >> >fixed, but if it's something outside our control
> >>then a
> >> > > >> >>timeout
> >> > > >> >> > >>>makes
> >> > > >> >> > >>> >>a
> >> > > >> >> > >>> >> >lot
> >> > > >> >> > >>> >> >more sense.
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >> >
> >> > > >> >> > >>> >> >> >I know we have a similar setting,
> >> > > >> >> > >>> >> >>max.in.flights.requests.per.connection,
> >> > > >> >> > >>> >> >> >exposed publicly (which I just discovered is
> >>missing
> >> > > >>from
> >> > > >> >>the
> >> > > >> >> > >>>new
> >> > > >> >> > >>> >> >>producer
> >> > > >> >> > >>> >> >> >configs documentation). But it looks like the new
> >> > > >>consumer
> >> > > >> >>is
> >> > > >> >> > >>>not
> >> > > >> >> > >>> >> >>exposing
> >> > > >> >> > >>> >> >> >that option, using a fixed value instead. I
> >>think we
> >> > > >>should
> >> > > >> >> > >>>default
> >> > > >> >> > >>> >>to
> >> > > >> >> > >>> >> >> >hiding these implementation values unless
> >>there's a
> >> > > >>strong
> >> > > >> >> case
> >> > > >> >> > >>>for
> >> > > >> >> > >>> >>a
> >> > > >> >> > >>> >> >> >scenario that requires customization.
> >> > > >> >> > >>> >> >> For producer,
> >>max.in.flight.requests.per.connection
> >> > > >>really
> >> > > >> >> > >>>matters.
> >> > > >> >> > >>> >>If
> >> > > >> >> > >>> >> >> people do not want to have reorder of messages,
> >>they
> >> > > >>have to
> >> > > >> >> use
> >> > > >> >> > >>> >> >> max.in.flight.requests.per.connection=1. On the
> >>other
> >> > > >>hand,
> >> > > >> >>if
> >> > > >> >> > >>> >> >>throughput
> >> > > >> >> > >>> >> >> is more of a concern, it could be set to higher.
> >>For
> >> > the
> >> > > >>new
> >> > > >> >> > >>> >>consumer, I
> >> > > >> >> > >>> >> >> checked the value and I am not sure if the hard
> >>coded
> >> > > >> >> > >>> >> >> max.in.flight.requests.per.connection=100 is the
> >> right
> >> > > >> >>value.
> >> > > >> >> > >>> >>Without
> >> > > >> >> > >>> >> >>the
> >> > > >> >> > >>> >> >> response to the previous request, what offsets
> >>should
> >> > be
> >> > > >>put
> >> > > >> >> into
> >> > > >> >> > >>> >>the
> >> > > >> >> > >>> >> >>next
> >> > > >> >> > >>> >> >> fetch request? It seems to me the value will be
> >>one
> >> > > >>natively
> >> > > >> >> > >>> >>regardless
> >> > > >> >> > >>> >> >>of
> >> > > >> >> > >>> >> >> the setting unless we are sending fetch request to
> >> > > >>different
> >> > > >> >> > >>> >>partitions,
> >> > > >> >> > >>> >> >> which does not look like the case.
> >> > > >> >> > >>> >> >> Anyway, it looks to be a separate issue
> >>orthogonal to
> >> > the
> >> > > >> >> request
> >> > > >> >> > >>> >> >>timeout.
> >> > > >> >> > >>> >> >>
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >>
> >> > > >> >> > >>> >> >> >In other words, since the only user-facing change
> >> was
> >> > > >>the
> >> > > >> >> > >>>addition
> >> > > >> >> > >>> >>of
> >> > > >> >> > >>> >> >>the
> >> > > >> >> > >>> >> >> >setting, I'm wondering if we can avoid the KIP
> >> > > >>altogether
> >> > > >> >>by
> >> > > >> >> > >>>just
> >> > > >> >> > >>> >> >>choosing
> >> > > >> >> > >>> >> >> >a good default value for the timeout.
> >> > > >> >> > >>> >> >> The problem is that we have a server side request
> >> > timeout
> >> > > >> >> exposed
> >> > > >> >> > >>>as
> >> > > >> >> > >>> >>a
> >> > > >> >> > >>> >> >> public configuration. We cannot set the client
> >> timeout
> >> > > >> >>smaller
> >> > > >> >> > >>>than
> >> > > >> >> > >>> >>that
> >> > > >> >> > >>> >> >> value, so a hard coded value probably won¹t work
> >> here.
> >> > > >> >> > >>> >> >>
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >That makes sense, although it's worth keeping in
> >>mind
> >> > that
> >> > > >> >>even
> >> > > >> >> if
> >> > > >> >> > >>>you
> >> > > >> >> > >>> >>use
> >> > > >> >> > >>> >> >"correct" values, they could still be violated due
> >>to,
> >> > > >>e.g.,
> >> > > >> >>a GC
> >> > > >> >> > >>> >>pause
> >> > > >> >> > >>> >> >that causes the broker to process a request after
> >>it is
> >> > > >> >>supposed
> >> > > >> >> to
> >> > > >> >> > >>> >>have
> >> > > >> >> > >>> >> >expired.
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >-Ewen
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >> >
> >> > > >> >> > >>> >> >> >-Ewen
> >> > > >> >> > >>> >> >> >
> >> > > >> >> > >>> >> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin
> >> > > >> >> > >>> >> >><j...@linkedin.com.invalid>
> >> > > >> >> > >>> >> >> >wrote:
> >> > > >> >> > >>> >> >> >
> >> > > >> >> > >>> >> >> >> Hi,
> >> > > >> >> > >>> >> >> >>
> >> > > >> >> > >>> >> >> >> I just created a KIP to add a request timeout
> >>to
> >> > > >> >> NetworkClient
> >> > > >> >> > >>> >>for
> >> > > >> >> > >>> >> >>new
> >> > > >> >> > >>> >> >> >> Kafka clients.
> >> > > >> >> > >>> >> >> >>
> >> > > >> >> > >>> >> >> >>
> >> > > >> >> > >>> >> >> >>
> >> > > >> >> > >>> >> >>
> >> > > >> >> > >>> >> >>
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >>
> >> > > >> >> > >>>
> >> > > >> >> > >>>
> >> > > >> >> >
> >> > > >>
> >> > > >>>>
> >> > >
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+reques
> >> > > >>>>t
> >> > > >> >> > >>>+
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >> >> >>timeout+to+NetworkClient
> >> > > >> >> > >>> >> >> >>
> >> > > >> >> > >>> >> >> >> Comments and suggestions are welcome!
> >> > > >> >> > >>> >> >> >>
> >> > > >> >> > >>> >> >> >> Thanks.
> >> > > >> >> > >>> >> >> >>
> >> > > >> >> > >>> >> >> >> Jiangjie (Becket) Qin
> >> > > >> >> > >>> >> >> >>
> >> > > >> >> > >>> >> >> >>
> >> > > >> >> > >>> >> >> >
> >> > > >> >> > >>> >> >> >
> >> > > >> >> > >>> >> >> >--
> >> > > >> >> > >>> >> >> >Thanks,
> >> > > >> >> > >>> >> >> >Ewen
> >> > > >> >> > >>> >> >>
> >> > > >> >> > >>> >> >>
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >
> >> > > >> >> > >>> >> >--
> >> > > >> >> > >>> >> >Thanks,
> >> > > >> >> > >>> >> >Ewen
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >>
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >
> >> > > >> >> > >>> >--
> >> > > >> >> > >>> >-- Guozhang
> >> > > >> >> > >>>
> >> > > >> >> > >>>
> >> > > >> >> > >
> >> > > >> >> >
> >> > > >> >> >
> >> > > >> >>
> >> > > >> >
> >> > > >> >
> >> > > >> >
> >> > > >> >--
> >> > > >> >Thanks,
> >> > > >> >Ewen
> >> > > >>
> >> > > >>
> >> > > >
> >> > > >
> >> > > >--
> >> > > >-Regards,
> >> > > >Mayuresh R. Gharat
> >> > > >(862) 250-7125
> >> > >
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -Regards,
> >> Mayuresh R. Gharat
> >> (862) 250-7125
> >>
>
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Reply via email to