Thanks Matthias,

I agree with you on all the bullet points above. Regarding the admin-client
outer-loop retries inside partition assignor, I think we should treat error
codes differently from those two blocking calls:

Describe-topic:
* unknown-topic (3): add this topic to the to-be-created topic list.
* leader-not-available (5): do not try to create, retry in the outer loop.
* request-timeout: break the current loop and retry in the outer loop.
* others: fatal error.

Create-topic:
* topic-already-exists: retry in the outer loop to validate the
num.partitions match expectation.
* request-timeout: break the current loop and retry in the outer loop.
* others: fatal error.

And in the outer-loop, I think we can have a global timer for the whole
"assign()" function, not only for the internal-topic-manager, and the timer
can be hard-coded with, e.g. half of the rebalance.timeout to get rid of
the `retries`; if we cannot complete the assignment before the timeout runs
out, we can return just the partial assignment (e.g. if there are two
tasks, but we can only get the topic metadata for one of them, then just do
the assignment for that one only) while encoding in the error-code field to
request for another rebalance.

Guozhang



On Mon, May 18, 2020 at 7:26 PM Matthias J. Sax <mj...@apache.org> wrote:

> No worries Guozhang, any feedback is always very welcome! My reply is
> going to be a little longer... Sorry.
>
>
>
> > 1) There are some inconsistent statements in the proposal regarding what
> to
> > deprecated:
>
> The proposal of the KIP is to deprecate `retries` for producer, admin,
> and Streams. Maybe the confusion is about the dependency of those
> settings within Streams and that we handle the deprecation somewhat
> different for plain clients vs Streams:
>
> For plain producer/admin the default `retries` is set to MAX_VALUE. The
> config will be deprecated but still be respected.
>
> For Streams, the default `retries` is set to zero, however, this default
> retry does _not_ affect the embedded producer/admin clients -- both
> clients stay on their own default of MAX_VALUES.
>
> Currently, this introduces the issue, that if a user wants to increase
> Streams retries, she might by accident reduce the embedded client
> retries, too. To avoid this issue, she would need to set
>
> retries=100
> producer.retires=MAX_VALUE
> admin.retries=MAX_VALUE
>
> This KIP will fix this issue only in the long term though, ie, when
> `retries` is finally removed. Short term, using `retries` in
> StreamsConfig would still affect the embedded clients, but Streams, as
> well as both client would log a WARN message. This preserves backward
> compatibility.
>
> Withing Streams `retries` is ignored and the new `task.timeout.ms` is
> used instead. This increase the default resilience of Kafka Streams
> itself. We could also achieve this by still respecting `retries` and to
> change it's default value. However, because we deprecate `retries` it
> seems better to just ignore it and switch to the new config directly.
>
> I updated the KIPs with some more details.
>
>
>
> > 2) We should also document the related behavior change in
> PartitionAssignor
> > that uses AdminClient.
>
> This is actually a good point. Originally, I looked into this only
> briefly, but it raised an interesting question on how to handle it.
>
> Note that `TimeoutExceptions` are currently not handled in this retry
> loop. Also note that the default retries value for other errors would be
> MAX_VALUE be default (inherited from `AdminClient#retries` as mentioned
> already by Guozhang).
>
> Applying the new `task.timeout.ms` config does not seem to be
> appropriate because the AdminClient is used during a rebalance in the
> leader. We could introduce a new config just for this case, but it seems
> to be a little bit too much. Furthermore, the group-coordinator applies
> a timeout on the leader anyway: if the assignment is not computed within
> the timeout, the leader is removed from the group and another rebalance
> is triggered.
>
> Overall, we make multiple admin client calls and thus we should keep
> some retry logic and not just rely on the admin client internal retries,
> as those would fall short to retry different calls interleaved. We could
> just retry infinitely and rely on the group coordinator to remove the
> leader eventually. However, this does not seem to be ideal because the
> removed leader might be stuck forever.
>
> The question though is: if topic metadata cannot be obtained or internal
> topics cannot be created, what should we do? We can't compute an
> assignment anyway. We have already an rebalance error code to shut down
> all instances for this case. Maybe we could break the retry loop before
> the leader is kicked out of the group and send this error code? This way
> we don't need a new config, but piggy-back on the existing timeout to
> compute the assignment. To be conservative, we could use a 50% threshold?
>
>
>
> > BTW as I mentioned in the previous statement, today throwing an exception
> > that kills one thread but not the whole instance is still an issue for
> > monitoring purposes, but I suppose this is not going to be in this KIP
> but
> > addressed by another KIP, right?
>
> Correct. This issue if out-of-scope.
>
>
>
> > for consumer, if it gets a
> > TimeoutException polling records, would start timing all tasks since that
> > single consumer would affect all tasks?
>
> Consumer#poll() would never throw a `TimeoutException` and thus
> `task.timeout.ms` does not apply.
>
>
>
> > For other blocking calls like
> > `endOffsets()` etc, they are usually also issued on behalf of a batch of
> > tasks, so if that gets timeout exception should we start ticking all the
> > corresponding tasks as well? Maybe worth clarifying a bit more in the
> wiki.
>
> Good point. I agree that the timer should tick for all affected tasks. I
> clarified in the KIP.
>
>
>
> About KAFKA-6520:
>
> There is already KIP-457 and I am not sure this KIP should subsume it.
> It somewhat feels orthogonal. I am also not 100% sure if KIP-572
> actually helps much, because a thread could be disconnected to the
> brokers without throwing any timeout exception: if all tasks are RUNNING
> and just polling for new data, but no progress is made because of a
> network issue, `poll()` would just return no data but not through.
>
>
>
> @Bruno
>
> > Wouldn't it be better to specify
> > task.timeout.ms to -1 if no retry should be done
>
> Interesting idea. Personally I find `-1` confusing. And it seems
> intuitive to me that `0` implies no retries (this seems to be in
> alignment to other APIs).
>
>
>
> -Matthias
>
>
> On 5/18/20 9:53 AM, Guozhang Wang wrote:
> > Hi Matthias,
> >
> > Sorry for flooding the thread, but with this KIP I feel the design scope
> of
> > https://issues.apache.org/jira/browse/KAFKA-6520 can be simplified a lot
> > and may it the design can be just piggy-backed as part of this KIP, wdyt?
> >
> > Guozhang
> >
> >
> > On Mon, May 18, 2020 at 9:47 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Hi Matthias,
> >>
> >> Just to add one more meta comment: for consumer, if it gets a
> >> TimeoutException polling records, would start timing all tasks since
> that
> >> single consumer would affect all tasks? For other blocking calls like
> >> `endOffsets()` etc, they are usually also issued on behalf of a batch of
> >> tasks, so if that gets timeout exception should we start ticking all the
> >> corresponding tasks as well? Maybe worth clarifying a bit more in the
> wiki.
> >>
> >>
> >> Guozhang
> >>
> >> On Mon, May 18, 2020 at 12:26 AM Bruno Cadonna <br...@confluent.io>
> wrote:
> >>
> >>> Hi Matthias,
> >>>
> >>> I am +1 (non-binding) on the KIP.
> >>>
> >>> Just one final remark: Wouldn't it be better to specify
> >>> task.timeout.ms to -1 if no retry should be done? IMO it would make
> >>> the config more intuitive because 0 would not have two possible
> >>> meanings (i.e. try once and never try) anymore.
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On Sat, May 16, 2020 at 7:51 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>>>
> >>>> Hello Matthias,
> >>>>
> >>>> Thanks for the updated KIP, overall I'm +1 on this proposal. Some
> minor
> >>>> comments (I know gmail mixed that again for me so I'm leaving it as a
> >>> combo
> >>>> for both DISCUSS and VOTE :)
> >>>>
> >>>> 1) There are some inconsistent statements in the proposal regarding
> >>> what to
> >>>> deprecated: at the beginning it says "We propose to deprecate the
> >>> retries
> >>>> configuration parameter for the producer and admin client" but later
> in
> >>>> compatibility we say "Producer and admin client behavior does not
> >>> change;
> >>>> both still respect retries config." My understanding is that we will
> >>> only
> >>>> deprecate the StreamsConfig#retries while not touch on
> >>>> ProducerConfig/AdminClientConfig#retries, AND we will always override
> >>> the
> >>>> embedded producer / admin retries config to infinity so that we never
> >>> rely
> >>>> on those configs but always bounded by the timeout config. Is that
> >>>> right, if yes could you clarify in the doc?
> >>>>
> >>>> 2) We should also document the related behavior change in
> >>> PartitionAssignor
> >>>> that uses AdminClient. More specifically, the admin client's retries
> >>> config
> >>>> is piggy-backed inside InternalTopicManager as an outer-loop retry
> >>> logic in
> >>>> addition to AdminClient's own inner retry loop. There are some
> existing
> >>>> issues like KAFKA-9999 / 10006 that Sophie and Boyang has been working
> >>> on.
> >>>> I exchanged some ideas with them, and generally we should consider if
> >>> the
> >>>> outer-loop of InternalTopicManager should just be removed and if we
> got
> >>>> TimeoutException we should just trigger another rebalance etc.
> >>>>
> >>>> BTW as I mentioned in the previous statement, today throwing an
> >>> exception
> >>>> that kills one thread but not the whole instance is still an issue for
> >>>> monitoring purposes, but I suppose this is not going to be in this KIP
> >>> but
> >>>> addressed by another KIP, right?
> >>>>
> >>>>
> >>>> Guozhang
> >>>>
> >>>>
> >>>> On Fri, May 15, 2020 at 1:14 PM Boyang Chen <
> reluctanthero...@gmail.com
> >>>>
> >>>> wrote:
> >>>>
> >>>>> Good, good.
> >>>>>
> >>>>> Read through the discussions, the KIP looks good to me, +1
> >>> (non-binding)
> >>>>>
> >>>>> On Fri, May 15, 2020 at 11:51 AM Sophie Blee-Goldman <
> >>> sop...@confluent.io>
> >>>>> wrote:
> >>>>>
> >>>>>> Called out!
> >>>>>>
> >>>>>> Seems like gmail struggles with [...] prefixed subjects. You'd
> >>> think they
> >>>>>> would adapt
> >>>>>> all their practices to conform to the Apache Kafka mailing list
> >>>>> standards,
> >>>>>> but no!
> >>>>>>
> >>>>>> +1 (non-binding) by the way
> >>>>>>
> >>>>>> On Fri, May 15, 2020 at 11:46 AM John Roesler <vvcep...@apache.org>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi Boyang,
> >>>>>>>
> >>>>>>> It is a separate thread, and you have just revealed yourself as a
> >>> gmail
> >>>>>>> user ;)
> >>>>>>>
> >>>>>>> (Gmail sometimes conflates vote and discuss threads for no
> >>> apparent
> >>>>>> reason
> >>>>>>> )
> >>>>>>>
> >>>>>>> -John
> >>>>>>>
> >>>>>>> On Fri, May 15, 2020, at 13:39, Boyang Chen wrote:
> >>>>>>>> Hey Matthias, should this be on a separate VOTE thread?
> >>>>>>>>
> >>>>>>>> On Fri, May 15, 2020 at 11:38 AM John Roesler <
> >>> vvcep...@apache.org>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks, Matthias!
> >>>>>>>>>
> >>>>>>>>> I’m +1 (binding)
> >>>>>>>>>
> >>>>>>>>> -John
> >>>>>>>>>
> >>>>>>>>> On Fri, May 15, 2020, at 11:55, Matthias J. Sax wrote:
> >>>>>>>>>> Hi,
> >>>>>>>>>>
> >>>>>>>>>> I would like to start the vote on KIP-572:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Attachments:
> >>>>>>>>>> * signature.asc
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>> --
> >>>> -- Guozhang
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>
>

-- 
-- Guozhang

Reply via email to