[ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9800:
-----------------------------
    Description: 
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for retry backoff ms.
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, I already wrapped the exponential backoff/timeout util class in my 
KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of attempts and returns the backoff/timeout value at 
the corresponding level. Thus, we can add a new class property to those classes 
containing retriable data in order to record the number of failed attempts.

 

Changes:

KafkaProducer:
 # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts. So we can let the Accumulator calculate the new 
retry backoff for each bach when it enqueues them, to avoid instantiate the 
util class multiple times.
 # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new class 
property of type `Long` to record the number of attempts.

KafkaConsumer:
 # Some synchronous retry use cases. Record the failed attempts in the blocking 
loop.
 # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
Though the actual requests are packed for each node, the current implementation 
is applying backoff to each topic partition, where the backoff value is kept by 
TopicPartitionState. Thus, TopicPartitionState will have the new property 
recording the number of attempts.

Metadata:
 #  Metadata lives as a singleton in many clients. Add a new property recording 
the number of attempts

 AdminClient:
 # AdminClient has its own request abstraction Call. The failed attempts are 
already kept by the abstraction. So probably clean the Call class logic a bit.

Existing tests:
 # If the tests are testing the retry backoff, add a delta to the assertion, 
considering the existence of the jitter.
 # If the tests are testing other functionality, we can specify the same value 
for both `retry.backoff.ms` and `retry.backoff.max.ms` in order to make the 
retry backoff static. We can use this trick to make the existing tests 
compatible to the changes.

There're other common usages look like client.poll(timeout), where the timeout 
passed in is the retry backoff value. We won't change these usages since its 
underlying logic is nioSelector.select(timeout) and nioSelector.selectNow(), 
which means if no interested op exists, the client will block retry backoff 
milliseconds. This is an optimization when there's no request that needs to be 
sent but the client is waiting for responses. Specifically, if the client fails 
the inflight requests before the retry backoff milliseconds passed, it still 
needs to wait until that amount of time passed, unless there's a new request 
need to be sent.

 

  was:
Design:

The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
has two main usage patterns:
 # Synchronous retires and blocking loop. The thread will sleep in each 
iteration for retry backoff ms.
 # Async retries. In each polling, the retries do not meet the backoff will be 
filtered. The data class often maintains a 1:1 mapping to a set of requests 
which are logically associated. (i.e. a set contains only one initial request 
and only its retries.)

For type 1, we can utilize a local failure counter of a Java generic data type.

For case 2, I already wrapped the exponential backoff/timeout util class in my 
KIP-601 
[implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
 which takes the number of attempts and returns the backoff/timeout value at 
the corresponding level. Thus, we can add a new class property to those classes 
containing retriable data in order to record the number of failed attempts.

 

Changes:

KafkaProducer:
 # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
ProducerBatch in Accumulator, which already has an attribute attempts recording 
the number of failed attempts. So we can let the Accumulator calculate the new 
retry backoff for each bach when it enqueues them, to avoid instantiate the 
util class multiple times.
 # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new class 
property of type `Long` to record the number of attempts.

KafkaConsumer:
 # Some synchronous retry use cases. Record the failed attempts in the blocking 
loop.
 # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
Though the actual requests are packed for each node, the current implementation 
is applying backoff to each topic partition, where the backoff value is kept by 
TopicPartitionState. Thus, TopicPartitionState will have the new property 
recording the number of attempts.

Metadata:
 #  Metadata lives as a singleton in many clients. Add a new property recording 
the number of attempts

 AdminClient:
 # AdminClient has its own request abstraction Call. The failed attempts are 
already kept by the abstraction. So probably clean the Call class logic a bit.

There're other common usages look like client.poll(timeout), where the timeout 
passed in is the retry backoff value. We won't change these usages since its 
underlying logic is nioSelector.select(timeout) and nioSelector.selectNow(), 
which means if no interested op exists, the client will block retry backoff 
milliseconds. This is an optimization when there's no request that needs to be 
sent but the client is waiting for responses. Specifically, if the client fails 
the inflight requests before the retry backoff milliseconds passed, it still 
needs to wait until that amount of time passed, unless there's a new request 
need to be sent.

 


> [KIP-580] Client Exponential Backoff Implementation
> ---------------------------------------------------
>
>                 Key: KAFKA-9800
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9800
>             Project: Kafka
>          Issue Type: New Feature
>            Reporter: Cheng Tan
>            Assignee: Cheng Tan
>            Priority: Major
>              Labels: KIP-580
>
> Design:
> The main idea is to bookkeep the failed attempt. Currently, the retry backoff 
> has two main usage patterns:
>  # Synchronous retires and blocking loop. The thread will sleep in each 
> iteration for retry backoff ms.
>  # Async retries. In each polling, the retries do not meet the backoff will 
> be filtered. The data class often maintains a 1:1 mapping to a set of 
> requests which are logically associated. (i.e. a set contains only one 
> initial request and only its retries.)
> For type 1, we can utilize a local failure counter of a Java generic data 
> type.
> For case 2, I already wrapped the exponential backoff/timeout util class in 
> my KIP-601 
> [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28]
>  which takes the number of attempts and returns the backoff/timeout value at 
> the corresponding level. Thus, we can add a new class property to those 
> classes containing retriable data in order to record the number of failed 
> attempts.
>  
> Changes:
> KafkaProducer:
>  # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each 
> ProducerBatch in Accumulator, which already has an attribute attempts 
> recording the number of failed attempts. So we can let the Accumulator 
> calculate the new retry backoff for each bach when it enqueues them, to avoid 
> instantiate the util class multiple times.
>  # Transaction request (ApiKeys..*TXN). TxnRequestHandler will have a new 
> class property of type `Long` to record the number of attempts.
> KafkaConsumer:
>  # Some synchronous retry use cases. Record the failed attempts in the 
> blocking loop.
>  # Partition request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). 
> Though the actual requests are packed for each node, the current 
> implementation is applying backoff to each topic partition, where the backoff 
> value is kept by TopicPartitionState. Thus, TopicPartitionState will have the 
> new property recording the number of attempts.
> Metadata:
>  #  Metadata lives as a singleton in many clients. Add a new property 
> recording the number of attempts
>  AdminClient:
>  # AdminClient has its own request abstraction Call. The failed attempts are 
> already kept by the abstraction. So probably clean the Call class logic a bit.
> Existing tests:
>  # If the tests are testing the retry backoff, add a delta to the assertion, 
> considering the existence of the jitter.
>  # If the tests are testing other functionality, we can specify the same 
> value for both `retry.backoff.ms` and `retry.backoff.max.ms` in order to make 
> the retry backoff static. We can use this trick to make the existing tests 
> compatible to the changes.
> There're other common usages look like client.poll(timeout), where the 
> timeout passed in is the retry backoff value. We won't change these usages 
> since its underlying logic is nioSelector.select(timeout) and 
> nioSelector.selectNow(), which means if no interested op exists, the client 
> will block retry backoff milliseconds. This is an optimization when there's 
> no request that needs to be sent but the client is waiting for responses. 
> Specifically, if the client fails the inflight requests before the retry 
> backoff milliseconds passed, it still needs to wait until that amount of time 
> passed, unless there's a new request need to be sent.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to