[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580
showuon commented on a change in pull request #11627: URL: https://github.com/apache/kafka/pull/11627#discussion_r777398999 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -191,6 +192,21 @@ private final Collection singleTopicPartition = Collections.singleton(new TopicPartition(topic, 0)); +private SubscriptionState subscription; +private Time time; + +@BeforeEach +public void setup() { +this.time = new MockTime(); +// default to reset to the earliest offset +this.subscription = createSubscriptionState(OffsetResetStrategy.EARLIEST); +} + +private SubscriptionState createSubscriptionState(OffsetResetStrategy offsetResetStrategy) { +// use static backoff time for testing +return new SubscriptionState(new LogContext(), offsetResetStrategy, 100, 100); +} + Review comment: Refactor the tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580
showuon commented on a change in pull request #11627: URL: https://github.com/apache/kafka/pull/11627#discussion_r777398361 ## File path: clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java ## @@ -32,7 +32,16 @@ private final double expMax; private final long initialInterval; private final double jitter; +private long attemptedCount = 0; Review comment: add the `attemptedCount` in `ExponentialBackoff` class. The caller can use the `attemptedCount` and doesn't need to maintain the attempted count in their side. It's good when the `ExponentialBackoff` only has single place to backoff, or the caller is inside lambda expression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580
showuon commented on a change in pull request #11627: URL: https://github.com/apache/kafka/pull/11627#discussion_r777395790 ## File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java ## @@ -192,14 +214,39 @@ * @return The new values which have been set as described in postProcessParsedConfig. */ public static Map postProcessReconnectBackoffConfigs(AbstractConfig config, -Map parsedValues) { + Map parsedValues) { HashMap rval = new HashMap<>(); if ((!config.originals().containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) && config.originals().containsKey(RECONNECT_BACKOFF_MS_CONFIG)) { -log.debug("Disabling exponential reconnect backoff because {} is set, but {} is not.", +log.info("Disabling exponential reconnect backoff because {} is set, but {} is not.", Review comment: Since we log `warn` when exponential `RETRY_BACKOFF_MS` and `SOCKET_CONNECTION_SETUP_TIMEOUT_MS` is disabled, I think we should at least log `info` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580
showuon commented on a change in pull request #11627: URL: https://github.com/apache/kafka/pull/11627#discussion_r777395274 ## File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java ## @@ -102,11 +119,16 @@ public static final String DEFAULT_SECURITY_PROTOCOL = "PLAINTEXT"; public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG = "socket.connection.setup.timeout.ms"; -public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the socket connection to be established. If the connection is not built before the timeout elapses, clients will close the socket channel."; +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC = "The amount of time the client will wait for the socket connection to be established. " + +"If the connection is not built before the timeout elapses, clients will close the socket channel. " + +"This value is the initial backoff value and will increase exponentially for each consecutive connection failure, " + +"up to the socket.connection.setup.timeout.max.ms value."; public static final Long DEFAULT_SOCKET_CONNECTION_SETUP_TIMEOUT_MS = 10 * 1000L; public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG = "socket.connection.setup.timeout.max.ms"; -public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the socket connection to be established. The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value."; +public static final String SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC = "The maximum amount of time the client will wait for the socket connection to be established. " + +"The connection setup timeout will increase exponentially for each consecutive connection failure up to this maximum. To avoid connection storms, " + +"a randomization factor of 0.2 will be applied to the timeout resulting in a random range between 20% below and 20% above the computed value."; Review comment: only make it into 3 lines, for better readability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580
showuon commented on a change in pull request #11627: URL: https://github.com/apache/kafka/pull/11627#discussion_r777394827 ## File path: clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java ## @@ -70,17 +70,34 @@ public static final String CLIENT_RACK_DOC = "A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config 'broker.rack'"; public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms"; -public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker."; +public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of time to wait before attempting to reconnect to a given host. " + +"This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. " + +"This value is the initial backoff value and will increase exponentially for each consecutive connection failure, up to the reconnect.backoff.max.ms value."; Review comment: Add the last sentence to mention this is the initial backoff value and will increase exponentially up to `reconnect.backoff.max.ms`. Same as below `retry.backoff.ms` and `socket.connection.setup.timeout.ms` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580
showuon commented on a change in pull request #11627: URL: https://github.com/apache/kafka/pull/11627#discussion_r777394154 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -536,7 +535,12 @@ private void clearAddresses() { } public String toString() { -return "NodeState(" + state + ", " + lastConnectAttemptMs + ", " + failedAttempts + ", " + throttleUntilTimeMs + ")"; +return "NodeState(" + +"state=" + state + ", " + +"lastConnectAttemptMs=" + lastConnectAttemptMs + ", " + +"failedAttempts=" + failedAttempts + ", " + +"failedConnectAttempts=" + failedConnectAttempts + ", " + +"throttleUntilTimeMs=" + throttleUntilTimeMs + ")"; Review comment: The original `toString` is unable to read because no leading variable name. Update it, and add `failedConnectAttempts` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580
showuon commented on a change in pull request #11627: URL: https://github.com/apache/kafka/pull/11627#discussion_r777393726 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -357,8 +357,7 @@ private void resetConnectionSetupTimeout(NodeConnectionState nodeState) { } /** - * Increment the failure counter, update the node reconnect backoff exponentially, - * and record the current timestamp. + * Increment the failure counter, update the node reconnect backoff exponentially. Review comment: this method doesn't `record the current timestamp`. Updated it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580
showuon commented on a change in pull request #11627: URL: https://github.com/apache/kafka/pull/11627#discussion_r777393490 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -372,7 +371,7 @@ private void updateReconnectBackoff(NodeConnectionState nodeState) { /** * Increment the failure counter and update the node connection setup timeout exponentially. * The delay is socket.connection.setup.timeout.ms * 2**(failures) * (+/- 20% random jitter) - * Up to a (pre-jitter) maximum of reconnect.backoff.max.ms + * Up to a (pre-jitter) maximum of socket.connection.setup.timeout.max.ms Review comment: side fix: the java doc is talking about `socket.connection.setup.timeout.ms`, not `reconnect.backoff.ms`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org