[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-15 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1294877936


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -267,11 +307,14 @@ public synchronized void update(int requestVersion, 
MetadataResponse response, b
 
 this.needPartialUpdate = requestVersion < this.requestVersion;
 this.lastRefreshMs = nowMs;
+this.attempts = 0;
 this.updateVersion += 1;
 if (!isPartialUpdate) {
 this.needFullUpdate = false;
 this.lastSuccessfulRefreshMs = nowMs;
 }
+this.backoffOnEquivalentResponses = true;
+this.equivalentResponseCount++;

Review Comment:
   1. equivalentResponseCount is only reset when fresher metadata response is 
received. Suppose that we only have periodic metadata refresh for sometime and 
the metadata doesn't change. This will cause equivalentResponseCount to keep 
going up. When a metadata refresh is requested, this will cause the metadata 
refresh to backoff exponentially unexpectedly. 
   2. This is a bit unintuitive since we haven't checked whether the response 
is equivalent or not. Could we add a comment that this will be reset later if 
the metadata response causes the metadata to change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-15 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1294883198


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -140,17 +172,34 @@ public long metadataExpireMs() {
 }
 
 /**
- * Request an update of the current cluster metadata info, return the 
current updateVersion before the update
+ * Request an update of the current cluster metadata info, permitting 
backoff based on the number of
+ * equivalent metadata responses, which indicates that responses did not 
make progress and may be stale.
+ * @param permitBackoffOnEquivalentResponses Whether to permit backoff 
when consecutive responses are equivalent.
+ *   This should be set to 
true in situations where the update is
+ *   being requested to retry an 
operation, such as when the leader has
+ *   changed. It should be set to 
false in situations where new
+ *   metadata is being requested, 
such as adding a topic to a subscription.
+ *   In situations where it's not 
clear, it's best to use false.
+ * @return The current updateVersion before the update
  */
-public synchronized int requestUpdate() {
+public synchronized int requestUpdate(final boolean 
permitBackoffOnEquivalentResponses) {
 this.needFullUpdate = true;
+if (!permitBackoffOnEquivalentResponses) {

Review Comment:
   Since we only take action when permitBackoffOnEquivalentResponses is false, 
would it be more intuitive to pass in the negation of that as sth like 
resetEquivalentResponseCount?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-15 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1294873505


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -114,18 +125,39 @@ public synchronized Cluster fetch() {
 
 /**
  * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+ * There are two calculations for backing off based on how many attempts 
to retrieve metadata have been made
+ * since the last successful response, and how many equivalent metadata 
responses have been received.
+ * The second of these allows backing off when there are errors to do with 
stale metadata, even though the
+ * metadata responses are clean.
+ * 
+ * This can be used to check whether it's worth requesting an update in 
the knowledge that it will
+ * not be delayed if this method returns 0.
  *
  * @param nowMs current time in ms
  * @return remaining time in ms till the cluster info can be updated again
  */
 public synchronized long timeToAllowUpdate(long nowMs) {
-return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+// Calculate the backoff for attempts which acts when metadata 
responses fail
+long backoffForAttempts = Math.max(this.lastRefreshMs +
+this.refreshBackoff.backoff(this.attempts > 0 ? this.attempts 
- 1 : 0) - nowMs, 0);
+
+// Periodic updates based on expiration are not backed off based on 
equivalent responses
+long backoffForEquivalentResponseCount;
+if (Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - 
nowMs, 0) == 0) {

Review Comment:
   Instead of calculating the periodic refresh time, it's probably simpler to 
just check `updateRequested()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-11 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1291575101


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -114,18 +127,41 @@ public synchronized Cluster fetch() {
 
 /**
  * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+ * There are two calculations for backing off based on how many attempts 
to retrieve metadata have been made
+ * since the last successful response, and how many equivalent metadata 
responses have been received.
+ * The second of these allows backing off when there are errors to do with 
stale metadata, even though the
+ * metadata responses are clean.
+ * 
+ * This can be used to check whether it's worth requesting an update in 
the knowledge that it will
+ * not be delayed if this method returns 0.
  *
  * @param nowMs current time in ms
  * @return remaining time in ms till the cluster info can be updated again
  */
 public synchronized long timeToAllowUpdate(long nowMs) {
-return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+// Calculate the backoff for attempts which acts when metadata 
responses fail
+long backoffForAttempts = Math.max(this.lastRefreshMs +

Review Comment:
   backoffForAttempts => backoffTimeForFailure ?
   backoffForEquivalentResponseCount => backoffTimeForEquivalentResponse ?



##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -114,18 +127,41 @@ public synchronized Cluster fetch() {
 
 /**
  * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+ * There are two calculations for backing off based on how many attempts 
to retrieve metadata have been made
+ * since the last successful response, and how many equivalent metadata 
responses have been received.
+ * The second of these allows backing off when there are errors to do with 
stale metadata, even though the
+ * metadata responses are clean.
+ * 
+ * This can be used to check whether it's worth requesting an update in 
the knowledge that it will
+ * not be delayed if this method returns 0.
  *
  * @param nowMs current time in ms
  * @return remaining time in ms till the cluster info can be updated again
  */
 public synchronized long timeToAllowUpdate(long nowMs) {
-return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+// Calculate the backoff for attempts which acts when metadata 
responses fail
+long backoffForAttempts = Math.max(this.lastRefreshMs +
+this.refreshBackoff.backoff(this.attempts > 0 ? this.attempts 
- 1 : 0) - nowMs, 0);
+
+// Periodic updates based on expiration are not backed off based on 
equivalent responses
+if (Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - 
nowMs, 0) == 0) {
+this.backoffOnEquivalentResponses = false;

Review Comment:
   Hmm, I am not sure if this accurately captures periodic refresh. A requested 
metadata refresh could be in progress when the periodic timer expires. In that 
case, we still want to enable backoffOnEquivalentResponses. It seems that we 
need to check `updateRequested()`.



##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -81,23 +85,32 @@ public class Metadata implements Closeable {
  *
  * @param refreshBackoffMs The minimum amount of time that must 
expire between metadata refreshes to avoid busy
  * polling
+ * @param refreshBackoffMaxMs  The maximum amount of time to wait 
between metadata refreshes
  * @param metadataExpireMs The maximum amount of time that 
metadata can be retained without refresh
  * @param logContext   Log context corresponding to the 
containing client
  * @param clusterResourceListeners List of ClusterResourceListeners which 
will receive metadata updates.
  */
 public Metadata(long refreshBackoffMs,
+long refreshBackoffMaxMs,
 long metadataExpireMs,
 LogContext logContext,
 ClusterResourceListeners clusterResourceListeners) {
 this.log = logContext.logger(Metadata.class);
-this.refreshBackoffMs = refreshBackoffMs;
+this.refreshBackoff = new ExponentialBackoff(
+refreshBackoffMs,
+CommonClientConfigs.RETRY_BACKOFF_EXP_BASE,
+refreshBackoffMaxMs,
+CommonClientConfigs.RETRY_BACKOFF_JITTER);
 this.metadataExpireMs = metadataExpireMs;
 this.lastRefreshMs = 0L;
 this.lastSuccessfulRefreshMs = 0L;
+this.attempts = 0L;
 this.requestVersion = 0;
 this.updateVersion = 0;
 this.needFullUpdate = false;
 

[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-10 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1290474356


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -140,17 +167,30 @@ public long metadataExpireMs() {
 }
 
 /**
- * Request an update of the current cluster metadata info, return the 
current updateVersion before the update
+ * Request an update of the current cluster metadata info, permitting 
backoff based on the number of
+ * equivalent responses, which indicate that metadata responses did not 
make progress and may be stale.

Review Comment:
   Could we add a comment on when the caller should set 
permitBackoffOnEquivalentResponses to true? Also, when should 
backoffOnEquivalentResponses be reset to false?



##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -114,18 +127,32 @@ public synchronized Cluster fetch() {
 
 /**
  * Return the next time when the current cluster info can be updated 
(i.e., backoff time has elapsed).
+ * There are two calculations for backing off based on how many attempts 
to retrieve metadata have been made
+ * since the last successful response, and how many equivalent metadata 
responses have been received.
+ * The second of these allows backing off when there are errors to do with 
stale metadata, even though the
+ * metadata responses are clean.
+ * 
+ * This can be used to check whether it's worth requesting an update in 
the knowledge that it will
+ * not be delayed if this method returns 0.
  *
  * @param nowMs current time in ms
  * @return remaining time in ms till the cluster info can be updated again
  */
 public synchronized long timeToAllowUpdate(long nowMs) {
-return Math.max(this.lastRefreshMs + this.refreshBackoffMs - nowMs, 0);
+// Calculate the backoff for attempts which acts when metadata 
responses fail
+long backoffForAttempts = Math.max(this.lastRefreshMs + 
this.refreshBackoff.backoff(this.attempts) - nowMs, 0);
+
+// Calculate the backoff for equivalent responses which acts when 
metadata responses as not making progress

Review Comment:
   as not making => are not making ?



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -598,22 +608,22 @@ private void insertInSequenceOrder(Deque 
deque, ProducerBatch bat
 /**
  * Add the leader to the ready nodes if the batch is ready
  *
- * @param nowMs The current time
  * @param exhausted 'true' is the buffer pool is exhausted
  * @param part The partition
  * @param leader The leader for the partition
  * @param waitedTimeMs How long batch waited
  * @param backingOff Is backing off
+ * @param backoffAttempts Number of attempts for calculating backoff delay
  * @param full Is batch full
  * @param nextReadyCheckDelayMs The delay for next check
  * @param readyNodes The set of ready nodes (to be filled in)
  * @return The delay for next check
  */
-private long batchReady(long nowMs, boolean exhausted, TopicPartition 
part, Node leader,
-long waitedTimeMs, boolean backingOff, boolean 
full,
-long nextReadyCheckDelayMs, Set readyNodes) {
+private long batchReady(boolean exhausted, TopicPartition part, Node 
leader,
+long waitedTimeMs, boolean backingOff, int 
backoffAttempts,
+boolean full, long nextReadyCheckDelayMs, 
Set readyNodes) {
 if (!readyNodes.contains(leader) && !isMuted(part)) {
-long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
+long timeToWaitMs = backingOff ? 
retryBackoff.backoff(backoffAttempts) : lingerMs;

Review Comment:
   Here, backoffAttempts is already subtracted by one when calling 
retryBackoff.backoff. In other places, we pass in the true attempts and 
subtract by one when calling retryBackoff.backoff. It would be useful to make 
that consistent.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -550,7 +550,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
 // refresh metadata before re-joining the group as long as 
the refresh backoff time has
 // passed.
 if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) 
== 0) {
-this.metadata.requestUpdate();
+this.metadata.requestUpdate(true);

Review Comment:
   Why is permitBackoffOnEquivalentResponses set to true? We refresh the 
metadata here not because we have discovered stale metadata.



##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -140,17 +167,30 @@ public long metadataExpireMs() {
 }
 
 /**
- * Request an update 

[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-09 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1288946333


##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -144,13 +159,25 @@ public long metadataExpireMs() {
  */
 public synchronized int requestUpdate() {
 this.needFullUpdate = true;
+this.backoffUpdateRequests = 0L;

Review Comment:
   > My understanding of the PR is this, metadata request won't backoff, but 
produce request would backoff. So likely metadata is going to be updated next 
time around produce request is retried(post backoff).
   
   @msn-tldr : To me, the common reason why a produce request needs to backoff 
is that the metadata is stable since the latest metadata hasn't been propagated 
to the brokers yet. So, if we don't backoff the metadata request, the returned 
metadata may still be stale, which won't help the backed off produce request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-08 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1286480033


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -613,7 +623,7 @@ private long batchReady(long nowMs, boolean exhausted, 
TopicPartition part, Node
 long waitedTimeMs, boolean backingOff, boolean 
full,
 long nextReadyCheckDelayMs, Set readyNodes) {
 if (!readyNodes.contains(leader) && !isMuted(part)) {
-long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
+long timeToWaitMs = backingOff ? retryBackoff.backoff(0) : 
lingerMs;

Review Comment:
   Hmm, we should use the number of retries for the batch instead of 0 for 
calculating timeToWaitMs, right?
   
   Also, this is an existing issue. It seems that nowMs is never used.



##
clients/src/main/java/org/apache/kafka/clients/Metadata.java:
##
@@ -144,13 +159,25 @@ public long metadataExpireMs() {
  */
 public synchronized int requestUpdate() {
 this.needFullUpdate = true;
+this.backoffUpdateRequests = 0L;

Review Comment:
   Hmm, this probably needs some more thought. For example, if a produce 
request fails, we request a metadata update and set backoffUpdateRequests to 0. 
However, if we exponentially back off the produce request because of stale 
metadata, it seems that we should do the same for refreshing the metadata. 
Otherwise, we likely will still have the stale metadata when retrying the 
produce request.
   
   Ditto for fetch requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-08-02 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1282312134


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -73,6 +75,7 @@ public class RecordAccumulator {
 private final CompressionType compression;
 private final int lingerMs;
 private final long retryBackoffMs;
+private final ExponentialBackoff retryBackoff;

Review Comment:
   There is still one reference of retryBackoffMs in batchReady(). Should that 
be changed with exponential backoff?



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -384,6 +389,32 @@ int attempts() {
 return attempts.get();
 }
 
+/*
+ * Returns whether the leader Node has changed since the last attempt.
+ * @param node The Node currently thought of as the leader, which might be 
null.
+ * @return true if the leader has changed, otherwise false
+ */
+boolean hasLeaderChanged(Node latestLeader) {
+boolean leaderChanged = false;
+if (latestLeader != null) {
+// If we don't know the leader yet, we have not yet attempted to 
send to the leader
+if (currentLeader == null) {
+currentLeader = latestLeader;
+} else {
+// If the leader's node id has changed, this counts as a 
leader change
+if (currentLeader.id() != latestLeader.id()) {

Review Comment:
   We probably need to take leaderEpoch into consideration. For example, if the 
leader epoch has changed, even if the leader node remains the same, it's still 
worth to retry the batch now.



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java:
##
@@ -701,7 +713,9 @@ private long partitionReady(Cluster cluster, long nowMs, 
String topic,
 }
 
 waitedTimeMs = batch.waitedTimeMs(nowMs);
-backingOff = batch.attempts() > 0 && waitedTimeMs < 
retryBackoffMs;
+backingOff = !batch.hasLeaderChanged(leader) &&
+batch.attempts() > 0 &&
+waitedTimeMs < retryBackoff.backoff(batch.attempts() - 
1);

Review Comment:
   This mostly duplicates the code in line 716. Could we pull that into a util 
method and reuse?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #14111: KAFKA-9800: Exponential backoff for Kafka clients - KIP-580

2023-07-28 Thread via GitHub


junrao commented on code in PR #14111:
URL: https://github.com/apache/kafka/pull/14111#discussion_r1278072179


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -1488,9 +1500,10 @@ public void run() {
 } else if (!heartbeat.shouldHeartbeat(now)) {
 // poll again after waiting for the retry backoff 
in case the heartbeat failed or the
 // coordinator disconnected
-
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
+
AbstractCoordinator.this.wait(retryBackoff.backoff(attempts++));
 } else {
 heartbeat.sentHeartbeat(now);
+attempts = 0L;

Review Comment:
   I think the common case where exponential backoff could be helpful is that 
during a heartbeat failure, the coordinator has changed, but it takes some time 
to discover the coordinator. The current code will do that following in a loop 
in that case.
   
   ```
   sendHeartbeat
   get NotCoordinator error 
   findCoordinator
   wait for retryBackoff
   ```
   With the new change, since attempts is reset on every Heartbeat request. We 
will do the same loop as the above with no exponential backoff in between.
   



##
clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java:
##
@@ -203,18 +224,43 @@ public class CommonClientConfigs {
  * @return  The new values which have been set as 
described in postProcessParsedConfig.
  */
 public static Map 
postProcessReconnectBackoffConfigs(AbstractConfig config,
-Map 
parsedValues) {
+ 
Map parsedValues) {
 HashMap rval = new HashMap<>();
 Map originalConfig = config.originals();
 if ((!originalConfig.containsKey(RECONNECT_BACKOFF_MAX_MS_CONFIG)) &&
 originalConfig.containsKey(RECONNECT_BACKOFF_MS_CONFIG)) {
-log.debug("Disabling exponential reconnect backoff because {} is 
set, but {} is not.",
+log.warn("Disabling exponential reconnect backoff because {} is 
set, but {} is not.",
 RECONNECT_BACKOFF_MS_CONFIG, 
RECONNECT_BACKOFF_MAX_MS_CONFIG);
 rval.put(RECONNECT_BACKOFF_MAX_MS_CONFIG, 
parsedValues.get(RECONNECT_BACKOFF_MS_CONFIG));
 }
 return rval;
 }
 
+/**
+ * Log warning if the exponential backoff is disabled due to initial 
backoff value is greater than max backoff value.
+ *
+ * @param configThe config object.
+ */
+public static void warnDisablingExponentialBackoff(AbstractConfig config) {
+long retryBackoffMs = config.getLong(RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = config.getLong(RETRY_BACKOFF_MAX_MS_CONFIG);
+if (retryBackoffMs > retryBackoffMaxMs) {
+log.warn("Configuration '{}' with value '{}' is greater than 
configuration '{}' with value '{}'. " +
+"A static backoff with value '{}' will be applied.",
+RETRY_BACKOFF_MS_CONFIG, retryBackoffMs,
+RETRY_BACKOFF_MAX_MS_CONFIG, retryBackoffMaxMs, 
retryBackoffMs);

Review Comment:
   Should the last param be `retryBackoffMaxMs`? Ditto for 
`connectionSetupTimeoutMs` below.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##
@@ -278,11 +291,13 @@ private void validatePositionsAsync(Map partition
 offsetsForLeaderEpochClient.sendAsyncRequest(node, 
fetchPositions);
 
 future.addListener(new 
RequestFutureListener() {
+private long attempts = 0L;
 @Override
 public void onSuccess(OffsetForEpochResult offsetsResult) {
 List truncations = new 
ArrayList<>();
 if (!offsetsResult.partitionsToRetry().isEmpty()) {
-
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), 
time.milliseconds() + retryBackoffMs);
+
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(),

Review Comment:
   Same question as the above. Does this really do exponential backoff since 
attempts is 0 for every new request?
   
   



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##
@@ -225,14 +231,21 @@ private void resetPositionsAsync(Map partitionResetTimesta
 
 RequestFuture future = 
sendListOffsetRequest(node, resetTimestamps, false);
 future.addListener(new RequestFutureListener() {
+long attempts = 0L;
 @Override
 public void onSuccess(ListOffsetResult result) {
-