Guozhang,

what you propose makes sense, but this seems to get into implementation
detail territory already?

Thus, if there are nor further change requests to the KIP wiki page
itself, I would like to proceed with the VOTE.


-Matthias

On 5/20/20 12:30 PM, Guozhang Wang wrote:
> Thanks Matthias,
> 
> I agree with you on all the bullet points above. Regarding the admin-client
> outer-loop retries inside partition assignor, I think we should treat error
> codes differently from those two blocking calls:
> 
> Describe-topic:
> * unknown-topic (3): add this topic to the to-be-created topic list.
> * leader-not-available (5): do not try to create, retry in the outer loop.
> * request-timeout: break the current loop and retry in the outer loop.
> * others: fatal error.
> 
> Create-topic:
> * topic-already-exists: retry in the outer loop to validate the
> num.partitions match expectation.
> * request-timeout: break the current loop and retry in the outer loop.
> * others: fatal error.
> 
> And in the outer-loop, I think we can have a global timer for the whole
> "assign()" function, not only for the internal-topic-manager, and the timer
> can be hard-coded with, e.g. half of the rebalance.timeout to get rid of
> the `retries`; if we cannot complete the assignment before the timeout runs
> out, we can return just the partial assignment (e.g. if there are two
> tasks, but we can only get the topic metadata for one of them, then just do
> the assignment for that one only) while encoding in the error-code field to
> request for another rebalance.
> 
> Guozhang
> 
> 
> 
> On Mon, May 18, 2020 at 7:26 PM Matthias J. Sax <mj...@apache.org> wrote:
> 
>> No worries Guozhang, any feedback is always very welcome! My reply is
>> going to be a little longer... Sorry.
>>
>>
>>
>>> 1) There are some inconsistent statements in the proposal regarding what
>> to
>>> deprecated:
>>
>> The proposal of the KIP is to deprecate `retries` for producer, admin,
>> and Streams. Maybe the confusion is about the dependency of those
>> settings within Streams and that we handle the deprecation somewhat
>> different for plain clients vs Streams:
>>
>> For plain producer/admin the default `retries` is set to MAX_VALUE. The
>> config will be deprecated but still be respected.
>>
>> For Streams, the default `retries` is set to zero, however, this default
>> retry does _not_ affect the embedded producer/admin clients -- both
>> clients stay on their own default of MAX_VALUES.
>>
>> Currently, this introduces the issue, that if a user wants to increase
>> Streams retries, she might by accident reduce the embedded client
>> retries, too. To avoid this issue, she would need to set
>>
>> retries=100
>> producer.retires=MAX_VALUE
>> admin.retries=MAX_VALUE
>>
>> This KIP will fix this issue only in the long term though, ie, when
>> `retries` is finally removed. Short term, using `retries` in
>> StreamsConfig would still affect the embedded clients, but Streams, as
>> well as both client would log a WARN message. This preserves backward
>> compatibility.
>>
>> Withing Streams `retries` is ignored and the new `task.timeout.ms` is
>> used instead. This increase the default resilience of Kafka Streams
>> itself. We could also achieve this by still respecting `retries` and to
>> change it's default value. However, because we deprecate `retries` it
>> seems better to just ignore it and switch to the new config directly.
>>
>> I updated the KIPs with some more details.
>>
>>
>>
>>> 2) We should also document the related behavior change in
>> PartitionAssignor
>>> that uses AdminClient.
>>
>> This is actually a good point. Originally, I looked into this only
>> briefly, but it raised an interesting question on how to handle it.
>>
>> Note that `TimeoutExceptions` are currently not handled in this retry
>> loop. Also note that the default retries value for other errors would be
>> MAX_VALUE be default (inherited from `AdminClient#retries` as mentioned
>> already by Guozhang).
>>
>> Applying the new `task.timeout.ms` config does not seem to be
>> appropriate because the AdminClient is used during a rebalance in the
>> leader. We could introduce a new config just for this case, but it seems
>> to be a little bit too much. Furthermore, the group-coordinator applies
>> a timeout on the leader anyway: if the assignment is not computed within
>> the timeout, the leader is removed from the group and another rebalance
>> is triggered.
>>
>> Overall, we make multiple admin client calls and thus we should keep
>> some retry logic and not just rely on the admin client internal retries,
>> as those would fall short to retry different calls interleaved. We could
>> just retry infinitely and rely on the group coordinator to remove the
>> leader eventually. However, this does not seem to be ideal because the
>> removed leader might be stuck forever.
>>
>> The question though is: if topic metadata cannot be obtained or internal
>> topics cannot be created, what should we do? We can't compute an
>> assignment anyway. We have already an rebalance error code to shut down
>> all instances for this case. Maybe we could break the retry loop before
>> the leader is kicked out of the group and send this error code? This way
>> we don't need a new config, but piggy-back on the existing timeout to
>> compute the assignment. To be conservative, we could use a 50% threshold?
>>
>>
>>
>>> BTW as I mentioned in the previous statement, today throwing an exception
>>> that kills one thread but not the whole instance is still an issue for
>>> monitoring purposes, but I suppose this is not going to be in this KIP
>> but
>>> addressed by another KIP, right?
>>
>> Correct. This issue if out-of-scope.
>>
>>
>>
>>> for consumer, if it gets a
>>> TimeoutException polling records, would start timing all tasks since that
>>> single consumer would affect all tasks?
>>
>> Consumer#poll() would never throw a `TimeoutException` and thus
>> `task.timeout.ms` does not apply.
>>
>>
>>
>>> For other blocking calls like
>>> `endOffsets()` etc, they are usually also issued on behalf of a batch of
>>> tasks, so if that gets timeout exception should we start ticking all the
>>> corresponding tasks as well? Maybe worth clarifying a bit more in the
>> wiki.
>>
>> Good point. I agree that the timer should tick for all affected tasks. I
>> clarified in the KIP.
>>
>>
>>
>> About KAFKA-6520:
>>
>> There is already KIP-457 and I am not sure this KIP should subsume it.
>> It somewhat feels orthogonal. I am also not 100% sure if KIP-572
>> actually helps much, because a thread could be disconnected to the
>> brokers without throwing any timeout exception: if all tasks are RUNNING
>> and just polling for new data, but no progress is made because of a
>> network issue, `poll()` would just return no data but not through.
>>
>>
>>
>> @Bruno
>>
>>> Wouldn't it be better to specify
>>> task.timeout.ms to -1 if no retry should be done
>>
>> Interesting idea. Personally I find `-1` confusing. And it seems
>> intuitive to me that `0` implies no retries (this seems to be in
>> alignment to other APIs).
>>
>>
>>
>> -Matthias
>>
>>
>> On 5/18/20 9:53 AM, Guozhang Wang wrote:
>>> Hi Matthias,
>>>
>>> Sorry for flooding the thread, but with this KIP I feel the design scope
>> of
>>> https://issues.apache.org/jira/browse/KAFKA-6520 can be simplified a lot
>>> and may it the design can be just piggy-backed as part of this KIP, wdyt?
>>>
>>> Guozhang
>>>
>>>
>>> On Mon, May 18, 2020 at 9:47 AM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>>
>>>> Hi Matthias,
>>>>
>>>> Just to add one more meta comment: for consumer, if it gets a
>>>> TimeoutException polling records, would start timing all tasks since
>> that
>>>> single consumer would affect all tasks? For other blocking calls like
>>>> `endOffsets()` etc, they are usually also issued on behalf of a batch of
>>>> tasks, so if that gets timeout exception should we start ticking all the
>>>> corresponding tasks as well? Maybe worth clarifying a bit more in the
>> wiki.
>>>>
>>>>
>>>> Guozhang
>>>>
>>>> On Mon, May 18, 2020 at 12:26 AM Bruno Cadonna <br...@confluent.io>
>> wrote:
>>>>
>>>>> Hi Matthias,
>>>>>
>>>>> I am +1 (non-binding) on the KIP.
>>>>>
>>>>> Just one final remark: Wouldn't it be better to specify
>>>>> task.timeout.ms to -1 if no retry should be done? IMO it would make
>>>>> the config more intuitive because 0 would not have two possible
>>>>> meanings (i.e. try once and never try) anymore.
>>>>>
>>>>> Best,
>>>>> Bruno
>>>>>
>>>>> On Sat, May 16, 2020 at 7:51 PM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>>>>>
>>>>>> Hello Matthias,
>>>>>>
>>>>>> Thanks for the updated KIP, overall I'm +1 on this proposal. Some
>> minor
>>>>>> comments (I know gmail mixed that again for me so I'm leaving it as a
>>>>> combo
>>>>>> for both DISCUSS and VOTE :)
>>>>>>
>>>>>> 1) There are some inconsistent statements in the proposal regarding
>>>>> what to
>>>>>> deprecated: at the beginning it says "We propose to deprecate the
>>>>> retries
>>>>>> configuration parameter for the producer and admin client" but later
>> in
>>>>>> compatibility we say "Producer and admin client behavior does not
>>>>> change;
>>>>>> both still respect retries config." My understanding is that we will
>>>>> only
>>>>>> deprecate the StreamsConfig#retries while not touch on
>>>>>> ProducerConfig/AdminClientConfig#retries, AND we will always override
>>>>> the
>>>>>> embedded producer / admin retries config to infinity so that we never
>>>>> rely
>>>>>> on those configs but always bounded by the timeout config. Is that
>>>>>> right, if yes could you clarify in the doc?
>>>>>>
>>>>>> 2) We should also document the related behavior change in
>>>>> PartitionAssignor
>>>>>> that uses AdminClient. More specifically, the admin client's retries
>>>>> config
>>>>>> is piggy-backed inside InternalTopicManager as an outer-loop retry
>>>>> logic in
>>>>>> addition to AdminClient's own inner retry loop. There are some
>> existing
>>>>>> issues like KAFKA-9999 / 10006 that Sophie and Boyang has been working
>>>>> on.
>>>>>> I exchanged some ideas with them, and generally we should consider if
>>>>> the
>>>>>> outer-loop of InternalTopicManager should just be removed and if we
>> got
>>>>>> TimeoutException we should just trigger another rebalance etc.
>>>>>>
>>>>>> BTW as I mentioned in the previous statement, today throwing an
>>>>> exception
>>>>>> that kills one thread but not the whole instance is still an issue for
>>>>>> monitoring purposes, but I suppose this is not going to be in this KIP
>>>>> but
>>>>>> addressed by another KIP, right?
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>>
>>>>>> On Fri, May 15, 2020 at 1:14 PM Boyang Chen <
>> reluctanthero...@gmail.com
>>>>>>
>>>>>> wrote:
>>>>>>
>>>>>>> Good, good.
>>>>>>>
>>>>>>> Read through the discussions, the KIP looks good to me, +1
>>>>> (non-binding)
>>>>>>>
>>>>>>> On Fri, May 15, 2020 at 11:51 AM Sophie Blee-Goldman <
>>>>> sop...@confluent.io>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Called out!
>>>>>>>>
>>>>>>>> Seems like gmail struggles with [...] prefixed subjects. You'd
>>>>> think they
>>>>>>>> would adapt
>>>>>>>> all their practices to conform to the Apache Kafka mailing list
>>>>>>> standards,
>>>>>>>> but no!
>>>>>>>>
>>>>>>>> +1 (non-binding) by the way
>>>>>>>>
>>>>>>>> On Fri, May 15, 2020 at 11:46 AM John Roesler <vvcep...@apache.org>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Boyang,
>>>>>>>>>
>>>>>>>>> It is a separate thread, and you have just revealed yourself as a
>>>>> gmail
>>>>>>>>> user ;)
>>>>>>>>>
>>>>>>>>> (Gmail sometimes conflates vote and discuss threads for no
>>>>> apparent
>>>>>>>> reason
>>>>>>>>> )
>>>>>>>>>
>>>>>>>>> -John
>>>>>>>>>
>>>>>>>>> On Fri, May 15, 2020, at 13:39, Boyang Chen wrote:
>>>>>>>>>> Hey Matthias, should this be on a separate VOTE thread?
>>>>>>>>>>
>>>>>>>>>> On Fri, May 15, 2020 at 11:38 AM John Roesler <
>>>>> vvcep...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks, Matthias!
>>>>>>>>>>>
>>>>>>>>>>> I’m +1 (binding)
>>>>>>>>>>>
>>>>>>>>>>> -John
>>>>>>>>>>>
>>>>>>>>>>> On Fri, May 15, 2020, at 11:55, Matthias J. Sax wrote:
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I would like to start the vote on KIP-572:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Attachments:
>>>>>>>>>>>> * signature.asc
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>>
>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to