junrao commented on code in PR #14111: URL: https://github.com/apache/kafka/pull/14111#discussion_r1291575101
########## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ########## @@ -114,18 +127,41 @@ public synchronized Cluster fetch() { /** * Return the next time when the current cluster info can be updated (i.e., backoff time has elapsed). + * There are two calculations for backing off based on how many attempts to retrieve metadata have been made + * since the last successful response, and how many equivalent metadata responses have been received. + * The second of these allows backing off when there are errors to do with stale metadata, even though the + * metadata responses are clean. + * <p> + * This can be used to check whether it's worth requesting an update in the knowledge that it will + * not be delayed if this method returns 0. * * @param nowMs current time in ms * @return remaining time in ms till the cluster info can be updated again */ public synchronized long timeToAllowUpdate(long nowMs) { - return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0); + // Calculate the backoff for attempts which acts when metadata responses fail + long backoffForAttempts = Math.max(this.lastRefreshMs + Review Comment: backoffForAttempts => backoffTimeForFailure ? backoffForEquivalentResponseCount => backoffTimeForEquivalentResponse ? ########## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ########## @@ -114,18 +127,41 @@ public synchronized Cluster fetch() { /** * Return the next time when the current cluster info can be updated (i.e., backoff time has elapsed). + * There are two calculations for backing off based on how many attempts to retrieve metadata have been made + * since the last successful response, and how many equivalent metadata responses have been received. + * The second of these allows backing off when there are errors to do with stale metadata, even though the + * metadata responses are clean. + * <p> + * This can be used to check whether it's worth requesting an update in the knowledge that it will + * not be delayed if this method returns 0. * * @param nowMs current time in ms * @return remaining time in ms till the cluster info can be updated again */ public synchronized long timeToAllowUpdate(long nowMs) { - return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0); + // Calculate the backoff for attempts which acts when metadata responses fail + long backoffForAttempts = Math.max(this.lastRefreshMs + + this.refreshBackoff.backoff(this.attempts > 0 ? this.attempts - 1 : 0) - nowMs, 0); + + // Periodic updates based on expiration are not backed off based on equivalent responses + if (Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0) == 0) { + this.backoffOnEquivalentResponses = false; Review Comment: Hmm, I am not sure if this accurately captures periodic refresh. A requested metadata refresh could be in progress when the periodic timer expires. In that case, we still want to enable backoffOnEquivalentResponses. It seems that we need to check `updateRequested()`. ########## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ########## @@ -81,23 +85,32 @@ public class Metadata implements Closeable { * * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy * polling + * @param refreshBackoffMaxMs The maximum amount of time to wait between metadata refreshes * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh * @param logContext Log context corresponding to the containing client * @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates. */ public Metadata(long refreshBackoffMs, + long refreshBackoffMaxMs, long metadataExpireMs, LogContext logContext, ClusterResourceListeners clusterResourceListeners) { this.log = logContext.logger(Metadata.class); - this.refreshBackoffMs = refreshBackoffMs; + this.refreshBackoff = new ExponentialBackoff( + refreshBackoffMs, + CommonClientConfigs.RETRY_BACKOFF_EXP_BASE, + refreshBackoffMaxMs, + CommonClientConfigs.RETRY_BACKOFF_JITTER); this.metadataExpireMs = metadataExpireMs; this.lastRefreshMs = 0L; this.lastSuccessfulRefreshMs = 0L; + this.attempts = 0L; this.requestVersion = 0; this.updateVersion = 0; this.needFullUpdate = false; this.needPartialUpdate = false; + this.backoffOnEquivalentResponses = true; Review Comment: Why does backoffOnEquivalentResponses initialize to true? It seems that it should be enabled on a first metadata related error in the request response. ########## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ########## @@ -267,11 +307,14 @@ public synchronized void update(int requestVersion, MetadataResponse response, b this.needPartialUpdate = requestVersion < this.requestVersion; this.lastRefreshMs = nowMs; + this.attempts = 0; this.updateVersion += 1; if (!isPartialUpdate) { this.needFullUpdate = false; this.lastSuccessfulRefreshMs = nowMs; } + this.backoffOnEquivalentResponses = true; + this.equivalentResponseCount++; Review Comment: I don't see backoffOnEquivalentResponses being reset when fresher metadata response is received. Only equivalentResponseCount is reset to 0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org