junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1291575101


##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -114,18 +127,41 @@ public synchronized Cluster fetch() {
 
     /**
      * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+     * There are two calculations for backing off based on how many attempts 
to retrieve metadata have been made
+     * since the last successful response, and how many equivalent metadata 
responses have been received.
+     * The second of these allows backing off when there are errors to do with 
stale metadata, even though the
+     * metadata responses are clean.
+     * <p>
+     * This can be used to check whether it's worth requesting an update in 
the knowledge that it will
+     * not be delayed if this method returns 0.
      *
      * @param nowMs current time in ms
      * @return remaining time in ms till the cluster info can be updated again
      */
     public synchronized long timeToAllowUpdate(long nowMs) {
-        return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+        // Calculate the backoff for attempts which acts when metadata 
responses fail
+        long backoffForAttempts = Math.max(this.lastRefreshMs +

Review Comment:
   backoffForAttempts => backoffTimeForFailure ?
   backoffForEquivalentResponseCount => backoffTimeForEquivalentResponse ?



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -114,18 +127,41 @@ public synchronized Cluster fetch() {
 
     /**
      * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+     * There are two calculations for backing off based on how many attempts 
to retrieve metadata have been made
+     * since the last successful response, and how many equivalent metadata 
responses have been received.
+     * The second of these allows backing off when there are errors to do with 
stale metadata, even though the
+     * metadata responses are clean.
+     * <p>
+     * This can be used to check whether it's worth requesting an update in 
the knowledge that it will
+     * not be delayed if this method returns 0.
      *
      * @param nowMs current time in ms
      * @return remaining time in ms till the cluster info can be updated again
      */
     public synchronized long timeToAllowUpdate(long nowMs) {
-        return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+        // Calculate the backoff for attempts which acts when metadata 
responses fail
+        long backoffForAttempts = Math.max(this.lastRefreshMs +
+                this.refreshBackoff.backoff(this.attempts > 0 ? this.attempts 
- 1 : 0) - nowMs, 0);
+
+        // Periodic updates based on expiration are not backed off based on 
equivalent responses
+        if (Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - 
nowMs, 0) == 0) {
+            this.backoffOnEquivalentResponses = false;

Review Comment:
   Hmm, I am not sure if this accurately captures periodic refresh. A requested 
metadata refresh could be in progress when the periodic timer expires. In that 
case, we still want to enable backoffOnEquivalentResponses. It seems that we 
need to check `updateRequested()`.



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -81,23 +85,32 @@ public class Metadata implements Closeable {
      *
      * @param refreshBackoffMs         The minimum amount of time that must 
expire between metadata refreshes to avoid busy
      *                                 polling
+     * @param refreshBackoffMaxMs      The maximum amount of time to wait 
between metadata refreshes
      * @param metadataExpireMs         The maximum amount of time that 
metadata can be retained without refresh
      * @param logContext               Log context corresponding to the 
containing client
      * @param clusterResourceListeners List of ClusterResourceListeners which 
will receive metadata updates.
      */
     public Metadata(long refreshBackoffMs,
+                    long refreshBackoffMaxMs,
                     long metadataExpireMs,
                     LogContext logContext,
                     ClusterResourceListeners clusterResourceListeners) {
         this.log = logContext.logger(Metadata.class);
-        this.refreshBackoffMs = refreshBackoffMs;
+        this.refreshBackoff = new ExponentialBackoff(
+            refreshBackoffMs,
+            CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
+            refreshBackoffMaxMs,
+            CommonClientConfigs.RETRY_BACKOFF_JITTER);
         this.metadataExpireMs = metadataExpireMs;
         this.lastRefreshMs = 0L;
         this.lastSuccessfulRefreshMs = 0L;
+        this.attempts = 0L;
         this.requestVersion = 0;
         this.updateVersion = 0;
         this.needFullUpdate = false;
         this.needPartialUpdate = false;
+        this.backoffOnEquivalentResponses = true;

Review Comment:
   Why does backoffOnEquivalentResponses initialize to true? It seems that it 
should be enabled on a first metadata related error in the request response.



##########
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##########
@@ -267,11 +307,14 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
 
         this.needPartialUpdate = requestVersion < this.requestVersion;
         this.lastRefreshMs = nowMs;
+        this.attempts = 0;
         this.updateVersion += 1;
         if (!isPartialUpdate) {
             this.needFullUpdate = false;
             this.lastSuccessfulRefreshMs = nowMs;
         }
+        this.backoffOnEquivalentResponses = true;
+        this.equivalentResponseCount++;

Review Comment:
   I don't see backoffOnEquivalentResponses being reset when fresher metadata 
response is received. Only equivalentResponseCount is reset to 0.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to