Guozhang, what you propose makes sense, but this seems to get into implementation detail territory already?
Thus, if there are nor further change requests to the KIP wiki page itself, I would like to proceed with the VOTE. -Matthias On 5/20/20 12:30 PM, Guozhang Wang wrote: > Thanks Matthias, > > I agree with you on all the bullet points above. Regarding the admin-client > outer-loop retries inside partition assignor, I think we should treat error > codes differently from those two blocking calls: > > Describe-topic: > * unknown-topic (3): add this topic to the to-be-created topic list. > * leader-not-available (5): do not try to create, retry in the outer loop. > * request-timeout: break the current loop and retry in the outer loop. > * others: fatal error. > > Create-topic: > * topic-already-exists: retry in the outer loop to validate the > num.partitions match expectation. > * request-timeout: break the current loop and retry in the outer loop. > * others: fatal error. > > And in the outer-loop, I think we can have a global timer for the whole > "assign()" function, not only for the internal-topic-manager, and the timer > can be hard-coded with, e.g. half of the rebalance.timeout to get rid of > the `retries`; if we cannot complete the assignment before the timeout runs > out, we can return just the partial assignment (e.g. if there are two > tasks, but we can only get the topic metadata for one of them, then just do > the assignment for that one only) while encoding in the error-code field to > request for another rebalance. > > Guozhang > > > > On Mon, May 18, 2020 at 7:26 PM Matthias J. Sax <mj...@apache.org> wrote: > >> No worries Guozhang, any feedback is always very welcome! My reply is >> going to be a little longer... Sorry. >> >> >> >>> 1) There are some inconsistent statements in the proposal regarding what >> to >>> deprecated: >> >> The proposal of the KIP is to deprecate `retries` for producer, admin, >> and Streams. Maybe the confusion is about the dependency of those >> settings within Streams and that we handle the deprecation somewhat >> different for plain clients vs Streams: >> >> For plain producer/admin the default `retries` is set to MAX_VALUE. The >> config will be deprecated but still be respected. >> >> For Streams, the default `retries` is set to zero, however, this default >> retry does _not_ affect the embedded producer/admin clients -- both >> clients stay on their own default of MAX_VALUES. >> >> Currently, this introduces the issue, that if a user wants to increase >> Streams retries, she might by accident reduce the embedded client >> retries, too. To avoid this issue, she would need to set >> >> retries=100 >> producer.retires=MAX_VALUE >> admin.retries=MAX_VALUE >> >> This KIP will fix this issue only in the long term though, ie, when >> `retries` is finally removed. Short term, using `retries` in >> StreamsConfig would still affect the embedded clients, but Streams, as >> well as both client would log a WARN message. This preserves backward >> compatibility. >> >> Withing Streams `retries` is ignored and the new `task.timeout.ms` is >> used instead. This increase the default resilience of Kafka Streams >> itself. We could also achieve this by still respecting `retries` and to >> change it's default value. However, because we deprecate `retries` it >> seems better to just ignore it and switch to the new config directly. >> >> I updated the KIPs with some more details. >> >> >> >>> 2) We should also document the related behavior change in >> PartitionAssignor >>> that uses AdminClient. >> >> This is actually a good point. Originally, I looked into this only >> briefly, but it raised an interesting question on how to handle it. >> >> Note that `TimeoutExceptions` are currently not handled in this retry >> loop. Also note that the default retries value for other errors would be >> MAX_VALUE be default (inherited from `AdminClient#retries` as mentioned >> already by Guozhang). >> >> Applying the new `task.timeout.ms` config does not seem to be >> appropriate because the AdminClient is used during a rebalance in the >> leader. We could introduce a new config just for this case, but it seems >> to be a little bit too much. Furthermore, the group-coordinator applies >> a timeout on the leader anyway: if the assignment is not computed within >> the timeout, the leader is removed from the group and another rebalance >> is triggered. >> >> Overall, we make multiple admin client calls and thus we should keep >> some retry logic and not just rely on the admin client internal retries, >> as those would fall short to retry different calls interleaved. We could >> just retry infinitely and rely on the group coordinator to remove the >> leader eventually. However, this does not seem to be ideal because the >> removed leader might be stuck forever. >> >> The question though is: if topic metadata cannot be obtained or internal >> topics cannot be created, what should we do? We can't compute an >> assignment anyway. We have already an rebalance error code to shut down >> all instances for this case. Maybe we could break the retry loop before >> the leader is kicked out of the group and send this error code? This way >> we don't need a new config, but piggy-back on the existing timeout to >> compute the assignment. To be conservative, we could use a 50% threshold? >> >> >> >>> BTW as I mentioned in the previous statement, today throwing an exception >>> that kills one thread but not the whole instance is still an issue for >>> monitoring purposes, but I suppose this is not going to be in this KIP >> but >>> addressed by another KIP, right? >> >> Correct. This issue if out-of-scope. >> >> >> >>> for consumer, if it gets a >>> TimeoutException polling records, would start timing all tasks since that >>> single consumer would affect all tasks? >> >> Consumer#poll() would never throw a `TimeoutException` and thus >> `task.timeout.ms` does not apply. >> >> >> >>> For other blocking calls like >>> `endOffsets()` etc, they are usually also issued on behalf of a batch of >>> tasks, so if that gets timeout exception should we start ticking all the >>> corresponding tasks as well? Maybe worth clarifying a bit more in the >> wiki. >> >> Good point. I agree that the timer should tick for all affected tasks. I >> clarified in the KIP. >> >> >> >> About KAFKA-6520: >> >> There is already KIP-457 and I am not sure this KIP should subsume it. >> It somewhat feels orthogonal. I am also not 100% sure if KIP-572 >> actually helps much, because a thread could be disconnected to the >> brokers without throwing any timeout exception: if all tasks are RUNNING >> and just polling for new data, but no progress is made because of a >> network issue, `poll()` would just return no data but not through. >> >> >> >> @Bruno >> >>> Wouldn't it be better to specify >>> task.timeout.ms to -1 if no retry should be done >> >> Interesting idea. Personally I find `-1` confusing. And it seems >> intuitive to me that `0` implies no retries (this seems to be in >> alignment to other APIs). >> >> >> >> -Matthias >> >> >> On 5/18/20 9:53 AM, Guozhang Wang wrote: >>> Hi Matthias, >>> >>> Sorry for flooding the thread, but with this KIP I feel the design scope >> of >>> https://issues.apache.org/jira/browse/KAFKA-6520 can be simplified a lot >>> and may it the design can be just piggy-backed as part of this KIP, wdyt? >>> >>> Guozhang >>> >>> >>> On Mon, May 18, 2020 at 9:47 AM Guozhang Wang <wangg...@gmail.com> >> wrote: >>> >>>> Hi Matthias, >>>> >>>> Just to add one more meta comment: for consumer, if it gets a >>>> TimeoutException polling records, would start timing all tasks since >> that >>>> single consumer would affect all tasks? For other blocking calls like >>>> `endOffsets()` etc, they are usually also issued on behalf of a batch of >>>> tasks, so if that gets timeout exception should we start ticking all the >>>> corresponding tasks as well? Maybe worth clarifying a bit more in the >> wiki. >>>> >>>> >>>> Guozhang >>>> >>>> On Mon, May 18, 2020 at 12:26 AM Bruno Cadonna <br...@confluent.io> >> wrote: >>>> >>>>> Hi Matthias, >>>>> >>>>> I am +1 (non-binding) on the KIP. >>>>> >>>>> Just one final remark: Wouldn't it be better to specify >>>>> task.timeout.ms to -1 if no retry should be done? IMO it would make >>>>> the config more intuitive because 0 would not have two possible >>>>> meanings (i.e. try once and never try) anymore. >>>>> >>>>> Best, >>>>> Bruno >>>>> >>>>> On Sat, May 16, 2020 at 7:51 PM Guozhang Wang <wangg...@gmail.com> >> wrote: >>>>>> >>>>>> Hello Matthias, >>>>>> >>>>>> Thanks for the updated KIP, overall I'm +1 on this proposal. Some >> minor >>>>>> comments (I know gmail mixed that again for me so I'm leaving it as a >>>>> combo >>>>>> for both DISCUSS and VOTE :) >>>>>> >>>>>> 1) There are some inconsistent statements in the proposal regarding >>>>> what to >>>>>> deprecated: at the beginning it says "We propose to deprecate the >>>>> retries >>>>>> configuration parameter for the producer and admin client" but later >> in >>>>>> compatibility we say "Producer and admin client behavior does not >>>>> change; >>>>>> both still respect retries config." My understanding is that we will >>>>> only >>>>>> deprecate the StreamsConfig#retries while not touch on >>>>>> ProducerConfig/AdminClientConfig#retries, AND we will always override >>>>> the >>>>>> embedded producer / admin retries config to infinity so that we never >>>>> rely >>>>>> on those configs but always bounded by the timeout config. Is that >>>>>> right, if yes could you clarify in the doc? >>>>>> >>>>>> 2) We should also document the related behavior change in >>>>> PartitionAssignor >>>>>> that uses AdminClient. More specifically, the admin client's retries >>>>> config >>>>>> is piggy-backed inside InternalTopicManager as an outer-loop retry >>>>> logic in >>>>>> addition to AdminClient's own inner retry loop. There are some >> existing >>>>>> issues like KAFKA-9999 / 10006 that Sophie and Boyang has been working >>>>> on. >>>>>> I exchanged some ideas with them, and generally we should consider if >>>>> the >>>>>> outer-loop of InternalTopicManager should just be removed and if we >> got >>>>>> TimeoutException we should just trigger another rebalance etc. >>>>>> >>>>>> BTW as I mentioned in the previous statement, today throwing an >>>>> exception >>>>>> that kills one thread but not the whole instance is still an issue for >>>>>> monitoring purposes, but I suppose this is not going to be in this KIP >>>>> but >>>>>> addressed by another KIP, right? >>>>>> >>>>>> >>>>>> Guozhang >>>>>> >>>>>> >>>>>> On Fri, May 15, 2020 at 1:14 PM Boyang Chen < >> reluctanthero...@gmail.com >>>>>> >>>>>> wrote: >>>>>> >>>>>>> Good, good. >>>>>>> >>>>>>> Read through the discussions, the KIP looks good to me, +1 >>>>> (non-binding) >>>>>>> >>>>>>> On Fri, May 15, 2020 at 11:51 AM Sophie Blee-Goldman < >>>>> sop...@confluent.io> >>>>>>> wrote: >>>>>>> >>>>>>>> Called out! >>>>>>>> >>>>>>>> Seems like gmail struggles with [...] prefixed subjects. You'd >>>>> think they >>>>>>>> would adapt >>>>>>>> all their practices to conform to the Apache Kafka mailing list >>>>>>> standards, >>>>>>>> but no! >>>>>>>> >>>>>>>> +1 (non-binding) by the way >>>>>>>> >>>>>>>> On Fri, May 15, 2020 at 11:46 AM John Roesler <vvcep...@apache.org> >>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Boyang, >>>>>>>>> >>>>>>>>> It is a separate thread, and you have just revealed yourself as a >>>>> gmail >>>>>>>>> user ;) >>>>>>>>> >>>>>>>>> (Gmail sometimes conflates vote and discuss threads for no >>>>> apparent >>>>>>>> reason >>>>>>>>> ) >>>>>>>>> >>>>>>>>> -John >>>>>>>>> >>>>>>>>> On Fri, May 15, 2020, at 13:39, Boyang Chen wrote: >>>>>>>>>> Hey Matthias, should this be on a separate VOTE thread? >>>>>>>>>> >>>>>>>>>> On Fri, May 15, 2020 at 11:38 AM John Roesler < >>>>> vvcep...@apache.org> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks, Matthias! >>>>>>>>>>> >>>>>>>>>>> I’m +1 (binding) >>>>>>>>>>> >>>>>>>>>>> -John >>>>>>>>>>> >>>>>>>>>>> On Fri, May 15, 2020, at 11:55, Matthias J. Sax wrote: >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I would like to start the vote on KIP-572: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Attachments: >>>>>>>>>>>> * signature.asc >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature