+1 Becket. That would give enough time for clients to move. We should make this change very clear.
Thanks, Mayuresh On Tue, May 12, 2015 at 1:45 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hey Ewen, > > Very good summary about the compatibility. What you proposed makes sense. > So basically we can do the following: > > In next release, i.e. 0.8.3: > 1. Add REPLICATION_TIMEOUT_CONFIG (“replication.timeout.ms”) > 2. Mark TIMEOUT_CONFIG as deprecated > 3. Override REPLICATION_TIMEOUT_CONFIG with TIMEOUT_CONFIG if it is > defined and give a warning about deprecation. > In the release after 0.8.3, we remove TIMEOUT_CONFIG. > > This should give enough buffer for this change. > > Request timeout is a complete new thing we add to fix a bug, I’m with you > it does not make sense to have it maintain the old buggy behavior. So we > can set it to a reasonable value instead of infinite. > > Jiangjie (Becket) Qin > > On 5/12/15, 12:03 PM, "Ewen Cheslack-Postava" <e...@confluent.io> wrote: > > >I think my confusion is coming from this: > > > >> So in this KIP, we only address (3). The only public interface change > >>is a > >> new configuration of request timeout (and maybe change the configuration > >> name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). > > > >There are 3 possible compatibility issues I see here: > > > >* I assumed this meant the constants also change, so "timeout.ms" becomes > >" > >replication.timeout.ms". This breaks config files that worked on the > >previous version and the only warning would be in release notes. We do > >warn > >about unused configs so they might notice the problem. > > > >* Binary and source compatibility if someone configures their client in > >code and uses the TIMEOUT_CONFIG variable. Renaming it will cause existing > >jars to break if you try to run against an updated client (which seems not > >very significant since I doubt people upgrade these without recompiling > >but > >maybe I'm wrong about that). And it breaks builds without have deprecated > >that field first, which again, is probably not the biggest issue but is > >annoying for users and when we accidentally changed the API we received a > >complaint about breaking builds. > > > >* Behavior compatibility as Jay mentioned on the call -- setting the > >config > >(even if the name changed) doesn't have the same effect it used to. > > > >One solution, which admittedly is more painful to implement and maintain, > >would be to maintain the timeout.ms config, have it override the others > if > >it is specified (including an infinite request timeout I guess?), and if > >it > >isn't specified, we can just use the new config variables. Given a real > >deprecation schedule, users would have better warning of changes and a > >window to make the changes. > > > >I actually think it might not be necessary to maintain the old behavior > >precisely, although maybe for some code it is an issue if they start > >seeing > >timeout exceptions that they wouldn't have seen before? > > > >-Ewen > > > >On Wed, May 6, 2015 at 6:06 PM, Jun Rao <j...@confluent.io> wrote: > > > >> Jiangjie, > >> > >> Yes, I think using metadata timeout to expire batches in the record > >> accumulator makes sense. > >> > >> Thanks, > >> > >> Jun > >> > >> On Mon, May 4, 2015 at 10:32 AM, Jiangjie Qin > >><j...@linkedin.com.invalid> > >> wrote: > >> > >> > I incorporated Ewen and Guozhang’s comments in the KIP page. Want to > >> speed > >> > up on this KIP because currently we experience mirror-maker hung very > >> > likely when a broker is down. > >> > > >> > I also took a shot to solve KAFKA-1788 in KAFKA-2142. I used metadata > >> > timeout to expire the batches which are sitting in accumulator without > >> > leader info. I did that because the situation there is essentially > >> missing > >> > metadata. > >> > > >> > As a summary of what I am thinking about the timeout in new Producer: > >> > > >> > 1. Metadata timeout: > >> > - used in send(), blocking > >> > - used in accumulator to expire batches with timeout exception. > >> > 2. Linger.ms > >> > - Used in accumulator to ready the batch for drain > >> > 3. Request timeout > >> > - Used in NetworkClient to expire a batch and retry if no response > >>is > >> > received for a request before timeout. > >> > > >> > So in this KIP, we only address (3). The only public interface change > >>is > >> a > >> > new configuration of request timeout (and maybe change the > >>configuration > >> > name of TIMEOUT_CONFIG to REPLICATION_TIMEOUT_CONFIG). > >> > > >> > Would like to see what people think of above approach? > >> > > >> > Jiangjie (Becket) Qin > >> > > >> > On 4/20/15, 6:02 PM, "Jiangjie Qin" <j...@linkedin.com> wrote: > >> > > >> > >Jun, > >> > > > >> > >I thought a little bit differently on this. > >> > >Intuitively, I am thinking that if a partition is offline, the > >>metadata > >> > >for that partition should be considered not ready because we don’t > >>know > >> > >which broker we should send the message to. So those sends need to be > >> > >blocked on metadata timeout. > >> > >Another thing I’m wondering is in which scenario an offline partition > >> will > >> > >become online again in a short period of time and how likely it will > >> > >occur. My understanding is that the batch timeout for batches > >>sitting in > >> > >accumulator should be larger than linger.ms but should not be too > >>long > >> > >(e.g. less than 60 seconds). Otherwise it will exhaust the shared > >>buffer > >> > >with batches to be aborted. > >> > > > >> > >That said, I do agree it is reasonable to buffer the message for some > >> time > >> > >so messages to other partitions can still get sent. But adding > >>another > >> > >expiration in addition to linger.ms - which is essentially a timeout > >>- > >> > >sounds a little bit confusing. Maybe we can do this, let the batch > >>sit > >> in > >> > >accumulator up to linger.ms, then fail it if necessary. > >> > > > >> > >What do you think? > >> > > > >> > >Thanks, > >> > > > >> > >Jiangjie (Becket) Qin > >> > > > >> > >On 4/20/15, 1:11 PM, "Jun Rao" <j...@confluent.io> wrote: > >> > > > >> > >>Jiangjie, > >> > >> > >> > >>Allowing messages to be accumulated in an offline partition could be > >> > >>useful > >> > >>since the partition may become available before the request timeout > >>or > >> > >>linger time is reached. Now that we are planning to add a new > >>timeout, > >> it > >> > >>would be useful to think through whether/how that applies to > >>messages > >> in > >> > >>the accumulator too. > >> > >> > >> > >>Thanks, > >> > >> > >> > >>Jun > >> > >> > >> > >> > >> > >>On Thu, Apr 16, 2015 at 1:02 PM, Jiangjie Qin > >> <j...@linkedin.com.invalid > >> > > > >> > >>wrote: > >> > >> > >> > >>> Hi Harsha, > >> > >>> > >> > >>> Took a quick look at the patch. I think it is still a little bit > >> > >>> different. KAFKA-1788 only handles the case where a batch sitting > >>in > >> > >>> accumulator for too long. The KIP is trying to solve the issue > >>where > >> a > >> > >>> batch has already been drained from accumulator and sent to > >>broker. > >> > >>> We might be able to apply timeout on batch level to merge those > >>two > >> > >>>cases > >> > >>> as Ewen suggested. But I’m not sure if it is a good idea to allow > >> > >>>messages > >> > >>> whose target partition is offline to sit in accumulator in the > >>first > >> > >>>place. > >> > >>> > >> > >>> Jiangjie (Becket) Qin > >> > >>> > >> > >>> On 4/16/15, 10:19 AM, "Sriharsha Chintalapani" <ka...@harsha.io> > >> > wrote: > >> > >>> > >> > >>> >Guozhang and Jiangjie, > >> > >>> > Isn’t this work being covered in > >> > >>> >https://issues.apache.org/jira/browse/KAFKA-1788 . Can you > please > >> the > >> > >>> >review the patch there. > >> > >>> >Thanks, > >> > >>> >Harsha > >> > >>> > > >> > >>> > > >> > >>> >On April 15, 2015 at 10:39:40 PM, Guozhang Wang > >>(wangg...@gmail.com > >> ) > >> > >>> >wrote: > >> > >>> > > >> > >>> >Thanks for the update Jiangjie, > >> > >>> > > >> > >>> >I think it is actually NOT expected that hardware disconnection > >>will > >> > >>>be > >> > >>> >detected by the selector, but rather will only be revealed upon > >>TCP > >> > >>> >timeout, which could be hours. > >> > >>> > > >> > >>> >A couple of comments on the wiki: > >> > >>> > > >> > >>> >1. "For KafkaProducer.close() and KafkaProducer.flush() we need > >>the > >> > >>> >request > >> > >>> >timeout as implict timeout." I am not very clear what does this > >> mean? > >> > >>> > > >> > >>> >2. Currently the producer already has a "TIMEOUT_CONFIG" which > >> should > >> > >>> >really be "REPLICATION_TIMEOUT_CONFIG". So if we decide to add " > >> > >>> >REQUEST_TIMEOUT_CONFIG", I suggest we also make this renaming: > >> > >>>admittedly > >> > >>> > > >> > >>> >it will change the config names but will reduce confusions moving > >> > >>> >forward. > >> > >>> > > >> > >>> > > >> > >>> >Guozhang > >> > >>> > > >> > >>> > > >> > >>> >On Wed, Apr 15, 2015 at 6:48 PM, Jiangjie Qin > >> > >>><j...@linkedin.com.invalid> > >> > >>> > > >> > >>> >wrote: > >> > >>> > > >> > >>> >> Checked the code again. It seems that the disconnected channel > >>is > >> > >>>not > >> > >>> >> detected by selector as expected. > >> > >>> >> > >> > >>> >> Currently we are depending on the > >> > >>> >> o.a.k.common.network.Selector.disconnected set to see if we > >>need > >> to > >> > >>>do > >> > >>> >> something for a disconnected channel. > >> > >>> >> However Selector.disconnected set is only updated when: > >> > >>> >> 1. A write/read/connect to channel failed. > >> > >>> >> 2. A Key is canceled > >> > >>> >> However when a broker is down before it sends back the > >>response, > >> the > >> > >>> >> client seems not be able to detect this failure. > >> > >>> >> > >> > >>> >> I did a simple test below: > >> > >>> >> 1. Run a selector on one machine and an echo server on another > >> > >>>machine. > >> > >>> >> > >> > >>> >> Connect a selector to an echo server > >> > >>> >> 2. Send a message to echo server using selector, then let the > >> > >>>selector > >> > >>> >> poll() every 10 seconds. > >> > >>> >> 3. After the sever received the message, unplug cable on the > >>echo > >> > >>> >>server. > >> > >>> >> 4. After waiting for 45 min. The selector still did not > >>detected > >> the > >> > >>> >> network failure. > >> > >>> >> Lsof on selector machine shows that the TCP connection is still > >> > >>> >>considered > >> > >>> >> ESTABLISHED. > >> > >>> >> > >> > >>> >> I’m not sure in this case what should we expect from the > >> > >>> >> java.nio.channels.Selector. According to the document, the > >> selector > >> > >>> >>does > >> > >>> >> not verify the status of the associated channel. In my test > >>case > >> it > >> > >>> >>looks > >> > >>> >> even worse that OS did not think of the socket has been > >> > >>>disconnected. > >> > >>> >> > >> > >>> >> Anyway. It seems adding the client side request timeout is > >> > >>>necessary. > >> > >>> >>I’ve > >> > >>> >> updated the KIP page to clarify the problem we want to solve > >> > >>>according > >> > >>> >>to > >> > >>> >> Ewen’s comments. > >> > >>> >> > >> > >>> >> Thanks. > >> > >>> >> > >> > >>> >> Jiangjie (Becket) Qin > >> > >>> >> > >> > >>> >> On 4/14/15, 3:38 PM, "Ewen Cheslack-Postava" > >><e...@confluent.io> > >> > >>>wrote: > >> > >>> >> > >> > >>> >> > >> > >>> >> >On Tue, Apr 14, 2015 at 1:57 PM, Jiangjie Qin > >> > >>> >><j...@linkedin.com.invalid> > >> > >>> >> >wrote: > >> > >>> >> > > >> > >>> >> >> Hi Ewen, thanks for the comments. Very good points! Please > >>see > >> > >>> >>replies > >> > >>> >> >> inline. > >> > >>> >> >> > >> > >>> >> >> > >> > >>> >> >> On 4/13/15, 11:19 PM, "Ewen Cheslack-Postava" < > >> e...@confluent.io > >> > > > >> > >>> >> wrote: > >> > >>> >> >> > >> > >>> >> >> >Jiangjie, > >> > >>> >> >> > > >> > >>> >> >> >Great start. I have a couple of comments. > >> > >>> >> >> > > >> > >>> >> >> >Under the motivation section, is it really true that the > >> request > >> > >>> >>will > >> > >>> >> >> >never > >> > >>> >> >> >be completed? Presumably if the broker goes down the > >> connection > >> > >>> >>will be > >> > >>> >> >> >severed, at worst by a TCP timeout, which should clean up > >>the > >> > >>> >> >>connection > >> > >>> >> >> >and any outstanding requests, right? I think the real > >>reason > >> we > >> > >>> >>need a > >> > >>> >> >> >different timeout is that the default TCP timeouts are > >> > >>>ridiculously > >> > >>> >> > >> > >>> >> >>long > >> > >>> >> >> >in > >> > >>> >> >> >this context. > >> > >>> >> >> Yes, when broker is completely down the request should be > >> cleared > >> > >>>as > >> > >>> >>you > >> > >>> >> >> said. The case we encountered looks like the broker was just > >> not > >> > >>> >> >> responding but TCP connection was still alive though. > >> > >>> >> >> > >> > >>> >> > > >> > >>> >> >Ok, that makes sense. > >> > >>> >> > > >> > >>> >> > > >> > >>> >> >> > >> > >>> >> >> > > >> > >>> >> >> >My second question is about whether this is the right > >>level to > >> > >>> >>tackle > >> > >>> >> >>the > >> > >>> >> >> >issue/what user-facing changes need to be made. A related > >> > >>>problem > >> > >>> >>came > >> > >>> >> >>up > >> > >>> >> >> >in https://issues.apache.org/jira/browse/KAFKA-1788 where > >> > >>>producer > >> > >>> >> >> records > >> > >>> >> >> >get stuck indefinitely because there's no client-side > >>timeout. > >> > >>>This > >> > >>> >>KIP > >> > >>> >> >> >wouldn't fix that problem or any problems caused by lack of > >> > >>> >> >>connectivity > >> > >>> >> >> >since this would only apply to in flight requests, which by > >> > >>> >>definition > >> > >>> >> >> >must > >> > >>> >> >> >have been sent on an active connection. > >> > >>> >> >> > > >> > >>> >> >> >I suspect both types of problems probably need to be > >>addressed > >> > >>> >> >>separately > >> > >>> >> >> >by introducing explicit timeouts. However, because the > >> settings > >> > >>> >> >>introduced > >> > >>> >> >> >here are very much about the internal implementations of > >>the > >> > >>> >>clients, > >> > >>> >> >>I'm > >> > >>> >> >> >wondering if this even needs to be a user-facing setting, > >> > >>> >>especially > >> > >>> >> >>if we > >> > >>> >> >> >have to add other timeouts anyway. For example, would a > >>fixed, > >> > >>> >>generous > >> > >>> >> >> >value that's still much shorter than a TCP timeout, say > >>15s, > >> be > >> > >>> >>good > >> > >>> >> >> >enough? If other timeouts would allow, for example, the > >> clients > >> > >>>to > >> > >>> >> >> >properly > >> > >>> >> >> >exit even if requests have not hit their timeout, then > >>what's > >> > >>>the > >> > >>> >> >>benefit > >> > >>> >> >> >of being able to configure the request-level timeout? > >> > >>> >> >> That is a very good point. We have three places that we > >>might > >> be > >> > >>> >>able to > >> > >>> >> >> enforce timeout for a message send: > >> > >>> >> >> 1. Before append to accumulator - handled by metadata > >>timeout > >> on > >> > >>>per > >> > >>> >> > >> > >>> >> >> message level. > >> > >>> >> >> 2. Batch of messages inside accumulator - no timeout > >>mechanism > >> > >>>now. > >> > >>> >> >> 3. Request of batches after messages leave the accumulator > >>- we > >> > >>>have > >> > >>> >>a > >> > >>> >> >> broker side timeout but no client side timeout for now. > >> > >>> >> >> My current proposal only address (3) but not (2). > >> > >>> >> >> Honestly I do not have a very clear idea about what should > >>we > >> do > >> > >>> >>with > >> > >>> >> >>(2) > >> > >>> >> >> right now. But I am with you that we should not expose too > >>many > >> > >>> >> >> configurations to users. What I am thinking now to handle > >>(2) > >> is > >> > >>> >>when > >> > >>> >> >>user > >> > >>> >> >> call send, if we know that a partition is offline, we should > >> > >>>throw > >> > >>> >> >> exception immediately instead of putting it into > >>accumulator. > >> > >>>This > >> > >>> >>would > >> > >>> >> >> protect further memory consumption. We might also want to > >>fail > >> > >>>all > >> > >>> >>the > >> > >>> >> >> batches in the dequeue once we found a partition is offline. > >> That > >> > >>> >> >>said, I > >> > >>> >> >> feel timeout might not be quite applicable to (2). > >> > >>> >> >> Do you have any suggestion on this? > >> > >>> >> >> > >> > >>> >> > > >> > >>> >> >Right, I didn't actually mean to solve 2 here, but was trying > >>to > >> > >>> >>figure > >> > >>> >> >out > >> > >>> >> >if a solution to 2 would reduce what we needed to do to > >>address > >> 3. > >> > >>> >>(And > >> > >>> >> >depending on how they are implemented, fixing 1 might also > >> address > >> > >>>2). > >> > >>> >>It > >> > >>> >> >sounds like you hit hang that I wasn't really expecting. This > >> > >>>probably > >> > >>> >> > >> > >>> >> >just > >> > >>> >> >means the KIP motivation needs to be a bit clearer about what > >> type > >> > >>>of > >> > >>> >> >situation this addresses. The cause of the hang may also be > >> > >>>relevant > >> > >>> >>-- if > >> > >>> >> >it was something like a deadlock then that's something that > >> should > >> > >>> >>just be > >> > >>> >> >fixed, but if it's something outside our control then a > >>timeout > >> > >>>makes > >> > >>> >>a > >> > >>> >> >lot > >> > >>> >> >more sense. > >> > >>> >> > > >> > >>> >> > > >> > >>> >> >> > > >> > >>> >> >> >I know we have a similar setting, > >> > >>> >> >>max.in.flights.requests.per.connection, > >> > >>> >> >> >exposed publicly (which I just discovered is missing from > >>the > >> > >>>new > >> > >>> >> >>producer > >> > >>> >> >> >configs documentation). But it looks like the new consumer > >>is > >> > >>>not > >> > >>> >> >>exposing > >> > >>> >> >> >that option, using a fixed value instead. I think we should > >> > >>>default > >> > >>> >>to > >> > >>> >> >> >hiding these implementation values unless there's a strong > >> case > >> > >>>for > >> > >>> >>a > >> > >>> >> >> >scenario that requires customization. > >> > >>> >> >> For producer, max.in.flight.requests.per.connection really > >> > >>>matters. > >> > >>> >>If > >> > >>> >> >> people do not want to have reorder of messages, they have to > >> use > >> > >>> >> >> max.in.flight.requests.per.connection=1. On the other hand, > >>if > >> > >>> >> >>throughput > >> > >>> >> >> is more of a concern, it could be set to higher. For the new > >> > >>> >>consumer, I > >> > >>> >> >> checked the value and I am not sure if the hard coded > >> > >>> >> >> max.in.flight.requests.per.connection=100 is the right > >>value. > >> > >>> >>Without > >> > >>> >> >>the > >> > >>> >> >> response to the previous request, what offsets should be put > >> into > >> > >>> >>the > >> > >>> >> >>next > >> > >>> >> >> fetch request? It seems to me the value will be one natively > >> > >>> >>regardless > >> > >>> >> >>of > >> > >>> >> >> the setting unless we are sending fetch request to different > >> > >>> >>partitions, > >> > >>> >> >> which does not look like the case. > >> > >>> >> >> Anyway, it looks to be a separate issue orthogonal to the > >> request > >> > >>> >> >>timeout. > >> > >>> >> >> > >> > >>> >> > > >> > >>> >> > > >> > >>> >> >> > >> > >>> >> >> >In other words, since the only user-facing change was the > >> > >>>addition > >> > >>> >>of > >> > >>> >> >>the > >> > >>> >> >> >setting, I'm wondering if we can avoid the KIP altogether > >>by > >> > >>>just > >> > >>> >> >>choosing > >> > >>> >> >> >a good default value for the timeout. > >> > >>> >> >> The problem is that we have a server side request timeout > >> exposed > >> > >>>as > >> > >>> >>a > >> > >>> >> >> public configuration. We cannot set the client timeout > >>smaller > >> > >>>than > >> > >>> >>that > >> > >>> >> >> value, so a hard coded value probably won¹t work here. > >> > >>> >> >> > >> > >>> >> > > >> > >>> >> >That makes sense, although it's worth keeping in mind that > >>even > >> if > >> > >>>you > >> > >>> >>use > >> > >>> >> >"correct" values, they could still be violated due to, e.g., > >>a GC > >> > >>> >>pause > >> > >>> >> >that causes the broker to process a request after it is > >>supposed > >> to > >> > >>> >>have > >> > >>> >> >expired. > >> > >>> >> > > >> > >>> >> >-Ewen > >> > >>> >> > > >> > >>> >> > > >> > >>> >> > > >> > >>> >> >> > > >> > >>> >> >> >-Ewen > >> > >>> >> >> > > >> > >>> >> >> >On Mon, Apr 13, 2015 at 2:35 PM, Jiangjie Qin > >> > >>> >> >><j...@linkedin.com.invalid> > >> > >>> >> >> >wrote: > >> > >>> >> >> > > >> > >>> >> >> >> Hi, > >> > >>> >> >> >> > >> > >>> >> >> >> I just created a KIP to add a request timeout to > >> NetworkClient > >> > >>> >>for > >> > >>> >> >>new > >> > >>> >> >> >> Kafka clients. > >> > >>> >> >> >> > >> > >>> >> >> >> > >> > >>> >> >> >> > >> > >>> >> >> > >> > >>> >> >> > >> > >>> >> > >> > >>> >> > >> > >>> > >> > >>> > >> > > >>https://cwiki.apache.org/confluence/display/KAFKA/KIP-19+-+Add+a+request > >> > >>>+ > >> > >>> >> > >> > >>> >> >> >>timeout+to+NetworkClient > >> > >>> >> >> >> > >> > >>> >> >> >> Comments and suggestions are welcome! > >> > >>> >> >> >> > >> > >>> >> >> >> Thanks. > >> > >>> >> >> >> > >> > >>> >> >> >> Jiangjie (Becket) Qin > >> > >>> >> >> >> > >> > >>> >> >> >> > >> > >>> >> >> > > >> > >>> >> >> > > >> > >>> >> >> >-- > >> > >>> >> >> >Thanks, > >> > >>> >> >> >Ewen > >> > >>> >> >> > >> > >>> >> >> > >> > >>> >> > > >> > >>> >> > > >> > >>> >> >-- > >> > >>> >> >Thanks, > >> > >>> >> >Ewen > >> > >>> >> > >> > >>> >> > >> > >>> > > >> > >>> > > >> > >>> >-- > >> > >>> >-- Guozhang > >> > >>> > >> > >>> > >> > > > >> > > >> > > >> > > > > > > > >-- > >Thanks, > >Ewen > > -- -Regards, Mayuresh R. Gharat (862) 250-7125