This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new fc267f4eb8c KAFKA-16200: Enforce that RequestManager implementations 
respect user-provided timeout (#16031)
fc267f4eb8c is described below

commit fc267f4eb8ca1c3ca13117e2534f9d8b54d924fa
Author: Kirk True <k...@kirktrue.pro>
AuthorDate: Fri Jun 7 00:53:27 2024 -0700

    KAFKA-16200: Enforce that RequestManager implementations respect 
user-provided timeout (#16031)
    
    Improve consistency and correctness for user-provided timeouts at the 
Consumer network request layer, per the Java client Consumer timeouts design 
(https://cwiki.apache.org/confluence/display/KAFKA/Java+client+Consumer+timeouts).
 While the changes introduced in KAFKA-15974 enforce timeouts at the Consumer's 
event layer, this change enforces timeouts at the network request layer.
    
    The changes mostly fit into the following areas:
    
    1. Create shared code and idioms so timeout handling logic is consistent 
across current and future RequestManager implementations
    2. Use deadlineMs instead of expirationMs, expirationTimeoutMs, 
retryExpirationTimeMs, timeoutMs, etc.
    3. Update "preemptive pruning" to remove expired requests that have had at 
least one attempt
    
    Reviewers: Lianet Magrans <liane...@gmail.com>, Bruno Cadonna 
<cado...@apache.org>
---
 .../consumer/internals/CommitRequestManager.java   | 133 ++++++++----------
 .../consumer/internals/MembershipManagerImpl.java  |   4 +-
 .../consumer/internals/NetworkClientDelegate.java  |  11 +-
 .../consumer/internals/RequestManagers.java        |   1 +
 .../consumer/internals/TimedRequestState.java      |  71 ++++++++++
 .../internals/TopicMetadataRequestManager.java     |  53 +++----
 .../events/CompletableApplicationEvent.java        |   1 +
 .../internals/CommitRequestManagerTest.java        | 153 +++++++++++++--------
 .../consumer/internals/ConsumerTestBuilder.java    |   2 +-
 .../consumer/internals/TimedRequestStateTest.java  |  96 +++++++++++++
 .../internals/TopicMetadataRequestManagerTest.java |   1 +
 .../kafka/api/AbstractConsumerTest.scala           |   7 +-
 .../kafka/api/PlaintextConsumerPollTest.scala      |  13 ++
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   5 +-
 .../tiered/storage/TieredStorageTestContext.java   |   3 +-
 15 files changed, 382 insertions(+), 172 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 577cf7dee6b..000797dba09 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -69,6 +69,7 @@ import static 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.
 import static 
org.apache.kafka.common.protocol.Errors.COORDINATOR_LOAD_IN_PROGRESS;
 
 public class CommitRequestManager implements RequestManager, 
MemberStateListener {
+    private final Time time;
     private final SubscriptionState subscriptions;
     private final LogContext logContext;
     private final Logger log;
@@ -133,6 +134,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         final OptionalDouble jitter,
         final Metrics metrics) {
         Objects.requireNonNull(coordinatorRequestManager, "Coordinator is 
needed upon committing offsets");
+        this.time = time;
         this.logContext = logContext;
         this.log = logContext.logger(getClass());
         this.pendingRequests = new PendingRequests();
@@ -205,6 +207,13 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             .orElse(Long.MAX_VALUE);
     }
 
+    private KafkaException maybeWrapAsTimeoutException(Throwable t) {
+        if (t instanceof TimeoutException)
+            return (TimeoutException) t;
+        else
+            return new TimeoutException(t);
+    }
+
     /**
      * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
      * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
@@ -245,7 +254,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
             OffsetCommitRequestState requestState = createOffsetCommitRequest(
                 subscriptions.allConsumed(),
-                Optional.empty());
+                Long.MAX_VALUE);
             CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
requestAutoCommit(requestState);
             // Reset timer to the interval (even if no request was generated), 
but ensure that if
             // the request completes with a retriable error, the timer is 
reset to send the next
@@ -294,14 +303,14 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
      * complete exceptionally if the commit fails with a non-retriable error, 
or if the retry
      * timeout expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitSyncBeforeRevocation(final 
long retryExpirationTimeMs) {
+    public CompletableFuture<Void> maybeAutoCommitSyncBeforeRevocation(final 
long deadlineMs) {
         if (!autoCommitEnabled()) {
             return CompletableFuture.completedFuture(null);
         }
 
         CompletableFuture<Void> result = new CompletableFuture<>();
         OffsetCommitRequestState requestState =
-            createOffsetCommitRequest(subscriptions.allConsumed(), 
Optional.of(retryExpirationTimeMs));
+            createOffsetCommitRequest(subscriptions.allConsumed(), deadlineMs);
         autoCommitSyncBeforeRevocationWithRetries(requestState, result);
         return result;
     }
@@ -314,9 +323,9 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 result.complete(null);
             } else {
                 if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
-                    if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+                    if (requestAttempt.isExpired()) {
                         log.debug("Auto-commit sync before revocation timed 
out and won't be retried anymore");
-                        result.completeExceptionally(error);
+                        
result.completeExceptionally(maybeWrapAsTimeoutException(error));
                     } else if (error instanceof 
UnknownTopicOrPartitionException) {
                         log.debug("Auto-commit sync before revocation failed 
because topic or partition were deleted");
                         result.completeExceptionally(error);
@@ -367,7 +376,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             log.debug("Skipping commit of empty offsets");
             return CompletableFuture.completedFuture(null);
         }
-        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(offsets, Optional.empty());
+        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(offsets, Long.MAX_VALUE);
         pendingRequests.addOffsetCommitRequest(commitRequest);
 
         CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>();
@@ -385,28 +394,26 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
      * Commit offsets, retrying on expected retriable errors while the retry 
timeout hasn't expired.
      *
      * @param offsets               Offsets to commit
-     * @param retryExpirationTimeMs Time until which the request will be 
retried if it fails with
+     * @param deadlineMs            Time until which the request will be 
retried if it fails with
      *                              an expected retriable error.
      * @return Future that will complete when a successful response
      */
     public CompletableFuture<Void> commitSync(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                              final long 
retryExpirationTimeMs) {
+                                              final long deadlineMs) {
         CompletableFuture<Void> result = new CompletableFuture<>();
-        OffsetCommitRequestState requestState = createOffsetCommitRequest(
-            offsets,
-            Optional.of(retryExpirationTimeMs));
+        OffsetCommitRequestState requestState = 
createOffsetCommitRequest(offsets, deadlineMs);
         commitSyncWithRetries(requestState, result);
         return result;
     }
 
     private OffsetCommitRequestState createOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets,
-                                                               final 
Optional<Long> expirationTimeMs) {
+                                                               final long 
deadlineMs) {
         return jitter.isPresent() ?
             new OffsetCommitRequestState(
                 offsets,
                 groupId,
                 groupInstanceId,
-                expirationTimeMs,
+                deadlineMs,
                 retryBackoffMs,
                 retryBackoffMaxMs,
                 jitter.getAsDouble(),
@@ -415,7 +422,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 offsets,
                 groupId,
                 groupInstanceId,
-                expirationTimeMs,
+                deadlineMs,
                 retryBackoffMs,
                 retryBackoffMaxMs,
                 memberInfo);
@@ -432,9 +439,9 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 result.complete(null);
             } else {
                 if (error instanceof RetriableException) {
-                    if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+                    if (requestAttempt.isExpired()) {
                         log.info("OffsetCommit timeout expired so it won't be 
retried anymore");
-                        result.completeExceptionally(error);
+                        
result.completeExceptionally(maybeWrapAsTimeoutException(error));
                     } else {
                         requestAttempt.resetFuture();
                         commitSyncWithRetries(requestAttempt, result);
@@ -465,7 +472,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
      * Enqueue a request to fetch committed offsets, that will be sent on the 
next call to {@link #poll(long)}.
      *
      * @param partitions       Partitions to fetch offsets for.
-     * @param expirationTimeMs Time until which the request should be retried 
if it fails
+     * @param deadlineMs       Time until which the request should be retried 
if it fails
      *                         with expected retriable errors.
      * @return Future that will complete when a successful response is 
received, or the request
      * fails and cannot be retried. Note that the request is retried whenever 
it fails with
@@ -473,31 +480,31 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
      */
     public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
fetchOffsets(
         final Set<TopicPartition> partitions,
-        final long expirationTimeMs) {
+        final long deadlineMs) {
         if (partitions.isEmpty()) {
             return CompletableFuture.completedFuture(Collections.emptyMap());
         }
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new 
CompletableFuture<>();
-        OffsetFetchRequestState request = createOffsetFetchRequest(partitions, 
expirationTimeMs);
+        OffsetFetchRequestState request = createOffsetFetchRequest(partitions, 
deadlineMs);
         fetchOffsetsWithRetries(request, result);
         return result;
     }
 
     private OffsetFetchRequestState createOffsetFetchRequest(final 
Set<TopicPartition> partitions,
-                                                             final long 
expirationTimeMs) {
+                                                             final long 
deadlineMs) {
         return jitter.isPresent() ?
             new OffsetFetchRequestState(
                 partitions,
                 retryBackoffMs,
                 retryBackoffMaxMs,
-                expirationTimeMs,
+                deadlineMs,
                 jitter.getAsDouble(),
                 memberInfo) :
             new OffsetFetchRequestState(
                 partitions,
                 retryBackoffMs,
                 retryBackoffMaxMs,
-                expirationTimeMs,
+                deadlineMs,
                 memberInfo);
     }
 
@@ -516,8 +523,9 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 result.complete(res);
             } else {
                 if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
-                    if (error instanceof TimeoutException && 
fetchRequest.isExpired) {
-                        result.completeExceptionally(error);
+                    if (fetchRequest.isExpired()) {
+                        log.debug("OffsetFetch request for {} timed out and 
won't be retried anymore", fetchRequest.requestedPartitions);
+                        
result.completeExceptionally(maybeWrapAsTimeoutException(error));
                     } else {
                         fetchRequest.resetFuture();
                         fetchOffsetsWithRetries(fetchRequest, result);
@@ -612,12 +620,12 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         OffsetCommitRequestState(final Map<TopicPartition, OffsetAndMetadata> 
offsets,
                                  final String groupId,
                                  final Optional<String> groupInstanceId,
-                                 final Optional<Long> expirationTimeMs,
+                                 final long deadlineMs,
                                  final long retryBackoffMs,
                                  final long retryBackoffMaxMs,
                                  final MemberInfo memberInfo) {
             super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs,
-                retryBackoffMaxMs, memberInfo, expirationTimeMs);
+                retryBackoffMaxMs, memberInfo, deadlineTimer(time, 
deadlineMs));
             this.offsets = offsets;
             this.groupId = groupId;
             this.groupInstanceId = groupInstanceId;
@@ -628,13 +636,13 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         OffsetCommitRequestState(final Map<TopicPartition, OffsetAndMetadata> 
offsets,
                                  final String groupId,
                                  final Optional<String> groupInstanceId,
-                                 final Optional<Long> expirationTimeMs,
+                                 final long deadlineMs,
                                  final long retryBackoffMs,
                                  final long retryBackoffMaxMs,
                                  final double jitter,
                                  final MemberInfo memberInfo) {
             super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs, 2,
-                retryBackoffMaxMs, jitter, memberInfo, expirationTimeMs);
+                retryBackoffMaxMs, jitter, memberInfo, deadlineTimer(time, 
deadlineMs));
             this.offsets = offsets;
             this.groupId = groupId;
             this.groupInstanceId = groupInstanceId;
@@ -780,40 +788,24 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
      * Represents a request that can be retried or aborted, based on member ID 
and epoch
      * information.
      */
-    abstract class RetriableRequestState extends RequestState {
+    abstract class RetriableRequestState extends TimedRequestState {
 
         /**
          * Member info (ID and epoch) to be included in the request if present.
          */
         final MemberInfo memberInfo;
 
-        /**
-         * Time until which the request should be retried if it fails with 
retriable
-         * errors. If not present, the request is triggered without waiting 
for a response or
-         * retrying.
-         */
-        private final Optional<Long> expirationTimeMs;
-
-        /**
-         * True if the request expiration time has been reached. This is set 
when validating the
-         * request expiration on {@link #poll(long)} before sending it. It is 
used to know if a
-         * request should be retried on TimeoutException.
-         */
-        boolean isExpired;
-
         RetriableRequestState(LogContext logContext, String owner, long 
retryBackoffMs,
-                              long retryBackoffMaxMs, MemberInfo memberInfo, 
Optional<Long> expirationTimeMs) {
-            super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+                              long retryBackoffMaxMs, MemberInfo memberInfo, 
Timer timer) {
+            super(logContext, owner, retryBackoffMs, retryBackoffMaxMs, timer);
             this.memberInfo = memberInfo;
-            this.expirationTimeMs = expirationTimeMs;
         }
 
         // Visible for testing
         RetriableRequestState(LogContext logContext, String owner, long 
retryBackoffMs, int retryBackoffExpBase,
-                              long retryBackoffMaxMs, double jitter, 
MemberInfo memberInfo, Optional<Long> expirationTimeMs) {
-            super(logContext, owner, retryBackoffMs, retryBackoffExpBase, 
retryBackoffMaxMs, jitter);
+                              long retryBackoffMaxMs, double jitter, 
MemberInfo memberInfo, Timer timer) {
+            super(logContext, owner, retryBackoffMs, retryBackoffExpBase, 
retryBackoffMaxMs, jitter, timer);
             this.memberInfo = memberInfo;
-            this.expirationTimeMs = expirationTimeMs;
         }
 
         /**
@@ -828,13 +820,12 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         abstract CompletableFuture<?> future();
 
         /**
-         * Complete the request future with a TimeoutException if the request 
timeout has been
-         * reached, based on the provided current time.
+         * Complete the request future with a TimeoutException if the request 
has been sent out
+         * at least once and the timeout has been reached.
          */
-        void maybeExpire(long currentTimeMs) {
-            if (retryTimeoutExpired(currentTimeMs)) {
+        void maybeExpire() {
+            if (numAttempts > 0 && isExpired()) {
                 removeRequest();
-                isExpired = true;
                 future().completeExceptionally(new 
TimeoutException(requestDescription() +
                     " could not complete before timeout expired."));
             }
@@ -846,11 +837,12 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final AbstractRequest.Builder<?> builder) {
             NetworkClientDelegate.UnsentRequest request = new 
NetworkClientDelegate.UnsentRequest(
                 builder,
-                coordinatorRequestManager.coordinator());
+                coordinatorRequestManager.coordinator()
+            );
             request.whenComplete(
                 (response, throwable) -> {
-                    long currentTimeMs = request.handler().completionTimeMs();
-                    handleClientResponse(response, throwable, currentTimeMs);
+                    long completionTimeMs = 
request.handler().completionTimeMs();
+                    handleClientResponse(response, throwable, 
completionTimeMs);
                 });
             return request;
         }
@@ -875,10 +867,6 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
 
         abstract void onResponse(final ClientResponse response);
 
-        boolean retryTimeoutExpired(long currentTimeMs) {
-            return expirationTimeMs.isPresent() && expirationTimeMs.get() <= 
currentTimeMs;
-        }
-
         abstract void removeRequest();
     }
 
@@ -898,10 +886,10 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         public OffsetFetchRequestState(final Set<TopicPartition> partitions,
                                        final long retryBackoffMs,
                                        final long retryBackoffMaxMs,
-                                       final long expirationTimeMs,
+                                       final long deadlineMs,
                                        final MemberInfo memberInfo) {
             super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs,
-                retryBackoffMaxMs, memberInfo, Optional.of(expirationTimeMs));
+                retryBackoffMaxMs, memberInfo, deadlineTimer(time, 
deadlineMs));
             this.requestedPartitions = partitions;
             this.future = new CompletableFuture<>();
         }
@@ -909,11 +897,11 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         public OffsetFetchRequestState(final Set<TopicPartition> partitions,
                                        final long retryBackoffMs,
                                        final long retryBackoffMaxMs,
-                                       final long expirationTimeMs,
+                                       final long deadlineMs,
                                        final double jitter,
                                        final MemberInfo memberInfo) {
             super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs, 2,
-                retryBackoffMaxMs, jitter, memberInfo, 
Optional.of(expirationTimeMs));
+                retryBackoffMaxMs, jitter, memberInfo, deadlineTimer(time, 
deadlineMs));
             this.requestedPartitions = partitions;
             this.future = new CompletableFuture<>();
         }
@@ -1145,9 +1133,10 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                     inflightOffsetFetches.stream().filter(r -> 
r.sameRequest(request)).findAny();
 
             if (dupe.isPresent() || inflight.isPresent()) {
-                log.info("Duplicated OffsetFetchRequest: " + 
request.requestedPartitions);
+                log.debug("Duplicated unsent offset fetch request found for 
partitions: {}", request.requestedPartitions);
                 dupe.orElseGet(inflight::get).chainFuture(request.future);
             } else {
+                log.debug("Enqueuing offset fetch request for partitions: {}", 
request.requestedPartitions);
                 this.unsentOffsetFetches.add(request);
             }
             return request.future;
@@ -1165,7 +1154,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 .filter(request -> !request.canSendRequest(currentTimeMs))
                 .collect(Collectors.toList());
 
-            failAndRemoveExpiredCommitRequests(currentTimeMs);
+            failAndRemoveExpiredCommitRequests();
 
             // Add all unsent offset commit requests to the unsentRequests list
             List<NetworkClientDelegate.UnsentRequest> unsentRequests = 
unsentOffsetCommits.stream()
@@ -1179,7 +1168,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                     unsentOffsetFetches.stream()
                             .collect(Collectors.partitioningBy(request -> 
request.canSendRequest(currentTimeMs)));
 
-            failAndRemoveExpiredFetchRequests(currentTimeMs);
+            failAndRemoveExpiredFetchRequests();
 
             // Add all sendable offset fetch requests to the unsentRequests 
list and to the inflightOffsetFetches list
             for (OffsetFetchRequestState request : 
partitionedBySendability.get(true)) {
@@ -1200,18 +1189,18 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
          * Find the unsent commit requests that have expired, remove them and 
complete their
          * futures with a TimeoutException.
          */
-        private void failAndRemoveExpiredCommitRequests(final long 
currentTimeMs) {
+        private void failAndRemoveExpiredCommitRequests() {
             Queue<OffsetCommitRequestState> requestsToPurge = new 
LinkedList<>(unsentOffsetCommits);
-            requestsToPurge.forEach(req -> req.maybeExpire(currentTimeMs));
+            requestsToPurge.forEach(RetriableRequestState::maybeExpire);
         }
 
         /**
          * Find the unsent fetch requests that have expired, remove them and 
complete their
          * futures with a TimeoutException.
          */
-        private void failAndRemoveExpiredFetchRequests(final long 
currentTimeMs) {
+        private void failAndRemoveExpiredFetchRequests() {
             Queue<OffsetFetchRequestState> requestsToPurge = new 
LinkedList<>(unsentOffsetFetches);
-            requestsToPurge.forEach(req -> req.maybeExpire(currentTimeMs));
+            requestsToPurge.forEach(RetriableRequestState::maybeExpire);
         }
 
         private void clearAll() {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 76a550ad719..2aabf4ae130 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -960,7 +960,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
         // best effort to commit the offsets in the case where the epoch might 
have changed while
         // the current reconciliation is in process. Note this is using the 
rebalance timeout as
         // it is the limit enforced by the broker to complete the 
reconciliation process.
-        commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getExpirationTimeForTimeout(rebalanceTimeoutMs));
+        commitResult = 
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(getDeadlineMsForTimeout(rebalanceTimeoutMs));
 
         // Execute commit -> onPartitionsRevoked -> onPartitionsAssigned.
         commitResult.whenComplete((__, commitReqError) -> {
@@ -986,7 +986,7 @@ public class MembershipManagerImpl implements 
MembershipManager {
         });
     }
 
-    long getExpirationTimeForTimeout(final long timeoutMs) {
+    long getDeadlineMsForTimeout(final long timeoutMs) {
         long expiration = time.milliseconds() + timeoutMs;
         if (expiration < 0) {
             return Long.MAX_VALUE;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 2cd6f6d8530..e2e4d529c00 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -316,11 +316,20 @@ public class NetworkClientDelegate implements 
AutoCloseable {
 
         @Override
         public String toString() {
+            String remainingMs;
+
+            if (timer != null) {
+                timer.update();
+                remainingMs = String.valueOf(timer.remainingMs());
+            } else {
+                remainingMs = "<not set>";
+            }
+
             return "UnsentRequest{" +
                     "requestBuilder=" + requestBuilder +
                     ", handler=" + handler +
                     ", node=" + node +
-                    ", timer=" + timer +
+                    ", remainingMs=" + remainingMs +
                     '}';
         }
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
index 75d87432db6..edd8ca97215 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java
@@ -155,6 +155,7 @@ public class RequestManagers implements Closeable {
                         apiVersions);
                 final TopicMetadataRequestManager topic = new 
TopicMetadataRequestManager(
                         logContext,
+                        time,
                         config);
                 HeartbeatRequestManager heartbeatRequestManager = null;
                 MembershipManager membershipManager = null;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java
new file mode 100644
index 00000000000..c61032cea72
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer} 
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+    private final Timer timer;
+
+    public TimedRequestState(final LogContext logContext,
+                             final String owner,
+                             final long retryBackoffMs,
+                             final long retryBackoffMaxMs,
+                             final Timer timer) {
+        super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+        this.timer = timer;
+    }
+
+    public TimedRequestState(final LogContext logContext,
+                             final String owner,
+                             final long retryBackoffMs,
+                             final int retryBackoffExpBase,
+                             final long retryBackoffMaxMs,
+                             final double jitter,
+                             final Timer timer) {
+        super(logContext, owner, retryBackoffMs, retryBackoffExpBase, 
retryBackoffMaxMs, jitter);
+        this.timer = timer;
+    }
+
+    public boolean isExpired() {
+        timer.update();
+        return timer.isExpired();
+    }
+
+    public long remainingMs() {
+        timer.update();
+        return timer.remainingMs();
+    }
+
+    public static Timer deadlineTimer(final Time time, final long deadlineMs) {
+        long diff = Math.max(0, deadlineMs - time.milliseconds());
+        return time.timer(diff);
+    }
+
+
+    @Override
+    protected String toStringBase() {
+        return super.toStringBase() + ", remainingMs=" + remainingMs();
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
index 75a5ed08d15..a555d6ce7f3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManager.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 
 import java.util.Collections;
@@ -61,6 +62,7 @@ import static 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.
  */
 
 public class TopicMetadataRequestManager implements RequestManager {
+    private final Time time;
     private final boolean allowAutoTopicCreation;
     private final List<TopicMetadataRequestState> inflightRequests;
     private final long retryBackoffMs;
@@ -68,9 +70,10 @@ public class TopicMetadataRequestManager implements 
RequestManager {
     private final Logger log;
     private final LogContext logContext;
 
-    public TopicMetadataRequestManager(final LogContext context, final 
ConsumerConfig config) {
+    public TopicMetadataRequestManager(final LogContext context, final Time 
time, final ConsumerConfig config) {
         logContext = context;
         log = logContext.logger(getClass());
+        this.time = time;
         inflightRequests = new LinkedList<>();
         retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
@@ -81,7 +84,7 @@ public class TopicMetadataRequestManager implements 
RequestManager {
     public NetworkClientDelegate.PollResult poll(final long currentTimeMs) {
         // Prune any requests which have timed out
         List<TopicMetadataRequestState> expiredRequests = 
inflightRequests.stream()
-                .filter(req -> req.isExpired(currentTimeMs))
+                .filter(TimedRequestState::isExpired)
                 .collect(Collectors.toList());
         expiredRequests.forEach(TopicMetadataRequestState::expire);
 
@@ -99,10 +102,10 @@ public class TopicMetadataRequestManager implements 
RequestManager {
      *
      * @return the future of the metadata request.
      */
-    public CompletableFuture<Map<String, List<PartitionInfo>>> 
requestAllTopicsMetadata(final long expirationTimeMs) {
+    public CompletableFuture<Map<String, List<PartitionInfo>>> 
requestAllTopicsMetadata(final long deadlineMs) {
         TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
                 logContext,
-                expirationTimeMs,
+                deadlineMs,
                 retryBackoffMs,
                 retryBackoffMaxMs);
         inflightRequests.add(newRequest);
@@ -115,11 +118,11 @@ public class TopicMetadataRequestManager implements 
RequestManager {
      * @param topic to be requested.
      * @return the future of the metadata request.
      */
-    public CompletableFuture<Map<String, List<PartitionInfo>>> 
requestTopicMetadata(final String topic, final long expirationTimeMs) {
+    public CompletableFuture<Map<String, List<PartitionInfo>>> 
requestTopicMetadata(final String topic, final long deadlineMs) {
         TopicMetadataRequestState newRequest = new TopicMetadataRequestState(
                 logContext,
                 topic,
-                expirationTimeMs,
+                deadlineMs,
                 retryBackoffMs,
                 retryBackoffMaxMs);
         inflightRequests.add(newRequest);
@@ -131,35 +134,32 @@ public class TopicMetadataRequestManager implements 
RequestManager {
         return inflightRequests;
     }
 
-    class TopicMetadataRequestState extends RequestState {
+    class TopicMetadataRequestState extends TimedRequestState {
         private final String topic;
         private final boolean allTopics;
-        private final long expirationTimeMs;
         CompletableFuture<Map<String, List<PartitionInfo>>> future;
 
         public TopicMetadataRequestState(final LogContext logContext,
-                                         final long expirationTimeMs,
+                                         final long deadlineMs,
                                          final long retryBackoffMs,
                                          final long retryBackoffMaxMs) {
             super(logContext, TopicMetadataRequestState.class.getSimpleName(), 
retryBackoffMs,
-                    retryBackoffMaxMs);
+                    retryBackoffMaxMs, deadlineTimer(time, deadlineMs));
             future = new CompletableFuture<>();
             this.topic = null;
             this.allTopics = true;
-            this.expirationTimeMs = expirationTimeMs;
         }
 
         public TopicMetadataRequestState(final LogContext logContext,
                                          final String topic,
-                                         final long expirationTimeMs,
+                                         final long deadlineMs,
                                          final long retryBackoffMs,
                                          final long retryBackoffMaxMs) {
             super(logContext, TopicMetadataRequestState.class.getSimpleName(), 
retryBackoffMs,
-                retryBackoffMaxMs);
+                retryBackoffMaxMs, deadlineTimer(time, deadlineMs));
             future = new CompletableFuture<>();
             this.topic = topic;
             this.allTopics = false;
-            this.expirationTimeMs = expirationTimeMs;
         }
 
         /**
@@ -167,10 +167,6 @@ public class TopicMetadataRequestManager implements 
RequestManager {
          * {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest}
 if needed.
          */
         private Optional<NetworkClientDelegate.UnsentRequest> send(final long 
currentTimeMs) {
-            if (currentTimeMs >= expirationTimeMs) {
-                return Optional.empty();
-            }
-
             if (!canSendRequest(currentTimeMs)) {
                 return Optional.empty();
             }
@@ -183,10 +179,6 @@ public class TopicMetadataRequestManager implements 
RequestManager {
             return Optional.of(createUnsentRequest(request));
         }
 
-        private boolean isExpired(final long currentTimeMs) {
-            return currentTimeMs >= expirationTimeMs;
-        }
-
         private void expire() {
             completeFutureAndRemoveRequest(
                     new TimeoutException("Timeout expired while fetching topic 
metadata"));
@@ -210,9 +202,8 @@ public class TopicMetadataRequestManager implements 
RequestManager {
         private void handleError(final Throwable exception,
                                  final long completionTimeMs) {
             if (exception instanceof RetriableException) {
-                if (completionTimeMs >= expirationTimeMs) {
-                    completeFutureAndRemoveRequest(
-                        new TimeoutException("Timeout expired while fetching 
topic metadata"));
+                if (isExpired()) {
+                    completeFutureAndRemoveRequest(new 
TimeoutException("Timeout expired while fetching topic metadata"));
                 } else {
                     onFailedAttempt(completionTimeMs);
                 }
@@ -222,20 +213,12 @@ public class TopicMetadataRequestManager implements 
RequestManager {
         }
 
         private void handleResponse(final ClientResponse response) {
-            long responseTimeMs = response.receivedTimeMs();
             try {
                 Map<String, List<PartitionInfo>> res = 
handleTopicMetadataResponse((MetadataResponse) response.responseBody());
                 future.complete(res);
                 inflightRequests.remove(this);
-            } catch (RetriableException e) {
-                if (responseTimeMs >= expirationTimeMs) {
-                    completeFutureAndRemoveRequest(
-                        new TimeoutException("Timeout expired while fetching 
topic metadata"));
-                } else {
-                    onFailedAttempt(responseTimeMs);
-                }
-            } catch (Exception t) {
-                completeFutureAndRemoveRequest(t);
+            } catch (Exception e) {
+                handleError(e, response.receivedTimeMs());
             }
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
index dffac129021..8cd17d19feb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableApplicationEvent.java
@@ -43,6 +43,7 @@ public abstract class CompletableApplicationEvent<T> extends 
ApplicationEvent im
         return future;
     }
 
+    @Override
     public long deadlineMs() {
         return deadlineMs;
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index b1db0297a12..8e61d61cbc5 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -193,11 +193,11 @@ public class CommitRequestManagerTest {
         offsets2.put(new TopicPartition("test", 4), new 
OffsetAndMetadata(20L));
 
         // Add the requests to the CommitRequestManager and store their futures
-        long expirationTimeMs  = time.milliseconds() + defaultApiTimeoutMs;
-        commitManager.commitSync(offsets1, expirationTimeMs);
-        commitManager.fetchOffsets(Collections.singleton(new 
TopicPartition("test", 0)), expirationTimeMs);
-        commitManager.commitSync(offsets2, expirationTimeMs);
-        commitManager.fetchOffsets(Collections.singleton(new 
TopicPartition("test", 1)), expirationTimeMs);
+        long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
+        commitManager.commitSync(offsets1, deadlineMs);
+        commitManager.fetchOffsets(Collections.singleton(new 
TopicPartition("test", 0)), deadlineMs);
+        commitManager.commitSync(offsets2, deadlineMs);
+        commitManager.fetchOffsets(Collections.singleton(new 
TopicPartition("test", 1)), deadlineMs);
 
         // Poll the CommitRequestManager and verify that the 
inflightOffsetFetches size is correct
         NetworkClientDelegate.PollResult result = 
commitManager.poll(time.milliseconds());
@@ -287,8 +287,8 @@ public class CommitRequestManagerTest {
         Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(
             new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
-        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, expirationTimeMs);
+        long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
+        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
         
sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManager, 
error, commitResult);
 
         // We expect that request should have been retried on this sync commit.
@@ -307,8 +307,8 @@ public class CommitRequestManagerTest {
             new OffsetAndMetadata(0));
 
         // Send sync offset commit that fails and verify it propagates the 
expected exception.
-        long expirationTimeMs = time.milliseconds() + retryBackoffMs;
-        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, expirationTimeMs);
+        long deadlineMs = time.milliseconds() + retryBackoffMs;
+        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
         completeOffsetCommitRequestWithError(commitRequestManager, 
commitError);
         assertFutureThrows(commitResult, expectedException);
     }
@@ -332,8 +332,8 @@ public class CommitRequestManagerTest {
         Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(
             new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
-        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, expirationTimeMs);
+        long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
+        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
 
         completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.UNKNOWN_MEMBER_ID);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
@@ -594,7 +594,7 @@ public class CommitRequestManagerTest {
 
     @ParameterizedTest
     @MethodSource("offsetFetchExceptionSupplier")
-    public void testOffsetFetchRequestErroredRequests(final Errors error, 
final boolean isRetriable) {
+    public void testOffsetFetchRequestErroredRequests(final Errors error) {
         CommitRequestManager commitRequestManager = create(true, 100);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
 
@@ -606,7 +606,7 @@ public class CommitRequestManagerTest {
             1,
             error);
         // we only want to make sure to purge the outbound buffer for 
non-retriables, so retriable will be re-queued.
-        if (isRetriable)
+        if (isRetriableOnOffsetFetch(error))
             testRetriable(commitRequestManager, futures);
         else {
             testNonRetriable(futures);
@@ -614,15 +614,49 @@ public class CommitRequestManagerTest {
         }
     }
 
+    @ParameterizedTest
+    @MethodSource("offsetFetchExceptionSupplier")
+    public void testOffsetFetchRequestTimeoutRequests(final Errors error) {
+        CommitRequestManager commitRequestManager = create(true, 100);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+        Set<TopicPartition> partitions = new HashSet<>();
+        partitions.add(new TopicPartition("t1", 0));
+        List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
+                commitRequestManager,
+                partitions,
+                1,
+                error);
+
+        if (isRetriableOnOffsetFetch(error)) {
+            futures.forEach(f -> assertFalse(f.isDone()));
+
+            // Insert a long enough sleep to force a timeout of the operation. 
Invoke poll() again so that each
+            // OffsetFetchRequestState is evaluated via isExpired().
+            time.sleep(defaultApiTimeoutMs);
+            
assertFalse(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
+            commitRequestManager.poll(time.milliseconds());
+            futures.forEach(f -> assertFutureThrows(f, 
TimeoutException.class));
+            
assertTrue(commitRequestManager.pendingRequests.unsentOffsetFetches.isEmpty());
+        } else {
+            futures.forEach(f -> assertFutureThrows(f, KafkaException.class));
+            assertEmptyPendingRequests(commitRequestManager);
+        }
+    }
+
+    private boolean isRetriableOnOffsetFetch(Errors error) {
+        return error == Errors.NOT_COORDINATOR || error == 
Errors.COORDINATOR_LOAD_IN_PROGRESS || error == 
Errors.COORDINATOR_NOT_AVAILABLE;
+    }
+
     @Test
     public void testSuccessfulOffsetFetch() {
         CommitRequestManager commitManager = create(false, 100);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
 
-        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
+        long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult =
             commitManager.fetchOffsets(Collections.singleton(new 
TopicPartition("test", 0)),
-                expirationTimeMs);
+                deadlineMs);
 
         // Send fetch request
         NetworkClientDelegate.PollResult result = 
commitManager.poll(time.milliseconds());
@@ -667,8 +701,8 @@ public class CommitRequestManagerTest {
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(new TopicPartition("t1", 0));
 
-        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
commitRequestManager.fetchOffsets(partitions, expirationTimeMs);
+        long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
commitRequestManager.fetchOffsets(partitions, deadlineMs);
 
         completeOffsetFetchRequestWithError(commitRequestManager, partitions, 
error);
 
@@ -694,8 +728,8 @@ public class CommitRequestManagerTest {
         Set<TopicPartition> partitions = new HashSet<>();
         partitions.add(new TopicPartition("t1", 0));
 
-        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
commitRequestManager.fetchOffsets(partitions, expirationTimeMs);
+        long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
commitRequestManager.fetchOffsets(partitions, deadlineMs);
 
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -748,8 +782,8 @@ public class CommitRequestManagerTest {
             new OffsetAndMetadata(0));
 
         // Send sync offset commit request that fails with retriable error.
-        long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2;
-        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, expirationTimeMs);
+        long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
+        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
         completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.REQUEST_TIMED_OUT);
 
         // Request retried after backoff, and fails with retriable again. 
Should not complete yet
@@ -770,8 +804,9 @@ public class CommitRequestManagerTest {
      * Sync commit requests that fail with an expected retriable error should 
be retried
      * while there is time. When time expires, they should fail with a 
TimeoutException.
      */
-    @Test
-    public void 
testOffsetCommitSyncFailedWithRetriableThrowsTimeoutWhenRetryTimeExpires() {
+    @ParameterizedTest
+    @MethodSource("offsetCommitExceptionSupplier")
+    public void 
testOffsetCommitSyncFailedWithRetriableThrowsTimeoutWhenRetryTimeExpires(final 
Errors error) {
         CommitRequestManager commitRequestManager = create(false, 100);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
 
@@ -780,17 +815,21 @@ public class CommitRequestManagerTest {
             new OffsetAndMetadata(0));
 
         // Send offset commit request that fails with retriable error.
-        long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2;
-        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, expirationTimeMs);
-        completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.COORDINATOR_NOT_AVAILABLE);
+        long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
+        CompletableFuture<Void> commitResult = 
commitRequestManager.commitSync(offsets, deadlineMs);
+        completeOffsetCommitRequestWithError(commitRequestManager, error);
 
         // Sleep to expire the request timeout. Request should fail on the 
next poll with a
         // TimeoutException.
-        time.sleep(expirationTimeMs);
+        time.sleep(deadlineMs);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
         assertTrue(commitResult.isDone());
-        assertFutureThrows(commitResult, TimeoutException.class);
+
+        if (error.exception() instanceof RetriableException)
+            assertFutureThrows(commitResult, TimeoutException.class);
+        else
+            assertFutureThrows(commitResult, KafkaException.class);
     }
 
     /**
@@ -829,8 +868,8 @@ public class CommitRequestManagerTest {
         Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
 
-        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
-        commitRequestManager.commitSync(offsets, expirationTimeMs);
+        long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
+        commitRequestManager.commitSync(offsets, deadlineMs);
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
         res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new 
TimeoutException());
@@ -911,8 +950,8 @@ public class CommitRequestManagerTest {
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
 
         // Send request that is expected to fail with invalid epoch.
-        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
-        commitRequestManager.fetchOffsets(partitions, expirationTimeMs);
+        long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
+        commitRequestManager.fetchOffsets(partitions, deadlineMs);
 
         // Mock member has new a valid epoch.
         int newEpoch = 8;
@@ -950,9 +989,9 @@ public class CommitRequestManagerTest {
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
 
         // Send request that is expected to fail with invalid epoch.
-        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
+        long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
requestResult =
-            commitRequestManager.fetchOffsets(partitions, expirationTimeMs);
+            commitRequestManager.fetchOffsets(partitions, deadlineMs);
 
         // Mock member not having a valid epoch anymore (left/failed/fenced).
         commitRequestManager.onMemberEpochUpdated(Optional.empty(), 
Optional.empty());
@@ -983,10 +1022,10 @@ public class CommitRequestManagerTest {
         TopicPartition tp = new TopicPartition("topic", 1);
         subscriptionState.assignFromUser(singleton(tp));
         subscriptionState.seek(tp, 5);
-        long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2;
+        long deadlineMs = time.milliseconds() + retryBackoffMs * 2;
 
         // Send commit request expected to be retried on STALE_MEMBER_EPOCH 
error while it does not expire
-        
commitRequestManager.maybeAutoCommitSyncBeforeRevocation(expirationTimeMs);
+        commitRequestManager.maybeAutoCommitSyncBeforeRevocation(deadlineMs);
 
         int newEpoch = 8;
         String memberId = "member1";
@@ -1094,7 +1133,7 @@ public class CommitRequestManagerTest {
     }
 
     /**
-     * @return {@link Errors} that could be received in OffsetCommit responses.
+     * @return {@link Errors} that could be received in {@link 
ApiKeys#OFFSET_COMMIT} responses.
      */
     private static Stream<Arguments> offsetCommitExceptionSupplier() {
         return Stream.of(
@@ -1113,25 +1152,27 @@ public class CommitRequestManagerTest {
             Arguments.of(Errors.UNKNOWN_MEMBER_ID));
     }
 
-    // Supplies (error, isRetriable)
+    /**
+     * @return {@link Errors} that could be received in {@link 
ApiKeys#OFFSET_FETCH} responses.
+     */
     private static Stream<Arguments> offsetFetchExceptionSupplier() {
-        // fetchCommit is only retrying on a subset of RetriableErrors
         return Stream.of(
-            Arguments.of(Errors.NOT_COORDINATOR, true),
-            Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS, true),
-            Arguments.of(Errors.UNKNOWN_SERVER_ERROR, false),
-            Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED, false),
-            Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE, false),
-            Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE, false),
-            Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION, false),
-            Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE, true),
-            Arguments.of(Errors.REQUEST_TIMED_OUT, false),
-            Arguments.of(Errors.FENCED_INSTANCE_ID, false),
-            Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED, false),
-            Arguments.of(Errors.UNKNOWN_MEMBER_ID, false),
+            Arguments.of(Errors.NOT_COORDINATOR),
+            Arguments.of(Errors.COORDINATOR_LOAD_IN_PROGRESS),
+            Arguments.of(Errors.UNKNOWN_SERVER_ERROR),
+            Arguments.of(Errors.GROUP_AUTHORIZATION_FAILED),
+            Arguments.of(Errors.OFFSET_METADATA_TOO_LARGE),
+            Arguments.of(Errors.INVALID_COMMIT_OFFSET_SIZE),
+            Arguments.of(Errors.UNKNOWN_TOPIC_OR_PARTITION),
+            Arguments.of(Errors.COORDINATOR_NOT_AVAILABLE),
+            Arguments.of(Errors.REQUEST_TIMED_OUT),
+            Arguments.of(Errors.FENCED_INSTANCE_ID),
+            Arguments.of(Errors.TOPIC_AUTHORIZATION_FAILED),
+            Arguments.of(Errors.UNKNOWN_MEMBER_ID),
             // Adding STALE_MEMBER_EPOCH as non-retriable here because it is 
only retried if a new
             // member epoch is received. Tested separately.
-            Arguments.of(Errors.STALE_MEMBER_EPOCH, false));
+            Arguments.of(Errors.STALE_MEMBER_EPOCH),
+            Arguments.of(Errors.UNSTABLE_OFFSET_COMMIT));
     }
 
     /**
@@ -1155,9 +1196,9 @@ public class CommitRequestManagerTest {
         TopicPartition tp2 = new TopicPartition("t2", 3);
         partitions.add(tp1);
         partitions.add(tp2);
-        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
+        long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
-                commitRequestManager.fetchOffsets(partitions, 
expirationTimeMs);
+                commitRequestManager.fetchOffsets(partitions, deadlineMs);
 
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -1215,9 +1256,9 @@ public class CommitRequestManagerTest {
             int numRequest,
             final Errors error) {
         List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = new ArrayList<>();
-        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
+        long deadlineMs = time.milliseconds() + defaultApiTimeoutMs;
         for (int i = 0; i < numRequest; i++) {
-            futures.add(commitRequestManager.fetchOffsets(partitions, 
expirationTimeMs));
+            futures.add(commitRequestManager.fetchOffsets(partitions, 
deadlineMs));
         }
 
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
index 9f6fd4a764b..dabd697b896 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
@@ -174,7 +174,7 @@ public class ConsumerTestBuilder implements Closeable {
                 backgroundEventHandler,
                 logContext));
 
-        this.topicMetadataRequestManager = spy(new 
TopicMetadataRequestManager(logContext, config));
+        this.topicMetadataRequestManager = spy(new 
TopicMetadataRequestManager(logContext, time, config));
 
         if (groupInfo.isPresent()) {
             GroupInformation gi = groupInfo.get();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TimedRequestStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TimedRequestStateTest.java
new file mode 100644
index 00000000000..ddde3ae84d4
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TimedRequestStateTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TimedRequestStateTest {
+
+    private final static long DEFAULT_TIMEOUT_MS = 30000;
+    private final Time time = new MockTime();
+
+    @Test
+    public void testIsExpired() {
+        TimedRequestState state = new TimedRequestState(
+            new LogContext(),
+            this.getClass().getSimpleName(),
+            100,
+            1000,
+            time.timer(DEFAULT_TIMEOUT_MS)
+        );
+        assertFalse(state.isExpired());
+        time.sleep(DEFAULT_TIMEOUT_MS);
+        assertTrue(state.isExpired());
+    }
+
+    @Test
+    public void testRemainingMs() {
+        TimedRequestState state = new TimedRequestState(
+            new LogContext(),
+            this.getClass().getSimpleName(),
+            100,
+            1000,
+            time.timer(DEFAULT_TIMEOUT_MS)
+        );
+        assertEquals(DEFAULT_TIMEOUT_MS, state.remainingMs());
+        time.sleep(DEFAULT_TIMEOUT_MS);
+        assertEquals(0, state.remainingMs());
+    }
+
+    @Test
+    public void testDeadlineTimer() {
+        long deadlineMs = time.milliseconds() + DEFAULT_TIMEOUT_MS;
+        Timer timer = TimedRequestState.deadlineTimer(time, deadlineMs);
+        assertEquals(DEFAULT_TIMEOUT_MS, timer.remainingMs());
+        timer.sleep(DEFAULT_TIMEOUT_MS);
+        assertEquals(0, timer.remainingMs());
+    }
+
+    @Test
+    public void testAllowOverdueDeadlineTimer() {
+        long deadlineMs = time.milliseconds() - DEFAULT_TIMEOUT_MS;
+        Timer timer = TimedRequestState.deadlineTimer(time, deadlineMs);
+        assertEquals(0, timer.remainingMs());
+    }
+
+    @Test
+    public void testToStringUpdatesTimer() {
+        TimedRequestState state = new TimedRequestState(
+            new LogContext(),
+            this.getClass().getSimpleName(),
+            100,
+            1000,
+            time.timer(DEFAULT_TIMEOUT_MS)
+        );
+
+        assertToString(state, DEFAULT_TIMEOUT_MS);
+        time.sleep(DEFAULT_TIMEOUT_MS);
+        assertToString(state, 0);
+    }
+
+    private void assertToString(TimedRequestState state, long timerMs) {
+        assertTrue(state.toString().contains("remainingMs=" + timerMs + "}"));
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java
index 3f2b2c3d983..56eff5b4f4b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/TopicMetadataRequestManagerTest.java
@@ -74,6 +74,7 @@ public class TopicMetadataRequestManagerTest {
         props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         this.topicMetadataRequestManager = spy(new TopicMetadataRequestManager(
             new LogContext(),
+            time,
             new ConsumerConfig(props)));
     }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
index d242ea105e6..d29f05b36b3 100644
--- a/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AbstractConsumerTest.scala
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.test.{TestUtils => JTestUtils}
 import kafka.utils.TestUtils
 import kafka.server.BaseRequestTest
 import org.junit.jupiter.api.Assertions._
@@ -90,12 +91,14 @@ abstract class AbstractConsumerTest extends BaseRequestTest 
{
         s"The current assignment is ${consumer.assignment()}")
   }
 
-  def awaitNonEmptyRecords[K, V](consumer: Consumer[K, V], partition: 
TopicPartition): ConsumerRecords[K, V] = {
+  def awaitNonEmptyRecords[K, V](consumer: Consumer[K, V],
+                                 partition: TopicPartition,
+                                 pollTimeoutMs: Long = 100): 
ConsumerRecords[K, V] = {
     TestUtils.pollRecordsUntilTrue(consumer, (polledRecords: 
ConsumerRecords[K, V]) => {
       if (polledRecords.records(partition).asScala.nonEmpty)
         return polledRecords
       false
-    }, s"Consumer did not consume any messages for partition $partition before 
timeout.")
+    }, s"Consumer did not consume any messages for partition $partition before 
timeout.", JTestUtils.DEFAULT_MAX_WAIT_MS, pollTimeoutMs)
     throw new IllegalStateException("Should have timed out before reaching 
here")
   }
 
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala
index daed397e43f..0184a6eea67 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala
@@ -238,6 +238,19 @@ class PlaintextConsumerPollTest extends 
AbstractConsumerTest {
     runMultiConsumerSessionTimeoutTest(true)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testPollEventuallyReturnsRecordsWithZeroTimeout(quorum: String, 
groupProtocol: String): Unit = {
+    val numMessages = 100
+    val producer = createProducer()
+    sendRecords(producer, numMessages, tp)
+
+    val consumer = createConsumer()
+    consumer.subscribe(Set(topic).asJava)
+    val records = awaitNonEmptyRecords(consumer, tp, 0L)
+    assertEquals(numMessages, records.count())
+  }
+
   def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
     // use consumers defined in this class plus one additional consumer
     // Use topic defined in this class + one additional topic
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 31f671e3a0f..35b7ce418d1 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -903,9 +903,10 @@ object TestUtils extends Logging {
   def pollRecordsUntilTrue[K, V](consumer: Consumer[K, V],
                                  action: ConsumerRecords[K, V] => Boolean,
                                  msg: => String,
-                                 waitTimeMs: Long = 
JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
+                                 waitTimeMs: Long = 
JTestUtils.DEFAULT_MAX_WAIT_MS,
+                                 pollTimeoutMs: Long = 100): Unit = {
     waitUntilTrue(() => {
-      val records = consumer.poll(Duration.ofMillis(100))
+      val records = consumer.poll(Duration.ofMillis(pollTimeoutMs))
       action(records)
     }, msg = msg, pause = 0L, waitTimeMs = waitTimeMs)
   }
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
index 059e33f3525..91455f86232 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
@@ -208,6 +208,7 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
         consumer.seek(topicPartition, fetchOffset);
 
         long timeoutMs = 60_000L;
+        long pollTimeoutMs = 100L;
         String sep = System.lineSeparator();
         List<ConsumerRecord<String, String>> records = new ArrayList<>();
         Function1<ConsumerRecords<String, String>, Object> pollAction = 
polledRecords -> {
@@ -218,7 +219,7 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
                 String.format("Could not consume %d records of %s from offset 
%d in %d ms. %d message(s) consumed:%s%s",
                         expectedTotalCount, topicPartition, fetchOffset, 
timeoutMs, records.size(), sep,
                         
records.stream().map(Object::toString).collect(Collectors.joining(sep)));
-        TestUtils.pollRecordsUntilTrue(consumer, pollAction, messageSupplier, 
timeoutMs);
+        TestUtils.pollRecordsUntilTrue(consumer, pollAction, messageSupplier, 
timeoutMs, pollTimeoutMs);
         return records;
     }
 

Reply via email to