No worries Guozhang, any feedback is always very welcome! My reply is
going to be a little longer... Sorry.



> 1) There are some inconsistent statements in the proposal regarding what to
> deprecated:

The proposal of the KIP is to deprecate `retries` for producer, admin,
and Streams. Maybe the confusion is about the dependency of those
settings within Streams and that we handle the deprecation somewhat
different for plain clients vs Streams:

For plain producer/admin the default `retries` is set to MAX_VALUE. The
config will be deprecated but still be respected.

For Streams, the default `retries` is set to zero, however, this default
retry does _not_ affect the embedded producer/admin clients -- both
clients stay on their own default of MAX_VALUES.

Currently, this introduces the issue, that if a user wants to increase
Streams retries, she might by accident reduce the embedded client
retries, too. To avoid this issue, she would need to set

retries=100
producer.retires=MAX_VALUE
admin.retries=MAX_VALUE

This KIP will fix this issue only in the long term though, ie, when
`retries` is finally removed. Short term, using `retries` in
StreamsConfig would still affect the embedded clients, but Streams, as
well as both client would log a WARN message. This preserves backward
compatibility.

Withing Streams `retries` is ignored and the new `task.timeout.ms` is
used instead. This increase the default resilience of Kafka Streams
itself. We could also achieve this by still respecting `retries` and to
change it's default value. However, because we deprecate `retries` it
seems better to just ignore it and switch to the new config directly.

I updated the KIPs with some more details.



> 2) We should also document the related behavior change in PartitionAssignor
> that uses AdminClient.

This is actually a good point. Originally, I looked into this only
briefly, but it raised an interesting question on how to handle it.

Note that `TimeoutExceptions` are currently not handled in this retry
loop. Also note that the default retries value for other errors would be
MAX_VALUE be default (inherited from `AdminClient#retries` as mentioned
already by Guozhang).

Applying the new `task.timeout.ms` config does not seem to be
appropriate because the AdminClient is used during a rebalance in the
leader. We could introduce a new config just for this case, but it seems
to be a little bit too much. Furthermore, the group-coordinator applies
a timeout on the leader anyway: if the assignment is not computed within
the timeout, the leader is removed from the group and another rebalance
is triggered.

Overall, we make multiple admin client calls and thus we should keep
some retry logic and not just rely on the admin client internal retries,
as those would fall short to retry different calls interleaved. We could
just retry infinitely and rely on the group coordinator to remove the
leader eventually. However, this does not seem to be ideal because the
removed leader might be stuck forever.

The question though is: if topic metadata cannot be obtained or internal
topics cannot be created, what should we do? We can't compute an
assignment anyway. We have already an rebalance error code to shut down
all instances for this case. Maybe we could break the retry loop before
the leader is kicked out of the group and send this error code? This way
we don't need a new config, but piggy-back on the existing timeout to
compute the assignment. To be conservative, we could use a 50% threshold?



> BTW as I mentioned in the previous statement, today throwing an exception
> that kills one thread but not the whole instance is still an issue for
> monitoring purposes, but I suppose this is not going to be in this KIP but
> addressed by another KIP, right?

Correct. This issue if out-of-scope.



> for consumer, if it gets a
> TimeoutException polling records, would start timing all tasks since that
> single consumer would affect all tasks? 

Consumer#poll() would never throw a `TimeoutException` and thus
`task.timeout.ms` does not apply.



> For other blocking calls like
> `endOffsets()` etc, they are usually also issued on behalf of a batch of
> tasks, so if that gets timeout exception should we start ticking all the
> corresponding tasks as well? Maybe worth clarifying a bit more in the wiki.

Good point. I agree that the timer should tick for all affected tasks. I
clarified in the KIP.



About KAFKA-6520:

There is already KIP-457 and I am not sure this KIP should subsume it.
It somewhat feels orthogonal. I am also not 100% sure if KIP-572
actually helps much, because a thread could be disconnected to the
brokers without throwing any timeout exception: if all tasks are RUNNING
and just polling for new data, but no progress is made because of a
network issue, `poll()` would just return no data but not through.



@Bruno

> Wouldn't it be better to specify
> task.timeout.ms to -1 if no retry should be done

Interesting idea. Personally I find `-1` confusing. And it seems
intuitive to me that `0` implies no retries (this seems to be in
alignment to other APIs).



-Matthias


On 5/18/20 9:53 AM, Guozhang Wang wrote:
> Hi Matthias,
> 
> Sorry for flooding the thread, but with this KIP I feel the design scope of
> https://issues.apache.org/jira/browse/KAFKA-6520 can be simplified a lot
> and may it the design can be just piggy-backed as part of this KIP, wdyt?
> 
> Guozhang
> 
> 
> On Mon, May 18, 2020 at 9:47 AM Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> Hi Matthias,
>>
>> Just to add one more meta comment: for consumer, if it gets a
>> TimeoutException polling records, would start timing all tasks since that
>> single consumer would affect all tasks? For other blocking calls like
>> `endOffsets()` etc, they are usually also issued on behalf of a batch of
>> tasks, so if that gets timeout exception should we start ticking all the
>> corresponding tasks as well? Maybe worth clarifying a bit more in the wiki.
>>
>>
>> Guozhang
>>
>> On Mon, May 18, 2020 at 12:26 AM Bruno Cadonna <br...@confluent.io> wrote:
>>
>>> Hi Matthias,
>>>
>>> I am +1 (non-binding) on the KIP.
>>>
>>> Just one final remark: Wouldn't it be better to specify
>>> task.timeout.ms to -1 if no retry should be done? IMO it would make
>>> the config more intuitive because 0 would not have two possible
>>> meanings (i.e. try once and never try) anymore.
>>>
>>> Best,
>>> Bruno
>>>
>>> On Sat, May 16, 2020 at 7:51 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>>>
>>>> Hello Matthias,
>>>>
>>>> Thanks for the updated KIP, overall I'm +1 on this proposal. Some minor
>>>> comments (I know gmail mixed that again for me so I'm leaving it as a
>>> combo
>>>> for both DISCUSS and VOTE :)
>>>>
>>>> 1) There are some inconsistent statements in the proposal regarding
>>> what to
>>>> deprecated: at the beginning it says "We propose to deprecate the
>>> retries
>>>> configuration parameter for the producer and admin client" but later in
>>>> compatibility we say "Producer and admin client behavior does not
>>> change;
>>>> both still respect retries config." My understanding is that we will
>>> only
>>>> deprecate the StreamsConfig#retries while not touch on
>>>> ProducerConfig/AdminClientConfig#retries, AND we will always override
>>> the
>>>> embedded producer / admin retries config to infinity so that we never
>>> rely
>>>> on those configs but always bounded by the timeout config. Is that
>>>> right, if yes could you clarify in the doc?
>>>>
>>>> 2) We should also document the related behavior change in
>>> PartitionAssignor
>>>> that uses AdminClient. More specifically, the admin client's retries
>>> config
>>>> is piggy-backed inside InternalTopicManager as an outer-loop retry
>>> logic in
>>>> addition to AdminClient's own inner retry loop. There are some existing
>>>> issues like KAFKA-9999 / 10006 that Sophie and Boyang has been working
>>> on.
>>>> I exchanged some ideas with them, and generally we should consider if
>>> the
>>>> outer-loop of InternalTopicManager should just be removed and if we got
>>>> TimeoutException we should just trigger another rebalance etc.
>>>>
>>>> BTW as I mentioned in the previous statement, today throwing an
>>> exception
>>>> that kills one thread but not the whole instance is still an issue for
>>>> monitoring purposes, but I suppose this is not going to be in this KIP
>>> but
>>>> addressed by another KIP, right?
>>>>
>>>>
>>>> Guozhang
>>>>
>>>>
>>>> On Fri, May 15, 2020 at 1:14 PM Boyang Chen <reluctanthero...@gmail.com
>>>>
>>>> wrote:
>>>>
>>>>> Good, good.
>>>>>
>>>>> Read through the discussions, the KIP looks good to me, +1
>>> (non-binding)
>>>>>
>>>>> On Fri, May 15, 2020 at 11:51 AM Sophie Blee-Goldman <
>>> sop...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Called out!
>>>>>>
>>>>>> Seems like gmail struggles with [...] prefixed subjects. You'd
>>> think they
>>>>>> would adapt
>>>>>> all their practices to conform to the Apache Kafka mailing list
>>>>> standards,
>>>>>> but no!
>>>>>>
>>>>>> +1 (non-binding) by the way
>>>>>>
>>>>>> On Fri, May 15, 2020 at 11:46 AM John Roesler <vvcep...@apache.org>
>>>>> wrote:
>>>>>>
>>>>>>> Hi Boyang,
>>>>>>>
>>>>>>> It is a separate thread, and you have just revealed yourself as a
>>> gmail
>>>>>>> user ;)
>>>>>>>
>>>>>>> (Gmail sometimes conflates vote and discuss threads for no
>>> apparent
>>>>>> reason
>>>>>>> )
>>>>>>>
>>>>>>> -John
>>>>>>>
>>>>>>> On Fri, May 15, 2020, at 13:39, Boyang Chen wrote:
>>>>>>>> Hey Matthias, should this be on a separate VOTE thread?
>>>>>>>>
>>>>>>>> On Fri, May 15, 2020 at 11:38 AM John Roesler <
>>> vvcep...@apache.org>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks, Matthias!
>>>>>>>>>
>>>>>>>>> I’m +1 (binding)
>>>>>>>>>
>>>>>>>>> -John
>>>>>>>>>
>>>>>>>>> On Fri, May 15, 2020, at 11:55, Matthias J. Sax wrote:
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I would like to start the vote on KIP-572:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> -Matthias
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Attachments:
>>>>>>>>>> * signature.asc
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> -- Guozhang
>>>
>>
>>
>> --
>> -- Guozhang
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to