[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Cheng Tan updated KAFKA-9800: ----------------------------- Description: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Changes: KafkaProducer: # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. So probably clean up the logic a little bit by hiding the failed attempts property and the getter method by inheritance. # Transaction request (ApiKeys..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempt. KafkaConsumer: # Some synchronous retry use cases. Record the failed attempts in the blocking loop. # Partition state request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the current implementation is applying backoff to each topic partition, where the backoff value is kept by TopicPartitionState. Thus, TopicPartitionState will inherit from Retribale. Metadata: # Metadata lives as a singleton in many clients. It can inherit from Retriable. AdminClient: # AdminClient has its own request abstraction Call. The failed attempts are kept by the abstraction. So probably clean the Call class logic a bit by inheritance. There're other common usages look like client.poll(timeout), where the timeout passed in is the retry backoff value. We won't change these usages since its underlying logic is nioSelector.select(timeout) and nioSelector.selectNow(), which means if no interested op exists, the client will block retry backoff milliseconds. This is an optimization when there's no request that needs to be sent but the client is waiting for responses. Specifically, if the client fails the inflight requests before the retry backoff milliseconds passed, it still needs to wait until that amount of time passed, unless there's a new request need to be sent. was: Design: The main idea is to bookkeep the failed attempt. Currently, the retry backoff has two main usage patterns: # Synchronous retires and blocking loop. The thread will sleep in each iteration for # Async retries. In each polling, the retries do not meet the backoff will be filtered. The data class often maintains a 1:1 mapping to a set of requests which are logically associated. (i.e. a set contains only one initial request and only its retries.) For type 1, we can utilize a local failure counter of a Java generic data type. For case 2, we can make those classes containing retriable data inherit from an abstract class Retriable. Retriable will implement interfaces recording the number of failed attempts. I already wrapped the exponential backoff/timeout util class in my KIP-601 [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] which takes the number of failures and returns the backoff/timeout value at the corresponding level. Changes: KafkaProducer: # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each ProducerBatch in Accumulator, which already has an attribute attempts recording the number of failed attempts. So probably clean up the logic a little bit by hiding the failed attempts property and the getter method by inheritance. # Transaction request (ApiKeys..*TXN). TxnRequestHandler will inherit from Retriable and record each failed attempt. KafkaConsumer: # Some synchronous retry use cases. Record the failed attempts in the blocking loop. # Partition state request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, the current implementation is applying backoff to each topic partition, where the backoff value is kept by TopicPartitionState. Thus, TopicPartitionState will inherit from Retribale. Metadata: # Metadata lives as a singleton in many clients. It can inherit from Retriable. AdminClient: # AdminClient has its own request abstraction Call. The failed attempts are kept by the abstraction. So probably clean the logic a bit by inheritance. There're other common usages look like client.poll(timeout), where the timeout passed in is the retry backoff value. We won't change these usages since its underlying logic is nioSelector.select(timeout) and nioSelector.selectNow(), which means if no interested op exists, the client will block retry backoff milliseconds. This is an optimization when there's no request that needs to be sent but the client is waiting for responses. Specifically, if the client fails the inflight requests before the retry backoff milliseconds passed, it still needs to wait until that amount of time passed, unless there's a new request need to be sent. > [KIP-580] Client Exponential Backoff Implementation > --------------------------------------------------- > > Key: KAFKA-9800 > URL: https://issues.apache.org/jira/browse/KAFKA-9800 > Project: Kafka > Issue Type: New Feature > Reporter: Cheng Tan > Assignee: Cheng Tan > Priority: Major > Labels: KIP-580 > > Design: > The main idea is to bookkeep the failed attempt. Currently, the retry backoff > has two main usage patterns: > # Synchronous retires and blocking loop. The thread will sleep in each > iteration for > # Async retries. In each polling, the retries do not meet the backoff will > be filtered. The data class often maintains a 1:1 mapping to a set of > requests which are logically associated. (i.e. a set contains only one > initial request and only its retries.) > For type 1, we can utilize a local failure counter of a Java generic data > type. > For case 2, we can make those classes containing retriable data inherit from > an abstract class Retriable. Retriable will implement interfaces recording > the number of failed attempts. I already wrapped the exponential > backoff/timeout util class in my KIP-601 > [implementation|https://github.com/apache/kafka/pull/8683/files#diff-9ca2b1294653dfa914b9277de62b52e3R28] > which takes the number of failures and returns the backoff/timeout value at > the corresponding level. > > Changes: > KafkaProducer: > # Produce request (ApiKeys.PRODUCE). Currently, the backoff applies to each > ProducerBatch in Accumulator, which already has an attribute attempts > recording the number of failed attempts. So probably clean up the logic a > little bit by hiding the failed attempts property and the getter method by > inheritance. > # Transaction request (ApiKeys..*TXN). TxnRequestHandler will inherit from > Retriable and record each failed attempt. > KafkaConsumer: > # Some synchronous retry use cases. Record the failed attempts in the > blocking loop. > # Partition state request (ApiKeys.OFFSET_FOR_LEADER_EPOCH, > ApiKeys.LIST_OFFSETS). Though the actual requests are packed for each node, > the current implementation is applying backoff to each topic partition, where > the backoff value is kept by TopicPartitionState. Thus, TopicPartitionState > will inherit from Retribale. > Metadata: > # Metadata lives as a singleton in many clients. It can inherit from > Retriable. > AdminClient: > # AdminClient has its own request abstraction Call. The failed attempts are > kept by the abstraction. So probably clean the Call class logic a bit by > inheritance. > There're other common usages look like client.poll(timeout), where the > timeout passed in is the retry backoff value. We won't change these usages > since its underlying logic is nioSelector.select(timeout) and > nioSelector.selectNow(), which means if no interested op exists, the client > will block retry backoff milliseconds. This is an optimization when there's > no request that needs to be sent but the client is waiting for responses. > Specifically, if the client fails the inflight requests before the retry > backoff milliseconds passed, it still needs to wait until that amount of time > passed, unless there's a new request need to be sent. > -- This message was sent by Atlassian Jira (v8.3.4#803005)