IMO, having 4 different timeouts makes it confusing for the user and it requires the client to understand the internals of kafka. We should have a single timeout from the users perspective and handle other timeouts internally like a batch timeout.
Mayuresh On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hey Jay, > > That is also a viable solution. > > I think the main purpose is to let user know how long they can block, > which is important. > > I have some question over the proposal, though. Will user still need to > send linger.ms? Will request timeout cover linger.ms as well? > My concern of letting request timeout also cover the time spent in > accumulator is that this will result in the actually request timeout > indeterministic. > Also, implementation wise, a request can have multiple batches, the time > spent in the accumulator could vary a lot. If one of the batch times out, > what should we do the the rest of the batches? > I think we probably want to separate batch timeout and request timeout. > > Maybe we can do this: > Max.send.block.ms > Request.timeout > Batch.timeout > Replication.timeout > > So in send() we use max.send.block.ms only. In accumulator, we use > batch.timeout, in NetWorkClient, we use request.timeout. Replication > timeout is needed anyway. > > This looks more understandable from what I can see. > > What do you think? > > Jiangjie (Becket) Qin > > On 5/19/15, 11:48 AM, "Jay Kreps" <jay.kr...@gmail.com> wrote: > > >So the alternative to consider would be to instead have > > max.block.ms (or something) > > request.timeout > > replication.timeout > > > >I think this better captures what the user cares about. Here is how it > >would work. > > > >*max.send.block.ms <http://max.send.block.ms>* is the bound on the > maximum > >time the producer.send() call can block. > >This subsumes the existing metadata timeout use case but not the proposed > >use for the time in the accumulator. It *also* acts as a bound on the time > >you can block on BufferPool allocation (we'd have to add this but that > >should be easy). > > > >*request.timeout* is the bound on the time after send() complete until you > >get an acknowledgement. This covers the connection timeout, and the time > >in > >the accumulator. So to implement this, the time we set in the request sent > >via NetworkClient would have already subtracted off the time spent in the > >accumulator, and if the request retried we would include both the time in > >the accumulator an the time taken for the first request, etc. In other > >words this is the upper bound on the time to the Future being satisfied. > > > >*replication.timeout* will default to something reasonable but maybe you > >can override it if you want? > > > >Thoughts? > > > >-Jay > > > >On Tue, May 19, 2015 at 11:34 AM, Mayuresh Gharat < > >gharatmayures...@gmail.com> wrote: > > > >> So what I understand is that, we would have 3 time outs : > >> 1) replication timeout > >> 2) request timeout > >> 3) metadata timeout (existing) > >> > >> The request timeout has to be greater than the replication timeout. > >> request timeout is for messages already sent to kafka and the producer > >>is > >> waiting for them. > >> > >> Thanks, > >> > >> Mayuresh > >> > >> On Tue, May 19, 2015 at 11:12 AM, Jay Kreps <jay.kr...@gmail.com> > wrote: > >> > >> > I think this looks good. What I think is missing is an overview of the > >> > timeouts from the user's perspective. > >> > > >> > My worry is that it is quite complicated to reason about the current > >>set > >> of > >> > timeouts. Currently we have > >> > timeout.ms > >> > metadata.fetch.timeout.ms > >> > > >> > The proposed settings I think are: > >> > batch.expiration.ms > >> > request.timeout.ms > >> > replication.timeout.ms > >> > > >> > I think maybe we can skip the batch.expiration.ms. Instead maybe we > >>can > >> > somehow combine these into a single request timeout so that we > >>subtract > >> the > >> > time you spent waiting from the request timeout and/or replication > >> timeout > >> > somehow? I don't have an explicit proposal but my suspicion is that > >>from > >> > the user's point of view there is just one timeout related to the > >>request > >> > after which they don't care, and we can split that up between the > >>batch > >> > time and the request time. Thoughts? > >> > > >> > How are we handling connection timeouts? If a machine hard fails in > >>the > >> > middle of connection establishment there will be no outstanding > >> requests. I > >> > think this may be okay because connections are established when we > >>want > >> to > >> > send a request and presumably we will begin the timer then? > >> > > >> > To that end I suggest we do two things: > >> > 1. Include KAKFA-1788. I know that technically these two things are > >> > different but from the user's point of view they aren't. > >> > 2. Include in the KIP the explanation to the user of the full set of > >> > timeouts, what they mean, how we will default them, and when to > >>override > >> > which. > >> > > >> > I know this is a hassle but I think the end experience will be a lot > >> better > >> > if we go through this thought process. > >> > > >> > -Jay > >> > > >> > On Fri, May 15, 2015 at 2:14 PM, Jiangjie Qin > >><j...@linkedin.com.invalid > >> > > >> > wrote: > >> > > >> > > I modified the WIKI page to incorporate the feedbacks from mailing > >>list > >> > > and KIP hangout. > >> > > > >> > > - Added the deprecation plan for TIMEOUT_CONFIG > >> > > - Added the actions to take after request timeout > >> > > > >> > > I finally chose to create a new connection if requests timeout. The > >> > reason > >> > > is: > >> > > 1. In most cases, if a broker is just slow, as long as we set > >>request > >> > > timeout to be a reasonable value, we should not see many new > >> connections > >> > > get created. > >> > > 2. If a broker is down, hopefully metadata refresh will find the new > >> > > broker and we will not try to reconnect to the broker anymore. > >> > > > >> > > Comments are welcome! > >> > > > >> > > Thanks. > >> > > > >> > > Jiangjie (Becket) Qin > >> > > > >> > > On 5/12/15, 2:59 PM, "Mayuresh Gharat" <gharatmayures...@gmail.com> > >> > wrote: > >> > > > >> > > >+1 Becket. That would give enough time for clients to move. We > >>should > >> > make > >> > > >this change very clear. > >> > > > > >> > > >Thanks, > >> > > > > >> > > >Mayuresh > >> > > > > >> > > >On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin > >> <j...@linkedin.com.invalid > >> > > > >> > > >wrote: > >> > > > > >> > > >> Hey Ewen, > >> > > >> > >> > > >> Very good summary about the compatibility. What you proposed > >>makes > >> > > >>sense. > >> > > >> So basically we can do the following: > >> > > >> > >> > > >> In next release, i.e. 0.8.3: > >> > > >> 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”) > >> > > >> 2. Mark TIMEOUT_CONFIG as deprecated > >> > > >> 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it > >>is > >> > > >> defined and give a warning about deprecation. > >> > > >> In the release after 0.8.3, we remove TIMEOUT_CONFIG. > >> > > >> > >> > > >> This should give enough buffer for this change. > >> > > >> > >> > > >> Request timeout is a complete new thing we add to fix a bug, I’m > >> with > >> > > >>you > >> > > >> it does not make sense to have it maintain the old buggy > >>behavior. > >> So > >> > we > >> > > >> can set it to a reasonable value instead of infinite. > >> > > >> > >> > > >> Jiangjie (Becket) Qin > >> > > >> > >> > > >> On 5/12/15, 12:03 PM, "Ewen Cheslack-Postava" <e...@confluent.io > > > >> > > wrote: > >> > > >> > >> > > >> >I think my confusion is coming from this: > >> > > >> > > >> > > >> >> So in this KIP, we only address (3). The only public interface > >> > change > >> > > >> >>is a > >> > > >> >> new configuration of request timeout (and maybe change the > >> > > >>configuration > >> > > >> >> name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). > >> > > >> > > >> > > >> >There are 3 possible compatibility issues I see here: > >> > > >> > > >> > > >> >* I assumed this meant the constants also change, so > >>"timeout.ms" > >> > > >>becomes > >> > > >> >" > >> > > >> >replication.timeout.ms". This breaks config files that worked > on > >> the > >> > > >> >previous version and the only warning would be in release > >>notes. We > >> > do > >> > > >> >warn > >> > > >> >about unused configs so they might notice the problem. > >> > > >> > > >> > > >> >* Binary and source compatibility if someone configures their > >> client > >> > in > >> > > >> >code and uses the TIMEOUT_CONFIG variable. Renaming it will > >>cause > >> > > >>existing > >> > > >> >jars to break if you try to run against an updated client (which > >> > seems > >> > > >>not > >> > > >> >very significant since I doubt people upgrade these without > >> > recompiling > >> > > >> >but > >> > > >> >maybe I'm wrong about that). And it breaks builds without have > >> > > >>deprecated > >> > > >> >that field first, which again, is probably not the biggest issue > >> but > >> > is > >> > > >> >annoying for users and when we accidentally changed the API we > >> > > >>received a > >> > > >> >complaint about breaking builds. > >> > > >> > > >> > > >> >* Behavior compatibility as Jay mentioned on the call -- setting > >> the > >> > > >> >config > >> > > >> >(even if the name changed) doesn't have the same effect it used > >>to. > >> > > >> > > >> > > >> >One solution, which admittedly is more painful to implement and > >> > > >>maintain, > >> > > >> >would be to maintain the timeout.ms config, have it override > the > >> > > others > >> > > >> if > >> > > >> >it is specified (including an infinite request timeout I > >>guess?), > >> and > >> > > >>if > >> > > >> >it > >> > > >> >isn't specified, we can just use the new config variables. > >>Given a > >> > real > >> > > >> >deprecation schedule, users would have better warning of changes > >> and > >> > a > >> > > >> >window to make the changes. > >> > > >> > > >> > > >> >I actually think it might not be necessary to maintain the old > >> > behavior > >> > > >> >precisely, although maybe for some code it is an issue if they > >> start > >> > > >> >seeing > >> > > >> >timeout exceptions that they wouldn't have seen before? > >> > > >> > > >> > > >> >-Ewen > >> > > >> > > >> > > >> >On Wed, May 6, 2015 at 6:06 PM, Jun Rao <j...@confluent.io> > >>wrote: > >> > > >> > > >> > > >> >> Jiangjie, > >> > > >> >> > >> > > >> >> Yes, I think using metadata timeout to expire batches in the > >> record > >> > > >> >> accumulator makes sense. > >> > > >> >> > >> > > >> >> Thanks, > >> > > >> >> > >> > > >> >> Jun > >> > > >> >> > >> > > >> >> On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin > >> > > >> >><j...@linkedin.com.invalid> > >> > > >> >> wrote: > >> > > >> >> > >> > > >> >> > I incorporated Ewen and Guozhang’s comments in the KIP page. > >> Want > >> > > >>to > >> > > >> >> speed > >> > > >> >> > up on this KIP because currently we experience mirror-maker > >> hung > >> > > >>very > >> > > >> >> > likely when a broker is down. > >> > > >> >> > > >> > > >> >> > I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used > >> > > >>metadata > >> > > >> >> > timeout to expire the batches which are sitting in > >>accumulator > >> > > >>without > >> > > >> >> > leader info. I did that because the situation there is > >> > essentially > >> > > >> >> missing > >> > > >> >> > metadata. > >> > > >> >> > > >> > > >> >> > As a summary of what I am thinking about the timeout in new > >> > > >>Producer: > >> > > >> >> > > >> > > >> >> > 1. Metadata timeout: > >> > > >> >> > - used in send(), blocking > >> > > >> >> > - used in accumulator to expire batches with timeout > >> exception. > >> > > >> >> > 2. Linger.ms > >> > > >> >> > - Used in accumulator to ready the batch for drain > >> > > >> >> > 3. Request timeout > >> > > >> >> > - Used in NetworkClient to expire a batch and retry if no > >> > > >>response > >> > > >> >>is > >> > > >> >> > received for a request before timeout. > >> > > >> >> > > >> > > >> >> > So in this KIP, we only address (3). The only public > >>interface > >> > > >>change > >> > > >> >>is > >> > > >> >> a > >> > > >> >> > new configuration of request timeout (and maybe change the > >> > > >> >>configuration > >> > > >> >> > name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). > >> > > >> >> > > >> > > >> >> > Would like to see what people think of above approach? > >> > > >> >> > > >> > > >> >> > Jiangjie (Becket) Qin > >> > > >> >> > > >> > > >> >> > On 4/20/15, 6:02 PM, "Jiangjie Qin" <j...@linkedin.com> > >>wrote: > >> > > >> >> > > >> > > >> >> > >Jun, > >> > > >> >> > > > >> > > >> >> > >I thought a little bit differently on this. > >> > > >> >> > >Intuitively, I am thinking that if a partition is offline, > >>the > >> > > >> >>metadata > >> > > >> >> > >for that partition should be considered not ready because > >>we > >> > don’t > >> > > >> >>know > >> > > >> >> > >which broker we should send the message to. So those sends > >> need > >> > > >>to be > >> > > >> >> > >blocked on metadata timeout. > >> > > >> >> > >Another thing I’m wondering is in which scenario an offline > >> > > >>partition > >> > > >> >> will > >> > > >> >> > >become online again in a short period of time and how > >>likely > >> it > >> > > >>will > >> > > >> >> > >occur. My understanding is that the batch timeout for > >>batches > >> > > >> >>sitting in > >> > > >> >> > >accumulator should be larger than linger.ms but should not > >>be > >> > too > >> > > >> >>long > >> > > >> >> > >(e.g. less than 60 seconds). Otherwise it will exhaust the > >> > shared > >> > > >> >>buffer > >> > > >> >> > >with batches to be aborted. > >> > > >> >> > > > >> > > >> >> > >That said, I do agree it is reasonable to buffer the > >>message > >> for > >> > > >>some > >> > > >> >> time > >> > > >> >> > >so messages to other partitions can still get sent. But > >>adding > >> > > >> >>another > >> > > >> >> > >expiration in addition to linger.ms - which is essentially > >>a > >> > > >>timeout > >> > > >> >>- > >> > > >> >> > >sounds a little bit confusing. Maybe we can do this, let > >>the > >> > batch > >> > > >> >>sit > >> > > >> >> in > >> > > >> >> > >accumulator up to linger.ms, then fail it if necessary. > >> > > >> >> > > > >> > > >> >> > >What do you think? > >> > > >> >> > > > >> > > >> >> > >Thanks, > >> > > >> >> > > > >> > > >> >> > >Jiangjie (Becket) Qin > >> > > >> >> > > > >> > > >> >> > >On 4/20/15, 1:11 PM, "Jun Rao" <j...@confluent.io> wrote: > >> > > >> >> > > > >> > > >> >> > >>Jiangjie, > >> > > >> >> > >> > >> > > >> >> > >>Allowing messages to be accumulated in an offline > >>partition > >> > > >>could be > >> > > >> >> > >>useful > >> > > >> >> > >>since the partition may become available before the > >>request > >> > > >>timeout > >> > > >> >>or > >> > > >> >> > >>linger time is reached. Now that we are planning to add a > >>new > >> > > >> >>timeout, > >> > > >> >> it > >> > > >> >> > >>would be useful to think through whether/how that applies > >>to > >> > > >> >>messages > >> > > >> >> in > >> > > >> >> > >>the accumulator too. > >> > > >> >> > >> > >> > > >> >> > >>Thanks, > >> > > >> >> > >> > >> > > >> >> > >>Jun > >> > > >> >> > >> > >> > > >> >> > >> > >> > > >> >> > >>On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin > >> > > >> >> <j...@linkedin.com.invalid > >> > > >> >> > > > >> > > >> >> > >>wrote: > >> > > >> >> > >> > >> > > >> >> > >>> Hi Harsha, > >> > > >> >> > >>> > >> > > >> >> > >>> Took a quick look at the patch. I think it is still a > >> little > >> > > >>bit > >> > > >> >> > >>> different. KAFKA-1788 only handles the case where a > >>batch > >> > > >>sitting > >> > > >> >>in > >> > > >> >> > >>> accumulator for too long. The KIP is trying to solve the > >> > issue > >> > > >> >>where > >> > > >> >> a > >> > > >> >> > >>> batch has already been drained from accumulator and > >>sent to > >> > > >> >>broker. > >> > > >> >> > >>> We might be able to apply timeout on batch level to > >>merge > >> > those > >> > > >> >>two > >> > > >> >> > >>>cases > >> > > >> >> > >>> as Ewen suggested. But I’m not sure if it is a good > >>idea to > >> > > >>allow > >> > > >> >> > >>>messages > >> > > >> >> > >>> whose target partition is offline to sit in accumulator > >>in > >> > the > >> > > >> >>first > >> > > >> >> > >>>place. > >> > > >> >> > >>> > >> > > >> >> > >>> Jiangjie (Becket) Qin > >> > > >> >> > >>> > >> > > >> >> > >>> On 4/16/15, 10:19 AM, "Sriharsha Chintalapani" > >> > > >><ka...@harsha.io> > >> > > >> >> > wrote: > >> > > >> >> > >>> > >> > > >> >> > >>> >Guozhang and Jiangjie, > >> > > >> >> > >>> > Isn’t this work being covered in > >> > > >> >> > >>> >https://issues.apache.org/jira/browse/KAFKA-1788 . Can > >> you > >> > > >> please > >> > > >> >> the > >> > > >> >> > >>> >review the patch there. > >> > > >> >> > >>> >Thanks, > >> > > >> >> > >>> >Harsha > >> > > >> >> > >>> > > >> > > >> >> > >>> > > >> > > >> >> > >>> >On April 15, 2015 at 10:39:40 PM, Guozhang Wang > >> > > >> >>(wangg...@gmail.com > >> > > >> >> ) > >> > > >> >> > >>> >wrote: > >> > > >> >> > >>> > > >> > > >> >> > >>> >Thanks for the update Jiangjie, > >> > > >> >> > >>> > > >> > > >> >> > >>> >I think it is actually NOT expected that hardware > >> > > >>disconnection > >> > > >> >>will > >> > > >> >> > >>>be > >> > > >> >> > >>> >detected by the selector, but rather will only be > >>revealed > >> > > >>upon > >> > > >> >>TCP > >> > > >> >> > >>> >timeout, which could be hours. > >> > > >> >> > >>> > > >> > > >> >> > >>> >A couple of comments on the wiki: > >> > > >> >> > >>> > > >> > > >> >> > >>> >1. "For KafkaProducer.close() and > >>KafkaProducer.flush() we > >> > > >>need > >> > > >> >>the > >> > > >> >> > >>> >request > >> > > >> >> > >>> >timeout as implict timeout." I am not very clear what > >>does > >> > > >>this > >> > > >> >> mean? > >> > > >> >> > >>> > > >> > > >> >> > >>> >2. Currently the producer already has a > >>"TIMEOUT_CONFIG" > >> > which > >> > > >> >> should > >> > > >> >> > >>> >really be "REPLICATION_TIMEOUT_CONFIG". So if we > >>decide to > >> > > >>add " > >> > > >> >> > >>> >REQUEST_TIMEOUT_CONFIG", I suggest we also make this > >> > renaming: > >> > > >> >> > >>>admittedly > >> > > >> >> > >>> > > >> > > >> >> > >>> >it will change the config names but will reduce > >>confusions > >> > > >>moving > >> > > >> >> > >>> >forward. > >> > > >> >> > >>> > > >> > > >> >> > >>> > > >> > > >> >> > >>> >Guozhang > >> > > >> >> > >>> > > >> > > >> >> > >>> > > >> > > >> >> > >>> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin > >> > > >> >> > >>><j...@linkedin.com.invalid> > >> > > >> >> > >>> > > >> > > >> >> > >>> >wrote: > >> > > >> >> > >>> > > >> > > >> >> > >>> >> Checked the code again. It seems that the > >>disconnected > >> > > >>channel > >> > > >> >>is > >> > > >> >> > >>>not > >> > > >> >> > >>> >> detected by selector as expected. > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> Currently we are depending on the > >> > > >> >> > >>> >> o.a.k.common.network.Selector.disconnected set to > >>see if > >> > we > >> > > >> >>need > >> > > >> >> to > >> > > >> >> > >>>do > >> > > >> >> > >>> >> something for a disconnected channel. > >> > > >> >> > >>> >> However Selector.disconnected set is only updated > >>when: > >> > > >> >> > >>> >> 1. A write/read/connect to channel failed. > >> > > >> >> > >>> >> 2. A Key is canceled > >> > > >> >> > >>> >> However when a broker is down before it sends back > >>the > >> > > >> >>response, > >> > > >> >> the > >> > > >> >> > >>> >> client seems not be able to detect this failure. > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> I did a simple test below: > >> > > >> >> > >>> >> 1. Run a selector on one machine and an echo server > >>on > >> > > >>another > >> > > >> >> > >>>machine. > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> Connect a selector to an echo server > >> > > >> >> > >>> >> 2. Send a message to echo server using selector, then > >> let > >> > > >>the > >> > > >> >> > >>>selector > >> > > >> >> > >>> >> poll() every 10 seconds. > >> > > >> >> > >>> >> 3. After the sever received the message, unplug > >>cable on > >> > the > >> > > >> >>echo > >> > > >> >> > >>> >>server. > >> > > >> >> > >>> >> 4. After waiting for 45 min. The selector still did > >>not > >> > > >> >>detected > >> > > >> >> the > >> > > >> >> > >>> >> network failure. > >> > > >> >> > >>> >> Lsof on selector machine shows that the TCP > >>connection > >> is > >> > > >>still > >> > > >> >> > >>> >>considered > >> > > >> >> > >>> >> ESTABLISHED. > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> I’m not sure in this case what should we expect from > >>the > >> > > >> >> > >>> >> java.nio.channels.Selector. According to the > >>document, > >> the > >> > > >> >> selector > >> > > >> >> > >>> >>does > >> > > >> >> > >>> >> not verify the status of the associated channel. In > >>my > >> > test > >> > > >> >>case > >> > > >> >> it > >> > > >> >> > >>> >>looks > >> > > >> >> > >>> >> even worse that OS did not think of the socket has > >>been > >> > > >> >> > >>>disconnected. > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> Anyway. It seems adding the client side request > >>timeout > >> is > >> > > >> >> > >>>necessary. > >> > > >> >> > >>> >>I’ve > >> > > >> >> > >>> >> updated the KIP page to clarify the problem we want > >>to > >> > solve > >> > > >> >> > >>>according > >> > > >> >> > >>> >>to > >> > > >> >> > >>> >> Ewen’s comments. > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> Thanks. > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> Jiangjie (Becket) Qin > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" > >> > > >> >><e...@confluent.io> > >> > > >> >> > >>>wrote: > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin > >> > > >> >> > >>> >><j...@linkedin.com.invalid> > >> > > >> >> > >>> >> >wrote: > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> >> Hi Ewen, thanks for the comments. Very good > >>points! > >> > > >>Please > >> > > >> >>see > >> > > >> >> > >>> >>replies > >> > > >> >> > >>> >> >> inline. > >> > > >> >> > >>> >> >> > >> > > >> >> > >>> >> >> > >> > > >> >> > >>> >> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" < > >> > > >> >> e...@confluent.io > >> > > >> >> > > > >> > > >> >> > >>> >> wrote: > >> > > >> >> > >>> >> >> > >> > > >> >> > >>> >> >> >Jiangjie, > >> > > >> >> > >>> >> >> > > >> > > >> >> > >>> >> >> >Great start. I have a couple of comments. > >> > > >> >> > >>> >> >> > > >> > > >> >> > >>> >> >> >Under the motivation section, is it really true > >>that > >> > the > >> > > >> >> request > >> > > >> >> > >>> >>will > >> > > >> >> > >>> >> >> >never > >> > > >> >> > >>> >> >> >be completed? Presumably if the broker goes down > >>the > >> > > >> >> connection > >> > > >> >> > >>> >>will be > >> > > >> >> > >>> >> >> >severed, at worst by a TCP timeout, which should > >> clean > >> > > >>up > >> > > >> >>the > >> > > >> >> > >>> >> >>connection > >> > > >> >> > >>> >> >> >and any outstanding requests, right? I think the > >> real > >> > > >> >>reason > >> > > >> >> we > >> > > >> >> > >>> >>need a > >> > > >> >> > >>> >> >> >different timeout is that the default TCP > >>timeouts > >> are > >> > > >> >> > >>>ridiculously > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> >>long > >> > > >> >> > >>> >> >> >in > >> > > >> >> > >>> >> >> >this context. > >> > > >> >> > >>> >> >> Yes, when broker is completely down the request > >> should > >> > be > >> > > >> >> cleared > >> > > >> >> > >>>as > >> > > >> >> > >>> >>you > >> > > >> >> > >>> >> >> said. The case we encountered looks like the > >>broker > >> was > >> > > >>just > >> > > >> >> not > >> > > >> >> > >>> >> >> responding but TCP connection was still alive > >>though. > >> > > >> >> > >>> >> >> > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> >Ok, that makes sense. > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> >> > >> > > >> >> > >>> >> >> > > >> > > >> >> > >>> >> >> >My second question is about whether this is the > >> right > >> > > >> >>level to > >> > > >> >> > >>> >>tackle > >> > > >> >> > >>> >> >>the > >> > > >> >> > >>> >> >> >issue/what user-facing changes need to be made. A > >> > > >>related > >> > > >> >> > >>>problem > >> > > >> >> > >>> >>came > >> > > >> >> > >>> >> >>up > >> > > >> >> > >>> >> >> >in > >>https://issues.apache.org/jira/browse/KAFKA-1788 > >> > > >>where > >> > > >> >> > >>>producer > >> > > >> >> > >>> >> >> records > >> > > >> >> > >>> >> >> >get stuck indefinitely because there's no > >> client-side > >> > > >> >>timeout. > >> > > >> >> > >>>This > >> > > >> >> > >>> >>KIP > >> > > >> >> > >>> >> >> >wouldn't fix that problem or any problems caused > >>by > >> > > >>lack of > >> > > >> >> > >>> >> >>connectivity > >> > > >> >> > >>> >> >> >since this would only apply to in flight > >>requests, > >> > > >>which by > >> > > >> >> > >>> >>definition > >> > > >> >> > >>> >> >> >must > >> > > >> >> > >>> >> >> >have been sent on an active connection. > >> > > >> >> > >>> >> >> > > >> > > >> >> > >>> >> >> >I suspect both types of problems probably need > >>to be > >> > > >> >>addressed > >> > > >> >> > >>> >> >>separately > >> > > >> >> > >>> >> >> >by introducing explicit timeouts. However, > >>because > >> the > >> > > >> >> settings > >> > > >> >> > >>> >> >>introduced > >> > > >> >> > >>> >> >> >here are very much about the internal > >> implementations > >> > of > >> > > >> >>the > >> > > >> >> > >>> >>clients, > >> > > >> >> > >>> >> >>I'm > >> > > >> >> > >>> >> >> >wondering if this even needs to be a user-facing > >> > > >>setting, > >> > > >> >> > >>> >>especially > >> > > >> >> > >>> >> >>if we > >> > > >> >> > >>> >> >> >have to add other timeouts anyway. For example, > >> would > >> > a > >> > > >> >>fixed, > >> > > >> >> > >>> >>generous > >> > > >> >> > >>> >> >> >value that's still much shorter than a TCP > >>timeout, > >> > say > >> > > >> >>15s, > >> > > >> >> be > >> > > >> >> > >>> >>good > >> > > >> >> > >>> >> >> >enough? If other timeouts would allow, for > >>example, > >> > the > >> > > >> >> clients > >> > > >> >> > >>>to > >> > > >> >> > >>> >> >> >properly > >> > > >> >> > >>> >> >> >exit even if requests have not hit their timeout, > >> then > >> > > >> >>what's > >> > > >> >> > >>>the > >> > > >> >> > >>> >> >>benefit > >> > > >> >> > >>> >> >> >of being able to configure the request-level > >> timeout? > >> > > >> >> > >>> >> >> That is a very good point. We have three places > >>that > >> we > >> > > >> >>might > >> > > >> >> be > >> > > >> >> > >>> >>able to > >> > > >> >> > >>> >> >> enforce timeout for a message send: > >> > > >> >> > >>> >> >> 1. Before append to accumulator - handled by > >>metadata > >> > > >> >>timeout > >> > > >> >> on > >> > > >> >> > >>>per > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> >> message level. > >> > > >> >> > >>> >> >> 2. Batch of messages inside accumulator - no > >>timeout > >> > > >> >>mechanism > >> > > >> >> > >>>now. > >> > > >> >> > >>> >> >> 3. Request of batches after messages leave the > >> > > >>accumulator > >> > > >> >>- we > >> > > >> >> > >>>have > >> > > >> >> > >>> >>a > >> > > >> >> > >>> >> >> broker side timeout but no client side timeout for > >> now. > >> > > >> >> > >>> >> >> My current proposal only address (3) but not (2). > >> > > >> >> > >>> >> >> Honestly I do not have a very clear idea about > >>what > >> > > >>should > >> > > >> >>we > >> > > >> >> do > >> > > >> >> > >>> >>with > >> > > >> >> > >>> >> >>(2) > >> > > >> >> > >>> >> >> right now. But I am with you that we should not > >> expose > >> > > >>too > >> > > >> >>many > >> > > >> >> > >>> >> >> configurations to users. What I am thinking now to > >> > handle > >> > > >> >>(2) > >> > > >> >> is > >> > > >> >> > >>> >>when > >> > > >> >> > >>> >> >>user > >> > > >> >> > >>> >> >> call send, if we know that a partition is > >>offline, we > >> > > >>should > >> > > >> >> > >>>throw > >> > > >> >> > >>> >> >> exception immediately instead of putting it into > >> > > >> >>accumulator. > >> > > >> >> > >>>This > >> > > >> >> > >>> >>would > >> > > >> >> > >>> >> >> protect further memory consumption. We might also > >> want > >> > to > >> > > >> >>fail > >> > > >> >> > >>>all > >> > > >> >> > >>> >>the > >> > > >> >> > >>> >> >> batches in the dequeue once we found a partition > >>is > >> > > >>offline. > >> > > >> >> That > >> > > >> >> > >>> >> >>said, I > >> > > >> >> > >>> >> >> feel timeout might not be quite applicable to (2). > >> > > >> >> > >>> >> >> Do you have any suggestion on this? > >> > > >> >> > >>> >> >> > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> >Right, I didn't actually mean to solve 2 here, but > >>was > >> > > >>trying > >> > > >> >>to > >> > > >> >> > >>> >>figure > >> > > >> >> > >>> >> >out > >> > > >> >> > >>> >> >if a solution to 2 would reduce what we needed to > >>do to > >> > > >> >>address > >> > > >> >> 3. > >> > > >> >> > >>> >>(And > >> > > >> >> > >>> >> >depending on how they are implemented, fixing 1 > >>might > >> > also > >> > > >> >> address > >> > > >> >> > >>>2). > >> > > >> >> > >>> >>It > >> > > >> >> > >>> >> >sounds like you hit hang that I wasn't really > >> expecting. > >> > > >>This > >> > > >> >> > >>>probably > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> >just > >> > > >> >> > >>> >> >means the KIP motivation needs to be a bit clearer > >> about > >> > > >>what > >> > > >> >> type > >> > > >> >> > >>>of > >> > > >> >> > >>> >> >situation this addresses. The cause of the hang may > >> also > >> > be > >> > > >> >> > >>>relevant > >> > > >> >> > >>> >>-- if > >> > > >> >> > >>> >> >it was something like a deadlock then that's > >>something > >> > that > >> > > >> >> should > >> > > >> >> > >>> >>just be > >> > > >> >> > >>> >> >fixed, but if it's something outside our control > >>then a > >> > > >> >>timeout > >> > > >> >> > >>>makes > >> > > >> >> > >>> >>a > >> > > >> >> > >>> >> >lot > >> > > >> >> > >>> >> >more sense. > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> >> > > >> > > >> >> > >>> >> >> >I know we have a similar setting, > >> > > >> >> > >>> >> >>max.in.flights.requests.per.connection, > >> > > >> >> > >>> >> >> >exposed publicly (which I just discovered is > >>missing > >> > > >>from > >> > > >> >>the > >> > > >> >> > >>>new > >> > > >> >> > >>> >> >>producer > >> > > >> >> > >>> >> >> >configs documentation). But it looks like the new > >> > > >>consumer > >> > > >> >>is > >> > > >> >> > >>>not > >> > > >> >> > >>> >> >>exposing > >> > > >> >> > >>> >> >> >that option, using a fixed value instead. I > >>think we > >> > > >>should > >> > > >> >> > >>>default > >> > > >> >> > >>> >>to > >> > > >> >> > >>> >> >> >hiding these implementation values unless > >>there's a > >> > > >>strong > >> > > >> >> case > >> > > >> >> > >>>for > >> > > >> >> > >>> >>a > >> > > >> >> > >>> >> >> >scenario that requires customization. > >> > > >> >> > >>> >> >> For producer, > >>max.in.flight.requests.per.connection > >> > > >>really > >> > > >> >> > >>>matters. > >> > > >> >> > >>> >>If > >> > > >> >> > >>> >> >> people do not want to have reorder of messages, > >>they > >> > > >>have to > >> > > >> >> use > >> > > >> >> > >>> >> >> max.in.flight.requests.per.connection=1. On the > >>other > >> > > >>hand, > >> > > >> >>if > >> > > >> >> > >>> >> >>throughput > >> > > >> >> > >>> >> >> is more of a concern, it could be set to higher. > >>For > >> > the > >> > > >>new > >> > > >> >> > >>> >>consumer, I > >> > > >> >> > >>> >> >> checked the value and I am not sure if the hard > >>coded > >> > > >> >> > >>> >> >> max.in.flight.requests.per.connection=100 is the > >> right > >> > > >> >>value. > >> > > >> >> > >>> >>Without > >> > > >> >> > >>> >> >>the > >> > > >> >> > >>> >> >> response to the previous request, what offsets > >>should > >> > be > >> > > >>put > >> > > >> >> into > >> > > >> >> > >>> >>the > >> > > >> >> > >>> >> >>next > >> > > >> >> > >>> >> >> fetch request? It seems to me the value will be > >>one > >> > > >>natively > >> > > >> >> > >>> >>regardless > >> > > >> >> > >>> >> >>of > >> > > >> >> > >>> >> >> the setting unless we are sending fetch request to > >> > > >>different > >> > > >> >> > >>> >>partitions, > >> > > >> >> > >>> >> >> which does not look like the case. > >> > > >> >> > >>> >> >> Anyway, it looks to be a separate issue > >>orthogonal to > >> > the > >> > > >> >> request > >> > > >> >> > >>> >> >>timeout. > >> > > >> >> > >>> >> >> > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> >> > >> > > >> >> > >>> >> >> >In other words, since the only user-facing change > >> was > >> > > >>the > >> > > >> >> > >>>addition > >> > > >> >> > >>> >>of > >> > > >> >> > >>> >> >>the > >> > > >> >> > >>> >> >> >setting, I'm wondering if we can avoid the KIP > >> > > >>altogether > >> > > >> >>by > >> > > >> >> > >>>just > >> > > >> >> > >>> >> >>choosing > >> > > >> >> > >>> >> >> >a good default value for the timeout. > >> > > >> >> > >>> >> >> The problem is that we have a server side request > >> > timeout > >> > > >> >> exposed > >> > > >> >> > >>>as > >> > > >> >> > >>> >>a > >> > > >> >> > >>> >> >> public configuration. We cannot set the client > >> timeout > >> > > >> >>smaller > >> > > >> >> > >>>than > >> > > >> >> > >>> >>that > >> > > >> >> > >>> >> >> value, so a hard coded value probably won¹t work > >> here. > >> > > >> >> > >>> >> >> > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> >That makes sense, although it's worth keeping in > >>mind > >> > that > >> > > >> >>even > >> > > >> >> if > >> > > >> >> > >>>you > >> > > >> >> > >>> >>use > >> > > >> >> > >>> >> >"correct" values, they could still be violated due > >>to, > >> > > >>e.g., > >> > > >> >>a GC > >> > > >> >> > >>> >>pause > >> > > >> >> > >>> >> >that causes the broker to process a request after > >>it is > >> > > >> >>supposed > >> > > >> >> to > >> > > >> >> > >>> >>have > >> > > >> >> > >>> >> >expired. > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> >-Ewen > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> >> > > >> > > >> >> > >>> >> >> >-Ewen > >> > > >> >> > >>> >> >> > > >> > > >> >> > >>> >> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin > >> > > >> >> > >>> >> >><j...@linkedin.com.invalid> > >> > > >> >> > >>> >> >> >wrote: > >> > > >> >> > >>> >> >> > > >> > > >> >> > >>> >> >> >> Hi, > >> > > >> >> > >>> >> >> >> > >> > > >> >> > >>> >> >> >> I just created a KIP to add a request timeout > >>to > >> > > >> >> NetworkClient > >> > > >> >> > >>> >>for > >> > > >> >> > >>> >> >>new > >> > > >> >> > >>> >> >> >> Kafka clients. > >> > > >> >> > >>> >> >> >> > >> > > >> >> > >>> >> >> >> > >> > > >> >> > >>> >> >> >> > >> > > >> >> > >>> >> >> > >> > > >> >> > >>> >> >> > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> > >> > > >> >> > >>> > >> > > >> >> > >>> > >> > > >> >> > > >> > > >> > >> > > >>>> > >> > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+reques > >> > > >>>>t > >> > > >> >> > >>>+ > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> >> >>timeout+to+NetworkClient > >> > > >> >> > >>> >> >> >> > >> > > >> >> > >>> >> >> >> Comments and suggestions are welcome! > >> > > >> >> > >>> >> >> >> > >> > > >> >> > >>> >> >> >> Thanks. > >> > > >> >> > >>> >> >> >> > >> > > >> >> > >>> >> >> >> Jiangjie (Becket) Qin > >> > > >> >> > >>> >> >> >> > >> > > >> >> > >>> >> >> >> > >> > > >> >> > >>> >> >> > > >> > > >> >> > >>> >> >> > > >> > > >> >> > >>> >> >> >-- > >> > > >> >> > >>> >> >> >Thanks, > >> > > >> >> > >>> >> >> >Ewen > >> > > >> >> > >>> >> >> > >> > > >> >> > >>> >> >> > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> > > >> > > >> >> > >>> >> >-- > >> > > >> >> > >>> >> >Thanks, > >> > > >> >> > >>> >> >Ewen > >> > > >> >> > >>> >> > >> > > >> >> > >>> >> > >> > > >> >> > >>> > > >> > > >> >> > >>> > > >> > > >> >> > >>> >-- > >> > > >> >> > >>> >-- Guozhang > >> > > >> >> > >>> > >> > > >> >> > >>> > >> > > >> >> > > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> >-- > >> > > >> >Thanks, > >> > > >> >Ewen > >> > > >> > >> > > >> > >> > > > > >> > > > > >> > > >-- > >> > > >-Regards, > >> > > >Mayuresh R. Gharat > >> > > >(862) 250-7125 > >> > > > >> > > > >> > > >> > >> > >> > >> -- > >> -Regards, > >> Mayuresh R. Gharat > >> (862) 250-7125 > >> > > -- -Regards, Mayuresh R. Gharat (862) 250-7125