junrao commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1278072179
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ########## @@ -1488,9 +1500,10 @@ public void run() { } else if (!heartbeat.shouldHeartbeat(now)) { // poll again after waiting for the retry backoff in case the heartbeat failed or the // coordinator disconnected - AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs); + AbstractCoordinator.this.wait(retryBackoff.backoff(attempts++)); } else { heartbeat.sentHeartbeat(now); + attempts = 0L; Review Comment: I think the common case where exponential backoff could be helpful is that during a heartbeat failure, the coordinator has changed, but it takes some time to discover the coordinator. The current code will do that following in a loop in that case. ``` sendHeartbeat get NotCoordinator error findCoordinator wait for retryBackoff ``` With the new change, since attempts is reset on every Heartbeat request. We will do the same loop as the above with no exponential backoff in between. ########## clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java: ########## @@ -203,18 +224,43 @@ public class CommonClientConfigs { * @return The new values which have been set as described in postProcessParsedConfig. */ public static Map<String, Object> postProcessReconnectBackoffConfigs(AbstractConfig config, - Map<String, Object> parsedValues) { + Map<String, Object> parsedValues) { HashMap<String, Object> rval = new HashMap<>(); Map<String, Object> originalConfig = config.originals(); if ((!originalConfig.containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) && originalConfig.containsKey(RECONNECT_BACKOFF_MS_CONFIG)) { - log.debug("Disabling exponential reconnect backoff because {} is set, but {} is not.", + log.warn("Disabling exponential reconnect backoff because {} is set, but {} is not.", RECONNECT_BACKOFF_MS_CONFIG, RECONNECT_BACKOFF_MAX_MS_CONFIG); rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG)); } return rval; } + /** + * Log warning if the exponential backoff is disabled due to initial backoff value is greater than max backoff value. + * + * @param config The config object. + */ + public static void warnDisablingExponentialBackoff(AbstractConfig config) { + long retryBackoffMs = config.getLong(RETRY_BACKOFF_MS_CONFIG); + long retryBackoffMaxMs = config.getLong(RETRY_BACKOFF_MAX_MS_CONFIG); + if (retryBackoffMs > retryBackoffMaxMs) { + log.warn("Configuration '{}' with value '{}' is greater than configuration '{}' with value '{}'. " + + "A static backoff with value '{}' will be applied.", + RETRY_BACKOFF_MS_CONFIG, retryBackoffMs, + RETRY_BACKOFF_MAX_MS_CONFIG, retryBackoffMaxMs, retryBackoffMs); Review Comment: Should the last param be `retryBackoffMaxMs`? Ditto for `connectionSetupTimeoutMs` below. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java: ########## @@ -278,11 +291,13 @@ private void validatePositionsAsync(Map<TopicPartition, FetchPosition> partition offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPositions); future.addListener(new RequestFutureListener<OffsetForEpochResult>() { + private long attempts = 0L; @Override public void onSuccess(OffsetForEpochResult offsetsResult) { List<SubscriptionState.LogTruncation> truncations = new ArrayList<>(); if (!offsetsResult.partitionsToRetry().isEmpty()) { - subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs); + subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), Review Comment: Same question as the above. Does this really do exponential backoff since attempts is 0 for every new request? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java: ########## @@ -225,14 +231,21 @@ private void resetPositionsAsync(Map<TopicPartition, Long> partitionResetTimesta RequestFuture<ListOffsetResult> future = sendListOffsetRequest(node, resetTimestamps, false); future.addListener(new RequestFutureListener<ListOffsetResult>() { + long attempts = 0L; @Override public void onSuccess(ListOffsetResult result) { - offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps, result); + offsetFetcherUtils.onSuccessfulRequestForResettingPositions( + resetTimestamps, + result, + retryBackoff.backoff(attempts++)); Review Comment: Hmm, does this really do exponential backoff since attempts is 0 for every new request? ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java: ########## @@ -255,21 +258,34 @@ private KafkaMetric getMetric(final String name) { } @Test - public void testCoordinatorDiscoveryBackoff() { - setupCoordinator(); - - mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); - mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + public void testCoordinatorDiscoveryExponentialBackoff() { + // With exponential backoff, we will get retries at 10, 20, 40, 80, 100 ms (with jitter) + int shortRetryBackoffMs = 10; + int shortRetryBackoffMaxMs = 100; + setupCoordinator(shortRetryBackoffMs, shortRetryBackoffMaxMs); + + for (int i = 0; i < 5; i++) { + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + } - // cut out the coordinator for 10 milliseconds to simulate a disconnect. + // cut out the coordinator for 100 milliseconds to simulate a disconnect. // after backing off, we should be able to connect. - mockClient.backoff(coordinatorNode, 10L); + mockClient.backoff(coordinatorNode, 100L); long initialTime = mockTime.milliseconds(); coordinator.ensureCoordinatorReady(mockTime.timer(Long.MAX_VALUE)); long endTime = mockTime.milliseconds(); - assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS); + long lowerBoundBackoffMs = 0; + long upperBoundBackoffMs = 0; + for (int i = 0; i < 4; i++) { + lowerBoundBackoffMs += shortRetryBackoffMs * Math.pow(CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, i) * (1 - CommonClientConfigs.RETRY_BACKOFF_JITTER); + upperBoundBackoffMs += shortRetryBackoffMs * Math.pow(CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, i) * (1 + CommonClientConfigs.RETRY_BACKOFF_JITTER); + } + + long timeElapsed = endTime - initialTime; + assertTrue(timeElapsed >= lowerBoundBackoffMs); + assertTrue(timeElapsed <= upperBoundBackoffMs + 10); Review Comment: Should we use shortRetryBackoffMs instead of 10? ########## clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java: ########## @@ -431,6 +438,10 @@ public void clearInflight(long nextAllowedRetryMs) { this.nextAllowedRetryMs = nextAllowedRetryMs; } + public void clearInflightAndBackoff(long nextAllowedRetryMs) { Review Comment: The input param is the current time, not nextAllowedRetryMs. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ########## @@ -518,7 +529,7 @@ else if (!future.isRetriable()) return false; } - timer.sleep(rebalanceConfig.retryBackoffMs); + timer.sleep(retryBackoff.backoff(attempts++)); Review Comment: The KIP says the following. `However other places that utilize the retry.backoff.ms configuration will not be affected, notably the classes responsible for rebalancing. Therefore, it will not affect Connect worker or Consumer group rebalancing. ` But it seems that we are changing the backoff time related to rebalance here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org