junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1278072179


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -1488,9 +1500,10 @@ public void run() {
                         } else if (!heartbeat.shouldHeartbeat(now)) {
                             // poll again after waiting for the retry backoff 
in case the heartbeat failed or the
                             // coordinator disconnected
-                            
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
+                            
AbstractCoordinator.this.wait(retryBackoff.backoff(attempts++));
                         } else {
                             heartbeat.sentHeartbeat(now);
+                            attempts = 0L;

Review Comment:
   I think the common case where exponential backoff could be helpful is that 
during a heartbeat failure, the coordinator has changed, but it takes some time 
to discover the coordinator. The current code will do that following in a loop 
in that case.
   
   ```
   sendHeartbeat
   get NotCoordinator error 
   findCoordinator
   wait for retryBackoff
   ```
   With the new change, since attempts is reset on every Heartbeat request. We 
will do the same loop as the above with no exponential backoff in between.
   



##########
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##########
@@ -203,18 +224,43 @@ public class CommonClientConfigs {
      * @return                          The new values which have been set as 
described in postProcessParsedConfig.
      */
     public static Map<String, Object> 
postProcessReconnectBackoffConfigs(AbstractConfig config,
-                                                    Map<String, Object> 
parsedValues) {
+                                                                         
Map<String, Object> parsedValues) {
         HashMap<String, Object> rval = new HashMap<>();
         Map<String, Object> originalConfig = config.originals();
         if ((!originalConfig.containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
             originalConfig.containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
-            log.debug("Disabling exponential reconnect backoff because {} is 
set, but {} is not.",
+            log.warn("Disabling exponential reconnect backoff because {} is 
set, but {} is not.",
                     RECONNECT_BACKOFF_MS_CONFIG, 
RECONNECT_BACKOFF_MAX_MS_CONFIG);
             rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, 
parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG));
         }
         return rval;
     }
 
+    /**
+     * Log warning if the exponential backoff is disabled due to initial 
backoff value is greater than max backoff value.
+     *
+     * @param config                    The config object.
+     */
+    public static void warnDisablingExponentialBackoff(AbstractConfig config) {
+        long retryBackoffMs = config.getLong(RETRY_BACKOFF_MS_CONFIG);
+        long retryBackoffMaxMs = config.getLong(RETRY_BACKOFF_MAX_MS_CONFIG);
+        if (retryBackoffMs > retryBackoffMaxMs) {
+            log.warn("Configuration '{}' with value '{}' is greater than 
configuration '{}' with value '{}'. " +
+                    "A static backoff with value '{}' will be applied.",
+                RETRY_BACKOFF_MS_CONFIG, retryBackoffMs,
+                RETRY_BACKOFF_MAX_MS_CONFIG, retryBackoffMaxMs, 
retryBackoffMs);

Review Comment:
   Should the last param be `retryBackoffMaxMs`? Ditto for 
`connectionSetupTimeoutMs` below.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##########
@@ -278,11 +291,13 @@ private void validatePositionsAsync(Map<TopicPartition, 
FetchPosition> partition
                     offsetsForLeaderEpochClient.sendAsyncRequest(node, 
fetchPositions);
 
             future.addListener(new 
RequestFutureListener<OffsetForEpochResult>() {
+                private long attempts = 0L;
                 @Override
                 public void onSuccess(OffsetForEpochResult offsetsResult) {
                     List<SubscriptionState.LogTruncation> truncations = new 
ArrayList<>();
                     if (!offsetsResult.partitionsToRetry().isEmpty()) {
-                        
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), 
time.milliseconds() + retryBackoffMs);
+                        
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(),

Review Comment:
   Same question as the above. Does this really do exponential backoff since 
attempts is 0 for every new request?
   
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##########
@@ -225,14 +231,21 @@ private void resetPositionsAsync(Map<TopicPartition, 
Long> partitionResetTimesta
 
             RequestFuture<ListOffsetResult> future = 
sendListOffsetRequest(node, resetTimestamps, false);
             future.addListener(new RequestFutureListener<ListOffsetResult>() {
+                long attempts = 0L;
                 @Override
                 public void onSuccess(ListOffsetResult result) {
-                    
offsetFetcherUtils.onSuccessfulRequestForResettingPositions(resetTimestamps, 
result);
+                    
offsetFetcherUtils.onSuccessfulRequestForResettingPositions(
+                            resetTimestamps,
+                            result,
+                            retryBackoff.backoff(attempts++));

Review Comment:
   Hmm, does this really do exponential backoff since attempts is 0 for every 
new request?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java:
##########
@@ -255,21 +258,34 @@ private KafkaMetric getMetric(final String name) {
     }
 
     @Test
-    public void testCoordinatorDiscoveryBackoff() {
-        setupCoordinator();
-
-        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
-        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+    public void testCoordinatorDiscoveryExponentialBackoff() {
+        // With exponential backoff, we will get retries at 10, 20, 40, 80, 
100 ms (with jitter)
+        int shortRetryBackoffMs = 10;
+        int shortRetryBackoffMaxMs = 100;
+        setupCoordinator(shortRetryBackoffMs, shortRetryBackoffMaxMs);
+
+        for (int i = 0; i < 5; i++) {
+            mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        }
 
-        // cut out the coordinator for 10 milliseconds to simulate a 
disconnect.
+        // cut out the coordinator for 100 milliseconds to simulate a 
disconnect.
         // after backing off, we should be able to connect.
-        mockClient.backoff(coordinatorNode, 10L);
+        mockClient.backoff(coordinatorNode, 100L);
 
         long initialTime = mockTime.milliseconds();
         coordinator.ensureCoordinatorReady(mockTime.timer(Long.MAX_VALUE));
         long endTime = mockTime.milliseconds();
 
-        assertTrue(endTime - initialTime >= RETRY_BACKOFF_MS);
+        long lowerBoundBackoffMs = 0;
+        long upperBoundBackoffMs = 0;
+        for (int i = 0; i < 4; i++) {
+            lowerBoundBackoffMs += shortRetryBackoffMs * 
Math.pow(CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, i) * (1 - 
CommonClientConfigs.RETRY_BACKOFF_JITTER);
+            upperBoundBackoffMs += shortRetryBackoffMs * 
Math.pow(CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, i) * (1 + 
CommonClientConfigs.RETRY_BACKOFF_JITTER);
+        }
+
+        long timeElapsed = endTime - initialTime;
+        assertTrue(timeElapsed >= lowerBoundBackoffMs);
+        assertTrue(timeElapsed <= upperBoundBackoffMs + 10);

Review Comment:
   Should we use shortRetryBackoffMs instead of 10?



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java:
##########
@@ -431,6 +438,10 @@ public void clearInflight(long nextAllowedRetryMs) {
             this.nextAllowedRetryMs = nextAllowedRetryMs;
         }
 
+        public void clearInflightAndBackoff(long nextAllowedRetryMs) {

Review Comment:
   The input param is the current time, not nextAllowedRetryMs.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -518,7 +529,7 @@ else if (!future.isRetriable())
                     return false;
                 }
 
-                timer.sleep(rebalanceConfig.retryBackoffMs);
+                timer.sleep(retryBackoff.backoff(attempts++));

Review Comment:
   The KIP says the following.
   
   `However other places that utilize the retry.backoff.ms configuration will 
not be affected, notably the classes responsible for rebalancing. Therefore, it 
will not affect Connect worker or Consumer group rebalancing. 
   `
   
   But it seems that we are changing the backoff time related to rebalance here?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to