Option 3 seems a lot better than previous options, especially from the
user's perspective. I think it gives reasonable balance between control and
fewer options, and the only implementation details it's exposing are that
there is a buffer and there is a network request. Making the request
timeout only start after enqueuing still allows you to compute a maximum
timeout for a request by adding the two values, but doesn't have annoying
artifacts like sometimes issuing a network request when there's only a
fraction of a millisecond left for it to complete.

REQUEST_TIMEOUT_DOC could probably add something about the retries, e.g.
something like "This timeout is per retry, so the maximum time spent
waiting for a request to complete will be (retries+1) *
network.request.timeout.ms".

There's also one other use of the metadata fetch timeout in partitionsFor.
Are we converting that to use MAX_ENQUEUE_TIMEOUT_MS_CONFIG? The naming is
a bit awkward, but we need to use something there.

Finally, just a nit, but the naming conventions for variables are getting
inconsistent. Some have _MS in them, some don't, and some of the _DOC names
are inconsistent with the _CONFIG names.

-Ewen


On Mon, Jun 1, 2015 at 9:44 PM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Bump up this thread.
>
> After several discussions in LinkedIn, we came up with three options. I
> have updated the KIP-19 wiki page to summarize the three options and
> stated our preference. We can discuss on them in tomorrow’s KIP hangout.
> Please let us know what do you think.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On 5/21/15, 5:54 PM, "Jiangjie Qin" <j...@linkedin.com> wrote:
>
> >Based on the discussion we have, I just updated the KIP with the following
> >proposal and want to see if there is further comments.
> >
> >The proposal is to have the following four timeout as end state.
> >
> >1. max.buffer.full.block.ms   - To replace block.on.buffer.full. The max
> >time to block when buffer is full.
> >2. metadata.fetch.timeout.ms  - reuse metadata timeout as
> batch.timeout.ms
> >because it is essentially metadata not available.
> >3. replication.timeout.ms     - It defines how long a server will wait
> for
> >the records to be replicated to followers.
> >4. network.request.timeout.ms - This timeout is used when producer sends
> >request to brokers through TCP connections. It specifies how long the
> >producer should wait for the response.
> >
> >With the above approach, we can achieve the following.
> >* We can have bounded blocking time for send() = (1) + (2).
> >* The time after send() until response got received is generally bounded
> >by linger.ms + (2) + (4), not taking retries into consideration.
> >
> >So from user’s perspective. Send() depends on metadata of a topic and
> >buffer space. I am not sure if user would really care about how long it
> >takes to receive the response because it is async anyway and we have so
> >many things to consider (retries, linger.ms, retry backoff time, request
> >timeout, etc).
> >
> >I think these configurations are clear enough to let user understand at
> >the first glance. Please let me know what do you think.
> >
> >Thanks.
> >
> >Jiangjie (Becket) Qin
> >
> >
> >
> >On 5/20/15, 9:55 AM, "Joel Koshy" <jjkosh...@gmail.com> wrote:
> >
> >>> The fact that I understand the producer internals and am still
> >>>struggling
> >>> to understand the implications of the different settings, how I would
> >>>set
> >>> them, and how they potentially interact such that I could set invalid
> >>> combinations seems like a red flag to me... Being able to say "I want
> >>> produce requests to timeout in 5s" shouldn't require adjusting 3 or 4
> >>> configs if the defaults would normally timeout out in something like
> >>>30s.
> >>>
> >>> Setting aside compatibility issues and focusing on the best set of
> >>>configs,
> >>> I agree with Jay that there are two things I actually want out of the
> >>>API.
> >>> The key thing is a per-request timeout, which should be enforced client
> >>> side. I would just expect this to follow the request through any
> >>>internals
> >>> so it can be enforced no matter where in the pipeline the request is.
> >>> Within each component in the pipeline we might have to compute how much
> >>> time we have left for the request in order to create a timeout within
> >>>that
> >>> setting. The second setting is to bound the amount of time spent
> >>>blocking
> >>> on send(). This is really an implementation detail, but one that people
> >>>are
> >>> complaining about enough that it seems worthwhile to provide control
> >>>over
> >>> it (and fixing it would just make that setting superfluous, not break
> >>> anything).
> >>>
> >>> Exposing a lot more settings also exposes a lot about the
> >>>implementation
> >>> and makes it harder to improve the implementation in the future, but I
> >>> don't think we have listed good use cases for setting each of them
> >>> individually. Why would the user specifically care about how much time
> >>>the
> >>> request spends in the accumulator vs. some other component (assuming
> >>>they
> >>> have the overall timeout)? Same for requests in flight, as long as I
> >>>have
> >>> that client side timeout? And if they care about what component is the
> >>> bottleneck, could that be better exposed by the exceptions that are
> >>> returned rather than a ton of different settings?
> >>
> >>Agreed with the above. I'm also extremely wary of configs that are
> >>inherently unintuitive, or can interact to yield unintuitive behavior.
> >>OTOH I think it is okay if a config is categorized as "advanced" or if
> >>it requires deeper knowledge of the internals of the producer (or the
> >>configured system in general). i.e., as long as we think long and hard
> >>and agree on necessity (driven by clear use cases) before adding such
> >>configs. We should also consider how we can simplify or even eliminate
> >>existing configs.
> >>
> >>Re: requests in flight may be a good example: Becket had given a valid
> >>use-case i.e., support strict ordering. Maybe we can replace it with a
> >>"enable.strict.ordering" config which is clearer in intent and would
> >>internally ensure only one in-flight request per partition and default
> >>to a fixed in-flight requests (say, five or 10) if set to false. If we
> >>implement idempotence then we won't even need that.
> >>
> >>> On Tue, May 19, 2015 at 7:13 PM, Jiangjie Qin
> >>><j...@linkedin.com.invalid>
> >>> wrote:
> >>>
> >>> > Hi Jay,
> >>> >
> >>> > I updated what I think int KIP wiki. Just a short summary here.
> >>>Because we
> >>> > need timeout for:
> >>> > 1. Send()
> >>> > 2. Batches in accumulator
> >>> > 3. Requests in flight.
> >>> > That means we need to have at least three configurations if we do not
> >>> > reuse configurations.
> >>> >
> >>> > I think we probably want to also separate the configurations for
> >>>exception
> >>> > handling and SLA purposes as well.
> >>> > My understanding of the configurations we are discussing here is they
> >>>are
> >>> > for exception handling but not for SLA purposes. It looks to me that
> >>> > exception handling is more component oriented while SLA is more of
> >>> > systematic tuning. What you suggested sounds more like to set
> >>> > configurations to meet a user defined SLA. I am not sure if this is
> >>>the
> >>> > things we want to do here.
> >>> >
> >>> > Thanks.
> >>> >
> >>> > Jiangjie (Becket) Qin
> >>> >
> >>> > On 5/19/15, 5:42 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote:
> >>> >
> >>> > >Yeah I think linger.ms remains separate, setting that is a
> >>>performance
> >>> > >optimization rather than failure handling thing. We should ideally
> >>>sanity
> >>> > >check this, though, in my proposal, since if they set linger.ms >
> >>> > >request.timeout then that won't work.
> >>> > >
> >>> > >It's true that in my proposal that the actual replication timeout we
> >>>set
> >>> > >on
> >>> > >the request would be non-deterministic. However the flip side of
> >>>that
> >>> > >argument is that in the existing proposal the actual time until an
> >>> > >acknowledgement is non-deterministic, right? So I think the argument
> >>>I am
> >>> > >trying to construct is that the two things the user cares about are
> >>>the
> >>> > >time to block and the time to ack and any other timeout we use
> >>>internally
> >>> > >is basically an implementation detail of ensuring this.
> >>> > >
> >>> > >Your point about the difference between batches and requests is a
> >>>good
> >>> > >one.
> >>> > >I hadn't thought of that. So to make my proposal  work we would need
> >>>to do
> >>> > >something like base the request time off the oldest batch. Let me
> >>>think
> >>> > >about the implications of that, it's definitely a problem.
> >>> > >
> >>> > >-Jay
> >>> > >
> >>> > >On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin
> >>><j...@linkedin.com.invalid
> >>> > >
> >>> > >wrote:
> >>> > >
> >>> > >> Hey Jay,
> >>> > >>
> >>> > >> That is also a viable solution.
> >>> > >>
> >>> > >> I think the main purpose is to let user know how long they can
> >>>block,
> >>> > >> which is important.
> >>> > >>
> >>> > >> I have some question over the proposal, though. Will user still
> >>>need to
> >>> > >> send linger.ms? Will request timeout cover linger.ms as well?
> >>> > >> My concern of letting request timeout also cover the time spent in
> >>> > >> accumulator is that this will result in the actually request
> >>>timeout
> >>> > >> indeterministic.
> >>> > >> Also, implementation wise, a request can have multiple batches,
> >>>the time
> >>> > >> spent in the accumulator could vary a lot. If one of the batch
> >>>times
> >>> > >>out,
> >>> > >> what should we do the the rest of the batches?
> >>> > >> I think we probably want to separate batch timeout and request
> >>>timeout.
> >>> > >>
> >>> > >> Maybe we can do this:
> >>> > >> Max.send.block.ms
> >>> > >> Request.timeout
> >>> > >> Batch.timeout
> >>> > >> Replication.timeout
> >>> > >>
> >>> > >> So in send() we use max.send.block.ms only. In accumulator, we
> use
> >>> > >> batch.timeout, in NetWorkClient, we use request.timeout.
> >>>Replication
> >>> > >> timeout is needed anyway.
> >>> > >>
> >>> > >> This looks more understandable from what I can see.
> >>> > >>
> >>> > >> What do you think?
> >>> > >>
> >>> > >> Jiangjie (Becket) Qin
> >>> > >>
> >>> > >> On 5/19/15, 11:48 AM, "Jay Kreps" <jay.kr...@gmail.com> wrote:
> >>> > >>
> >>> > >> >So the alternative to consider would be to instead have
> >>> > >> >   max.block.ms (or something)
> >>> > >> >   request.timeout
> >>> > >> >   replication.timeout
> >>> > >> >
> >>> > >> >I think this better captures what the user cares about. Here is
> >>>how it
> >>> > >> >would work.
> >>> > >> >
> >>> > >> >*max.send.block.ms <http://max.send.block.ms>* is the bound on
> >>>the
> >>> > >> maximum
> >>> > >> >time the producer.send() call can block.
> >>> > >> >This subsumes the existing metadata timeout use case but not the
> >>> > >>proposed
> >>> > >> >use for the time in the accumulator. It *also* acts as a bound on
> >>>the
> >>> > >>time
> >>> > >> >you can block on BufferPool allocation (we'd have to add this but
> >>>that
> >>> > >> >should be easy).
> >>> > >> >
> >>> > >> >*request.timeout* is the bound on the time after send() complete
> >>>until
> >>> > >>you
> >>> > >> >get an acknowledgement. This covers the connection timeout, and
> >>>the
> >>> > >>time
> >>> > >> >in
> >>> > >> >the accumulator. So to implement this, the time we set in the
> >>>request
> >>> > >>sent
> >>> > >> >via NetworkClient would have already subtracted off the time
> >>>spent in
> >>> > >>the
> >>> > >> >accumulator, and if the request retried we would include both the
> >>>time
> >>> > >>in
> >>> > >> >the accumulator an the time taken for the first request, etc. In
> >>>other
> >>> > >> >words this is the upper bound on the time to the Future being
> >>> > >>satisfied.
> >>> > >> >
> >>> > >> >*replication.timeout* will default to something reasonable but
> >>>maybe
> >>> > >>you
> >>> > >> >can override it if you want?
> >>> > >> >
> >>> > >> >Thoughts?
> >>> > >> >
> >>> > >> >-Jay
> >>> > >> >
> >>> > >> >On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat <
> >>> > >> >gharatmayures...@gmail.com> wrote:
> >>> > >> >
> >>> > >> >> So what I understand is that, we would have 3 time outs :
> >>> > >> >> 1) replication timeout
> >>> > >> >> 2) request timeout
> >>> > >> >> 3) metadata timeout (existing)
> >>> > >> >>
> >>> > >> >> The request timeout has to be greater than the replication
> >>>timeout.
> >>> > >> >> request timeout is for messages already sent to kafka and the
> >>> > >>producer
> >>> > >> >>is
> >>> > >> >> waiting for them.
> >>> > >> >>
> >>> > >> >> Thanks,
> >>> > >> >>
> >>> > >> >> Mayuresh
> >>> > >> >>
> >>> > >> >> On Tue, May 19, 2015 at 11:12 AM, Jay Kreps
> >>><jay.kr...@gmail.com>
> >>> > >> wrote:
> >>> > >> >>
> >>> > >> >> > I think this looks good. What I think is missing is an
> >>>overview of
> >>> > >>the
> >>> > >> >> > timeouts from the user's perspective.
> >>> > >> >> >
> >>> > >> >> > My worry is that it is quite complicated to reason about the
> >>> > >>current
> >>> > >> >>set
> >>> > >> >> of
> >>> > >> >> > timeouts. Currently we have
> >>> > >> >> >    timeout.ms
> >>> > >> >> >    metadata.fetch.timeout.ms
> >>> > >> >> >
> >>> > >> >> > The proposed settings I think are:
> >>> > >> >> >   batch.expiration.ms
> >>> > >> >> > request.timeout.ms
> >>> > >> >> > replication.timeout.ms
> >>> > >> >> >
> >>> > >> >> > I think maybe we can skip the batch.expiration.ms. Instead
> >>>maybe
> >>> > we
> >>> > >> >>can
> >>> > >> >> > somehow combine these into a single request timeout so that
> >>>we
> >>> > >> >>subtract
> >>> > >> >> the
> >>> > >> >> > time you spent waiting from the request timeout and/or
> >>>replication
> >>> > >> >> timeout
> >>> > >> >> > somehow? I don't have an explicit proposal but my suspicion
> >>>is that
> >>> > >> >>from
> >>> > >> >> > the user's point of view there is just one timeout related to
> >>>the
> >>> > >> >>request
> >>> > >> >> > after which they don't care, and we can split that up between
> >>>the
> >>> > >> >>batch
> >>> > >> >> > time and the request time. Thoughts?
> >>> > >> >> >
> >>> > >> >> > How are we handling connection timeouts? If a machine hard
> >>>fails in
> >>> > >> >>the
> >>> > >> >> > middle of connection establishment there will be no
> >>>outstanding
> >>> > >> >> requests. I
> >>> > >> >> > think this may be okay because connections are established
> >>>when we
> >>> > >> >>want
> >>> > >> >> to
> >>> > >> >> > send a request and presumably we will begin the timer then?
> >>> > >> >> >
> >>> > >> >> > To that end I suggest we do two things:
> >>> > >> >> > 1. Include KAKFA-1788. I know that technically these two
> >>>things are
> >>> > >> >> > different but from the user's point of view they aren't.
> >>> > >> >> > 2. Include in the KIP the explanation to the user of the full
> >>>set
> >>> > >>of
> >>> > >> >> > timeouts, what they mean, how we will default them, and when
> >>>to
> >>> > >> >>override
> >>> > >> >> > which.
> >>> > >> >> >
> >>> > >> >> > I know this is a hassle but I think the end experience will
> >>>be a
> >>> > >>lot
> >>> > >> >> better
> >>> > >> >> > if we go through this thought process.
> >>> > >> >> >
> >>> > >> >> > -Jay
> >>> > >> >> >
> >>> > >> >> > On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin
> >>> > >> >><j...@linkedin.com.invalid
> >>> > >> >> >
> >>> > >> >> > wrote:
> >>> > >> >> >
> >>> > >> >> > > I modified the WIKI page to incorporate the feedbacks from
> >>> > >>mailing
> >>> > >> >>list
> >>> > >> >> > > and KIP hangout.
> >>> > >> >> > >
> >>> > >> >> > > - Added the deprecation plan for TIMEOUT_CONFIG
> >>> > >> >> > > - Added the actions to take after request timeout
> >>> > >> >> > >
> >>> > >> >> > > I finally chose to create a new connection if requests
> >>>timeout.
> >>> > >>The
> >>> > >> >> > reason
> >>> > >> >> > > is:
> >>> > >> >> > > 1. In most cases, if a broker is just slow, as long as we
> >>>set
> >>> > >> >>request
> >>> > >> >> > > timeout to be a reasonable value, we should not see many
> >>>new
> >>> > >> >> connections
> >>> > >> >> > > get created.
> >>> > >> >> > > 2. If a broker is down, hopefully metadata refresh will
> >>>find the
> >>> > >>new
> >>> > >> >> > > broker and we will not try to reconnect to the broker
> >>>anymore.
> >>> > >> >> > >
> >>> > >> >> > > Comments are welcome!
> >>> > >> >> > >
> >>> > >> >> > > Thanks.
> >>> > >> >> > >
> >>> > >> >> > > Jiangjie (Becket) Qin
> >>> > >> >> > >
> >>> > >> >> > > On 5/12/15, 2:59 PM, "Mayuresh Gharat"
> >>> > >><gharatmayures...@gmail.com>
> >>> > >> >> > wrote:
> >>> > >> >> > >
> >>> > >> >> > > >+1 Becket. That would give enough time for clients to
> >>>move. We
> >>> > >> >>should
> >>> > >> >> > make
> >>> > >> >> > > >this change very clear.
> >>> > >> >> > > >
> >>> > >> >> > > >Thanks,
> >>> > >> >> > > >
> >>> > >> >> > > >Mayuresh
> >>> > >> >> > > >
> >>> > >> >> > > >On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin
> >>> > >> >> <j...@linkedin.com.invalid
> >>> > >> >> > >
> >>> > >> >> > > >wrote:
> >>> > >> >> > > >
> >>> > >> >> > > >> Hey Ewen,
> >>> > >> >> > > >>
> >>> > >> >> > > >> Very good summary about the compatibility. What you
> >>>proposed
> >>> > >> >>makes
> >>> > >> >> > > >>sense.
> >>> > >> >> > > >> So basically we can do the following:
> >>> > >> >> > > >>
> >>> > >> >> > > >> In next release, i.e. 0.8.3:
> >>> > >> >> > > >> 1. Add REPLICATION_TIMEOUT_CONFIG
> >>>(“replication.timeout.ms”)
> >>> > >> >> > > >> 2. Mark TIMEOUT_CONFIG as deprecated
> >>> > >> >> > > >> 3. Override REPLICATION_TIMEOUT_CONFIG with
> >>>TIMEOUT_CONFIG if
> >>> > >>it
> >>> > >> >>is
> >>> > >> >> > > >> defined and give a warning about deprecation.
> >>> > >> >> > > >> In the release after 0.8.3, we remove TIMEOUT_CONFIG.
> >>> > >> >> > > >>
> >>> > >> >> > > >> This should give enough buffer for this change.
> >>> > >> >> > > >>
> >>> > >> >> > > >> Request timeout is a complete new thing we add to fix a
> >>>bug,
> >>> > >>I’m
> >>> > >> >> with
> >>> > >> >> > > >>you
> >>> > >> >> > > >> it does not make sense to have it maintain the old buggy
> >>> > >> >>behavior.
> >>> > >> >> So
> >>> > >> >> > we
> >>> > >> >> > > >> can set it to a reasonable value instead of infinite.
> >>> > >> >> > > >>
> >>> > >> >> > > >> Jiangjie (Becket) Qin
> >>> > >> >> > > >>
> >>> > >> >> > > >> On 5/12/15, 12:03 PM, "Ewen Cheslack-Postava"
> >>> > >><e...@confluent.io
> >>> > >> >
> >>> > >> >> > > wrote:
> >>> > >> >> > > >>
> >>> > >> >> > > >> >I think my confusion is coming from this:
> >>> > >> >> > > >> >
> >>> > >> >> > > >> >> So in this KIP, we only address (3). The only public
> >>> > >>interface
> >>> > >> >> > change
> >>> > >> >> > > >> >>is a
> >>> > >> >> > > >> >> new configuration of request timeout (and maybe
> >>>change the
> >>> > >> >> > > >>configuration
> >>> > >> >> > > >> >> name of TIMEOUT_CONFIG to
> >>>REPLICATION_TIMEOUT_CONFIG).
> >>> > >> >> > > >> >
> >>> > >> >> > > >> >There are 3 possible compatibility issues I see here:
> >>> > >> >> > > >> >
> >>> > >> >> > > >> >* I assumed this meant the constants also change, so
> >>> > >> >>"timeout.ms"
> >>> > >> >> > > >>becomes
> >>> > >> >> > > >> >"
> >>> > >> >> > > >> >replication.timeout.ms". This breaks config files that
> >>> > worked
> >>> > >> on
> >>> > >> >> the
> >>> > >> >> > > >> >previous version and the only warning would be in
> >>>release
> >>> > >> >>notes. We
> >>> > >> >> > do
> >>> > >> >> > > >> >warn
> >>> > >> >> > > >> >about unused configs so they might notice the problem.
> >>> > >> >> > > >> >
> >>> > >> >> > > >> >* Binary and source compatibility if someone configures
> >>>their
> >>> > >> >> client
> >>> > >> >> > in
> >>> > >> >> > > >> >code and uses the TIMEOUT_CONFIG variable. Renaming it
> >>>will
> >>> > >> >>cause
> >>> > >> >> > > >>existing
> >>> > >> >> > > >> >jars to break if you try to run against an updated
> >>>client
> >>> > >>(which
> >>> > >> >> > seems
> >>> > >> >> > > >>not
> >>> > >> >> > > >> >very significant since I doubt people upgrade these
> >>>without
> >>> > >> >> > recompiling
> >>> > >> >> > > >> >but
> >>> > >> >> > > >> >maybe I'm wrong about that). And it breaks builds
> >>>without
> >>> > >>have
> >>> > >> >> > > >>deprecated
> >>> > >> >> > > >> >that field first, which again, is probably not the
> >>>biggest
> >>> > >>issue
> >>> > >> >> but
> >>> > >> >> > is
> >>> > >> >> > > >> >annoying for users and when we accidentally changed the
> >>>API
> >>> > >>we
> >>> > >> >> > > >>received a
> >>> > >> >> > > >> >complaint about breaking builds.
> >>> > >> >> > > >> >
> >>> > >> >> > > >> >* Behavior compatibility as Jay mentioned on the call
> >>>--
> >>> > >>setting
> >>> > >> >> the
> >>> > >> >> > > >> >config
> >>> > >> >> > > >> >(even if the name changed) doesn't have the same effect
> >>>it
> >>> > >>used
> >>> > >> >>to.
> >>> > >> >> > > >> >
> >>> > >> >> > > >> >One solution, which admittedly is more painful to
> >>>implement
> >>> > >>and
> >>> > >> >> > > >>maintain,
> >>> > >> >> > > >> >would be to maintain the timeout.ms config, have it
> >>>override
> >>> > >> the
> >>> > >> >> > > others
> >>> > >> >> > > >> if
> >>> > >> >> > > >> >it is specified (including an infinite request timeout
> >>>I
> >>> > >> >>guess?),
> >>> > >> >> and
> >>> > >> >> > > >>if
> >>> > >> >> > > >> >it
> >>> > >> >> > > >> >isn't specified, we can just use the new config
> >>>variables.
> >>> > >> >>Given a
> >>> > >> >> > real
> >>> > >> >> > > >> >deprecation schedule, users would have better warning
> >>>of
> >>> > >>changes
> >>> > >> >> and
> >>> > >> >> > a
> >>> > >> >> > > >> >window to make the changes.
> >>> > >> >> > > >> >
> >>> > >> >> > > >> >I actually think it might not be necessary to maintain
> >>>the
> >>> > >>old
> >>> > >> >> > behavior
> >>> > >> >> > > >> >precisely, although maybe for some code it is an issue
> >>>if
> >>> > >>they
> >>> > >> >> start
> >>> > >> >> > > >> >seeing
> >>> > >> >> > > >> >timeout exceptions that they wouldn't have seen before?
> >>> > >> >> > > >> >
> >>> > >> >> > > >> >-Ewen
> >>> > >> >> > > >> >
> >>> > >> >> > > >> >On Wed, May 6, 2015 at 6:06 PM, Jun Rao
> >>><j...@confluent.io>
> >>> > >> >>wrote:
> >>> > >> >> > > >> >
> >>> > >> >> > > >> >> Jiangjie,
> >>> > >> >> > > >> >>
> >>> > >> >> > > >> >> Yes, I think using metadata timeout to expire batches
> >>>in
> >>> > >>the
> >>> > >> >> record
> >>> > >> >> > > >> >> accumulator makes sense.
> >>> > >> >> > > >> >>
> >>> > >> >> > > >> >> Thanks,
> >>> > >> >> > > >> >>
> >>> > >> >> > > >> >> Jun
> >>> > >> >> > > >> >>
> >>> > >> >> > > >> >> On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin
> >>> > >> >> > > >> >><j...@linkedin.com.invalid>
> >>> > >> >> > > >> >> wrote:
> >>> > >> >> > > >> >>
> >>> > >> >> > > >> >> > I incorporated Ewen and Guozhang’s comments in the
> >>>KIP
> >>> > >>page.
> >>> > >> >> Want
> >>> > >> >> > > >>to
> >>> > >> >> > > >> >> speed
> >>> > >> >> > > >> >> > up on this KIP because currently we experience
> >>> > >>mirror-maker
> >>> > >> >> hung
> >>> > >> >> > > >>very
> >>> > >> >> > > >> >> > likely when a broker is down.
> >>> > >> >> > > >> >> >
> >>> > >> >> > > >> >> > I also took a shot to solve KAFKA-1788 in
> >>>KAFKA-2142. I
> >>> > >>used
> >>> > >> >> > > >>metadata
> >>> > >> >> > > >> >> > timeout to expire the batches which are sitting in
> >>> > >> >>accumulator
> >>> > >> >> > > >>without
> >>> > >> >> > > >> >> > leader info. I did that because the situation there
> >>>is
> >>> > >> >> > essentially
> >>> > >> >> > > >> >> missing
> >>> > >> >> > > >> >> > metadata.
> >>> > >> >> > > >> >> >
> >>> > >> >> > > >> >> > As a summary of what I am thinking about the
> >>>timeout in
> >>> > >>new
> >>> > >> >> > > >>Producer:
> >>> > >> >> > > >> >> >
> >>> > >> >> > > >> >> > 1. Metadata timeout:
> >>> > >> >> > > >> >> >   - used in send(), blocking
> >>> > >> >> > > >> >> >   - used in accumulator to expire batches with
> >>>timeout
> >>> > >> >> exception.
> >>> > >> >> > > >> >> > 2. Linger.ms
> >>> > >> >> > > >> >> >   - Used in accumulator to ready the batch for
> >>>drain
> >>> > >> >> > > >> >> > 3. Request timeout
> >>> > >> >> > > >> >> >   - Used in NetworkClient to expire a batch and
> >>>retry if
> >>> > >>no
> >>> > >> >> > > >>response
> >>> > >> >> > > >> >>is
> >>> > >> >> > > >> >> > received for a request before timeout.
> >>> > >> >> > > >> >> >
> >>> > >> >> > > >> >> > So in this KIP, we only address (3). The only
> >>>public
> >>> > >> >>interface
> >>> > >> >> > > >>change
> >>> > >> >> > > >> >>is
> >>> > >> >> > > >> >> a
> >>> > >> >> > > >> >> > new configuration of request timeout (and maybe
> >>>change
> >>> > >>the
> >>> > >> >> > > >> >>configuration
> >>> > >> >> > > >> >> > name of TIMEOUT_CONFIG to
> >>>REPLICATION_TIMEOUT_CONFIG).
> >>> > >> >> > > >> >> >
> >>> > >> >> > > >> >> > Would like to see what people think of above
> >>>approach?
> >>> > >> >> > > >> >> >
> >>> > >> >> > > >> >> > Jiangjie (Becket) Qin
> >>> > >> >> > > >> >> >
> >>> > >> >> > > >> >> > On 4/20/15, 6:02 PM, "Jiangjie Qin"
> >>><j...@linkedin.com>
> >>> > >> >>wrote:
> >>> > >> >> > > >> >> >
> >>> > >> >> > > >> >> > >Jun,
> >>> > >> >> > > >> >> > >
> >>> > >> >> > > >> >> > >I thought a little bit differently on this.
> >>> > >> >> > > >> >> > >Intuitively, I am thinking that if a partition is
> >>> > >>offline,
> >>> > >> >>the
> >>> > >> >> > > >> >>metadata
> >>> > >> >> > > >> >> > >for that partition should be considered not ready
> >>> > >>because
> >>> > >> >>we
> >>> > >> >> > don’t
> >>> > >> >> > > >> >>know
> >>> > >> >> > > >> >> > >which broker we should send the message to. So
> >>>those
> >>> > >>sends
> >>> > >> >> need
> >>> > >> >> > > >>to be
> >>> > >> >> > > >> >> > >blocked on metadata timeout.
> >>> > >> >> > > >> >> > >Another thing I’m wondering is in which scenario
> >>>an
> >>> > >>offline
> >>> > >> >> > > >>partition
> >>> > >> >> > > >> >> will
> >>> > >> >> > > >> >> > >become online again in a short period of time and
> >>>how
> >>> > >> >>likely
> >>> > >> >> it
> >>> > >> >> > > >>will
> >>> > >> >> > > >> >> > >occur. My understanding is that the batch timeout
> >>>for
> >>> > >> >>batches
> >>> > >> >> > > >> >>sitting in
> >>> > >> >> > > >> >> > >accumulator should be larger than linger.ms but
> >>>should
> >>> > >>not
> >>> > >> >>be
> >>> > >> >> > too
> >>> > >> >> > > >> >>long
> >>> > >> >> > > >> >> > >(e.g. less than 60 seconds). Otherwise it will
> >>>exhaust
> >>> > >>the
> >>> > >> >> > shared
> >>> > >> >> > > >> >>buffer
> >>> > >> >> > > >> >> > >with batches to be aborted.
> >>> > >> >> > > >> >> > >
> >>> > >> >> > > >> >> > >That said, I do agree it is reasonable to buffer
> >>>the
> >>> > >> >>message
> >>> > >> >> for
> >>> > >> >> > > >>some
> >>> > >> >> > > >> >> time
> >>> > >> >> > > >> >> > >so messages to other partitions can still get
> >>>sent. But
> >>> > >> >>adding
> >>> > >> >> > > >> >>another
> >>> > >> >> > > >> >> > >expiration in addition to linger.ms - which is
> >>> > >>essentially
> >>> > >> >>a
> >>> > >> >> > > >>timeout
> >>> > >> >> > > >> >>-
> >>> > >> >> > > >> >> > >sounds a little bit confusing. Maybe we can do
> >>>this, let
> >>> > >> >>the
> >>> > >> >> > batch
> >>> > >> >> > > >> >>sit
> >>> > >> >> > > >> >> in
> >>> > >> >> > > >> >> > >accumulator up to linger.ms, then fail it if
> >>>necessary.
> >>> > >> >> > > >> >> > >
> >>> > >> >> > > >> >> > >What do you think?
> >>> > >> >> > > >> >> > >
> >>> > >> >> > > >> >> > >Thanks,
> >>> > >> >> > > >> >> > >
> >>> > >> >> > > >> >> > >Jiangjie (Becket) Qin
> >>> > >> >> > > >> >> > >
> >>> > >> >> > > >> >> > >On 4/20/15, 1:11 PM, "Jun Rao" <j...@confluent.io>
> >>> > wrote:
> >>> > >> >> > > >> >> > >
> >>> > >> >> > > >> >> > >>Jiangjie,
> >>> > >> >> > > >> >> > >>
> >>> > >> >> > > >> >> > >>Allowing messages to be accumulated in an offline
> >>> > >> >>partition
> >>> > >> >> > > >>could be
> >>> > >> >> > > >> >> > >>useful
> >>> > >> >> > > >> >> > >>since the partition may become available before
> >>>the
> >>> > >> >>request
> >>> > >> >> > > >>timeout
> >>> > >> >> > > >> >>or
> >>> > >> >> > > >> >> > >>linger time is reached. Now that we are planning
> >>>to
> >>> > >>add a
> >>> > >> >>new
> >>> > >> >> > > >> >>timeout,
> >>> > >> >> > > >> >> it
> >>> > >> >> > > >> >> > >>would be useful to think through whether/how that
> >>> > >>applies
> >>> > >> >>to
> >>> > >> >> > > >> >>messages
> >>> > >> >> > > >> >> in
> >>> > >> >> > > >> >> > >>the accumulator too.
> >>> > >> >> > > >> >> > >>
> >>> > >> >> > > >> >> > >>Thanks,
> >>> > >> >> > > >> >> > >>
> >>> > >> >> > > >> >> > >>Jun
> >>> > >> >> > > >> >> > >>
> >>> > >> >> > > >> >> > >>
> >>> > >> >> > > >> >> > >>On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin
> >>> > >> >> > > >> >> <j...@linkedin.com.invalid
> >>> > >> >> > > >> >> > >
> >>> > >> >> > > >> >> > >>wrote:
> >>> > >> >> > > >> >> > >>
> >>> > >> >> > > >> >> > >>> Hi Harsha,
> >>> > >> >> > > >> >> > >>>
> >>> > >> >> > > >> >> > >>> Took a quick look at the patch. I think it is
> >>>still a
> >>> > >> >> little
> >>> > >> >> > > >>bit
> >>> > >> >> > > >> >> > >>> different. KAFKA-1788 only handles the case
> >>>where a
> >>> > >> >>batch
> >>> > >> >> > > >>sitting
> >>> > >> >> > > >> >>in
> >>> > >> >> > > >> >> > >>> accumulator for too long. The KIP is trying to
> >>>solve
> >>> > >>the
> >>> > >> >> > issue
> >>> > >> >> > > >> >>where
> >>> > >> >> > > >> >> a
> >>> > >> >> > > >> >> > >>> batch has already been drained from accumulator
> >>>and
> >>> > >> >>sent to
> >>> > >> >> > > >> >>broker.
> >>> > >> >> > > >> >> > >>> We might be able to apply timeout on batch
> >>>level to
> >>> > >> >>merge
> >>> > >> >> > those
> >>> > >> >> > > >> >>two
> >>> > >> >> > > >> >> > >>>cases
> >>> > >> >> > > >> >> > >>> as Ewen suggested. But I’m not sure if it is a
> >>>good
> >>> > >> >>idea to
> >>> > >> >> > > >>allow
> >>> > >> >> > > >> >> > >>>messages
> >>> > >> >> > > >> >> > >>> whose target partition is offline to sit in
> >>> > >>accumulator
> >>> > >> >>in
> >>> > >> >> > the
> >>> > >> >> > > >> >>first
> >>> > >> >> > > >> >> > >>>place.
> >>> > >> >> > > >> >> > >>>
> >>> > >> >> > > >> >> > >>> Jiangjie (Becket) Qin
> >>> > >> >> > > >> >> > >>>
> >>> > >> >> > > >> >> > >>> On 4/16/15, 10:19 AM, "Sriharsha Chintalapani"
> >>> > >> >> > > >><ka...@harsha.io>
> >>> > >> >> > > >> >> > wrote:
> >>> > >> >> > > >> >> > >>>
> >>> > >> >> > > >> >> > >>> >Guozhang and Jiangjie,
> >>> > >> >> > > >> >> > >>> >                 Isn’t this work being covered
> >>>in
> >>> > >> >> > > >> >> > >>>
> >>>>https://issues.apache.org/jira/browse/KAFKA-1788 .
> >>> > >>Can
> >>> > >> >> you
> >>> > >> >> > > >> please
> >>> > >> >> > > >> >> the
> >>> > >> >> > > >> >> > >>> >review the patch there.
> >>> > >> >> > > >> >> > >>> >Thanks,
> >>> > >> >> > > >> >> > >>> >Harsha
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >On April 15, 2015 at 10:39:40 PM, Guozhang
> >>>Wang
> >>> > >> >> > > >> >>(wangg...@gmail.com
> >>> > >> >> > > >> >> )
> >>> > >> >> > > >> >> > >>> >wrote:
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >Thanks for the update Jiangjie,
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >I think it is actually NOT expected that
> >>>hardware
> >>> > >> >> > > >>disconnection
> >>> > >> >> > > >> >>will
> >>> > >> >> > > >> >> > >>>be
> >>> > >> >> > > >> >> > >>> >detected by the selector, but rather will only
> >>>be
> >>> > >> >>revealed
> >>> > >> >> > > >>upon
> >>> > >> >> > > >> >>TCP
> >>> > >> >> > > >> >> > >>> >timeout, which could be hours.
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >A couple of comments on the wiki:
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >1. "For KafkaProducer.close() and
> >>> > >> >>KafkaProducer.flush() we
> >>> > >> >> > > >>need
> >>> > >> >> > > >> >>the
> >>> > >> >> > > >> >> > >>> >request
> >>> > >> >> > > >> >> > >>> >timeout as implict timeout." I am not very
> >>>clear
> >>> > >>what
> >>> > >> >>does
> >>> > >> >> > > >>this
> >>> > >> >> > > >> >> mean?
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >2. Currently the producer already has a
> >>> > >> >>"TIMEOUT_CONFIG"
> >>> > >> >> > which
> >>> > >> >> > > >> >> should
> >>> > >> >> > > >> >> > >>> >really be "REPLICATION_TIMEOUT_CONFIG". So if
> >>>we
> >>> > >> >>decide to
> >>> > >> >> > > >>add "
> >>> > >> >> > > >> >> > >>> >REQUEST_TIMEOUT_CONFIG", I suggest we also
> >>>make this
> >>> > >> >> > renaming:
> >>> > >> >> > > >> >> > >>>admittedly
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >it will change the config names but will
> >>>reduce
> >>> > >> >>confusions
> >>> > >> >> > > >>moving
> >>> > >> >> > > >> >> > >>> >forward.
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >Guozhang
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin
> >>> > >> >> > > >> >> > >>><j...@linkedin.com.invalid>
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >wrote:
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >> Checked the code again. It seems that the
> >>> > >> >>disconnected
> >>> > >> >> > > >>channel
> >>> > >> >> > > >> >>is
> >>> > >> >> > > >> >> > >>>not
> >>> > >> >> > > >> >> > >>> >> detected by selector as expected.
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >> Currently we are depending on the
> >>> > >> >> > > >> >> > >>> >> o.a.k.common.network.Selector.disconnected
> >>>set to
> >>> > >> >>see if
> >>> > >> >> > we
> >>> > >> >> > > >> >>need
> >>> > >> >> > > >> >> to
> >>> > >> >> > > >> >> > >>>do
> >>> > >> >> > > >> >> > >>> >> something for a disconnected channel.
> >>> > >> >> > > >> >> > >>> >> However Selector.disconnected set is only
> >>>updated
> >>> > >> >>when:
> >>> > >> >> > > >> >> > >>> >> 1. A write/read/connect to channel failed.
> >>> > >> >> > > >> >> > >>> >> 2. A Key is canceled
> >>> > >> >> > > >> >> > >>> >> However when a broker is down before it
> >>>sends back
> >>> > >> >>the
> >>> > >> >> > > >> >>response,
> >>> > >> >> > > >> >> the
> >>> > >> >> > > >> >> > >>> >> client seems not be able to detect this
> >>>failure.
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >> I did a simple test below:
> >>> > >> >> > > >> >> > >>> >> 1. Run a selector on one machine and an echo
> >>> > >>server
> >>> > >> >>on
> >>> > >> >> > > >>another
> >>> > >> >> > > >> >> > >>>machine.
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >> Connect a selector to an echo server
> >>> > >> >> > > >> >> > >>> >> 2. Send a message to echo server using
> >>>selector,
> >>> > >>then
> >>> > >> >> let
> >>> > >> >> > > >>the
> >>> > >> >> > > >> >> > >>>selector
> >>> > >> >> > > >> >> > >>> >> poll() every 10 seconds.
> >>> > >> >> > > >> >> > >>> >> 3. After the sever received the message,
> >>>unplug
> >>> > >> >>cable on
> >>> > >> >> > the
> >>> > >> >> > > >> >>echo
> >>> > >> >> > > >> >> > >>> >>server.
> >>> > >> >> > > >> >> > >>> >> 4. After waiting for 45 min. The selector
> >>>still
> >>> > >>did
> >>> > >> >>not
> >>> > >> >> > > >> >>detected
> >>> > >> >> > > >> >> the
> >>> > >> >> > > >> >> > >>> >> network failure.
> >>> > >> >> > > >> >> > >>> >> Lsof on selector machine shows that the TCP
> >>> > >> >>connection
> >>> > >> >> is
> >>> > >> >> > > >>still
> >>> > >> >> > > >> >> > >>> >>considered
> >>> > >> >> > > >> >> > >>> >> ESTABLISHED.
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >> I’m not sure in this case what should we
> >>>expect
> >>> > >>from
> >>> > >> >>the
> >>> > >> >> > > >> >> > >>> >> java.nio.channels.Selector. According to the
> >>> > >> >>document,
> >>> > >> >> the
> >>> > >> >> > > >> >> selector
> >>> > >> >> > > >> >> > >>> >>does
> >>> > >> >> > > >> >> > >>> >> not verify the status of the associated
> >>>channel.
> >>> > >>In
> >>> > >> >>my
> >>> > >> >> > test
> >>> > >> >> > > >> >>case
> >>> > >> >> > > >> >> it
> >>> > >> >> > > >> >> > >>> >>looks
> >>> > >> >> > > >> >> > >>> >> even worse that OS did not think of the
> >>>socket has
> >>> > >> >>been
> >>> > >> >> > > >> >> > >>>disconnected.
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >> Anyway. It seems adding the client side
> >>>request
> >>> > >> >>timeout
> >>> > >> >> is
> >>> > >> >> > > >> >> > >>>necessary.
> >>> > >> >> > > >> >> > >>> >>I’ve
> >>> > >> >> > > >> >> > >>> >> updated the KIP page to clarify the problem
> >>>we
> >>> > >>want
> >>> > >> >>to
> >>> > >> >> > solve
> >>> > >> >> > > >> >> > >>>according
> >>> > >> >> > > >> >> > >>> >>to
> >>> > >> >> > > >> >> > >>> >> Ewen’s comments.
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >> Thanks.
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >> Jiangjie (Becket) Qin
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava"
> >>> > >> >> > > >> >><e...@confluent.io>
> >>> > >> >> > > >> >> > >>>wrote:
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie
> >>>Qin
> >>> > >> >> > > >> >> > >>> >><j...@linkedin.com.invalid>
> >>> > >> >> > > >> >> > >>> >> >wrote:
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >> Hi Ewen, thanks for the comments. Very
> >>>good
> >>> > >> >>points!
> >>> > >> >> > > >>Please
> >>> > >> >> > > >> >>see
> >>> > >> >> > > >> >> > >>> >>replies
> >>> > >> >> > > >> >> > >>> >> >> inline.
> >>> > >> >> > > >> >> > >>> >> >>
> >>> > >> >> > > >> >> > >>> >> >>
> >>> > >> >> > > >> >> > >>> >> >> On 4/13/15, 11:19 PM, "Ewen
> >>>Cheslack-Postava" <
> >>> > >> >> > > >> >> e...@confluent.io
> >>> > >> >> > > >> >> > >
> >>> > >> >> > > >> >> > >>> >> wrote:
> >>> > >> >> > > >> >> > >>> >> >>
> >>> > >> >> > > >> >> > >>> >> >> >Jiangjie,
> >>> > >> >> > > >> >> > >>> >> >> >
> >>> > >> >> > > >> >> > >>> >> >> >Great start. I have a couple of
> >>>comments.
> >>> > >> >> > > >> >> > >>> >> >> >
> >>> > >> >> > > >> >> > >>> >> >> >Under the motivation section, is it
> >>>really
> >>> > >>true
> >>> > >> >>that
> >>> > >> >> > the
> >>> > >> >> > > >> >> request
> >>> > >> >> > > >> >> > >>> >>will
> >>> > >> >> > > >> >> > >>> >> >> >never
> >>> > >> >> > > >> >> > >>> >> >> >be completed? Presumably if the broker
> >>>goes
> >>> > >>down
> >>> > >> >>the
> >>> > >> >> > > >> >> connection
> >>> > >> >> > > >> >> > >>> >>will be
> >>> > >> >> > > >> >> > >>> >> >> >severed, at worst by a TCP timeout,
> >>>which
> >>> > >>should
> >>> > >> >> clean
> >>> > >> >> > > >>up
> >>> > >> >> > > >> >>the
> >>> > >> >> > > >> >> > >>> >> >>connection
> >>> > >> >> > > >> >> > >>> >> >> >and any outstanding requests, right? I
> >>>think
> >>> > >>the
> >>> > >> >> real
> >>> > >> >> > > >> >>reason
> >>> > >> >> > > >> >> we
> >>> > >> >> > > >> >> > >>> >>need a
> >>> > >> >> > > >> >> > >>> >> >> >different timeout is that the default
> >>>TCP
> >>> > >> >>timeouts
> >>> > >> >> are
> >>> > >> >> > > >> >> > >>>ridiculously
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >> >>long
> >>> > >> >> > > >> >> > >>> >> >> >in
> >>> > >> >> > > >> >> > >>> >> >> >this context.
> >>> > >> >> > > >> >> > >>> >> >> Yes, when broker is completely down the
> >>>request
> >>> > >> >> should
> >>> > >> >> > be
> >>> > >> >> > > >> >> cleared
> >>> > >> >> > > >> >> > >>>as
> >>> > >> >> > > >> >> > >>> >>you
> >>> > >> >> > > >> >> > >>> >> >> said. The case we encountered looks like
> >>>the
> >>> > >> >>broker
> >>> > >> >> was
> >>> > >> >> > > >>just
> >>> > >> >> > > >> >> not
> >>> > >> >> > > >> >> > >>> >> >> responding but TCP connection was still
> >>>alive
> >>> > >> >>though.
> >>> > >> >> > > >> >> > >>> >> >>
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >Ok, that makes sense.
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >>
> >>> > >> >> > > >> >> > >>> >> >> >
> >>> > >> >> > > >> >> > >>> >> >> >My second question is about whether
> >>>this
> >>>is
> >>> > >>the
> >>> > >> >> right
> >>> > >> >> > > >> >>level to
> >>> > >> >> > > >> >> > >>> >>tackle
> >>> > >> >> > > >> >> > >>> >> >>the
> >>> > >> >> > > >> >> > >>> >> >> >issue/what user-facing changes need to
> >>>be
> >>> > >>made. A
> >>> > >> >> > > >>related
> >>> > >> >> > > >> >> > >>>problem
> >>> > >> >> > > >> >> > >>> >>came
> >>> > >> >> > > >> >> > >>> >> >>up
> >>> > >> >> > > >> >> > >>> >> >> >in
> >>> > >> >>https://issues.apache.org/jira/browse/KAFKA-1788
> >>> > >> >> > > >>where
> >>> > >> >> > > >> >> > >>>producer
> >>> > >> >> > > >> >> > >>> >> >> records
> >>> > >> >> > > >> >> > >>> >> >> >get stuck indefinitely because there's
> >>>no
> >>> > >> >> client-side
> >>> > >> >> > > >> >>timeout.
> >>> > >> >> > > >> >> > >>>This
> >>> > >> >> > > >> >> > >>> >>KIP
> >>> > >> >> > > >> >> > >>> >> >> >wouldn't fix that problem or any
> >>>problems
> >>> > >>caused
> >>> > >> >>by
> >>> > >> >> > > >>lack of
> >>> > >> >> > > >> >> > >>> >> >>connectivity
> >>> > >> >> > > >> >> > >>> >> >> >since this would only apply to in flight
> >>> > >> >>requests,
> >>> > >> >> > > >>which by
> >>> > >> >> > > >> >> > >>> >>definition
> >>> > >> >> > > >> >> > >>> >> >> >must
> >>> > >> >> > > >> >> > >>> >> >> >have been sent on an active connection.
> >>> > >> >> > > >> >> > >>> >> >> >
> >>> > >> >> > > >> >> > >>> >> >> >I suspect both types of problems
> >>>probably need
> >>> > >> >>to be
> >>> > >> >> > > >> >>addressed
> >>> > >> >> > > >> >> > >>> >> >>separately
> >>> > >> >> > > >> >> > >>> >> >> >by introducing explicit timeouts.
> >>>However,
> >>> > >> >>because
> >>> > >> >> the
> >>> > >> >> > > >> >> settings
> >>> > >> >> > > >> >> > >>> >> >>introduced
> >>> > >> >> > > >> >> > >>> >> >> >here are very much about the internal
> >>> > >> >> implementations
> >>> > >> >> > of
> >>> > >> >> > > >> >>the
> >>> > >> >> > > >> >> > >>> >>clients,
> >>> > >> >> > > >> >> > >>> >> >>I'm
> >>> > >> >> > > >> >> > >>> >> >> >wondering if this even needs to be a
> >>> > >>user-facing
> >>> > >> >> > > >>setting,
> >>> > >> >> > > >> >> > >>> >>especially
> >>> > >> >> > > >> >> > >>> >> >>if we
> >>> > >> >> > > >> >> > >>> >> >> >have to add other timeouts anyway. For
> >>> > >>example,
> >>> > >> >> would
> >>> > >> >> > a
> >>> > >> >> > > >> >>fixed,
> >>> > >> >> > > >> >> > >>> >>generous
> >>> > >> >> > > >> >> > >>> >> >> >value that's still much shorter than a
> >>>TCP
> >>> > >> >>timeout,
> >>> > >> >> > say
> >>> > >> >> > > >> >>15s,
> >>> > >> >> > > >> >> be
> >>> > >> >> > > >> >> > >>> >>good
> >>> > >> >> > > >> >> > >>> >> >> >enough? If other timeouts would allow,
> >>>for
> >>> > >> >>example,
> >>> > >> >> > the
> >>> > >> >> > > >> >> clients
> >>> > >> >> > > >> >> > >>>to
> >>> > >> >> > > >> >> > >>> >> >> >properly
> >>> > >> >> > > >> >> > >>> >> >> >exit even if requests have not hit their
> >>> > >>timeout,
> >>> > >> >> then
> >>> > >> >> > > >> >>what's
> >>> > >> >> > > >> >> > >>>the
> >>> > >> >> > > >> >> > >>> >> >>benefit
> >>> > >> >> > > >> >> > >>> >> >> >of being able to configure the
> >>>request-level
> >>> > >> >> timeout?
> >>> > >> >> > > >> >> > >>> >> >> That is a very good point. We have three
> >>>places
> >>> > >> >>that
> >>> > >> >> we
> >>> > >> >> > > >> >>might
> >>> > >> >> > > >> >> be
> >>> > >> >> > > >> >> > >>> >>able to
> >>> > >> >> > > >> >> > >>> >> >> enforce timeout for a message send:
> >>> > >> >> > > >> >> > >>> >> >> 1. Before append to accumulator -
> >>>handled
> >>>by
> >>> > >> >>metadata
> >>> > >> >> > > >> >>timeout
> >>> > >> >> > > >> >> on
> >>> > >> >> > > >> >> > >>>per
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >> >> message level.
> >>> > >> >> > > >> >> > >>> >> >> 2. Batch of messages inside accumulator
> >>>-
> >>>no
> >>> > >> >>timeout
> >>> > >> >> > > >> >>mechanism
> >>> > >> >> > > >> >> > >>>now.
> >>> > >> >> > > >> >> > >>> >> >> 3. Request of batches after messages
> >>>leave the
> >>> > >> >> > > >>accumulator
> >>> > >> >> > > >> >>- we
> >>> > >> >> > > >> >> > >>>have
> >>> > >> >> > > >> >> > >>> >>a
> >>> > >> >> > > >> >> > >>> >> >> broker side timeout but no client side
> >>>timeout
> >>> > >>for
> >>> > >> >> now.
> >>> > >> >> > > >> >> > >>> >> >> My current proposal only address (3) but
> >>>not
> >>> > >>(2).
> >>> > >> >> > > >> >> > >>> >> >> Honestly I do not have a very clear idea
> >>>about
> >>> > >> >>what
> >>> > >> >> > > >>should
> >>> > >> >> > > >> >>we
> >>> > >> >> > > >> >> do
> >>> > >> >> > > >> >> > >>> >>with
> >>> > >> >> > > >> >> > >>> >> >>(2)
> >>> > >> >> > > >> >> > >>> >> >> right now. But I am with you that we
> >>>should not
> >>> > >> >> expose
> >>> > >> >> > > >>too
> >>> > >> >> > > >> >>many
> >>> > >> >> > > >> >> > >>> >> >> configurations to users. What I am
> >>>thinking
> >>> > >>now to
> >>> > >> >> > handle
> >>> > >> >> > > >> >>(2)
> >>> > >> >> > > >> >> is
> >>> > >> >> > > >> >> > >>> >>when
> >>> > >> >> > > >> >> > >>> >> >>user
> >>> > >> >> > > >> >> > >>> >> >> call send, if we know that a partition is
> >>> > >> >>offline, we
> >>> > >> >> > > >>should
> >>> > >> >> > > >> >> > >>>throw
> >>> > >> >> > > >> >> > >>> >> >> exception immediately instead of putting
> >>>it
> >>> > >>into
> >>> > >> >> > > >> >>accumulator.
> >>> > >> >> > > >> >> > >>>This
> >>> > >> >> > > >> >> > >>> >>would
> >>> > >> >> > > >> >> > >>> >> >> protect further memory consumption. We
> >>>might
> >>> > >>also
> >>> > >> >> want
> >>> > >> >> > to
> >>> > >> >> > > >> >>fail
> >>> > >> >> > > >> >> > >>>all
> >>> > >> >> > > >> >> > >>> >>the
> >>> > >> >> > > >> >> > >>> >> >> batches in the dequeue once we found a
> >>> > >>partition
> >>> > >> >>is
> >>> > >> >> > > >>offline.
> >>> > >> >> > > >> >> That
> >>> > >> >> > > >> >> > >>> >> >>said, I
> >>> > >> >> > > >> >> > >>> >> >> feel timeout might not be quite
> >>>applicable to
> >>> > >>(2).
> >>> > >> >> > > >> >> > >>> >> >> Do you have any suggestion on this?
> >>> > >> >> > > >> >> > >>> >> >>
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >Right, I didn't actually mean to solve 2
> >>>here,
> >>> > >>but
> >>> > >> >>was
> >>> > >> >> > > >>trying
> >>> > >> >> > > >> >>to
> >>> > >> >> > > >> >> > >>> >>figure
> >>> > >> >> > > >> >> > >>> >> >out
> >>> > >> >> > > >> >> > >>> >> >if a solution to 2 would reduce what we
> >>>needed to
> >>> > >> >>do to
> >>> > >> >> > > >> >>address
> >>> > >> >> > > >> >> 3.
> >>> > >> >> > > >> >> > >>> >>(And
> >>> > >> >> > > >> >> > >>> >> >depending on how they are implemented,
> >>>fixing 1
> >>> > >> >>might
> >>> > >> >> > also
> >>> > >> >> > > >> >> address
> >>> > >> >> > > >> >> > >>>2).
> >>> > >> >> > > >> >> > >>> >>It
> >>> > >> >> > > >> >> > >>> >> >sounds like you hit hang that I wasn't
> >>>really
> >>> > >> >> expecting.
> >>> > >> >> > > >>This
> >>> > >> >> > > >> >> > >>>probably
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >> >just
> >>> > >> >> > > >> >> > >>> >> >means the KIP motivation needs to be a bit
> >>> > >>clearer
> >>> > >> >> about
> >>> > >> >> > > >>what
> >>> > >> >> > > >> >> type
> >>> > >> >> > > >> >> > >>>of
> >>> > >> >> > > >> >> > >>> >> >situation this addresses. The cause of the
> >>>hang
> >>> > >>may
> >>> > >> >> also
> >>> > >> >> > be
> >>> > >> >> > > >> >> > >>>relevant
> >>> > >> >> > > >> >> > >>> >>-- if
> >>> > >> >> > > >> >> > >>> >> >it was something like a deadlock then
> >>>that's
> >>> > >> >>something
> >>> > >> >> > that
> >>> > >> >> > > >> >> should
> >>> > >> >> > > >> >> > >>> >>just be
> >>> > >> >> > > >> >> > >>> >> >fixed, but if it's something outside our
> >>>control
> >>> > >> >>then a
> >>> > >> >> > > >> >>timeout
> >>> > >> >> > > >> >> > >>>makes
> >>> > >> >> > > >> >> > >>> >>a
> >>> > >> >> > > >> >> > >>> >> >lot
> >>> > >> >> > > >> >> > >>> >> >more sense.
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >> >
> >>> > >> >> > > >> >> > >>> >> >> >I know we have a similar setting,
> >>> > >> >> > > >> >> > >>> >> >>max.in.flights.requests.per.connection,
> >>> > >> >> > > >> >> > >>> >> >> >exposed publicly (which I just
> >>>discovered is
> >>> > >> >>missing
> >>> > >> >> > > >>from
> >>> > >> >> > > >> >>the
> >>> > >> >> > > >> >> > >>>new
> >>> > >> >> > > >> >> > >>> >> >>producer
> >>> > >> >> > > >> >> > >>> >> >> >configs documentation). But it looks
> >>>like the
> >>> > >>new
> >>> > >> >> > > >>consumer
> >>> > >> >> > > >> >>is
> >>> > >> >> > > >> >> > >>>not
> >>> > >> >> > > >> >> > >>> >> >>exposing
> >>> > >> >> > > >> >> > >>> >> >> >that option, using a fixed value
> >>>instead. I
> >>> > >> >>think we
> >>> > >> >> > > >>should
> >>> > >> >> > > >> >> > >>>default
> >>> > >> >> > > >> >> > >>> >>to
> >>> > >> >> > > >> >> > >>> >> >> >hiding these implementation values
> >>>unless
> >>> > >> >>there's a
> >>> > >> >> > > >>strong
> >>> > >> >> > > >> >> case
> >>> > >> >> > > >> >> > >>>for
> >>> > >> >> > > >> >> > >>> >>a
> >>> > >> >> > > >> >> > >>> >> >> >scenario that requires customization.
> >>> > >> >> > > >> >> > >>> >> >> For producer,
> >>> > >> >>max.in.flight.requests.per.connection
> >>> > >> >> > > >>really
> >>> > >> >> > > >> >> > >>>matters.
> >>> > >> >> > > >> >> > >>> >>If
> >>> > >> >> > > >> >> > >>> >> >> people do not want to have reorder of
> >>>messages,
> >>> > >> >>they
> >>> > >> >> > > >>have to
> >>> > >> >> > > >> >> use
> >>> > >> >> > > >> >> > >>> >> >> max.in.flight.requests.per.connection=1.
> >>>On the
> >>> > >> >>other
> >>> > >> >> > > >>hand,
> >>> > >> >> > > >> >>if
> >>> > >> >> > > >> >> > >>> >> >>throughput
> >>> > >> >> > > >> >> > >>> >> >> is more of a concern, it could be set to
> >>> > >>higher.
> >>> > >> >>For
> >>> > >> >> > the
> >>> > >> >> > > >>new
> >>> > >> >> > > >> >> > >>> >>consumer, I
> >>> > >> >> > > >> >> > >>> >> >> checked the value and I am not sure if
> >>>the hard
> >>> > >> >>coded
> >>> > >> >> > > >> >> > >>> >> >>
> >>>max.in.flight.requests.per.connection=100
> >>>is
> >>> > >>the
> >>> > >> >> right
> >>> > >> >> > > >> >>value.
> >>> > >> >> > > >> >> > >>> >>Without
> >>> > >> >> > > >> >> > >>> >> >>the
> >>> > >> >> > > >> >> > >>> >> >> response to the previous request, what
> >>>offsets
> >>> > >> >>should
> >>> > >> >> > be
> >>> > >> >> > > >>put
> >>> > >> >> > > >> >> into
> >>> > >> >> > > >> >> > >>> >>the
> >>> > >> >> > > >> >> > >>> >> >>next
> >>> > >> >> > > >> >> > >>> >> >> fetch request? It seems to me the value
> >>>will be
> >>> > >> >>one
> >>> > >> >> > > >>natively
> >>> > >> >> > > >> >> > >>> >>regardless
> >>> > >> >> > > >> >> > >>> >> >>of
> >>> > >> >> > > >> >> > >>> >> >> the setting unless we are sending fetch
> >>> > >>request to
> >>> > >> >> > > >>different
> >>> > >> >> > > >> >> > >>> >>partitions,
> >>> > >> >> > > >> >> > >>> >> >> which does not look like the case.
> >>> > >> >> > > >> >> > >>> >> >> Anyway, it looks to be a separate issue
> >>> > >> >>orthogonal to
> >>> > >> >> > the
> >>> > >> >> > > >> >> request
> >>> > >> >> > > >> >> > >>> >> >>timeout.
> >>> > >> >> > > >> >> > >>> >> >>
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >>
> >>> > >> >> > > >> >> > >>> >> >> >In other words, since the only
> >>>user-facing
> >>> > >>change
> >>> > >> >> was
> >>> > >> >> > > >>the
> >>> > >> >> > > >> >> > >>>addition
> >>> > >> >> > > >> >> > >>> >>of
> >>> > >> >> > > >> >> > >>> >> >>the
> >>> > >> >> > > >> >> > >>> >> >> >setting, I'm wondering if we can avoid
> >>>the KIP
> >>> > >> >> > > >>altogether
> >>> > >> >> > > >> >>by
> >>> > >> >> > > >> >> > >>>just
> >>> > >> >> > > >> >> > >>> >> >>choosing
> >>> > >> >> > > >> >> > >>> >> >> >a good default value for the timeout.
> >>> > >> >> > > >> >> > >>> >> >> The problem is that we have a server side
> >>> > >>request
> >>> > >> >> > timeout
> >>> > >> >> > > >> >> exposed
> >>> > >> >> > > >> >> > >>>as
> >>> > >> >> > > >> >> > >>> >>a
> >>> > >> >> > > >> >> > >>> >> >> public configuration. We cannot set the
> >>>client
> >>> > >> >> timeout
> >>> > >> >> > > >> >>smaller
> >>> > >> >> > > >> >> > >>>than
> >>> > >> >> > > >> >> > >>> >>that
> >>> > >> >> > > >> >> > >>> >> >> value, so a hard coded value probably
> >>>won¹t
> >>> > >>work
> >>> > >> >> here.
> >>> > >> >> > > >> >> > >>> >> >>
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >That makes sense, although it's worth
> >>>keeping in
> >>> > >> >>mind
> >>> > >> >> > that
> >>> > >> >> > > >> >>even
> >>> > >> >> > > >> >> if
> >>> > >> >> > > >> >> > >>>you
> >>> > >> >> > > >> >> > >>> >>use
> >>> > >> >> > > >> >> > >>> >> >"correct" values, they could still be
> >>>violated
> >>> > >>due
> >>> > >> >>to,
> >>> > >> >> > > >>e.g.,
> >>> > >> >> > > >> >>a GC
> >>> > >> >> > > >> >> > >>> >>pause
> >>> > >> >> > > >> >> > >>> >> >that causes the broker to process a
> >>>request
> >>>after
> >>> > >> >>it is
> >>> > >> >> > > >> >>supposed
> >>> > >> >> > > >> >> to
> >>> > >> >> > > >> >> > >>> >>have
> >>> > >> >> > > >> >> > >>> >> >expired.
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >-Ewen
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >> >
> >>> > >> >> > > >> >> > >>> >> >> >-Ewen
> >>> > >> >> > > >> >> > >>> >> >> >
> >>> > >> >> > > >> >> > >>> >> >> >On Mon, Apr 13, 2015 at 2:35 PM,
> >>>Jiangjie Qin
> >>> > >> >> > > >> >> > >>> >> >><j...@linkedin.com.invalid>
> >>> > >> >> > > >> >> > >>> >> >> >wrote:
> >>> > >> >> > > >> >> > >>> >> >> >
> >>> > >> >> > > >> >> > >>> >> >> >> Hi,
> >>> > >> >> > > >> >> > >>> >> >> >>
> >>> > >> >> > > >> >> > >>> >> >> >> I just created a KIP to add a request
> >>> > >>timeout
> >>> > >> >>to
> >>> > >> >> > > >> >> NetworkClient
> >>> > >> >> > > >> >> > >>> >>for
> >>> > >> >> > > >> >> > >>> >> >>new
> >>> > >> >> > > >> >> > >>> >> >> >> Kafka clients.
> >>> > >> >> > > >> >> > >>> >> >> >>
> >>> > >> >> > > >> >> > >>> >> >> >>
> >>> > >> >> > > >> >> > >>> >> >> >>
> >>> > >> >> > > >> >> > >>> >> >>
> >>> > >> >> > > >> >> > >>> >> >>
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>>
> >>> > >> >> > > >> >> > >>>
> >>> > >> >> > > >> >> >
> >>> > >> >> > > >>
> >>> > >> >> > > >>>>
> >>> > >> >> > >
> >>> > >> >>
> >>> >
> >>>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+reque
> >>>>>s
> >>> > >> >> > > >>>>t
> >>> > >> >> > > >> >> > >>>+
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >> >> >>timeout+to+NetworkClient
> >>> > >> >> > > >> >> > >>> >> >> >>
> >>> > >> >> > > >> >> > >>> >> >> >> Comments and suggestions are welcome!
> >>> > >> >> > > >> >> > >>> >> >> >>
> >>> > >> >> > > >> >> > >>> >> >> >> Thanks.
> >>> > >> >> > > >> >> > >>> >> >> >>
> >>> > >> >> > > >> >> > >>> >> >> >> Jiangjie (Becket) Qin
> >>> > >> >> > > >> >> > >>> >> >> >>
> >>> > >> >> > > >> >> > >>> >> >> >>
> >>> > >> >> > > >> >> > >>> >> >> >
> >>> > >> >> > > >> >> > >>> >> >> >
> >>> > >> >> > > >> >> > >>> >> >> >--
> >>> > >> >> > > >> >> > >>> >> >> >Thanks,
> >>> > >> >> > > >> >> > >>> >> >> >Ewen
> >>> > >> >> > > >> >> > >>> >> >>
> >>> > >> >> > > >> >> > >>> >> >>
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >
> >>> > >> >> > > >> >> > >>> >> >--
> >>> > >> >> > > >> >> > >>> >> >Thanks,
> >>> > >> >> > > >> >> > >>> >> >Ewen
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >>
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >
> >>> > >> >> > > >> >> > >>> >--
> >>> > >> >> > > >> >> > >>> >-- Guozhang
> >>> > >> >> > > >> >> > >>>
> >>> > >> >> > > >> >> > >>>
> >>> > >> >> > > >> >> > >
> >>> > >> >> > > >> >> >
> >>> > >> >> > > >> >> >
> >>> > >> >> > > >> >>
> >>> > >> >> > > >> >
> >>> > >> >> > > >> >
> >>> > >> >> > > >> >
> >>> > >> >> > > >> >--
> >>> > >> >> > > >> >Thanks,
> >>> > >> >> > > >> >Ewen
> >>> > >> >> > > >>
> >>> > >> >> > > >>
> >>> > >> >> > > >
> >>> > >> >> > > >
> >>> > >> >> > > >--
> >>> > >> >> > > >-Regards,
> >>> > >> >> > > >Mayuresh R. Gharat
> >>> > >> >> > > >(862) 250-7125
> >>> > >> >> > >
> >>> > >> >> > >
> >>> > >> >> >
> >>> > >> >>
> >>> > >> >>
> >>> > >> >>
> >>> > >> >> --
> >>> > >> >> -Regards,
> >>> > >> >> Mayuresh R. Gharat
> >>> > >> >> (862) 250-7125
> >>> > >> >>
> >>> > >>
> >>> > >>
> >>> >
> >>> >
> >>>
> >>>
> >>> --
> >>> Thanks,
> >>> Ewen
> >>
> >>--
> >>Joel
> >>
> >
>
>


-- 
Thanks,
Ewen

Reply via email to