Option 3 seems a lot better than previous options, especially from the user's perspective. I think it gives reasonable balance between control and fewer options, and the only implementation details it's exposing are that there is a buffer and there is a network request. Making the request timeout only start after enqueuing still allows you to compute a maximum timeout for a request by adding the two values, but doesn't have annoying artifacts like sometimes issuing a network request when there's only a fraction of a millisecond left for it to complete.
REQUEST_TIMEOUT_DOC could probably add something about the retries, e.g. something like "This timeout is per retry, so the maximum time spent waiting for a request to complete will be (retries+1) * network.request.timeout.ms". There's also one other use of the metadata fetch timeout in partitionsFor. Are we converting that to use MAX_ENQUEUE_TIMEOUT_MS_CONFIG? The naming is a bit awkward, but we need to use something there. Finally, just a nit, but the naming conventions for variables are getting inconsistent. Some have _MS in them, some don't, and some of the _DOC names are inconsistent with the _CONFIG names. -Ewen On Mon, Jun 1, 2015 at 9:44 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Bump up this thread. > > After several discussions in LinkedIn, we came up with three options. I > have updated the KIP-19 wiki page to summarize the three options and > stated our preference. We can discuss on them in tomorrow’s KIP hangout. > Please let us know what do you think. > > Thanks, > > Jiangjie (Becket) Qin > > On 5/21/15, 5:54 PM, "Jiangjie Qin" <j...@linkedin.com> wrote: > > >Based on the discussion we have, I just updated the KIP with the following > >proposal and want to see if there is further comments. > > > >The proposal is to have the following four timeout as end state. > > > >1. max.buffer.full.block.ms - To replace block.on.buffer.full. The max > >time to block when buffer is full. > >2. metadata.fetch.timeout.ms - reuse metadata timeout as > batch.timeout.ms > >because it is essentially metadata not available. > >3. replication.timeout.ms - It defines how long a server will wait > for > >the records to be replicated to followers. > >4. network.request.timeout.ms - This timeout is used when producer sends > >request to brokers through TCP connections. It specifies how long the > >producer should wait for the response. > > > >With the above approach, we can achieve the following. > >* We can have bounded blocking time for send() = (1) + (2). > >* The time after send() until response got received is generally bounded > >by linger.ms + (2) + (4), not taking retries into consideration. > > > >So from user’s perspective. Send() depends on metadata of a topic and > >buffer space. I am not sure if user would really care about how long it > >takes to receive the response because it is async anyway and we have so > >many things to consider (retries, linger.ms, retry backoff time, request > >timeout, etc). > > > >I think these configurations are clear enough to let user understand at > >the first glance. Please let me know what do you think. > > > >Thanks. > > > >Jiangjie (Becket) Qin > > > > > > > >On 5/20/15, 9:55 AM, "Joel Koshy" <jjkosh...@gmail.com> wrote: > > > >>> The fact that I understand the producer internals and am still > >>>struggling > >>> to understand the implications of the different settings, how I would > >>>set > >>> them, and how they potentially interact such that I could set invalid > >>> combinations seems like a red flag to me... Being able to say "I want > >>> produce requests to timeout in 5s" shouldn't require adjusting 3 or 4 > >>> configs if the defaults would normally timeout out in something like > >>>30s. > >>> > >>> Setting aside compatibility issues and focusing on the best set of > >>>configs, > >>> I agree with Jay that there are two things I actually want out of the > >>>API. > >>> The key thing is a per-request timeout, which should be enforced client > >>> side. I would just expect this to follow the request through any > >>>internals > >>> so it can be enforced no matter where in the pipeline the request is. > >>> Within each component in the pipeline we might have to compute how much > >>> time we have left for the request in order to create a timeout within > >>>that > >>> setting. The second setting is to bound the amount of time spent > >>>blocking > >>> on send(). This is really an implementation detail, but one that people > >>>are > >>> complaining about enough that it seems worthwhile to provide control > >>>over > >>> it (and fixing it would just make that setting superfluous, not break > >>> anything). > >>> > >>> Exposing a lot more settings also exposes a lot about the > >>>implementation > >>> and makes it harder to improve the implementation in the future, but I > >>> don't think we have listed good use cases for setting each of them > >>> individually. Why would the user specifically care about how much time > >>>the > >>> request spends in the accumulator vs. some other component (assuming > >>>they > >>> have the overall timeout)? Same for requests in flight, as long as I > >>>have > >>> that client side timeout? And if they care about what component is the > >>> bottleneck, could that be better exposed by the exceptions that are > >>> returned rather than a ton of different settings? > >> > >>Agreed with the above. I'm also extremely wary of configs that are > >>inherently unintuitive, or can interact to yield unintuitive behavior. > >>OTOH I think it is okay if a config is categorized as "advanced" or if > >>it requires deeper knowledge of the internals of the producer (or the > >>configured system in general). i.e., as long as we think long and hard > >>and agree on necessity (driven by clear use cases) before adding such > >>configs. We should also consider how we can simplify or even eliminate > >>existing configs. > >> > >>Re: requests in flight may be a good example: Becket had given a valid > >>use-case i.e., support strict ordering. Maybe we can replace it with a > >>"enable.strict.ordering" config which is clearer in intent and would > >>internally ensure only one in-flight request per partition and default > >>to a fixed in-flight requests (say, five or 10) if set to false. If we > >>implement idempotence then we won't even need that. > >> > >>> On Tue, May 19, 2015 at 7:13 PM, Jiangjie Qin > >>><j...@linkedin.com.invalid> > >>> wrote: > >>> > >>> > Hi Jay, > >>> > > >>> > I updated what I think int KIP wiki. Just a short summary here. > >>>Because we > >>> > need timeout for: > >>> > 1. Send() > >>> > 2. Batches in accumulator > >>> > 3. Requests in flight. > >>> > That means we need to have at least three configurations if we do not > >>> > reuse configurations. > >>> > > >>> > I think we probably want to also separate the configurations for > >>>exception > >>> > handling and SLA purposes as well. > >>> > My understanding of the configurations we are discussing here is they > >>>are > >>> > for exception handling but not for SLA purposes. It looks to me that > >>> > exception handling is more component oriented while SLA is more of > >>> > systematic tuning. What you suggested sounds more like to set > >>> > configurations to meet a user defined SLA. I am not sure if this is > >>>the > >>> > things we want to do here. > >>> > > >>> > Thanks. > >>> > > >>> > Jiangjie (Becket) Qin > >>> > > >>> > On 5/19/15, 5:42 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote: > >>> > > >>> > >Yeah I think linger.ms remains separate, setting that is a > >>>performance > >>> > >optimization rather than failure handling thing. We should ideally > >>>sanity > >>> > >check this, though, in my proposal, since if they set linger.ms > > >>> > >request.timeout then that won't work. > >>> > > > >>> > >It's true that in my proposal that the actual replication timeout we > >>>set > >>> > >on > >>> > >the request would be non-deterministic. However the flip side of > >>>that > >>> > >argument is that in the existing proposal the actual time until an > >>> > >acknowledgement is non-deterministic, right? So I think the argument > >>>I am > >>> > >trying to construct is that the two things the user cares about are > >>>the > >>> > >time to block and the time to ack and any other timeout we use > >>>internally > >>> > >is basically an implementation detail of ensuring this. > >>> > > > >>> > >Your point about the difference between batches and requests is a > >>>good > >>> > >one. > >>> > >I hadn't thought of that. So to make my proposal work we would need > >>>to do > >>> > >something like base the request time off the oldest batch. Let me > >>>think > >>> > >about the implications of that, it's definitely a problem. > >>> > > > >>> > >-Jay > >>> > > > >>> > >On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin > >>><j...@linkedin.com.invalid > >>> > > > >>> > >wrote: > >>> > > > >>> > >> Hey Jay, > >>> > >> > >>> > >> That is also a viable solution. > >>> > >> > >>> > >> I think the main purpose is to let user know how long they can > >>>block, > >>> > >> which is important. > >>> > >> > >>> > >> I have some question over the proposal, though. Will user still > >>>need to > >>> > >> send linger.ms? Will request timeout cover linger.ms as well? > >>> > >> My concern of letting request timeout also cover the time spent in > >>> > >> accumulator is that this will result in the actually request > >>>timeout > >>> > >> indeterministic. > >>> > >> Also, implementation wise, a request can have multiple batches, > >>>the time > >>> > >> spent in the accumulator could vary a lot. If one of the batch > >>>times > >>> > >>out, > >>> > >> what should we do the the rest of the batches? > >>> > >> I think we probably want to separate batch timeout and request > >>>timeout. > >>> > >> > >>> > >> Maybe we can do this: > >>> > >> Max.send.block.ms > >>> > >> Request.timeout > >>> > >> Batch.timeout > >>> > >> Replication.timeout > >>> > >> > >>> > >> So in send() we use max.send.block.ms only. In accumulator, we > use > >>> > >> batch.timeout, in NetWorkClient, we use request.timeout. > >>>Replication > >>> > >> timeout is needed anyway. > >>> > >> > >>> > >> This looks more understandable from what I can see. > >>> > >> > >>> > >> What do you think? > >>> > >> > >>> > >> Jiangjie (Becket) Qin > >>> > >> > >>> > >> On 5/19/15, 11:48 AM, "Jay Kreps" <jay.kr...@gmail.com> wrote: > >>> > >> > >>> > >> >So the alternative to consider would be to instead have > >>> > >> > max.block.ms (or something) > >>> > >> > request.timeout > >>> > >> > replication.timeout > >>> > >> > > >>> > >> >I think this better captures what the user cares about. Here is > >>>how it > >>> > >> >would work. > >>> > >> > > >>> > >> >*max.send.block.ms <http://max.send.block.ms>* is the bound on > >>>the > >>> > >> maximum > >>> > >> >time the producer.send() call can block. > >>> > >> >This subsumes the existing metadata timeout use case but not the > >>> > >>proposed > >>> > >> >use for the time in the accumulator. It *also* acts as a bound on > >>>the > >>> > >>time > >>> > >> >you can block on BufferPool allocation (we'd have to add this but > >>>that > >>> > >> >should be easy). > >>> > >> > > >>> > >> >*request.timeout* is the bound on the time after send() complete > >>>until > >>> > >>you > >>> > >> >get an acknowledgement. This covers the connection timeout, and > >>>the > >>> > >>time > >>> > >> >in > >>> > >> >the accumulator. So to implement this, the time we set in the > >>>request > >>> > >>sent > >>> > >> >via NetworkClient would have already subtracted off the time > >>>spent in > >>> > >>the > >>> > >> >accumulator, and if the request retried we would include both the > >>>time > >>> > >>in > >>> > >> >the accumulator an the time taken for the first request, etc. In > >>>other > >>> > >> >words this is the upper bound on the time to the Future being > >>> > >>satisfied. > >>> > >> > > >>> > >> >*replication.timeout* will default to something reasonable but > >>>maybe > >>> > >>you > >>> > >> >can override it if you want? > >>> > >> > > >>> > >> >Thoughts? > >>> > >> > > >>> > >> >-Jay > >>> > >> > > >>> > >> >On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat < > >>> > >> >gharatmayures...@gmail.com> wrote: > >>> > >> > > >>> > >> >> So what I understand is that, we would have 3 time outs : > >>> > >> >> 1) replication timeout > >>> > >> >> 2) request timeout > >>> > >> >> 3) metadata timeout (existing) > >>> > >> >> > >>> > >> >> The request timeout has to be greater than the replication > >>>timeout. > >>> > >> >> request timeout is for messages already sent to kafka and the > >>> > >>producer > >>> > >> >>is > >>> > >> >> waiting for them. > >>> > >> >> > >>> > >> >> Thanks, > >>> > >> >> > >>> > >> >> Mayuresh > >>> > >> >> > >>> > >> >> On Tue, May 19, 2015 at 11:12 AM, Jay Kreps > >>><jay.kr...@gmail.com> > >>> > >> wrote: > >>> > >> >> > >>> > >> >> > I think this looks good. What I think is missing is an > >>>overview of > >>> > >>the > >>> > >> >> > timeouts from the user's perspective. > >>> > >> >> > > >>> > >> >> > My worry is that it is quite complicated to reason about the > >>> > >>current > >>> > >> >>set > >>> > >> >> of > >>> > >> >> > timeouts. Currently we have > >>> > >> >> > timeout.ms > >>> > >> >> > metadata.fetch.timeout.ms > >>> > >> >> > > >>> > >> >> > The proposed settings I think are: > >>> > >> >> > batch.expiration.ms > >>> > >> >> > request.timeout.ms > >>> > >> >> > replication.timeout.ms > >>> > >> >> > > >>> > >> >> > I think maybe we can skip the batch.expiration.ms. Instead > >>>maybe > >>> > we > >>> > >> >>can > >>> > >> >> > somehow combine these into a single request timeout so that > >>>we > >>> > >> >>subtract > >>> > >> >> the > >>> > >> >> > time you spent waiting from the request timeout and/or > >>>replication > >>> > >> >> timeout > >>> > >> >> > somehow? I don't have an explicit proposal but my suspicion > >>>is that > >>> > >> >>from > >>> > >> >> > the user's point of view there is just one timeout related to > >>>the > >>> > >> >>request > >>> > >> >> > after which they don't care, and we can split that up between > >>>the > >>> > >> >>batch > >>> > >> >> > time and the request time. Thoughts? > >>> > >> >> > > >>> > >> >> > How are we handling connection timeouts? If a machine hard > >>>fails in > >>> > >> >>the > >>> > >> >> > middle of connection establishment there will be no > >>>outstanding > >>> > >> >> requests. I > >>> > >> >> > think this may be okay because connections are established > >>>when we > >>> > >> >>want > >>> > >> >> to > >>> > >> >> > send a request and presumably we will begin the timer then? > >>> > >> >> > > >>> > >> >> > To that end I suggest we do two things: > >>> > >> >> > 1. Include KAKFA-1788. I know that technically these two > >>>things are > >>> > >> >> > different but from the user's point of view they aren't. > >>> > >> >> > 2. Include in the KIP the explanation to the user of the full > >>>set > >>> > >>of > >>> > >> >> > timeouts, what they mean, how we will default them, and when > >>>to > >>> > >> >>override > >>> > >> >> > which. > >>> > >> >> > > >>> > >> >> > I know this is a hassle but I think the end experience will > >>>be a > >>> > >>lot > >>> > >> >> better > >>> > >> >> > if we go through this thought process. > >>> > >> >> > > >>> > >> >> > -Jay > >>> > >> >> > > >>> > >> >> > On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin > >>> > >> >><j...@linkedin.com.invalid > >>> > >> >> > > >>> > >> >> > wrote: > >>> > >> >> > > >>> > >> >> > > I modified the WIKI page to incorporate the feedbacks from > >>> > >>mailing > >>> > >> >>list > >>> > >> >> > > and KIP hangout. > >>> > >> >> > > > >>> > >> >> > > - Added the deprecation plan for TIMEOUT_CONFIG > >>> > >> >> > > - Added the actions to take after request timeout > >>> > >> >> > > > >>> > >> >> > > I finally chose to create a new connection if requests > >>>timeout. > >>> > >>The > >>> > >> >> > reason > >>> > >> >> > > is: > >>> > >> >> > > 1. In most cases, if a broker is just slow, as long as we > >>>set > >>> > >> >>request > >>> > >> >> > > timeout to be a reasonable value, we should not see many > >>>new > >>> > >> >> connections > >>> > >> >> > > get created. > >>> > >> >> > > 2. If a broker is down, hopefully metadata refresh will > >>>find the > >>> > >>new > >>> > >> >> > > broker and we will not try to reconnect to the broker > >>>anymore. > >>> > >> >> > > > >>> > >> >> > > Comments are welcome! > >>> > >> >> > > > >>> > >> >> > > Thanks. > >>> > >> >> > > > >>> > >> >> > > Jiangjie (Becket) Qin > >>> > >> >> > > > >>> > >> >> > > On 5/12/15, 2:59 PM, "Mayuresh Gharat" > >>> > >><gharatmayures...@gmail.com> > >>> > >> >> > wrote: > >>> > >> >> > > > >>> > >> >> > > >+1 Becket. That would give enough time for clients to > >>>move. We > >>> > >> >>should > >>> > >> >> > make > >>> > >> >> > > >this change very clear. > >>> > >> >> > > > > >>> > >> >> > > >Thanks, > >>> > >> >> > > > > >>> > >> >> > > >Mayuresh > >>> > >> >> > > > > >>> > >> >> > > >On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin > >>> > >> >> <j...@linkedin.com.invalid > >>> > >> >> > > > >>> > >> >> > > >wrote: > >>> > >> >> > > > > >>> > >> >> > > >> Hey Ewen, > >>> > >> >> > > >> > >>> > >> >> > > >> Very good summary about the compatibility. What you > >>>proposed > >>> > >> >>makes > >>> > >> >> > > >>sense. > >>> > >> >> > > >> So basically we can do the following: > >>> > >> >> > > >> > >>> > >> >> > > >> In next release, i.e. 0.8.3: > >>> > >> >> > > >> 1. Add REPLICATION_TIMEOUT_CONFIG > >>>(“replication.timeout.ms”) > >>> > >> >> > > >> 2. Mark TIMEOUT_CONFIG as deprecated > >>> > >> >> > > >> 3. Override REPLICATION_TIMEOUT_CONFIG with > >>>TIMEOUT_CONFIG if > >>> > >>it > >>> > >> >>is > >>> > >> >> > > >> defined and give a warning about deprecation. > >>> > >> >> > > >> In the release after 0.8.3, we remove TIMEOUT_CONFIG. > >>> > >> >> > > >> > >>> > >> >> > > >> This should give enough buffer for this change. > >>> > >> >> > > >> > >>> > >> >> > > >> Request timeout is a complete new thing we add to fix a > >>>bug, > >>> > >>I’m > >>> > >> >> with > >>> > >> >> > > >>you > >>> > >> >> > > >> it does not make sense to have it maintain the old buggy > >>> > >> >>behavior. > >>> > >> >> So > >>> > >> >> > we > >>> > >> >> > > >> can set it to a reasonable value instead of infinite. > >>> > >> >> > > >> > >>> > >> >> > > >> Jiangjie (Becket) Qin > >>> > >> >> > > >> > >>> > >> >> > > >> On 5/12/15, 12:03 PM, "Ewen Cheslack-Postava" > >>> > >><e...@confluent.io > >>> > >> > > >>> > >> >> > > wrote: > >>> > >> >> > > >> > >>> > >> >> > > >> >I think my confusion is coming from this: > >>> > >> >> > > >> > > >>> > >> >> > > >> >> So in this KIP, we only address (3). The only public > >>> > >>interface > >>> > >> >> > change > >>> > >> >> > > >> >>is a > >>> > >> >> > > >> >> new configuration of request timeout (and maybe > >>>change the > >>> > >> >> > > >>configuration > >>> > >> >> > > >> >> name of TIMEOUT_CONFIG to > >>>REPLICATION_TIMEOUT_CONFIG). > >>> > >> >> > > >> > > >>> > >> >> > > >> >There are 3 possible compatibility issues I see here: > >>> > >> >> > > >> > > >>> > >> >> > > >> >* I assumed this meant the constants also change, so > >>> > >> >>"timeout.ms" > >>> > >> >> > > >>becomes > >>> > >> >> > > >> >" > >>> > >> >> > > >> >replication.timeout.ms". This breaks config files that > >>> > worked > >>> > >> on > >>> > >> >> the > >>> > >> >> > > >> >previous version and the only warning would be in > >>>release > >>> > >> >>notes. We > >>> > >> >> > do > >>> > >> >> > > >> >warn > >>> > >> >> > > >> >about unused configs so they might notice the problem. > >>> > >> >> > > >> > > >>> > >> >> > > >> >* Binary and source compatibility if someone configures > >>>their > >>> > >> >> client > >>> > >> >> > in > >>> > >> >> > > >> >code and uses the TIMEOUT_CONFIG variable. Renaming it > >>>will > >>> > >> >>cause > >>> > >> >> > > >>existing > >>> > >> >> > > >> >jars to break if you try to run against an updated > >>>client > >>> > >>(which > >>> > >> >> > seems > >>> > >> >> > > >>not > >>> > >> >> > > >> >very significant since I doubt people upgrade these > >>>without > >>> > >> >> > recompiling > >>> > >> >> > > >> >but > >>> > >> >> > > >> >maybe I'm wrong about that). And it breaks builds > >>>without > >>> > >>have > >>> > >> >> > > >>deprecated > >>> > >> >> > > >> >that field first, which again, is probably not the > >>>biggest > >>> > >>issue > >>> > >> >> but > >>> > >> >> > is > >>> > >> >> > > >> >annoying for users and when we accidentally changed the > >>>API > >>> > >>we > >>> > >> >> > > >>received a > >>> > >> >> > > >> >complaint about breaking builds. > >>> > >> >> > > >> > > >>> > >> >> > > >> >* Behavior compatibility as Jay mentioned on the call > >>>-- > >>> > >>setting > >>> > >> >> the > >>> > >> >> > > >> >config > >>> > >> >> > > >> >(even if the name changed) doesn't have the same effect > >>>it > >>> > >>used > >>> > >> >>to. > >>> > >> >> > > >> > > >>> > >> >> > > >> >One solution, which admittedly is more painful to > >>>implement > >>> > >>and > >>> > >> >> > > >>maintain, > >>> > >> >> > > >> >would be to maintain the timeout.ms config, have it > >>>override > >>> > >> the > >>> > >> >> > > others > >>> > >> >> > > >> if > >>> > >> >> > > >> >it is specified (including an infinite request timeout > >>>I > >>> > >> >>guess?), > >>> > >> >> and > >>> > >> >> > > >>if > >>> > >> >> > > >> >it > >>> > >> >> > > >> >isn't specified, we can just use the new config > >>>variables. > >>> > >> >>Given a > >>> > >> >> > real > >>> > >> >> > > >> >deprecation schedule, users would have better warning > >>>of > >>> > >>changes > >>> > >> >> and > >>> > >> >> > a > >>> > >> >> > > >> >window to make the changes. > >>> > >> >> > > >> > > >>> > >> >> > > >> >I actually think it might not be necessary to maintain > >>>the > >>> > >>old > >>> > >> >> > behavior > >>> > >> >> > > >> >precisely, although maybe for some code it is an issue > >>>if > >>> > >>they > >>> > >> >> start > >>> > >> >> > > >> >seeing > >>> > >> >> > > >> >timeout exceptions that they wouldn't have seen before? > >>> > >> >> > > >> > > >>> > >> >> > > >> >-Ewen > >>> > >> >> > > >> > > >>> > >> >> > > >> >On Wed, May 6, 2015 at 6:06 PM, Jun Rao > >>><j...@confluent.io> > >>> > >> >>wrote: > >>> > >> >> > > >> > > >>> > >> >> > > >> >> Jiangjie, > >>> > >> >> > > >> >> > >>> > >> >> > > >> >> Yes, I think using metadata timeout to expire batches > >>>in > >>> > >>the > >>> > >> >> record > >>> > >> >> > > >> >> accumulator makes sense. > >>> > >> >> > > >> >> > >>> > >> >> > > >> >> Thanks, > >>> > >> >> > > >> >> > >>> > >> >> > > >> >> Jun > >>> > >> >> > > >> >> > >>> > >> >> > > >> >> On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin > >>> > >> >> > > >> >><j...@linkedin.com.invalid> > >>> > >> >> > > >> >> wrote: > >>> > >> >> > > >> >> > >>> > >> >> > > >> >> > I incorporated Ewen and Guozhang’s comments in the > >>>KIP > >>> > >>page. > >>> > >> >> Want > >>> > >> >> > > >>to > >>> > >> >> > > >> >> speed > >>> > >> >> > > >> >> > up on this KIP because currently we experience > >>> > >>mirror-maker > >>> > >> >> hung > >>> > >> >> > > >>very > >>> > >> >> > > >> >> > likely when a broker is down. > >>> > >> >> > > >> >> > > >>> > >> >> > > >> >> > I also took a shot to solve KAFKA-1788 in > >>>KAFKA-2142. I > >>> > >>used > >>> > >> >> > > >>metadata > >>> > >> >> > > >> >> > timeout to expire the batches which are sitting in > >>> > >> >>accumulator > >>> > >> >> > > >>without > >>> > >> >> > > >> >> > leader info. I did that because the situation there > >>>is > >>> > >> >> > essentially > >>> > >> >> > > >> >> missing > >>> > >> >> > > >> >> > metadata. > >>> > >> >> > > >> >> > > >>> > >> >> > > >> >> > As a summary of what I am thinking about the > >>>timeout in > >>> > >>new > >>> > >> >> > > >>Producer: > >>> > >> >> > > >> >> > > >>> > >> >> > > >> >> > 1. Metadata timeout: > >>> > >> >> > > >> >> > - used in send(), blocking > >>> > >> >> > > >> >> > - used in accumulator to expire batches with > >>>timeout > >>> > >> >> exception. > >>> > >> >> > > >> >> > 2. Linger.ms > >>> > >> >> > > >> >> > - Used in accumulator to ready the batch for > >>>drain > >>> > >> >> > > >> >> > 3. Request timeout > >>> > >> >> > > >> >> > - Used in NetworkClient to expire a batch and > >>>retry if > >>> > >>no > >>> > >> >> > > >>response > >>> > >> >> > > >> >>is > >>> > >> >> > > >> >> > received for a request before timeout. > >>> > >> >> > > >> >> > > >>> > >> >> > > >> >> > So in this KIP, we only address (3). The only > >>>public > >>> > >> >>interface > >>> > >> >> > > >>change > >>> > >> >> > > >> >>is > >>> > >> >> > > >> >> a > >>> > >> >> > > >> >> > new configuration of request timeout (and maybe > >>>change > >>> > >>the > >>> > >> >> > > >> >>configuration > >>> > >> >> > > >> >> > name of TIMEOUT_CONFIG to > >>>REPLICATION_TIMEOUT_CONFIG). > >>> > >> >> > > >> >> > > >>> > >> >> > > >> >> > Would like to see what people think of above > >>>approach? > >>> > >> >> > > >> >> > > >>> > >> >> > > >> >> > Jiangjie (Becket) Qin > >>> > >> >> > > >> >> > > >>> > >> >> > > >> >> > On 4/20/15, 6:02 PM, "Jiangjie Qin" > >>><j...@linkedin.com> > >>> > >> >>wrote: > >>> > >> >> > > >> >> > > >>> > >> >> > > >> >> > >Jun, > >>> > >> >> > > >> >> > > > >>> > >> >> > > >> >> > >I thought a little bit differently on this. > >>> > >> >> > > >> >> > >Intuitively, I am thinking that if a partition is > >>> > >>offline, > >>> > >> >>the > >>> > >> >> > > >> >>metadata > >>> > >> >> > > >> >> > >for that partition should be considered not ready > >>> > >>because > >>> > >> >>we > >>> > >> >> > don’t > >>> > >> >> > > >> >>know > >>> > >> >> > > >> >> > >which broker we should send the message to. So > >>>those > >>> > >>sends > >>> > >> >> need > >>> > >> >> > > >>to be > >>> > >> >> > > >> >> > >blocked on metadata timeout. > >>> > >> >> > > >> >> > >Another thing I’m wondering is in which scenario > >>>an > >>> > >>offline > >>> > >> >> > > >>partition > >>> > >> >> > > >> >> will > >>> > >> >> > > >> >> > >become online again in a short period of time and > >>>how > >>> > >> >>likely > >>> > >> >> it > >>> > >> >> > > >>will > >>> > >> >> > > >> >> > >occur. My understanding is that the batch timeout > >>>for > >>> > >> >>batches > >>> > >> >> > > >> >>sitting in > >>> > >> >> > > >> >> > >accumulator should be larger than linger.ms but > >>>should > >>> > >>not > >>> > >> >>be > >>> > >> >> > too > >>> > >> >> > > >> >>long > >>> > >> >> > > >> >> > >(e.g. less than 60 seconds). Otherwise it will > >>>exhaust > >>> > >>the > >>> > >> >> > shared > >>> > >> >> > > >> >>buffer > >>> > >> >> > > >> >> > >with batches to be aborted. > >>> > >> >> > > >> >> > > > >>> > >> >> > > >> >> > >That said, I do agree it is reasonable to buffer > >>>the > >>> > >> >>message > >>> > >> >> for > >>> > >> >> > > >>some > >>> > >> >> > > >> >> time > >>> > >> >> > > >> >> > >so messages to other partitions can still get > >>>sent. But > >>> > >> >>adding > >>> > >> >> > > >> >>another > >>> > >> >> > > >> >> > >expiration in addition to linger.ms - which is > >>> > >>essentially > >>> > >> >>a > >>> > >> >> > > >>timeout > >>> > >> >> > > >> >>- > >>> > >> >> > > >> >> > >sounds a little bit confusing. Maybe we can do > >>>this, let > >>> > >> >>the > >>> > >> >> > batch > >>> > >> >> > > >> >>sit > >>> > >> >> > > >> >> in > >>> > >> >> > > >> >> > >accumulator up to linger.ms, then fail it if > >>>necessary. > >>> > >> >> > > >> >> > > > >>> > >> >> > > >> >> > >What do you think? > >>> > >> >> > > >> >> > > > >>> > >> >> > > >> >> > >Thanks, > >>> > >> >> > > >> >> > > > >>> > >> >> > > >> >> > >Jiangjie (Becket) Qin > >>> > >> >> > > >> >> > > > >>> > >> >> > > >> >> > >On 4/20/15, 1:11 PM, "Jun Rao" <j...@confluent.io> > >>> > wrote: > >>> > >> >> > > >> >> > > > >>> > >> >> > > >> >> > >>Jiangjie, > >>> > >> >> > > >> >> > >> > >>> > >> >> > > >> >> > >>Allowing messages to be accumulated in an offline > >>> > >> >>partition > >>> > >> >> > > >>could be > >>> > >> >> > > >> >> > >>useful > >>> > >> >> > > >> >> > >>since the partition may become available before > >>>the > >>> > >> >>request > >>> > >> >> > > >>timeout > >>> > >> >> > > >> >>or > >>> > >> >> > > >> >> > >>linger time is reached. Now that we are planning > >>>to > >>> > >>add a > >>> > >> >>new > >>> > >> >> > > >> >>timeout, > >>> > >> >> > > >> >> it > >>> > >> >> > > >> >> > >>would be useful to think through whether/how that > >>> > >>applies > >>> > >> >>to > >>> > >> >> > > >> >>messages > >>> > >> >> > > >> >> in > >>> > >> >> > > >> >> > >>the accumulator too. > >>> > >> >> > > >> >> > >> > >>> > >> >> > > >> >> > >>Thanks, > >>> > >> >> > > >> >> > >> > >>> > >> >> > > >> >> > >>Jun > >>> > >> >> > > >> >> > >> > >>> > >> >> > > >> >> > >> > >>> > >> >> > > >> >> > >>On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin > >>> > >> >> > > >> >> <j...@linkedin.com.invalid > >>> > >> >> > > >> >> > > > >>> > >> >> > > >> >> > >>wrote: > >>> > >> >> > > >> >> > >> > >>> > >> >> > > >> >> > >>> Hi Harsha, > >>> > >> >> > > >> >> > >>> > >>> > >> >> > > >> >> > >>> Took a quick look at the patch. I think it is > >>>still a > >>> > >> >> little > >>> > >> >> > > >>bit > >>> > >> >> > > >> >> > >>> different. KAFKA-1788 only handles the case > >>>where a > >>> > >> >>batch > >>> > >> >> > > >>sitting > >>> > >> >> > > >> >>in > >>> > >> >> > > >> >> > >>> accumulator for too long. The KIP is trying to > >>>solve > >>> > >>the > >>> > >> >> > issue > >>> > >> >> > > >> >>where > >>> > >> >> > > >> >> a > >>> > >> >> > > >> >> > >>> batch has already been drained from accumulator > >>>and > >>> > >> >>sent to > >>> > >> >> > > >> >>broker. > >>> > >> >> > > >> >> > >>> We might be able to apply timeout on batch > >>>level to > >>> > >> >>merge > >>> > >> >> > those > >>> > >> >> > > >> >>two > >>> > >> >> > > >> >> > >>>cases > >>> > >> >> > > >> >> > >>> as Ewen suggested. But I’m not sure if it is a > >>>good > >>> > >> >>idea to > >>> > >> >> > > >>allow > >>> > >> >> > > >> >> > >>>messages > >>> > >> >> > > >> >> > >>> whose target partition is offline to sit in > >>> > >>accumulator > >>> > >> >>in > >>> > >> >> > the > >>> > >> >> > > >> >>first > >>> > >> >> > > >> >> > >>>place. > >>> > >> >> > > >> >> > >>> > >>> > >> >> > > >> >> > >>> Jiangjie (Becket) Qin > >>> > >> >> > > >> >> > >>> > >>> > >> >> > > >> >> > >>> On 4/16/15, 10:19 AM, "Sriharsha Chintalapani" > >>> > >> >> > > >><ka...@harsha.io> > >>> > >> >> > > >> >> > wrote: > >>> > >> >> > > >> >> > >>> > >>> > >> >> > > >> >> > >>> >Guozhang and Jiangjie, > >>> > >> >> > > >> >> > >>> > Isn’t this work being covered > >>>in > >>> > >> >> > > >> >> > >>> > >>>>https://issues.apache.org/jira/browse/KAFKA-1788 . > >>> > >>Can > >>> > >> >> you > >>> > >> >> > > >> please > >>> > >> >> > > >> >> the > >>> > >> >> > > >> >> > >>> >review the patch there. > >>> > >> >> > > >> >> > >>> >Thanks, > >>> > >> >> > > >> >> > >>> >Harsha > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> >On April 15, 2015 at 10:39:40 PM, Guozhang > >>>Wang > >>> > >> >> > > >> >>(wangg...@gmail.com > >>> > >> >> > > >> >> ) > >>> > >> >> > > >> >> > >>> >wrote: > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> >Thanks for the update Jiangjie, > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> >I think it is actually NOT expected that > >>>hardware > >>> > >> >> > > >>disconnection > >>> > >> >> > > >> >>will > >>> > >> >> > > >> >> > >>>be > >>> > >> >> > > >> >> > >>> >detected by the selector, but rather will only > >>>be > >>> > >> >>revealed > >>> > >> >> > > >>upon > >>> > >> >> > > >> >>TCP > >>> > >> >> > > >> >> > >>> >timeout, which could be hours. > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> >A couple of comments on the wiki: > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> >1. "For KafkaProducer.close() and > >>> > >> >>KafkaProducer.flush() we > >>> > >> >> > > >>need > >>> > >> >> > > >> >>the > >>> > >> >> > > >> >> > >>> >request > >>> > >> >> > > >> >> > >>> >timeout as implict timeout." I am not very > >>>clear > >>> > >>what > >>> > >> >>does > >>> > >> >> > > >>this > >>> > >> >> > > >> >> mean? > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> >2. Currently the producer already has a > >>> > >> >>"TIMEOUT_CONFIG" > >>> > >> >> > which > >>> > >> >> > > >> >> should > >>> > >> >> > > >> >> > >>> >really be "REPLICATION_TIMEOUT_CONFIG". So if > >>>we > >>> > >> >>decide to > >>> > >> >> > > >>add " > >>> > >> >> > > >> >> > >>> >REQUEST_TIMEOUT_CONFIG", I suggest we also > >>>make this > >>> > >> >> > renaming: > >>> > >> >> > > >> >> > >>>admittedly > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> >it will change the config names but will > >>>reduce > >>> > >> >>confusions > >>> > >> >> > > >>moving > >>> > >> >> > > >> >> > >>> >forward. > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> >Guozhang > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin > >>> > >> >> > > >> >> > >>><j...@linkedin.com.invalid> > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> >wrote: > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> >> Checked the code again. It seems that the > >>> > >> >>disconnected > >>> > >> >> > > >>channel > >>> > >> >> > > >> >>is > >>> > >> >> > > >> >> > >>>not > >>> > >> >> > > >> >> > >>> >> detected by selector as expected. > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> Currently we are depending on the > >>> > >> >> > > >> >> > >>> >> o.a.k.common.network.Selector.disconnected > >>>set to > >>> > >> >>see if > >>> > >> >> > we > >>> > >> >> > > >> >>need > >>> > >> >> > > >> >> to > >>> > >> >> > > >> >> > >>>do > >>> > >> >> > > >> >> > >>> >> something for a disconnected channel. > >>> > >> >> > > >> >> > >>> >> However Selector.disconnected set is only > >>>updated > >>> > >> >>when: > >>> > >> >> > > >> >> > >>> >> 1. A write/read/connect to channel failed. > >>> > >> >> > > >> >> > >>> >> 2. A Key is canceled > >>> > >> >> > > >> >> > >>> >> However when a broker is down before it > >>>sends back > >>> > >> >>the > >>> > >> >> > > >> >>response, > >>> > >> >> > > >> >> the > >>> > >> >> > > >> >> > >>> >> client seems not be able to detect this > >>>failure. > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> I did a simple test below: > >>> > >> >> > > >> >> > >>> >> 1. Run a selector on one machine and an echo > >>> > >>server > >>> > >> >>on > >>> > >> >> > > >>another > >>> > >> >> > > >> >> > >>>machine. > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> Connect a selector to an echo server > >>> > >> >> > > >> >> > >>> >> 2. Send a message to echo server using > >>>selector, > >>> > >>then > >>> > >> >> let > >>> > >> >> > > >>the > >>> > >> >> > > >> >> > >>>selector > >>> > >> >> > > >> >> > >>> >> poll() every 10 seconds. > >>> > >> >> > > >> >> > >>> >> 3. After the sever received the message, > >>>unplug > >>> > >> >>cable on > >>> > >> >> > the > >>> > >> >> > > >> >>echo > >>> > >> >> > > >> >> > >>> >>server. > >>> > >> >> > > >> >> > >>> >> 4. After waiting for 45 min. The selector > >>>still > >>> > >>did > >>> > >> >>not > >>> > >> >> > > >> >>detected > >>> > >> >> > > >> >> the > >>> > >> >> > > >> >> > >>> >> network failure. > >>> > >> >> > > >> >> > >>> >> Lsof on selector machine shows that the TCP > >>> > >> >>connection > >>> > >> >> is > >>> > >> >> > > >>still > >>> > >> >> > > >> >> > >>> >>considered > >>> > >> >> > > >> >> > >>> >> ESTABLISHED. > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> I’m not sure in this case what should we > >>>expect > >>> > >>from > >>> > >> >>the > >>> > >> >> > > >> >> > >>> >> java.nio.channels.Selector. According to the > >>> > >> >>document, > >>> > >> >> the > >>> > >> >> > > >> >> selector > >>> > >> >> > > >> >> > >>> >>does > >>> > >> >> > > >> >> > >>> >> not verify the status of the associated > >>>channel. > >>> > >>In > >>> > >> >>my > >>> > >> >> > test > >>> > >> >> > > >> >>case > >>> > >> >> > > >> >> it > >>> > >> >> > > >> >> > >>> >>looks > >>> > >> >> > > >> >> > >>> >> even worse that OS did not think of the > >>>socket has > >>> > >> >>been > >>> > >> >> > > >> >> > >>>disconnected. > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> Anyway. It seems adding the client side > >>>request > >>> > >> >>timeout > >>> > >> >> is > >>> > >> >> > > >> >> > >>>necessary. > >>> > >> >> > > >> >> > >>> >>I’ve > >>> > >> >> > > >> >> > >>> >> updated the KIP page to clarify the problem > >>>we > >>> > >>want > >>> > >> >>to > >>> > >> >> > solve > >>> > >> >> > > >> >> > >>>according > >>> > >> >> > > >> >> > >>> >>to > >>> > >> >> > > >> >> > >>> >> Ewen’s comments. > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> Thanks. > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> Jiangjie (Becket) Qin > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" > >>> > >> >> > > >> >><e...@confluent.io> > >>> > >> >> > > >> >> > >>>wrote: > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie > >>>Qin > >>> > >> >> > > >> >> > >>> >><j...@linkedin.com.invalid> > >>> > >> >> > > >> >> > >>> >> >wrote: > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> >> Hi Ewen, thanks for the comments. Very > >>>good > >>> > >> >>points! > >>> > >> >> > > >>Please > >>> > >> >> > > >> >>see > >>> > >> >> > > >> >> > >>> >>replies > >>> > >> >> > > >> >> > >>> >> >> inline. > >>> > >> >> > > >> >> > >>> >> >> > >>> > >> >> > > >> >> > >>> >> >> > >>> > >> >> > > >> >> > >>> >> >> On 4/13/15, 11:19 PM, "Ewen > >>>Cheslack-Postava" < > >>> > >> >> > > >> >> e...@confluent.io > >>> > >> >> > > >> >> > > > >>> > >> >> > > >> >> > >>> >> wrote: > >>> > >> >> > > >> >> > >>> >> >> > >>> > >> >> > > >> >> > >>> >> >> >Jiangjie, > >>> > >> >> > > >> >> > >>> >> >> > > >>> > >> >> > > >> >> > >>> >> >> >Great start. I have a couple of > >>>comments. > >>> > >> >> > > >> >> > >>> >> >> > > >>> > >> >> > > >> >> > >>> >> >> >Under the motivation section, is it > >>>really > >>> > >>true > >>> > >> >>that > >>> > >> >> > the > >>> > >> >> > > >> >> request > >>> > >> >> > > >> >> > >>> >>will > >>> > >> >> > > >> >> > >>> >> >> >never > >>> > >> >> > > >> >> > >>> >> >> >be completed? Presumably if the broker > >>>goes > >>> > >>down > >>> > >> >>the > >>> > >> >> > > >> >> connection > >>> > >> >> > > >> >> > >>> >>will be > >>> > >> >> > > >> >> > >>> >> >> >severed, at worst by a TCP timeout, > >>>which > >>> > >>should > >>> > >> >> clean > >>> > >> >> > > >>up > >>> > >> >> > > >> >>the > >>> > >> >> > > >> >> > >>> >> >>connection > >>> > >> >> > > >> >> > >>> >> >> >and any outstanding requests, right? I > >>>think > >>> > >>the > >>> > >> >> real > >>> > >> >> > > >> >>reason > >>> > >> >> > > >> >> we > >>> > >> >> > > >> >> > >>> >>need a > >>> > >> >> > > >> >> > >>> >> >> >different timeout is that the default > >>>TCP > >>> > >> >>timeouts > >>> > >> >> are > >>> > >> >> > > >> >> > >>>ridiculously > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> >>long > >>> > >> >> > > >> >> > >>> >> >> >in > >>> > >> >> > > >> >> > >>> >> >> >this context. > >>> > >> >> > > >> >> > >>> >> >> Yes, when broker is completely down the > >>>request > >>> > >> >> should > >>> > >> >> > be > >>> > >> >> > > >> >> cleared > >>> > >> >> > > >> >> > >>>as > >>> > >> >> > > >> >> > >>> >>you > >>> > >> >> > > >> >> > >>> >> >> said. The case we encountered looks like > >>>the > >>> > >> >>broker > >>> > >> >> was > >>> > >> >> > > >>just > >>> > >> >> > > >> >> not > >>> > >> >> > > >> >> > >>> >> >> responding but TCP connection was still > >>>alive > >>> > >> >>though. > >>> > >> >> > > >> >> > >>> >> >> > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> >Ok, that makes sense. > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> >> > >>> > >> >> > > >> >> > >>> >> >> > > >>> > >> >> > > >> >> > >>> >> >> >My second question is about whether > >>>this > >>>is > >>> > >>the > >>> > >> >> right > >>> > >> >> > > >> >>level to > >>> > >> >> > > >> >> > >>> >>tackle > >>> > >> >> > > >> >> > >>> >> >>the > >>> > >> >> > > >> >> > >>> >> >> >issue/what user-facing changes need to > >>>be > >>> > >>made. A > >>> > >> >> > > >>related > >>> > >> >> > > >> >> > >>>problem > >>> > >> >> > > >> >> > >>> >>came > >>> > >> >> > > >> >> > >>> >> >>up > >>> > >> >> > > >> >> > >>> >> >> >in > >>> > >> >>https://issues.apache.org/jira/browse/KAFKA-1788 > >>> > >> >> > > >>where > >>> > >> >> > > >> >> > >>>producer > >>> > >> >> > > >> >> > >>> >> >> records > >>> > >> >> > > >> >> > >>> >> >> >get stuck indefinitely because there's > >>>no > >>> > >> >> client-side > >>> > >> >> > > >> >>timeout. > >>> > >> >> > > >> >> > >>>This > >>> > >> >> > > >> >> > >>> >>KIP > >>> > >> >> > > >> >> > >>> >> >> >wouldn't fix that problem or any > >>>problems > >>> > >>caused > >>> > >> >>by > >>> > >> >> > > >>lack of > >>> > >> >> > > >> >> > >>> >> >>connectivity > >>> > >> >> > > >> >> > >>> >> >> >since this would only apply to in flight > >>> > >> >>requests, > >>> > >> >> > > >>which by > >>> > >> >> > > >> >> > >>> >>definition > >>> > >> >> > > >> >> > >>> >> >> >must > >>> > >> >> > > >> >> > >>> >> >> >have been sent on an active connection. > >>> > >> >> > > >> >> > >>> >> >> > > >>> > >> >> > > >> >> > >>> >> >> >I suspect both types of problems > >>>probably need > >>> > >> >>to be > >>> > >> >> > > >> >>addressed > >>> > >> >> > > >> >> > >>> >> >>separately > >>> > >> >> > > >> >> > >>> >> >> >by introducing explicit timeouts. > >>>However, > >>> > >> >>because > >>> > >> >> the > >>> > >> >> > > >> >> settings > >>> > >> >> > > >> >> > >>> >> >>introduced > >>> > >> >> > > >> >> > >>> >> >> >here are very much about the internal > >>> > >> >> implementations > >>> > >> >> > of > >>> > >> >> > > >> >>the > >>> > >> >> > > >> >> > >>> >>clients, > >>> > >> >> > > >> >> > >>> >> >>I'm > >>> > >> >> > > >> >> > >>> >> >> >wondering if this even needs to be a > >>> > >>user-facing > >>> > >> >> > > >>setting, > >>> > >> >> > > >> >> > >>> >>especially > >>> > >> >> > > >> >> > >>> >> >>if we > >>> > >> >> > > >> >> > >>> >> >> >have to add other timeouts anyway. For > >>> > >>example, > >>> > >> >> would > >>> > >> >> > a > >>> > >> >> > > >> >>fixed, > >>> > >> >> > > >> >> > >>> >>generous > >>> > >> >> > > >> >> > >>> >> >> >value that's still much shorter than a > >>>TCP > >>> > >> >>timeout, > >>> > >> >> > say > >>> > >> >> > > >> >>15s, > >>> > >> >> > > >> >> be > >>> > >> >> > > >> >> > >>> >>good > >>> > >> >> > > >> >> > >>> >> >> >enough? If other timeouts would allow, > >>>for > >>> > >> >>example, > >>> > >> >> > the > >>> > >> >> > > >> >> clients > >>> > >> >> > > >> >> > >>>to > >>> > >> >> > > >> >> > >>> >> >> >properly > >>> > >> >> > > >> >> > >>> >> >> >exit even if requests have not hit their > >>> > >>timeout, > >>> > >> >> then > >>> > >> >> > > >> >>what's > >>> > >> >> > > >> >> > >>>the > >>> > >> >> > > >> >> > >>> >> >>benefit > >>> > >> >> > > >> >> > >>> >> >> >of being able to configure the > >>>request-level > >>> > >> >> timeout? > >>> > >> >> > > >> >> > >>> >> >> That is a very good point. We have three > >>>places > >>> > >> >>that > >>> > >> >> we > >>> > >> >> > > >> >>might > >>> > >> >> > > >> >> be > >>> > >> >> > > >> >> > >>> >>able to > >>> > >> >> > > >> >> > >>> >> >> enforce timeout for a message send: > >>> > >> >> > > >> >> > >>> >> >> 1. Before append to accumulator - > >>>handled > >>>by > >>> > >> >>metadata > >>> > >> >> > > >> >>timeout > >>> > >> >> > > >> >> on > >>> > >> >> > > >> >> > >>>per > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> >> message level. > >>> > >> >> > > >> >> > >>> >> >> 2. Batch of messages inside accumulator > >>>- > >>>no > >>> > >> >>timeout > >>> > >> >> > > >> >>mechanism > >>> > >> >> > > >> >> > >>>now. > >>> > >> >> > > >> >> > >>> >> >> 3. Request of batches after messages > >>>leave the > >>> > >> >> > > >>accumulator > >>> > >> >> > > >> >>- we > >>> > >> >> > > >> >> > >>>have > >>> > >> >> > > >> >> > >>> >>a > >>> > >> >> > > >> >> > >>> >> >> broker side timeout but no client side > >>>timeout > >>> > >>for > >>> > >> >> now. > >>> > >> >> > > >> >> > >>> >> >> My current proposal only address (3) but > >>>not > >>> > >>(2). > >>> > >> >> > > >> >> > >>> >> >> Honestly I do not have a very clear idea > >>>about > >>> > >> >>what > >>> > >> >> > > >>should > >>> > >> >> > > >> >>we > >>> > >> >> > > >> >> do > >>> > >> >> > > >> >> > >>> >>with > >>> > >> >> > > >> >> > >>> >> >>(2) > >>> > >> >> > > >> >> > >>> >> >> right now. But I am with you that we > >>>should not > >>> > >> >> expose > >>> > >> >> > > >>too > >>> > >> >> > > >> >>many > >>> > >> >> > > >> >> > >>> >> >> configurations to users. What I am > >>>thinking > >>> > >>now to > >>> > >> >> > handle > >>> > >> >> > > >> >>(2) > >>> > >> >> > > >> >> is > >>> > >> >> > > >> >> > >>> >>when > >>> > >> >> > > >> >> > >>> >> >>user > >>> > >> >> > > >> >> > >>> >> >> call send, if we know that a partition is > >>> > >> >>offline, we > >>> > >> >> > > >>should > >>> > >> >> > > >> >> > >>>throw > >>> > >> >> > > >> >> > >>> >> >> exception immediately instead of putting > >>>it > >>> > >>into > >>> > >> >> > > >> >>accumulator. > >>> > >> >> > > >> >> > >>>This > >>> > >> >> > > >> >> > >>> >>would > >>> > >> >> > > >> >> > >>> >> >> protect further memory consumption. We > >>>might > >>> > >>also > >>> > >> >> want > >>> > >> >> > to > >>> > >> >> > > >> >>fail > >>> > >> >> > > >> >> > >>>all > >>> > >> >> > > >> >> > >>> >>the > >>> > >> >> > > >> >> > >>> >> >> batches in the dequeue once we found a > >>> > >>partition > >>> > >> >>is > >>> > >> >> > > >>offline. > >>> > >> >> > > >> >> That > >>> > >> >> > > >> >> > >>> >> >>said, I > >>> > >> >> > > >> >> > >>> >> >> feel timeout might not be quite > >>>applicable to > >>> > >>(2). > >>> > >> >> > > >> >> > >>> >> >> Do you have any suggestion on this? > >>> > >> >> > > >> >> > >>> >> >> > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> >Right, I didn't actually mean to solve 2 > >>>here, > >>> > >>but > >>> > >> >>was > >>> > >> >> > > >>trying > >>> > >> >> > > >> >>to > >>> > >> >> > > >> >> > >>> >>figure > >>> > >> >> > > >> >> > >>> >> >out > >>> > >> >> > > >> >> > >>> >> >if a solution to 2 would reduce what we > >>>needed to > >>> > >> >>do to > >>> > >> >> > > >> >>address > >>> > >> >> > > >> >> 3. > >>> > >> >> > > >> >> > >>> >>(And > >>> > >> >> > > >> >> > >>> >> >depending on how they are implemented, > >>>fixing 1 > >>> > >> >>might > >>> > >> >> > also > >>> > >> >> > > >> >> address > >>> > >> >> > > >> >> > >>>2). > >>> > >> >> > > >> >> > >>> >>It > >>> > >> >> > > >> >> > >>> >> >sounds like you hit hang that I wasn't > >>>really > >>> > >> >> expecting. > >>> > >> >> > > >>This > >>> > >> >> > > >> >> > >>>probably > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> >just > >>> > >> >> > > >> >> > >>> >> >means the KIP motivation needs to be a bit > >>> > >>clearer > >>> > >> >> about > >>> > >> >> > > >>what > >>> > >> >> > > >> >> type > >>> > >> >> > > >> >> > >>>of > >>> > >> >> > > >> >> > >>> >> >situation this addresses. The cause of the > >>>hang > >>> > >>may > >>> > >> >> also > >>> > >> >> > be > >>> > >> >> > > >> >> > >>>relevant > >>> > >> >> > > >> >> > >>> >>-- if > >>> > >> >> > > >> >> > >>> >> >it was something like a deadlock then > >>>that's > >>> > >> >>something > >>> > >> >> > that > >>> > >> >> > > >> >> should > >>> > >> >> > > >> >> > >>> >>just be > >>> > >> >> > > >> >> > >>> >> >fixed, but if it's something outside our > >>>control > >>> > >> >>then a > >>> > >> >> > > >> >>timeout > >>> > >> >> > > >> >> > >>>makes > >>> > >> >> > > >> >> > >>> >>a > >>> > >> >> > > >> >> > >>> >> >lot > >>> > >> >> > > >> >> > >>> >> >more sense. > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> >> > > >>> > >> >> > > >> >> > >>> >> >> >I know we have a similar setting, > >>> > >> >> > > >> >> > >>> >> >>max.in.flights.requests.per.connection, > >>> > >> >> > > >> >> > >>> >> >> >exposed publicly (which I just > >>>discovered is > >>> > >> >>missing > >>> > >> >> > > >>from > >>> > >> >> > > >> >>the > >>> > >> >> > > >> >> > >>>new > >>> > >> >> > > >> >> > >>> >> >>producer > >>> > >> >> > > >> >> > >>> >> >> >configs documentation). But it looks > >>>like the > >>> > >>new > >>> > >> >> > > >>consumer > >>> > >> >> > > >> >>is > >>> > >> >> > > >> >> > >>>not > >>> > >> >> > > >> >> > >>> >> >>exposing > >>> > >> >> > > >> >> > >>> >> >> >that option, using a fixed value > >>>instead. I > >>> > >> >>think we > >>> > >> >> > > >>should > >>> > >> >> > > >> >> > >>>default > >>> > >> >> > > >> >> > >>> >>to > >>> > >> >> > > >> >> > >>> >> >> >hiding these implementation values > >>>unless > >>> > >> >>there's a > >>> > >> >> > > >>strong > >>> > >> >> > > >> >> case > >>> > >> >> > > >> >> > >>>for > >>> > >> >> > > >> >> > >>> >>a > >>> > >> >> > > >> >> > >>> >> >> >scenario that requires customization. > >>> > >> >> > > >> >> > >>> >> >> For producer, > >>> > >> >>max.in.flight.requests.per.connection > >>> > >> >> > > >>really > >>> > >> >> > > >> >> > >>>matters. > >>> > >> >> > > >> >> > >>> >>If > >>> > >> >> > > >> >> > >>> >> >> people do not want to have reorder of > >>>messages, > >>> > >> >>they > >>> > >> >> > > >>have to > >>> > >> >> > > >> >> use > >>> > >> >> > > >> >> > >>> >> >> max.in.flight.requests.per.connection=1. > >>>On the > >>> > >> >>other > >>> > >> >> > > >>hand, > >>> > >> >> > > >> >>if > >>> > >> >> > > >> >> > >>> >> >>throughput > >>> > >> >> > > >> >> > >>> >> >> is more of a concern, it could be set to > >>> > >>higher. > >>> > >> >>For > >>> > >> >> > the > >>> > >> >> > > >>new > >>> > >> >> > > >> >> > >>> >>consumer, I > >>> > >> >> > > >> >> > >>> >> >> checked the value and I am not sure if > >>>the hard > >>> > >> >>coded > >>> > >> >> > > >> >> > >>> >> >> > >>>max.in.flight.requests.per.connection=100 > >>>is > >>> > >>the > >>> > >> >> right > >>> > >> >> > > >> >>value. > >>> > >> >> > > >> >> > >>> >>Without > >>> > >> >> > > >> >> > >>> >> >>the > >>> > >> >> > > >> >> > >>> >> >> response to the previous request, what > >>>offsets > >>> > >> >>should > >>> > >> >> > be > >>> > >> >> > > >>put > >>> > >> >> > > >> >> into > >>> > >> >> > > >> >> > >>> >>the > >>> > >> >> > > >> >> > >>> >> >>next > >>> > >> >> > > >> >> > >>> >> >> fetch request? It seems to me the value > >>>will be > >>> > >> >>one > >>> > >> >> > > >>natively > >>> > >> >> > > >> >> > >>> >>regardless > >>> > >> >> > > >> >> > >>> >> >>of > >>> > >> >> > > >> >> > >>> >> >> the setting unless we are sending fetch > >>> > >>request to > >>> > >> >> > > >>different > >>> > >> >> > > >> >> > >>> >>partitions, > >>> > >> >> > > >> >> > >>> >> >> which does not look like the case. > >>> > >> >> > > >> >> > >>> >> >> Anyway, it looks to be a separate issue > >>> > >> >>orthogonal to > >>> > >> >> > the > >>> > >> >> > > >> >> request > >>> > >> >> > > >> >> > >>> >> >>timeout. > >>> > >> >> > > >> >> > >>> >> >> > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> >> > >>> > >> >> > > >> >> > >>> >> >> >In other words, since the only > >>>user-facing > >>> > >>change > >>> > >> >> was > >>> > >> >> > > >>the > >>> > >> >> > > >> >> > >>>addition > >>> > >> >> > > >> >> > >>> >>of > >>> > >> >> > > >> >> > >>> >> >>the > >>> > >> >> > > >> >> > >>> >> >> >setting, I'm wondering if we can avoid > >>>the KIP > >>> > >> >> > > >>altogether > >>> > >> >> > > >> >>by > >>> > >> >> > > >> >> > >>>just > >>> > >> >> > > >> >> > >>> >> >>choosing > >>> > >> >> > > >> >> > >>> >> >> >a good default value for the timeout. > >>> > >> >> > > >> >> > >>> >> >> The problem is that we have a server side > >>> > >>request > >>> > >> >> > timeout > >>> > >> >> > > >> >> exposed > >>> > >> >> > > >> >> > >>>as > >>> > >> >> > > >> >> > >>> >>a > >>> > >> >> > > >> >> > >>> >> >> public configuration. We cannot set the > >>>client > >>> > >> >> timeout > >>> > >> >> > > >> >>smaller > >>> > >> >> > > >> >> > >>>than > >>> > >> >> > > >> >> > >>> >>that > >>> > >> >> > > >> >> > >>> >> >> value, so a hard coded value probably > >>>won¹t > >>> > >>work > >>> > >> >> here. > >>> > >> >> > > >> >> > >>> >> >> > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> >That makes sense, although it's worth > >>>keeping in > >>> > >> >>mind > >>> > >> >> > that > >>> > >> >> > > >> >>even > >>> > >> >> > > >> >> if > >>> > >> >> > > >> >> > >>>you > >>> > >> >> > > >> >> > >>> >>use > >>> > >> >> > > >> >> > >>> >> >"correct" values, they could still be > >>>violated > >>> > >>due > >>> > >> >>to, > >>> > >> >> > > >>e.g., > >>> > >> >> > > >> >>a GC > >>> > >> >> > > >> >> > >>> >>pause > >>> > >> >> > > >> >> > >>> >> >that causes the broker to process a > >>>request > >>>after > >>> > >> >>it is > >>> > >> >> > > >> >>supposed > >>> > >> >> > > >> >> to > >>> > >> >> > > >> >> > >>> >>have > >>> > >> >> > > >> >> > >>> >> >expired. > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> >-Ewen > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> >> > > >>> > >> >> > > >> >> > >>> >> >> >-Ewen > >>> > >> >> > > >> >> > >>> >> >> > > >>> > >> >> > > >> >> > >>> >> >> >On Mon, Apr 13, 2015 at 2:35 PM, > >>>Jiangjie Qin > >>> > >> >> > > >> >> > >>> >> >><j...@linkedin.com.invalid> > >>> > >> >> > > >> >> > >>> >> >> >wrote: > >>> > >> >> > > >> >> > >>> >> >> > > >>> > >> >> > > >> >> > >>> >> >> >> Hi, > >>> > >> >> > > >> >> > >>> >> >> >> > >>> > >> >> > > >> >> > >>> >> >> >> I just created a KIP to add a request > >>> > >>timeout > >>> > >> >>to > >>> > >> >> > > >> >> NetworkClient > >>> > >> >> > > >> >> > >>> >>for > >>> > >> >> > > >> >> > >>> >> >>new > >>> > >> >> > > >> >> > >>> >> >> >> Kafka clients. > >>> > >> >> > > >> >> > >>> >> >> >> > >>> > >> >> > > >> >> > >>> >> >> >> > >>> > >> >> > > >> >> > >>> >> >> >> > >>> > >> >> > > >> >> > >>> >> >> > >>> > >> >> > > >> >> > >>> >> >> > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> > >>> > >> >> > > >> >> > >>> > >>> > >> >> > > >> >> > > >>> > >> >> > > >> > >>> > >> >> > > >>>> > >>> > >> >> > > > >>> > >> >> > >>> > > >>>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+reque > >>>>>s > >>> > >> >> > > >>>>t > >>> > >> >> > > >> >> > >>>+ > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> >> >>timeout+to+NetworkClient > >>> > >> >> > > >> >> > >>> >> >> >> > >>> > >> >> > > >> >> > >>> >> >> >> Comments and suggestions are welcome! > >>> > >> >> > > >> >> > >>> >> >> >> > >>> > >> >> > > >> >> > >>> >> >> >> Thanks. > >>> > >> >> > > >> >> > >>> >> >> >> > >>> > >> >> > > >> >> > >>> >> >> >> Jiangjie (Becket) Qin > >>> > >> >> > > >> >> > >>> >> >> >> > >>> > >> >> > > >> >> > >>> >> >> >> > >>> > >> >> > > >> >> > >>> >> >> > > >>> > >> >> > > >> >> > >>> >> >> > > >>> > >> >> > > >> >> > >>> >> >> >-- > >>> > >> >> > > >> >> > >>> >> >> >Thanks, > >>> > >> >> > > >> >> > >>> >> >> >Ewen > >>> > >> >> > > >> >> > >>> >> >> > >>> > >> >> > > >> >> > >>> >> >> > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> > > >>> > >> >> > > >> >> > >>> >> >-- > >>> > >> >> > > >> >> > >>> >> >Thanks, > >>> > >> >> > > >> >> > >>> >> >Ewen > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> >> > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> > > >>> > >> >> > > >> >> > >>> >-- > >>> > >> >> > > >> >> > >>> >-- Guozhang > >>> > >> >> > > >> >> > >>> > >>> > >> >> > > >> >> > >>> > >>> > >> >> > > >> >> > > > >>> > >> >> > > >> >> > > >>> > >> >> > > >> >> > > >>> > >> >> > > >> >> > >>> > >> >> > > >> > > >>> > >> >> > > >> > > >>> > >> >> > > >> > > >>> > >> >> > > >> >-- > >>> > >> >> > > >> >Thanks, > >>> > >> >> > > >> >Ewen > >>> > >> >> > > >> > >>> > >> >> > > >> > >>> > >> >> > > > > >>> > >> >> > > > > >>> > >> >> > > >-- > >>> > >> >> > > >-Regards, > >>> > >> >> > > >Mayuresh R. Gharat > >>> > >> >> > > >(862) 250-7125 > >>> > >> >> > > > >>> > >> >> > > > >>> > >> >> > > >>> > >> >> > >>> > >> >> > >>> > >> >> > >>> > >> >> -- > >>> > >> >> -Regards, > >>> > >> >> Mayuresh R. Gharat > >>> > >> >> (862) 250-7125 > >>> > >> >> > >>> > >> > >>> > >> > >>> > > >>> > > >>> > >>> > >>> -- > >>> Thanks, > >>> Ewen > >> > >>-- > >>Joel > >> > > > > -- Thanks, Ewen