This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 77ba06fa620 KAFKA-16033: Commit retry logic fixes (#15357)
77ba06fa620 is described below

commit 77ba06fa620ad9cc42af92f42354661308676135
Author: Lianet Magrans <98415067+lian...@users.noreply.github.com>
AuthorDate: Wed Feb 21 05:08:37 2024 -0500

    KAFKA-16033: Commit retry logic fixes (#15357)
    
    This change modifies the commit manager for improved retry logic & fixing 
bugs:
    
    - defines high level functions for each of the different types of commit: 
commitSync, commitAsync, autoCommitSync (used from consumer close), 
autoCommitAsync (on interval), autoCommitNow (before revocation).
     - moves retry logic to these caller functions, keeping a common response 
error handling that propagates errors that each caller functions retry as it 
needs.
    
    Fixes the following issues:
    
    - auto-commit before revocation should retry with latest consumed offsets
    - auto-commit before revocation should only reset the timer once, when the 
rebalance completes
    - StaleMemberEpoch error (fatal) is considered retriable only when 
committing offsets before revocation, where it is retried with backoff if the 
member has a valid epoch. All other commits will fail fatally on stale epoch. 
Note that auto commit on the interval (autoCommitAsync) does not have any 
specific retry logic for the stale epoch, but will effectively retry on the 
next interval (as it does for any other fatal error)
    - fix duplicated and noisy logs for auto-commit
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  24 +-
 .../consumer/internals/CommitRequestManager.java   | 669 +++++++++++----------
 .../consumer/internals/MembershipManagerImpl.java  |  12 +-
 .../internals/events/ApplicationEvent.java         |   2 +-
 .../events/ApplicationEventProcessor.java          |  32 +-
 .../events/AsyncCommitApplicationEvent.java        |  39 ++
 .../internals/events/CommitApplicationEvent.java   |  34 +-
 .../events/SyncCommitApplicationEvent.java         |  52 ++
 .../consumer/internals/AsyncKafkaConsumerTest.java |  71 ++-
 .../internals/CommitRequestManagerTest.java        | 291 +++++----
 .../internals/ConsumerNetworkThreadTest.java       |  26 +-
 .../internals/MembershipManagerImplTest.java       |   4 +-
 12 files changed, 719 insertions(+), 537 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 6781e0b73cb..d2e4788a1bc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -40,6 +40,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.AsyncCommitApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
@@ -57,6 +58,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdat
 import org.apache.kafka.clients.consumer.internals.events.PollApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SyncCommitApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
@@ -755,9 +757,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, 
OffsetCommitCallback callback) {
         acquireAndEnsureOpen();
         try {
-            // Commit without timer to indicate that the commit should be 
triggered without
-            // waiting for a response.
-            CompletableFuture<Void> future = commit(offsets, false, 
Optional.empty());
+            AsyncCommitApplicationEvent asyncCommitEvent = new 
AsyncCommitApplicationEvent(offsets);
+            CompletableFuture<Void> future = commit(asyncCommitEvent);
             future.whenComplete((r, t) -> {
 
                 if (t == null) {
@@ -778,14 +779,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
-    // Visible for testing
-    CompletableFuture<Void> commit(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                   final boolean isWakeupable,
-                                   final Optional<Long> retryTimeoutMs) {
+    private CompletableFuture<Void> commit(final CommitApplicationEvent 
commitEvent) {
         maybeInvokeCommitCallbacks();
         maybeThrowFencedInstanceException();
         maybeThrowInvalidGroupIdException();
 
+        Map<TopicPartition, OffsetAndMetadata> offsets = commitEvent.offsets();
         log.debug("Committing offsets: {}", offsets);
         offsets.forEach(this::updateLastSeenEpochIfNewer);
 
@@ -793,11 +792,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             return CompletableFuture.completedFuture(null);
         }
 
-        final CommitApplicationEvent commitEvent = new 
CommitApplicationEvent(offsets, retryTimeoutMs);
-        if (isWakeupable) {
-            // the task can only be woken up if the top level API call is 
commitSync
-            wakeupTrigger.setActiveTask(commitEvent.future());
-        }
         applicationEventHandler.add(commitEvent);
         return commitEvent.future();
     }
@@ -1344,9 +1338,9 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         long commitStart = time.nanoseconds();
         try {
             Timer requestTimer = time.timer(timeout.toMillis());
-            // Commit with a timer to control how long the request should be 
retried until it
-            // gets a successful response or non-retriable error.
-            CompletableFuture<Void> commitFuture = commit(offsets, true, 
Optional.of(timeout.toMillis()));
+            SyncCommitApplicationEvent syncCommitEvent = new 
SyncCommitApplicationEvent(offsets, timeout.toMillis());
+            CompletableFuture<Void> commitFuture = commit(syncCommitEvent);
+            wakeupTrigger.setActiveTask(commitFuture);
             ConsumerUtils.getResult(commitFuture, requestTimer);
             interceptors.onCommit(offsets);
         } finally {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 704da62178c..9206783d561 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.StaleMemberEpochException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnstableOffsetCommitException;
@@ -168,7 +169,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             return drainPendingOffsetCommitRequests();
         }
 
-        maybeAutoCommitAllConsumedAsync();
+        maybeAutoCommitAsync();
         if (!pendingRequests.hasUnsentRequests())
             return EMPTY;
 
@@ -204,126 +205,313 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
     }
 
     /**
-     * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
-     * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
-     * request if there is no other commit request already in-flight, and if 
the commit interval
-     * has elapsed.
+     * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+     * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+     * offsets to commit, no request will be generated and a completed future 
will be returned.
      *
-     * @param offsets           Offsets to commit
-     * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
-     *                          fails with a retriable error. If not present, 
the request will be
-     *                          sent but not retried.
-     * @param checkInterval     True if the auto-commit interval expiration 
should be checked for
-     *                          sending a request. If true, the request will 
be sent only if the
-     *                          auto-commit interval has expired. Pass false to
-     *                          send the auto-commit request regardless of the 
interval (ex.
-     *                          auto-commit before rebalance).
-     * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
-     *                          {@link Errors#STALE_MEMBER_EPOCH}.
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * @param requestState Commit request
+     * @return Future containing the offsets that were committed, or an error 
if the request
+     * failed.
      */
-    private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                                    final Optional<Long> 
expirationTimeMs,
-                                                    boolean checkInterval,
-                                                    boolean retryOnStaleEpoch) 
{
-        if (!autoCommitEnabled()) {
-            log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-            return CompletableFuture.completedFuture(null);
-        }
-
+    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
         AutoCommitState autocommit = autoCommitState.get();
-        if (checkInterval && !autocommit.shouldAutoCommit()) {
-            return CompletableFuture.completedFuture(null);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result;
+        if (requestState.offsets.isEmpty()) {
+            result = CompletableFuture.completedFuture(Collections.emptyMap());
+        } else {
+            autocommit.setInflightCommitStatus(true);
+            OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+            result = request.future;
+            result.whenComplete(autoCommitCallback(request.offsets));
         }
-
-        autocommit.resetTimer();
-        autocommit.setInflightCommitStatus(true);
-        CompletableFuture<Void> result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-            .whenComplete(autoCommitCallback(offsets));
         return result;
     }
 
     /**
-     * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
-     * partitions and their current positions. Note on auto-commit timers: 
this will reset the
-     * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
-     * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
-     * failed with a retriable error.
-     *
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+     * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+     * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+     * <p/>
+     * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+     * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+     * the next commit will be generated when the interval expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() {
-        if (!autoCommitEnabled()) {
-            // Early return to ensure that no action/logging is performed.
-            return CompletableFuture.completedFuture(null);
+    public void maybeAutoCommitAsync() {
+        if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
+            OffsetCommitRequestState requestState = createOffsetCommitRequest(
+                subscriptions.allConsumed(),
+                Optional.empty());
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
requestAutoCommit(requestState);
+            // Reset timer to the interval (even if no request was generated), 
but ensure that if
+            // the request completes with a retriable error, the timer is 
reset to send the next
+            // auto-commit after the backoff expires.
+            resetAutoCommitTimer();
+            maybeResetTimerWithBackoff(result);
         }
-        Map<TopicPartition, OffsetAndMetadata> offsets = 
subscriptions.allConsumed();
-        CompletableFuture<Void> result = maybeAutoCommit(offsets, 
Optional.empty(), true, true);
-        result.whenComplete((__, error) -> {
+    }
+
+    /**
+     * Reset auto-commit timer to retry with backoff if the future failed with 
a RetriableCommitFailedException.
+     */
+    private void maybeResetTimerWithBackoff(final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
+        result.whenComplete((offsets, error) -> {
             if (error != null) {
                 if (error instanceof RetriableCommitFailedException) {
                     log.debug("Asynchronous auto-commit of offsets {} failed 
due to retriable error.", offsets, error);
                     resetAutoCommitTimer(retryBackoffMs);
                 } else {
-                    log.warn("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
+                    log.debug("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
                 }
             } else {
                 log.debug("Completed asynchronous auto-commit of offsets {}", 
offsets);
             }
         });
-
-        return result;
     }
 
     /**
-     * Commit consumed offsets if auto-commit is enabled. Retry while the 
timer is not expired,
-     * until the request succeeds or fails with a fatal error.
+     * Commit consumed offsets if auto-commit is enabled, regardless of the 
auto-commit interval.
+     * This is used for committing offsets before revoking partitions. This 
will retry committing
+     * the latest offsets until the request succeeds, fails with a fatal 
error, or the timeout
+     * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as 
a retriable error,
+     * and will retry it including the latest member ID and epoch received 
from the broker.
+     *
+     * @return Future that will complete when the offsets are successfully 
committed. It will
+     * complete exceptionally if the commit fails with a non-retriable error, 
or if the retry
+     * timeout expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedNow(
-        final Optional<Long> expirationTimeMs,
-        final boolean retryOnStaleEpoch) {
-        return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, 
false, retryOnStaleEpoch);
+    public CompletableFuture<Void> maybeAutoCommitSyncNow(final long 
retryExpirationTimeMs) {
+        if (!autoCommitEnabled()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        OffsetCommitRequestState requestState =
+            createOffsetCommitRequest(subscriptions.allConsumed(), 
Optional.of(retryExpirationTimeMs));
+        autoCommitSyncNowWithRetries(requestState, result);
+        return result;
     }
 
-    private BiConsumer<? super Void, ? super Throwable> 
autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> 
allConsumedOffsets) {
+    private void autoCommitSyncNowWithRetries(OffsetCommitRequestState 
requestAttempt,
+                                              CompletableFuture<Void> result) {
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAttempt = requestAutoCommit(requestAttempt);
+        commitAttempt.whenComplete((committedOffsets, error) -> {
+            if (error == null) {
+                result.complete(null);
+            } else {
+                if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
+                    if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+                        log.debug("Auto-commit sync timed out and won't be 
retried anymore");
+                        result.completeExceptionally(error);
+                    } else {
+                        // Make sure the auto-commit is retries with the 
latest offsets
+                        requestAttempt.offsets = subscriptions.allConsumed();
+                        requestAttempt.resetFuture();
+                        autoCommitSyncNowWithRetries(requestAttempt, result);
+                    }
+                } else {
+                    log.debug("Auto-commit sync failed with non-retriable 
error", error);
+                    result.completeExceptionally(error);
+                }
+            }
+        });
+    }
+
+    /**
+     * Clear the inflight auto-commit flag and log auto-commit completion 
status.
+     */
+    private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super 
Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> 
allConsumedOffsets) {
         return (response, throwable) -> {
             autoCommitState.ifPresent(autoCommitState -> 
autoCommitState.setInflightCommitStatus(false));
             if (throwable == null) {
                 
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets);
-                log.debug("Completed asynchronous auto-commit of offsets {}", 
allConsumedOffsets);
+                log.debug("Completed auto-commit of offsets {}", 
allConsumedOffsets);
             } else if (throwable instanceof RetriableCommitFailedException) {
-                log.debug("Asynchronous auto-commit of offsets {} failed due 
to retriable error: {}",
+                log.debug("Auto-commit of offsets {} failed due to retriable 
error: {}",
                         allConsumedOffsets, throwable.getMessage());
             } else {
-                log.warn("Asynchronous auto-commit of offsets {} failed", 
allConsumedOffsets, throwable);
+                log.warn("Auto-commit of offsets {} failed", 
allConsumedOffsets, throwable);
             }
         };
     }
 
     /**
-     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
-     * {@link OffsetCommitRequestState} and enqueue it to send later.
+     * Generate a request to commit offsets without retrying, even if it fails 
with a retriable
+     * error. The generated request will be added to the queue to be sent on 
the next call to
+     * {@link #poll(long)}.
+     *
+     * @param offsets Offsets to commit per partition.
+     * @return Future that will complete when a response is received, 
successfully or
+     * exceptionally depending on the response. If the request fails with a 
retriable error, the
+     * future will be completed with a {@link RetriableCommitFailedException}.
      */
-    public CompletableFuture<Void> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets,
-                                                          final Optional<Long> 
expirationTimeMs,
-                                                          final boolean 
retryOnStaleEpoch) {
+    public CompletableFuture<Void> commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
         if (offsets.isEmpty()) {
             log.debug("Skipping commit of empty offsets");
             return CompletableFuture.completedFuture(null);
         }
-        return pendingRequests.addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch).future;
+        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(offsets, Optional.empty());
+        pendingRequests.addOffsetCommitRequest(commitRequest);
+
+        CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>();
+        commitRequest.future.whenComplete((committedOffsets, error) -> {
+            if (error != null) {
+                
asyncCommitResult.completeExceptionally(commitAsyncExceptionForError(error));
+            } else {
+                asyncCommitResult.complete(null);
+            }
+        });
+        return asyncCommitResult;
     }
 
     /**
-     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsApplicationEvent}.
 It creates an
-     * {@link OffsetFetchRequestState} and enqueue it to send later.
+     * Commit offsets, retrying on expected retriable errors while the retry 
timeout hasn't expired.
+     *
+     * @param offsets               Offsets to commit
+     * @param retryExpirationTimeMs Time until which the request will be 
retried if it fails with
+     *                              an expected retriable error.
+     * @return Future that will complete when a successful response
      */
-    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(
+    public CompletableFuture<Void> commitSync(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
+                                              final long 
retryExpirationTimeMs) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        OffsetCommitRequestState requestState = createOffsetCommitRequest(
+            offsets,
+            Optional.of(retryExpirationTimeMs));
+        commitSyncWithRetries(requestState, result);
+        return result;
+    }
+
+    private OffsetCommitRequestState createOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets,
+                                                               final 
Optional<Long> expirationTimeMs) {
+        return jitter.isPresent() ?
+            new OffsetCommitRequestState(
+                offsets,
+                groupId,
+                groupInstanceId,
+                expirationTimeMs,
+                retryBackoffMs,
+                retryBackoffMaxMs,
+                jitter.getAsDouble(),
+                memberInfo) :
+            new OffsetCommitRequestState(
+                offsets,
+                groupId,
+                groupInstanceId,
+                expirationTimeMs,
+                retryBackoffMs,
+                retryBackoffMaxMs,
+                memberInfo);
+    }
+
+    private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt,
+                                       CompletableFuture<Void> result) {
+        pendingRequests.addOffsetCommitRequest(requestAttempt);
+
+        // Retry the same commit request while it fails with 
RetriableException and the retry
+        // timeout hasn't expired.
+        requestAttempt.future.whenComplete((res, error) -> {
+            if (error == null) {
+                result.complete(null);
+            } else {
+                if (error instanceof RetriableException) {
+                    if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+                        log.info("OffsetCommit timeout expired so it won't be 
retried anymore");
+                        result.completeExceptionally(error);
+                    } else {
+                        requestAttempt.resetFuture();
+                        commitSyncWithRetries(requestAttempt, result);
+                    }
+                } else {
+                    
result.completeExceptionally(commitSyncExceptionForError(error));
+                }
+            }
+        });
+    }
+
+    private Throwable commitSyncExceptionForError(Throwable error) {
+        if (error instanceof StaleMemberEpochException) {
+            return new CommitFailedException("OffsetCommit failed with stale 
member epoch."
+                + Errors.STALE_MEMBER_EPOCH.message());
+        }
+        return error;
+    }
+
+    private Throwable commitAsyncExceptionForError(Throwable error) {
+        if (error instanceof RetriableException) {
+            return new RetriableCommitFailedException(error.getMessage());
+        }
+        return error;
+    }
+
+    /**
+     * Enqueue a request to fetch committed offsets, that will be sent on the 
next call to {@link #poll(long)}.
+     *
+     * @param partitions       Partitions to fetch offsets for.
+     * @param expirationTimeMs Time until which the request should be retried 
if it fails
+     *                         with expected retriable errors.
+     * @return Future that will complete when a successful response is 
received, or the request
+     * fails and cannot be retried. Note that the request is retried whenever 
it fails with
+     * retriable expected error and the retry time hasn't expired.
+     */
+    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
fetchOffsets(
         final Set<TopicPartition> partitions,
         final long expirationTimeMs) {
-        return pendingRequests.addOffsetFetchRequest(partitions, 
expirationTimeMs);
+        if (partitions.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyMap());
+        }
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new 
CompletableFuture<>();
+        OffsetFetchRequestState request = createOffsetFetchRequest(partitions, 
expirationTimeMs);
+        fetchOffsetsWithRetries(request, result);
+        return result;
+    }
+
+    private OffsetFetchRequestState createOffsetFetchRequest(final 
Set<TopicPartition> partitions,
+                                                             final long 
expirationTimeMs) {
+        return jitter.isPresent() ?
+            new OffsetFetchRequestState(
+                partitions,
+                retryBackoffMs,
+                retryBackoffMaxMs,
+                expirationTimeMs,
+                jitter.getAsDouble(),
+                memberInfo) :
+            new OffsetFetchRequestState(
+                partitions,
+                retryBackoffMs,
+                retryBackoffMaxMs,
+                expirationTimeMs,
+                memberInfo);
+    }
+
+    private void fetchOffsetsWithRetries(final OffsetFetchRequestState 
fetchRequest,
+                                         final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
currentResult = pendingRequests.addOffsetFetchRequest(fetchRequest);
+
+        // Retry the same fetch request while it fails with RetriableException 
and the retry timeout hasn't expired.
+        currentResult.whenComplete((res, error) -> {
+            boolean inflightRemoved = 
pendingRequests.inflightOffsetFetches.remove(fetchRequest);
+            if (!inflightRemoved) {
+                log.warn("A duplicated, inflight, request was identified, but 
unable to find it in the " +
+                    "outbound buffer:" + fetchRequest);
+            }
+            if (error == null) {
+                result.complete(res);
+            } else {
+                if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
+                    if (error instanceof TimeoutException && 
fetchRequest.isExpired) {
+                        result.completeExceptionally(error);
+                    } else {
+                        fetchRequest.resetFuture();
+                        fetchOffsetsWithRetries(fetchRequest, result);
+                    }
+                } else
+                    result.completeExceptionally(error);
+            }
+        });
+    }
+
+    private boolean isStaleEpochErrorAndValidEpochAvailable(Throwable error) {
+        return error instanceof StaleMemberEpochException && 
memberInfo.memberEpoch.isPresent();
     }
 
     public void updateAutoCommitTimer(final long currentTimeMs) {
@@ -393,18 +581,15 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
     }
 
     private class OffsetCommitRequestState extends RetriableRequestState {
-        private final Map<TopicPartition, OffsetAndMetadata> offsets;
+        private Map<TopicPartition, OffsetAndMetadata> offsets;
         private final String groupId;
         private final Optional<String> groupInstanceId;
 
-        private final CompletableFuture<Void> future;
-
         /**
-         * Time until which the request should be retried if it fails with 
retriable
-         * errors. If not present, the request is triggered without waiting 
for a response or
-         * retrying.
+         * Future containing the offsets that were committed. It completes 
when a response is
+         * received for the commit request.
          */
-        private final Optional<Long> expirationTimeMs;
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
 
         OffsetCommitRequestState(final Map<TopicPartition, OffsetAndMetadata> 
offsets,
                                  final String groupId,
@@ -412,15 +597,13 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                                  final Optional<Long> expirationTimeMs,
                                  final long retryBackoffMs,
                                  final long retryBackoffMaxMs,
-                                 final MemberInfo memberInfo,
-                                 final boolean retryOnStaleEpoch) {
+                                 final MemberInfo memberInfo) {
             super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs,
-                retryBackoffMaxMs, memberInfo, retryOnStaleEpoch);
+                retryBackoffMaxMs, memberInfo, expirationTimeMs);
             this.offsets = offsets;
             this.groupId = groupId;
             this.groupInstanceId = groupInstanceId;
             this.future = new CompletableFuture<>();
-            this.expirationTimeMs = expirationTimeMs;
         }
 
         // Visible for testing
@@ -431,15 +614,13 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                                  final long retryBackoffMs,
                                  final long retryBackoffMaxMs,
                                  final double jitter,
-                                 final MemberInfo memberInfo,
-                                 final boolean retryOnStaleEpoch) {
+                                 final MemberInfo memberInfo) {
             super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs, 2,
-                retryBackoffMaxMs, jitter, memberInfo, retryOnStaleEpoch);
+                retryBackoffMaxMs, jitter, memberInfo, expirationTimeMs);
             this.offsets = offsets;
             this.groupId = groupId;
             this.groupInstanceId = groupInstanceId;
             this.future = new CompletableFuture<>();
-            this.expirationTimeMs = expirationTimeMs;
         }
 
         public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
@@ -494,14 +675,16 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : 
commitResponse.data().topics()) {
                 for (OffsetCommitResponseData.OffsetCommitResponsePartition 
partition : topic.partitions()) {
                     TopicPartition tp = new TopicPartition(topic.name(), 
partition.partitionIndex());
-                    OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
-                    long offset = offsetAndMetadata.offset();
+
                     Errors error = Errors.forCode(partition.errorCode());
                     if (error == Errors.NONE) {
+                        OffsetAndMetadata offsetAndMetadata = offsets.get(tp);
+                        long offset = offsetAndMetadata.offset();
                         log.debug("OffsetCommit completed successfully for 
offset {} partition {}", offset, tp);
                         continue;
                     }
 
+                    onFailedAttempt(currentTimeMs);
                     if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                         
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
                         return;
@@ -509,11 +692,11 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                         error == Errors.NOT_COORDINATOR ||
                         error == Errors.REQUEST_TIMED_OUT) {
                         
coordinatorRequestManager.markCoordinatorUnknown(error.message(), 
currentTimeMs);
-                        maybeRetry(currentTimeMs, error.exception());
+                        future.completeExceptionally(error.exception());
                         return;
                     } else if (error == Errors.FENCED_INSTANCE_ID) {
                         String fencedError = "OffsetCommit failed due to group 
instance id fenced: " + groupInstanceId;
-                        log.error(fencedError, error.message());
+                        log.error(fencedError);
                         future.completeExceptionally(new 
CommitFailedException(fencedError));
                         return;
                     } else if (error == Errors.OFFSET_METADATA_TOO_LARGE ||
@@ -523,21 +706,15 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                     } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS ||
                         error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                         // just retry
-                        maybeRetry(currentTimeMs, error.exception());
+                        future.completeExceptionally(error.exception());
                         return;
                     } else if (error == Errors.UNKNOWN_MEMBER_ID) {
-                        log.error("OffsetCommit failed with {} on partition {} 
for offset {}",
-                            error, tp, offset);
+                        log.error("OffsetCommit failed with {}", error);
                         future.completeExceptionally(new 
CommitFailedException("OffsetCommit " +
                             "failed with unknown member ID. " + 
error.message()));
                         return;
                     } else if (error == Errors.STALE_MEMBER_EPOCH) {
-                        if (maybeRetryWithNewMemberEpoch(currentTimeMs, 
error)) {
-                            log.debug("OffsetCommit failed with {} and will be 
retried with the " +
-                                "latest member ID and epoch.", error);
-                            return;
-                        }
-                        
future.completeExceptionally(commitExceptionForStaleMemberEpoch());
+                        future.completeExceptionally(error.exception());
                         return;
                     } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                         // Collect all unauthorized topics before failing
@@ -559,31 +736,6 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             }
         }
 
-        /**
-         * Enqueue the request to be retried with exponential backoff, if the 
request allows
-         * retries and the timer has not expired. Complete the request future 
exceptionally if
-         * the request won't be retried.
-         */
-        @Override
-        void maybeRetry(long currentTimeMs, Throwable throwable) {
-            if (!allowsRetries()) {
-                // Fail requests that do not allow retries (async requests), 
making sure to
-                // propagate a RetriableCommitException if the failure is 
retriable.
-                
future.completeExceptionally(commitExceptionForRetriableError(throwable));
-                return;
-            }
-            if (isExpired(currentTimeMs)) {
-                // Fail requests that allowed retries (sync requests), but 
expired.
-                future.completeExceptionally(throwable);
-                return;
-            }
-
-            // Enqueue request to be retried with backoff. Note that this 
maintains the same
-            // timer of the initial request, so all the retries are 
time-bounded.
-            onFailedAttempt(currentTimeMs);
-            pendingRequests.addOffsetCommitRequest(this);
-        }
-
         @Override
         String requestDescription() {
             return "OffsetCommit request for offsets " + offsets;
@@ -594,49 +746,15 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             return future;
         }
 
-        private boolean isExpired(final long currentTimeMs) {
-            return expirationTimeMs.isPresent() && expirationTimeMs.get() <= 
currentTimeMs;
+        void resetFuture() {
+            future = new CompletableFuture<>();
         }
 
-        /**
-         * @return True if the requests allows to be retried (sync requests 
that provide an
-         * expiration time to bound the retries). False if the request does 
not allow to be
-         * retried on RetriableErrors (async requests that does not provide an 
expiration time
-         * for retries)
-         */
-        private boolean allowsRetries() {
-            return expirationTimeMs.isPresent();
-        }
-
-        /**
-         * Complete the request future with a TimeoutException if the request 
expired. No action
-         * taken if the request is still active.
-         *
-         * @return True if the request expired.
-         */
-        private boolean maybeExpire(final long currentTimeMs) {
-            if (isExpired(currentTimeMs)) {
-                future.completeExceptionally(new 
TimeoutException("OffsetCommit could not complete " +
-                    "before timeout expired."));
-                return true;
+        @Override
+        void removeRequest() {
+            if (!unsentOffsetCommitRequests().remove(this)) {
+                log.warn("OffsetCommit request to remove not found in the 
outbound buffer: {}", this);
             }
-            return false;
-        }
-
-        /**
-         * @return A RetriableCommitFailedException for async commit requests 
if the original
-         * Exception was a RetriableException. Return the original one in any 
other case.
-         */
-        private Throwable commitExceptionForRetriableError(Throwable 
throwable) {
-            if (!allowsRetries() && throwable instanceof RetriableException)
-                return new RetriableCommitFailedException(throwable);
-            return throwable;
-        }
-
-        private Throwable commitExceptionForStaleMemberEpoch() {
-            if (retryOnStaleEpoch)
-                return new 
RetriableCommitFailedException(Errors.STALE_MEMBER_EPOCH.exception());
-            return new CommitFailedException("OffsetCommit failed with stale 
member epoch." + Errors.STALE_MEMBER_EPOCH.message());
         }
     }
 
@@ -652,45 +770,34 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         final MemberInfo memberInfo;
 
         /**
-         * True if the request should be retried if it fails with {@link 
Errors#STALE_MEMBER_EPOCH}.
+         * Time until which the request should be retried if it fails with 
retriable
+         * errors. If not present, the request is triggered without waiting 
for a response or
+         * retrying.
          */
-        boolean retryOnStaleEpoch;
+        private final Optional<Long> expirationTimeMs;
+
+        /**
+         * True if the request expiration time has been reached. This is set 
when validating the
+         * request expiration on {@link #poll(long)} before sending it. It is 
used to know if a
+         * request should be retried on TimeoutException.
+         */
+        boolean isExpired;
 
         RetriableRequestState(LogContext logContext, String owner, long 
retryBackoffMs,
-                              long retryBackoffMaxMs, MemberInfo memberInfo, 
boolean retryOnStaleEpoch) {
+                              long retryBackoffMaxMs, MemberInfo memberInfo, 
Optional<Long> expirationTimeMs) {
             super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
             this.memberInfo = memberInfo;
-            this.retryOnStaleEpoch = retryOnStaleEpoch;
+            this.expirationTimeMs = expirationTimeMs;
         }
 
         // Visible for testing
         RetriableRequestState(LogContext logContext, String owner, long 
retryBackoffMs, int retryBackoffExpBase,
-                              long retryBackoffMaxMs, double jitter, 
MemberInfo memberInfo,
-                              boolean retryOnStaleEpoch) {
+                              long retryBackoffMaxMs, double jitter, 
MemberInfo memberInfo, Optional<Long> expirationTimeMs) {
             super(logContext, owner, retryBackoffMs, retryBackoffExpBase, 
retryBackoffMaxMs, jitter);
             this.memberInfo = memberInfo;
-            this.retryOnStaleEpoch = retryOnStaleEpoch;
-        }
-
-        /**
-         * Retry with backoff if the request failed with {@link 
Errors#STALE_MEMBER_EPOCH} and
-         * the member has valid epoch.
-         *
-         * @return True if the request has been enqueued to be retried with 
the latest member ID
-         * and epoch.
-         */
-        boolean maybeRetryWithNewMemberEpoch(long currentTimeMs, Errors 
responseError) {
-            if (retryOnStaleEpoch && memberInfo.memberEpoch.isPresent()) {
-                // Request failed with invalid epoch, but the member has a 
valid one, so
-                // retry the request with the latest ID/epoch.
-                maybeRetry(currentTimeMs, responseError.exception());
-                return true;
-            }
-            return false;
+            this.expirationTimeMs = expirationTimeMs;
         }
 
-        abstract void maybeRetry(long currentTimeMs, Throwable throwable);
-
         /**
          * @return String containing the request name and arguments, to be 
used for logging
          * purposes.
@@ -702,6 +809,19 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
          */
         abstract CompletableFuture<?> future();
 
+        /**
+         * Complete the request future with a TimeoutException if the request 
timeout has been
+         * reached, based on the provided current time.
+         */
+        void maybeExpire(long currentTimeMs) {
+            if (retryTimeoutExpired(currentTimeMs)) {
+                removeRequest();
+                isExpired = true;
+                future().completeExceptionally(new 
TimeoutException(requestDescription() +
+                    " could not complete before timeout expired."));
+            }
+        }
+
         /**
          * Build request with the given builder, including response handling 
logic.
          */
@@ -719,26 +839,29 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
 
         private void handleClientResponse(final ClientResponse response,
                                           final Throwable error,
-                                          final long currentTimeMs) {
+                                          final long requestCompletionTimeMs) {
             try {
                 if (error == null) {
                     onResponse(response);
                 } else {
                     log.debug("{} completed with error", requestDescription(), 
error);
-                    handleCoordinatorDisconnect(error, currentTimeMs);
-                    if (error instanceof RetriableException) {
-                        maybeRetry(currentTimeMs, error);
-                    } else {
-                        future().completeExceptionally(error);
-                    }
+                    onFailedAttempt(requestCompletionTimeMs);
+                    handleCoordinatorDisconnect(error, 
requestCompletionTimeMs);
+                    future().completeExceptionally(error);
                 }
             } catch (Throwable t) {
-                log.error("Unexpected error handling response for ", 
requestDescription(), t);
+                log.error("Unexpected error handling response for {}", 
requestDescription(), t);
                 future().completeExceptionally(t);
             }
         }
 
         abstract void onResponse(final ClientResponse response);
+
+        boolean retryTimeoutExpired(long currentTimeMs) {
+            return expirationTimeMs.isPresent() && expirationTimeMs.get() <= 
currentTimeMs;
+        }
+
+        abstract void removeRequest();
     }
 
     class OffsetFetchRequestState extends RetriableRequestState {
@@ -748,12 +871,11 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
          */
         public final Set<TopicPartition> requestedPartitions;
 
-        private final CompletableFuture<Map<TopicPartition, 
OffsetAndMetadata>> future;
-
         /**
-         * Time until which the request should be retried if it fails with 
retriable errors.
+         * Future with the result of the request. This can be reset using 
{@link #resetFuture()}
+         * to get a new result when the request is retried.
          */
-        private final long expirationTimeMs;
+        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
future;
 
         public OffsetFetchRequestState(final Set<TopicPartition> partitions,
                                        final long retryBackoffMs,
@@ -761,10 +883,9 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                                        final long expirationTimeMs,
                                        final MemberInfo memberInfo) {
             super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs,
-                retryBackoffMaxMs, memberInfo, true);
+                retryBackoffMaxMs, memberInfo, Optional.of(expirationTimeMs));
             this.requestedPartitions = partitions;
             this.future = new CompletableFuture<>();
-            this.expirationTimeMs = expirationTimeMs;
         }
 
         public OffsetFetchRequestState(final Set<TopicPartition> partitions,
@@ -774,10 +895,9 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                                        final double jitter,
                                        final MemberInfo memberInfo) {
             super(logContext, CommitRequestManager.class.getSimpleName(), 
retryBackoffMs, 2,
-                retryBackoffMaxMs, jitter, memberInfo, true);
+                retryBackoffMaxMs, jitter, memberInfo, 
Optional.of(expirationTimeMs));
             this.requestedPartitions = partitions;
             this.future = new CompletableFuture<>();
-            this.expirationTimeMs = expirationTimeMs;
         }
 
         public boolean sameRequest(final OffsetFetchRequestState request) {
@@ -829,28 +949,22 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         private void onFailure(final long currentTimeMs,
                                final Errors responseError) {
             log.debug("Offset fetch failed: {}", responseError.message());
+            onFailedAttempt(currentTimeMs);
             if (responseError == COORDINATOR_LOAD_IN_PROGRESS) {
-                maybeRetry(currentTimeMs, responseError.exception());
+                future.completeExceptionally(responseError.exception());
             } else if (responseError == Errors.UNKNOWN_MEMBER_ID) {
                 log.error("OffsetFetch failed with {} because the member is 
not part of the group" +
                     " anymore.", responseError);
                 future.completeExceptionally(responseError.exception());
             } else if (responseError == Errors.STALE_MEMBER_EPOCH) {
-                if (maybeRetryWithNewMemberEpoch(currentTimeMs, 
responseError)) {
-                    log.debug("OffsetFetch failed with {} but the consumer is 
still part" +
-                        " of the group, so the request will be retried with 
the latest " +
-                        "member ID and epoch.", responseError);
-                    return;
-                }
                 log.error("OffsetFetch failed with {} and the consumer is not 
part " +
                     "of the group anymore (it probably left the group, got 
fenced" +
                     " or failed). The request cannot be retried and will 
fail.", responseError);
                 future.completeExceptionally(responseError.exception());
-            } else if (responseError == Errors.NOT_COORDINATOR
-                || responseError == Errors.COORDINATOR_NOT_AVAILABLE) {
+            } else if (responseError == Errors.NOT_COORDINATOR || 
responseError == Errors.COORDINATOR_NOT_AVAILABLE) {
                 // Re-discover the coordinator and retry
                 coordinatorRequestManager.markCoordinatorUnknown("error 
response " + responseError.name(), currentTimeMs);
-                maybeRetry(currentTimeMs, responseError.exception());
+                future.completeExceptionally(responseError.exception());
             } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
                 
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
             } else {
@@ -860,21 +974,6 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             }
         }
 
-        /**
-         * Enqueue the request to be retried with exponential backoff if the 
time has not expired.
-         * This will fail the request future if the request is not retried.
-         */
-        @Override
-        void maybeRetry(long currentTimeMs, Throwable throwable) {
-            if (isExpired(currentTimeMs)) {
-                future.completeExceptionally(throwable);
-                return;
-            }
-            onFailedAttempt(currentTimeMs);
-            pendingRequests.inflightOffsetFetches.remove(this);
-            pendingRequests.addOffsetFetchRequest(this);
-        }
-
         @Override
         String requestDescription() {
             return "OffsetFetch request for partitions " + requestedPartitions;
@@ -885,25 +984,23 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             return future;
         }
 
-        private boolean isExpired(final long currentTimeMs) {
-            return expirationTimeMs <= currentTimeMs;
+        void resetFuture() {
+            future = new CompletableFuture<>();
         }
 
-        /**
-         * Complete the request future with a TimeoutException if the request 
expired. No action
-         * taken if the request is still active.
-         *
-         * @return True if the request expired.
-         */
-        private boolean maybeExpire(final long currentTimeMs) {
-            if (isExpired(currentTimeMs)) {
-                future.completeExceptionally(new TimeoutException("OffsetFetch 
request could not " +
-                    "complete before timeout expired."));
-                return true;
+        @Override
+        void removeRequest() {
+            if (!unsentOffsetFetchRequests().remove(this)) {
+                log.warn("OffsetFetch request to remove not found in the 
outbound buffer: {}", this);
             }
-            return false;
         }
 
+        /**
+         * Handle OffsetFetch response that has no group level errors. This 
will look for
+         * partition level errors and fail the future accordingly, also 
recording a failed request
+         * attempt. If no partition level errors are found, this will complete 
the future with the
+         * offsets contained in the response, and record a successful request 
attempt.
+         */
         private void onSuccess(final long currentTimeMs,
                                final OffsetFetchResponse response) {
             Set<String> unauthorizedTopics = null;
@@ -915,6 +1012,7 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 TopicPartition tp = entry.getKey();
                 OffsetFetchResponse.PartitionData partitionData = 
entry.getValue();
                 if (partitionData.hasError()) {
+                    onFailedAttempt(currentTimeMs);
                     Errors error = partitionData.error;
                     log.debug("Failed to fetch offset for partition {}: {}", 
tp, error.message());
 
@@ -954,8 +1052,10 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                         ", this could be either " +
                         "transactional offsets waiting for completion, or " +
                         "normal offsets waiting for replication after 
appending to local log", unstableTxnOffsetTopicPartitions);
-                maybeRetry(currentTimeMs, new 
UnstableOffsetCommitException("There are unstable offsets for the requested 
topic partitions"));
+                future.completeExceptionally(new 
UnstableOffsetCommitException("There are " +
+                    "unstable offsets for the requested topic partitions"));
             } else {
+                onSuccessfulAttempt(currentTimeMs);
                 future.complete(offsets);
             }
         }
@@ -1003,51 +1103,16 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
             return !unsentOffsetCommits.isEmpty() || 
!unsentOffsetFetches.isEmpty();
         }
 
-        OffsetCommitRequestState addOffsetCommitRequest(
-            final Map<TopicPartition, OffsetAndMetadata> offsets,
-            final Optional<Long> expirationTimeMs,
-            final boolean retryOnStaleEpoch) {
-            // TODO: Dedupe committing the same offsets to the same partitions
-            OffsetCommitRequestState requestState = createOffsetCommitRequest(
-                offsets,
-                jitter,
-                expirationTimeMs,
-                retryOnStaleEpoch);
-            return addOffsetCommitRequest(requestState);
-        }
-
+        /**
+         * Add a commit request to the queue, so that it's sent out on the 
next call to
+         * {@link #poll(long)}. This is used from all commits (sync, async, 
auto-commit).
+         */
         OffsetCommitRequestState 
addOffsetCommitRequest(OffsetCommitRequestState request) {
             log.debug("Enqueuing OffsetCommit request for offsets: {}", 
request.offsets);
             unsentOffsetCommits.add(request);
             return request;
         }
 
-        OffsetCommitRequestState createOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets,
-                                                           final 
OptionalDouble jitter,
-                                                           final 
Optional<Long> expirationTimeMs,
-                                                           final boolean 
retryOnStaleEpoch) {
-            return jitter.isPresent() ?
-                new OffsetCommitRequestState(
-                    offsets,
-                    groupId,
-                    groupInstanceId,
-                    expirationTimeMs,
-                    retryBackoffMs,
-                    retryBackoffMaxMs,
-                    jitter.getAsDouble(),
-                    memberInfo,
-                    retryOnStaleEpoch) :
-                new OffsetCommitRequestState(
-                    offsets,
-                    groupId,
-                    groupInstanceId,
-                    expirationTimeMs,
-                    retryBackoffMs,
-                    retryBackoffMaxMs,
-                    memberInfo,
-                    retryOnStaleEpoch);
-        }
-
         /**
          * <p>Adding an offset fetch request to the outgoing buffer.  If the 
same request was made, we chain the future
          * to the existing one.
@@ -1065,37 +1130,11 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
                 log.info("Duplicated OffsetFetchRequest: " + 
request.requestedPartitions);
                 dupe.orElseGet(() -> 
inflight.get()).chainFuture(request.future);
             } else {
-                // remove the request from the outbound buffer: 
inflightOffsetFetches
-                request.future.whenComplete((r, t) -> {
-                    if (!inflightOffsetFetches.remove(request)) {
-                        log.warn("A duplicated, inflight, request was 
identified, but unable to find it in the " +
-                                "outbound buffer:" + request);
-                    }
-                });
                 this.unsentOffsetFetches.add(request);
             }
             return request.future;
         }
 
-        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
addOffsetFetchRequest(final Set<TopicPartition> partitions,
-                                                                               
                 final long expirationTimeMs) {
-            OffsetFetchRequestState request = jitter.isPresent() ?
-                    new OffsetFetchRequestState(
-                            partitions,
-                            retryBackoffMs,
-                            retryBackoffMaxMs,
-                            expirationTimeMs,
-                            jitter.getAsDouble(),
-                            memberInfo) :
-                    new OffsetFetchRequestState(
-                            partitions,
-                            retryBackoffMs,
-                            retryBackoffMaxMs,
-                            expirationTimeMs,
-                            memberInfo);
-            return addOffsetFetchRequest(request);
-        }
-
         /**
          * Clear {@code unsentOffsetCommits} and moves all the sendable 
request in {@code
          * unsentOffsetFetches} to the {@code inflightOffsetFetches} to 
bookkeep all the inflight
@@ -1147,7 +1186,8 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
          * futures with a TimeoutException.
          */
         private void failAndRemoveExpiredCommitRequests(final long 
currentTimeMs) {
-            unsentOffsetCommits.removeIf(req -> 
req.maybeExpire(currentTimeMs));
+            Queue<OffsetCommitRequestState> requestsToPurge = new 
LinkedList<>(unsentOffsetCommits);
+            requestsToPurge.forEach(req -> req.maybeExpire(currentTimeMs));
         }
 
         /**
@@ -1155,7 +1195,8 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
          * futures with a TimeoutException.
          */
         private void failAndRemoveExpiredFetchRequests(final long 
currentTimeMs) {
-            unsentOffsetFetches.removeIf(req -> 
req.maybeExpire(currentTimeMs));
+            Queue<OffsetFetchRequestState> requestsToPurge = new 
LinkedList<>(unsentOffsetFetches);
+            requestsToPurge.forEach(req -> req.maybeExpire(currentTimeMs));
         }
 
         private void clearAll() {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
index 6556495e55b..dd035506d4b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java
@@ -837,12 +837,10 @@ public class MembershipManagerImpl implements 
MembershipManager {
         // best effort to commit the offsets in the case where the epoch might 
have changed while
         // the current reconciliation is in process. Note this is using the 
rebalance timeout as
         // it is the limit enforced by the broker to complete the 
reconciliation process.
-        commitResult = commitRequestManager.maybeAutoCommitAllConsumedNow(
-            Optional.of(getExpirationTimeForTimeout(rebalanceTimeoutMs)),
-            true);
+        commitResult = 
commitRequestManager.maybeAutoCommitSyncNow(getExpirationTimeForTimeout(rebalanceTimeoutMs));
 
         // Execute commit -> onPartitionsRevoked -> onPartitionsAssigned.
-        commitResult.whenComplete((commitReqResult, commitReqError) -> {
+        commitResult.whenComplete((__, commitReqError) -> {
             if (commitReqError != null) {
                 // The call to commit, that includes retry logic for retriable 
errors, failed to
                 // complete within the time boundaries (fatal error or 
retriable that did not
@@ -887,9 +885,6 @@ public class MembershipManagerImpl implements 
MembershipManager {
             revocationResult.thenCompose(__ -> {
                 boolean memberHasRejoined = memberEpochOnReconciliationStart 
!= memberEpoch;
                 if (state == MemberState.RECONCILING && !memberHasRejoined) {
-                    // Reschedule the auto commit starting from now that 
member has a new assignment.
-                    commitRequestManager.resetAutoCommitTimer();
-
                     // Apply assignment
                     return assignPartitions(assignedTopicIdPartitions, 
addedPartitions);
                 } else {
@@ -916,6 +911,9 @@ public class MembershipManagerImpl implements 
MembershipManager {
             } else {
                 boolean memberHasRejoined = memberEpochOnReconciliationStart 
!= memberEpoch;
                 if (state == MemberState.RECONCILING && !memberHasRejoined) {
+                    // Reschedule the auto commit starting from now that the 
member has a new assignment.
+                    commitRequestManager.resetAutoCommitTimer();
+
                     // Make assignment effective on the broker by 
transitioning to send acknowledge.
                     transitionTo(MemberState.ACKNOWLEDGING);
                 } else {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index 4396df27853..ac7ccc56c55 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -24,7 +24,7 @@ import java.util.Objects;
 public abstract class ApplicationEvent {
 
     public enum Type {
-        COMMIT, POLL, FETCH_COMMITTED_OFFSETS, NEW_TOPICS_METADATA_UPDATE, 
ASSIGNMENT_CHANGE,
+        COMMIT_ASYNC, COMMIT_SYNC, POLL, FETCH_COMMITTED_OFFSETS, 
NEW_TOPICS_METADATA_UPDATE, ASSIGNMENT_CHANGE,
         LIST_OFFSETS, RESET_POSITIONS, VALIDATE_POSITIONS, TOPIC_METADATA, 
SUBSCRIPTION_CHANGE,
         UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
         COMMIT_ON_CLOSE, LEAVE_ON_CLOSE
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 24a7acf39a9..9e48b4de6da 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -32,7 +32,6 @@ import org.slf4j.Logger;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
@@ -69,8 +68,12 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
     @Override
     public void process(ApplicationEvent event) {
         switch (event.type()) {
-            case COMMIT:
-                process((CommitApplicationEvent) event);
+            case COMMIT_ASYNC:
+                process((AsyncCommitApplicationEvent) event);
+                return;
+
+            case COMMIT_SYNC:
+                process((SyncCommitApplicationEvent) event);
                 return;
 
             case POLL:
@@ -139,18 +142,17 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         requestManagers.heartbeatRequestManager.ifPresent(hrm -> 
hrm.resetPollTimer(event.pollTimeMs()));
     }
 
-    private void process(final CommitApplicationEvent event) {
-        if (!requestManagers.commitRequestManager.isPresent()) {
-            // Leaving this error handling here, but it is a bit strange as 
the commit API should enforce the group.id
-            // upfront, so we should never get to this block.
-            Exception exception = new KafkaException("Unable to commit offset. 
Most likely because the group.id wasn't set");
-            event.future().completeExceptionally(exception);
-            return;
-        }
+    private void process(final AsyncCommitApplicationEvent event) {
+        CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
+        CompletableFuture<Void> commitResult = 
manager.commitAsync(event.offsets());
+        event.chain(commitResult);
+    }
 
+    private void process(final SyncCommitApplicationEvent event) {
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        Optional<Long> expirationTimeMs = 
event.retryTimeoutMs().map(this::getExpirationTimeForTimeout);
-        event.chain(manager.addOffsetCommitRequest(event.offsets(), 
expirationTimeMs, false));
+        long expirationTimeoutMs = 
getExpirationTimeForTimeout(event.retryTimeoutMs());
+        CompletableFuture<Void> commitResult = 
manager.commitSync(event.offsets(), expirationTimeoutMs);
+        event.chain(commitResult);
     }
 
     private void process(final FetchCommittedOffsetsApplicationEvent event) {
@@ -161,7 +163,7 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         }
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
         long expirationTimeMs = getExpirationTimeForTimeout(event.timeout());
-        event.chain(manager.addOffsetFetchRequest(event.partitions(), 
expirationTimeMs));
+        event.chain(manager.fetchOffsets(event.partitions(), 
expirationTimeMs));
     }
 
     private void process(final NewTopicsMetadataUpdateRequestEvent ignored) {
@@ -179,7 +181,7 @@ public class ApplicationEventProcessor extends 
EventProcessor<ApplicationEvent>
         }
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
         manager.updateAutoCommitTimer(event.currentTimeMs());
-        manager.maybeAutoCommitAllConsumedAsync();
+        manager.maybeAutoCommitAsync();
     }
 
     private void process(final ListOffsetsApplicationEvent event) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitApplicationEvent.java
new file mode 100644
index 00000000000..7a939ce3cfd
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncCommitApplicationEvent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import java.util.Map;
+
+/**
+ * Event to commit offsets without waiting for a response, so the request 
won't be retried.
+ */
+public class AsyncCommitApplicationEvent extends CommitApplicationEvent {
+
+    public AsyncCommitApplicationEvent(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
+        super(offsets, Type.COMMIT_ASYNC);
+    }
+
+    @Override
+    public String toString() {
+        return "AsyncCommitApplicationEvent{" +
+            toStringBase() +
+            ", offsets=" + offsets() +
+            '}';
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
index d2205227c49..69d969d7b0f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CommitApplicationEvent.java
@@ -21,32 +21,17 @@ import org.apache.kafka.common.TopicPartition;
 
 import java.util.Collections;
 import java.util.Map;
-import java.util.Optional;
 
-public class CommitApplicationEvent extends CompletableApplicationEvent<Void> {
+public abstract class CommitApplicationEvent extends 
CompletableApplicationEvent<Void> {
 
     /**
      * Offsets to commit per partition.
      */
     private final Map<TopicPartition, OffsetAndMetadata> offsets;
 
-    /**
-     * Time to wait for a response, retrying on retriable errors. If not 
present, the request is
-     * triggered without waiting for a response or being retried.
-     */
-    private final Optional<Long> retryTimeoutMs;
-
-    /**
-     * Create new event to commit offsets. If timer is present, the request 
will be retried on
-     * retriable errors until the timer expires (sync commit offsets request). 
If the timer is
-     * not present, the request will be sent without waiting for a response of 
retrying (async
-     * commit offsets request).
-     */
-    public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets,
-                                  final Optional<Long> retryTimeoutMs) {
-        super(Type.COMMIT);
+    public CommitApplicationEvent(final Map<TopicPartition, OffsetAndMetadata> 
offsets, Type type) {
+        super(type);
         this.offsets = Collections.unmodifiableMap(offsets);
-        this.retryTimeoutMs = retryTimeoutMs;
 
         for (OffsetAndMetadata offsetAndMetadata : offsets.values()) {
             if (offsetAndMetadata.offset() < 0) {
@@ -59,10 +44,6 @@ public class CommitApplicationEvent extends 
CompletableApplicationEvent<Void> {
         return offsets;
     }
 
-    public Optional<Long> retryTimeoutMs() {
-        return retryTimeoutMs;
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
@@ -80,13 +61,4 @@ public class CommitApplicationEvent extends 
CompletableApplicationEvent<Void> {
         result = 31 * result + offsets.hashCode();
         return result;
     }
-
-    @Override
-    public String toString() {
-        return "CommitApplicationEvent{" +
-                toStringBase() +
-                ", offsets=" + offsets +
-                ", retryTimeout=" + (retryTimeoutMs.map(t -> t + 
"ms").orElse("none")) +
-                '}';
-    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitApplicationEvent.java
new file mode 100644
index 00000000000..43dfee6ab18
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SyncCommitApplicationEvent.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import java.util.Map;
+
+/**
+ * Event to commit offsets waiting for a response and retrying on expected 
retriable errors until
+ * the timer expires.
+ */
+public class SyncCommitApplicationEvent extends CommitApplicationEvent {
+
+    /**
+     * Time to wait for a response, retrying on retriable errors.
+     */
+    private final long retryTimeoutMs;
+
+    public SyncCommitApplicationEvent(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
+                                      final long retryTimeoutMs) {
+        super(offsets, Type.COMMIT_SYNC);
+        this.retryTimeoutMs = retryTimeoutMs;
+    }
+
+    public Long retryTimeoutMs() {
+        return retryTimeoutMs;
+    }
+
+    @Override
+    public String toString() {
+        return "SyncCommitApplicationEvent{" +
+            toStringBase() +
+            ", offsets=" + offsets() +
+            ", retryTimeout=" + retryTimeoutMs + "ms" +
+            '}';
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 57546951982..2db666e95e9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -32,8 +32,8 @@ import 
org.apache.kafka.clients.consumer.RetriableCommitFailedException;
 import org.apache.kafka.clients.consumer.RoundRobinAssignor;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.AsyncCommitApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CommitOnCloseApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableBackgroundEvent;
@@ -48,6 +48,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdat
 import org.apache.kafka.clients.consumer.internals.events.PollApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SyncCommitApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.UnsubscribeApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
 import org.apache.kafka.common.KafkaException;
@@ -252,9 +253,9 @@ public class AsyncKafkaConsumerTest {
 
         consumer.commitAsync(offsets, null);
 
-        final ArgumentCaptor<CommitApplicationEvent> commitEventCaptor = 
ArgumentCaptor.forClass(CommitApplicationEvent.class);
+        final ArgumentCaptor<AsyncCommitApplicationEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitApplicationEvent.class);
         verify(applicationEventHandler).add(commitEventCaptor.capture());
-        final CommitApplicationEvent commitEvent = 
commitEventCaptor.getValue();
+        final AsyncCommitApplicationEvent commitEvent = 
commitEventCaptor.getValue();
         assertEquals(offsets, commitEvent.offsets());
         assertDoesNotThrow(() -> commitEvent.future().complete(null));
         assertDoesNotThrow(() -> consumer.commitAsync(offsets, null));
@@ -266,7 +267,7 @@ public class AsyncKafkaConsumerTest {
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("my-topic", 1), new 
OffsetAndMetadata(200L));
-        completeCommitApplicationEventSuccessfully();
+        completeCommitAsyncApplicationEventSuccessfully();
 
         MockCommitCallback callback = new MockCommitCallback();
         assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
@@ -283,7 +284,7 @@ public class AsyncKafkaConsumerTest {
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("my-topic", 1), new 
OffsetAndMetadata(200L));
-        completeCommitApplicationEventExceptionally(exception);
+        completeCommitAsyncApplicationEventExceptionally(exception);
 
         MockCommitCallback callback = new MockCommitCallback();
         assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
@@ -306,9 +307,9 @@ public class AsyncKafkaConsumerTest {
 
         assertDoesNotThrow(() -> consumer.commitAsync(offsets, callback));
 
-        final ArgumentCaptor<CommitApplicationEvent> commitEventCaptor = 
ArgumentCaptor.forClass(CommitApplicationEvent.class);
+        final ArgumentCaptor<AsyncCommitApplicationEvent> commitEventCaptor = 
ArgumentCaptor.forClass(AsyncCommitApplicationEvent.class);
         verify(applicationEventHandler).add(commitEventCaptor.capture());
-        final CommitApplicationEvent commitEvent = 
commitEventCaptor.getValue();
+        final AsyncCommitApplicationEvent commitEvent = 
commitEventCaptor.getValue();
         
commitEvent.future().completeExceptionally(Errors.FENCED_INSTANCE_ID.exception());
 
         assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> 
consumer.commitAsync());
@@ -433,7 +434,7 @@ public class AsyncKafkaConsumerTest {
         sortedPartitions.add(tp);
         CompletableBackgroundEvent<Void> e = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, 
sortedPartitions);
         backgroundEventQueue.add(e);
-        completeCommitApplicationEventSuccessfully();
+        completeCommitSyncApplicationEventSuccessfully();
 
         ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
             @Override
@@ -478,7 +479,7 @@ public class AsyncKafkaConsumerTest {
         consumer = newConsumer();
         final String currentThread = Thread.currentThread().getName();
         MockCommitCallback callback = new MockCommitCallback();
-        completeCommitApplicationEventSuccessfully();
+        completeCommitAsyncApplicationEventSuccessfully();
 
         assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), 
callback));
         forceCommitCallbackInvocation();
@@ -515,7 +516,7 @@ public class AsyncKafkaConsumerTest {
         HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new 
HashMap<>();
         topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, 
Optional.of(2), ""));
         topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, 
Optional.of(1), ""));
-        completeCommitApplicationEventSuccessfully();
+        completeCommitSyncApplicationEventSuccessfully();
 
         consumer.assign(Arrays.asList(t0, t1));
 
@@ -523,7 +524,7 @@ public class AsyncKafkaConsumerTest {
 
         verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
         verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
-        
verify(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class));
+        
verify(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitApplicationEvent.class));
     }
 
     @Test
@@ -557,14 +558,14 @@ public class AsyncKafkaConsumerTest {
 
         verify(metadata).updateLastSeenEpochIfNewer(t0, 2);
         verify(metadata).updateLastSeenEpochIfNewer(t1, 1);
-        
verify(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class));
+        
verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitApplicationEvent.class));
     }
 
     @Test
     public void testEnsurePollExecutedCommitAsyncCallbacks() {
         consumer = newConsumer();
         MockCommitCallback callback = new MockCommitCallback();
-        completeCommitApplicationEventSuccessfully();
+        completeCommitAsyncApplicationEventSuccessfully();
         
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
         completeFetchedCommittedOffsetApplicationEventSuccessfully(mkMap());
 
@@ -579,7 +580,7 @@ public class AsyncKafkaConsumerTest {
     public void testEnsureShutdownExecutedCommitAsyncCallbacks() {
         consumer = newConsumer();
         MockCommitCallback callback = new MockCommitCallback();
-        completeCommitApplicationEventSuccessfully();
+        completeCommitAsyncApplicationEventSuccessfully();
         assertDoesNotThrow(() -> consumer.commitAsync(new HashMap<>(), 
callback));
         assertMockCommitCallbackInvoked(() -> consumer.close(),
             callback,
@@ -670,7 +671,7 @@ public class AsyncKafkaConsumerTest {
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
         subscriptions.seek(new TopicPartition("topic", 0), 100);
         consumer.maybeAutoCommitSync(true, time.timer(100), null);
-        verify(applicationEventHandler).add(any(CommitApplicationEvent.class));
+        
verify(applicationEventHandler).add(any(SyncCommitApplicationEvent.class));
     }
 
     @Test
@@ -688,7 +689,7 @@ public class AsyncKafkaConsumerTest {
         subscriptions.assignFromSubscribed(singleton(new 
TopicPartition("topic", 0)));
         subscriptions.seek(new TopicPartition("topic", 0), 100);
         consumer.maybeAutoCommitSync(false, time.timer(100), null);
-        verify(applicationEventHandler, 
never()).add(any(CommitApplicationEvent.class));
+        verify(applicationEventHandler, 
never()).add(any(SyncCommitApplicationEvent.class));
     }
 
     private void assertMockCommitCallbackInvoked(final Executable task,
@@ -893,7 +894,7 @@ public class AsyncKafkaConsumerTest {
 
         consumer = newConsumer(props);
         assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
-        completeCommitApplicationEventSuccessfully();
+        completeCommitSyncApplicationEventSuccessfully();
 
         consumer.close(Duration.ZERO);
 
@@ -909,7 +910,7 @@ public class AsyncKafkaConsumerTest {
 
         consumer = newConsumer(props);
         assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
-        completeCommitApplicationEventSuccessfully();
+        completeCommitSyncApplicationEventSuccessfully();
 
         consumer.commitSync(mockTopicPartitionOffset());
 
@@ -925,7 +926,7 @@ public class AsyncKafkaConsumerTest {
         consumer = newConsumer(props);
         assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
         KafkaException expected = new KafkaException("Test exception");
-        completeCommitApplicationEventExceptionally(expected);
+        completeCommitSyncApplicationEventExceptionally(expected);
 
         KafkaException actual = assertThrows(KafkaException.class, () -> 
consumer.commitSync(mockTopicPartitionOffset()));
         assertEquals(expected, actual);
@@ -941,7 +942,7 @@ public class AsyncKafkaConsumerTest {
         consumer = newConsumer(props);
         assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
 
-        completeCommitApplicationEventSuccessfully();
+        completeCommitAsyncApplicationEventSuccessfully();
         consumer.commitAsync(mockTopicPartitionOffset(), new 
MockCommitCallback());
         assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
 
@@ -957,7 +958,7 @@ public class AsyncKafkaConsumerTest {
 
         consumer = newConsumer(props);
         assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
-        completeCommitApplicationEventExceptionally(new KafkaException("Test 
exception"));
+        completeCommitAsyncApplicationEventExceptionally(new 
KafkaException("Test exception"));
 
         consumer.commitAsync(mockTopicPartitionOffset(), new 
MockCommitCallback());
         assertEquals(0, MockConsumerInterceptor.ON_COMMIT_COUNT.get());
@@ -1542,20 +1543,36 @@ public class AsyncKafkaConsumerTest {
         return timestampToSearch;
     }
 
-    private void completeCommitApplicationEventExceptionally(Exception ex) {
+    private void completeCommitAsyncApplicationEventExceptionally(Exception 
ex) {
         doAnswer(invocation -> {
-            CommitApplicationEvent event = invocation.getArgument(0);
+            AsyncCommitApplicationEvent event = invocation.getArgument(0);
             event.future().completeExceptionally(ex);
             return null;
-        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class));
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitApplicationEvent.class));
     }
 
-    private void completeCommitApplicationEventSuccessfully() {
+    private void completeCommitSyncApplicationEventExceptionally(Exception ex) 
{
         doAnswer(invocation -> {
-            CommitApplicationEvent event = invocation.getArgument(0);
+            SyncCommitApplicationEvent event = invocation.getArgument(0);
+            event.future().completeExceptionally(ex);
+            return null;
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitApplicationEvent.class));
+    }
+
+    private void completeCommitAsyncApplicationEventSuccessfully() {
+        doAnswer(invocation -> {
+            AsyncCommitApplicationEvent event = invocation.getArgument(0);
+            event.future().complete(null);
+            return null;
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitApplicationEvent.class));
+    }
+
+    private void completeCommitSyncApplicationEventSuccessfully() {
+        doAnswer(invocation -> {
+            SyncCommitApplicationEvent event = invocation.getArgument(0);
             event.future().complete(null);
             return null;
-        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(CommitApplicationEvent.class));
+        
}).when(applicationEventHandler).add(ArgumentMatchers.isA(SyncCommitApplicationEvent.class));
     }
 
     private void 
completeFetchedCommittedOffsetApplicationEventSuccessfully(final 
Map<TopicPartition, OffsetAndMetadata> committedOffsets) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index 17745fa8660..c27494d69a0 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -63,6 +63,7 @@ import java.util.OptionalDouble;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -78,13 +79,16 @@ import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER
 import static org.apache.kafka.test.TestUtils.assertFutureThrows;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -126,7 +130,7 @@ public class CommitRequestManagerTest {
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
-        commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), 
false);
+        commitRequestManger.commitAsync(offsets);
         assertPoll(false, 0, commitRequestManger);
     }
 
@@ -137,7 +141,7 @@ public class CommitRequestManagerTest {
 
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
         offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
-        commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), 
false);
+        commitRequestManger.commitAsync(offsets);
         assertPoll(1, commitRequestManger);
     }
 
@@ -177,13 +181,11 @@ public class CommitRequestManagerTest {
         offsets2.put(new TopicPartition("test", 4), new 
OffsetAndMetadata(20L));
 
         // Add the requests to the CommitRequestManager and store their futures
-        ArrayList<CompletableFuture<Void>> commitFutures = new ArrayList<>();
-        ArrayList<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
fetchFutures = new ArrayList<>();
         long expirationTimeMs  = time.milliseconds() + defaultApiTimeoutMs;
-        commitFutures.add(commitManager.addOffsetCommitRequest(offsets1, 
Optional.of(expirationTimeMs), false));
-        
fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new 
TopicPartition("test", 0)), expirationTimeMs));
-        commitFutures.add(commitManager.addOffsetCommitRequest(offsets2, 
Optional.of(expirationTimeMs), false));
-        
fetchFutures.add(commitManager.addOffsetFetchRequest(Collections.singleton(new 
TopicPartition("test", 1)), expirationTimeMs));
+        commitManager.commitSync(offsets1, expirationTimeMs);
+        commitManager.fetchOffsets(Collections.singleton(new 
TopicPartition("test", 0)), expirationTimeMs);
+        commitManager.commitSync(offsets2, expirationTimeMs);
+        commitManager.fetchOffsets(Collections.singleton(new 
TopicPartition("test", 1)), expirationTimeMs);
 
         // Poll the CommitRequestManager and verify that the 
inflightOffsetFetches size is correct
         NetworkClientDelegate.PollResult result = 
commitManager.poll(time.milliseconds());
@@ -195,9 +197,16 @@ public class CommitRequestManagerTest {
         assertFalse(commitManager.pendingRequests.hasUnsentRequests());
         assertEquals(2, 
commitManager.pendingRequests.inflightOffsetFetches.size());
 
+        // Complete requests with a response
+        result.unsentRequests.forEach(req -> {
+            if (req.requestBuilder() instanceof OffsetFetchRequest.Builder) {
+                req.handler().onComplete(buildOffsetFetchClientResponse(req, 
Collections.emptySet(), Errors.NONE));
+            } else {
+                req.handler().onComplete(buildOffsetCommitClientResponse(new 
OffsetCommitResponse(0, new HashMap<>())));
+            }
+        });
+
         // Verify that the inflight offset fetch requests have been removed 
from the pending request buffer
-        commitFutures.forEach(f -> f.complete(null));
-        fetchFutures.forEach(f -> f.complete(null));
         assertEquals(0, 
commitManager.pendingRequests.inflightOffsetFetches.size());
     }
 
@@ -208,7 +217,7 @@ public class CommitRequestManagerTest {
         Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(
             new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
-        commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), 
false);
+        commitRequestManger.commitAsync(offsets);
         assertEquals(1, 
commitRequestManger.unsentOffsetCommitRequests().size());
         assertEquals(1, 
commitRequestManger.poll(time.milliseconds()).unsentRequests.size());
         assertTrue(commitRequestManger.unsentOffsetCommitRequests().isEmpty());
@@ -255,24 +264,19 @@ public class CommitRequestManagerTest {
                 Errors.NONE));
     }
 
-    // This is the case of the sync auto commit sent when the consumer is 
being closed (sync commit
-    // that should be retried until it succeeds, fails, or timer expires).
+    // This is the case of the sync commit triggered from an API call to 
commitSync or when the
+    // consumer is being closed. It should be retried until it succeeds, 
fails, or timer expires.
     @ParameterizedTest
     @MethodSource("offsetCommitExceptionSupplier")
-    public void 
testAutoCommitSyncRetriedAfterExpectedRetriableException(Errors error) {
-        long commitInterval = retryBackoffMs * 2;
-        CommitRequestManager commitRequestManger = create(true, 
commitInterval);
-        TopicPartition tp = new TopicPartition("topic", 1);
-        subscriptionState.assignFromUser(Collections.singleton(tp));
-        subscriptionState.seek(tp, 100);
+    public void testCommitSyncRetriedAfterExpectedRetriableException(Errors 
error) {
+        CommitRequestManager commitRequestManger = create(false, 100);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
-        time.sleep(commitInterval);
-        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
 
-        // Auto-commit all consume sync (ex. triggered when the consumer is 
closed).
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(
+            new TopicPartition("topic", 1),
+            new OffsetAndMetadata(0));
         long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Void> commitResult =
-            
commitRequestManger.maybeAutoCommitAllConsumedNow(Optional.of(expirationTimeMs),
 false);
+        CompletableFuture<Void> commitResult = 
commitRequestManger.commitSync(offsets, expirationTimeMs);
         
sendAndVerifyOffsetCommitRequestFailedAndMaybeRetried(commitRequestManger, 
error, commitResult);
 
         // We expect that request should have been retried on this sync commit.
@@ -292,7 +296,7 @@ public class CommitRequestManagerTest {
 
         // Send sync offset commit that fails and verify it propagates the 
expected exception.
         Long expirationTimeMs = time.milliseconds() + retryBackoffMs;
-        CompletableFuture<Void> commitResult = 
commitRequestManger.addOffsetCommitRequest(offsets, 
Optional.of(expirationTimeMs), false);
+        CompletableFuture<Void> commitResult = 
commitRequestManger.commitSync(offsets, expirationTimeMs);
         completeOffsetCommitRequestWithError(commitRequestManger, commitError);
         assertFutureThrows(commitResult, expectedException);
     }
@@ -309,36 +313,25 @@ public class CommitRequestManagerTest {
     }
 
     @Test
-    public void 
testOffsetCommitFailsWithCommitFailedExceptionIfUnknownMemberId() {
-        long commitInterval = retryBackoffMs * 2;
-        CommitRequestManager commitRequestManger = create(true, 
commitInterval);
-        TopicPartition tp = new TopicPartition("topic", 1);
-        subscriptionState.assignFromUser(Collections.singleton(tp));
-        subscriptionState.seek(tp, 100);
+    public void 
testCommitSyncFailsWithCommitFailedExceptionIfUnknownMemberId() {
+        CommitRequestManager commitRequestManger = create(false, 100);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
-        time.sleep(commitInterval);
-        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
 
-        // Auto-commit all consume sync (ex. triggered when the consumer is 
closed).
+        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(
+            new TopicPartition("topic", 1),
+            new OffsetAndMetadata(0));
         long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Void> commitResult =
-            
commitRequestManger.maybeAutoCommitAllConsumedNow(Optional.of(expirationTimeMs),
 false);
+        CompletableFuture<Void> commitResult = 
commitRequestManger.commitSync(offsets, expirationTimeMs);
 
         completeOffsetCommitRequestWithError(commitRequestManger, 
Errors.UNKNOWN_MEMBER_ID);
         NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
-        // Commit should fail with CommitFailedException
         assertTrue(commitResult.isDone());
         assertFutureThrows(commitResult, CommitFailedException.class);
     }
 
-    /**
-     * This is the case where a request to commit offsets is performed without 
retrying on
-     * STALE_MEMBER_EPOCH (ex. commits triggered from the consumer API, 
auto-commits triggered on
-     * the interval). The expectation is that the request should fail with 
CommitFailedException.
-     */
     @Test
-    public void 
testOffsetCommitFailsWithCommitFailedExceptionIfStaleMemberEpochNotRetried() {
+    public void 
testCommitSyncFailsWithCommitFailedExceptionOnStaleMemberEpoch() {
         CommitRequestManager commitRequestManger = create(true, 100);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
 
@@ -347,8 +340,8 @@ public class CommitRequestManagerTest {
             new OffsetAndMetadata(0));
 
         // Send commit request expected to be retried on retriable errors
-        CompletableFuture<Void> commitResult = 
commitRequestManger.addOffsetCommitRequest(
-            offsets, Optional.of(time.milliseconds() + defaultApiTimeoutMs), 
false);
+        CompletableFuture<Void> commitResult = commitRequestManger.commitSync(
+            offsets, time.milliseconds() + defaultApiTimeoutMs);
         completeOffsetCommitRequestWithError(commitRequestManger, 
Errors.STALE_MEMBER_EPOCH);
         NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
@@ -359,29 +352,35 @@ public class CommitRequestManagerTest {
     }
 
     /**
-     * This is the case of the async auto commit request triggered on the 
interval. The
-     * expectation is that the request should fail with 
RetriableCommitFailedException, so that
-     * the timer is reset and the request is retried on the next interval.
+     * This is the case of the async auto commit request triggered on the 
interval. The request
+     * internally fails with the fatal stale epoch error, and the expectation 
is that it just
+     * resets the commit timer to the interval, to attempt again when the 
interval expires.
      */
     @Test
-    public void testCommitAsyncFailsWithRetriableOnStaleMemberEpoch() {
-        CommitRequestManager commitRequestManger = create(false, 100);
+    public void 
testAutoCommitAsyncFailsWithStaleMemberEpochContinuesToCommitOnTheInterval() {
+        CommitRequestManager commitRequestManger = create(true, 100);
+        time.sleep(100);
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+        TopicPartition t1p = new TopicPartition("topic1", 0);
+        subscriptionState.assignFromUser(singleton(t1p));
+        subscriptionState.seek(t1p, 10);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(
-            new TopicPartition("topic", 1),
-            new OffsetAndMetadata(0));
-
-        // Async commit that won't be retried.
-        CompletableFuture<Void> commitResult = 
commitRequestManger.addOffsetCommitRequest(
-            offsets, Optional.empty(), true);
+        // Async commit on the interval fails with fatal stale epoch and just 
resets the timer to
+        // the interval
+        commitRequestManger.maybeAutoCommitAsync();
         completeOffsetCommitRequestWithError(commitRequestManger, 
Errors.STALE_MEMBER_EPOCH);
+        verify(commitRequestManger).resetAutoCommitTimer();
+
+        // Async commit retried, only when the interval expires
         NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
-        assertEquals(0, res.unsentRequests.size());
+        assertEquals(0, res.unsentRequests.size(), "No request should be 
generated until the " +
+            "interval expires");
+        time.sleep(100);
+        commitRequestManger.updateAutoCommitTimer(time.milliseconds());
+        res = commitRequestManger.poll(time.milliseconds());
+        assertEquals(1, res.unsentRequests.size());
 
-        // Commit should fail with RetriableCommitFailedException.
-        assertTrue(commitResult.isDone());
-        assertFutureThrows(commitResult, RetriableCommitFailedException.class);
     }
 
     @Test
@@ -394,8 +393,7 @@ public class CommitRequestManagerTest {
             new OffsetAndMetadata(0));
 
         // Async commit that won't be retried.
-        CompletableFuture<Void> commitResult = 
commitRequestManager.addOffsetCommitRequest(
-            offsets, Optional.empty(), true);
+        CompletableFuture<Void> commitResult = 
commitRequestManager.commitAsync(offsets);
 
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -473,11 +471,10 @@ public class CommitRequestManagerTest {
         CommitRequestManager commitRequestManger = create(true, 100);
         time.sleep(100);
         commitRequestManger.updateAutoCommitTimer(time.milliseconds());
-        CompletableFuture<Void> result = 
commitRequestManger.maybeAutoCommitAllConsumedAsync();
-
+        // CompletableFuture<Void> result = 
commitRequestManger.maybeAutoCommitAllConsumedAsync();
+        commitRequestManger.maybeAutoCommitAsync();
         
assertTrue(commitRequestManger.pendingRequests.unsentOffsetCommits.isEmpty());
-        assertTrue(result.isDone());
-        assertFalse(result.isCompletedExceptionally());
+        verify(commitRequestManger).resetAutoCommitTimer();
     }
 
     @Test
@@ -489,18 +486,17 @@ public class CommitRequestManagerTest {
         // Auto-commit of empty offsets
         time.sleep(100);
         commitRequestManger.updateAutoCommitTimer(time.milliseconds());
-        CompletableFuture<Void> result = 
commitRequestManger.maybeAutoCommitAllConsumedAsync();
-        assertTrue(result.isDone());
-        assertFalse(result.isCompletedExceptionally());
+        commitRequestManger.maybeAutoCommitAsync();
 
         // Next auto-commit consumed offsets (not empty). Should generate a 
request, ensuring
         // that the previous auto-commit of empty did not leave the inflight 
request flag on
         subscriptionState.seek(t1p, 100);
         time.sleep(100);
         commitRequestManger.updateAutoCommitTimer(time.milliseconds());
-        result = commitRequestManger.maybeAutoCommitAllConsumedAsync();
-        
assertFalse(commitRequestManger.pendingRequests.unsentOffsetCommits.isEmpty());
-        assertFalse(result.isDone());
+        commitRequestManger.maybeAutoCommitAsync();
+        assertEquals(1, 
commitRequestManger.pendingRequests.unsentOffsetCommits.size());
+
+        verify(commitRequestManger, times(2)).resetAutoCommitTimer();
     }
 
     @Test
@@ -534,7 +530,7 @@ public class CommitRequestManagerTest {
         List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = sendAndVerifyDuplicatedOffsetFetchRequests(
             commitRequestManger,
             partitions,
-            5,
+            1,
             error);
         // we only want to make sure to purge the outbound buffer for 
non-retriables, so retriable will be re-queued.
         if (isRetriable)
@@ -545,6 +541,49 @@ public class CommitRequestManagerTest {
         }
     }
 
+    @Test
+    public void testSuccessfulOffsetFetch() {
+        CommitRequestManager commitManager = create(false, 100);
+        
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchResult =
+            commitManager.fetchOffsets(Collections.singleton(new 
TopicPartition("test", 0)),
+                expirationTimeMs);
+
+        // Send fetch request
+        NetworkClientDelegate.PollResult result = 
commitManager.poll(time.milliseconds());
+        assertEquals(1, result.unsentRequests.size());
+        assertEquals(1, 
commitManager.pendingRequests.inflightOffsetFetches.size());
+        assertFalse(fetchResult.isDone());
+
+        // Complete request with a response
+        TopicPartition tp = new TopicPartition("topic1", 0);
+        long expectedOffset = 100;
+        NetworkClientDelegate.UnsentRequest req = result.unsentRequests.get(0);
+        Map<TopicPartition, OffsetFetchResponse.PartitionData> 
topicPartitionData =
+            Collections.singletonMap(
+                tp,
+                new OffsetFetchResponse.PartitionData(expectedOffset, 
Optional.of(1), "", Errors.NONE));
+        req.handler().onComplete(buildOffsetFetchClientResponse(req, 
topicPartitionData, Errors.NONE, false));
+
+        // Validate request future completes with the response received
+        assertTrue(fetchResult.isDone());
+        assertFalse(fetchResult.isCompletedExceptionally());
+        Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = null;
+        try {
+            offsetsAndMetadata = fetchResult.get();
+        } catch (InterruptedException | ExecutionException e) {
+            fail(e);
+        }
+        assertNotNull(offsetsAndMetadata);
+        assertEquals(1, offsetsAndMetadata.size());
+        assertTrue(offsetsAndMetadata.containsKey(tp));
+        assertEquals(expectedOffset, offsetsAndMetadata.get(tp).offset());
+        assertEquals(0, 
commitManager.pendingRequests.inflightOffsetFetches.size(), "Inflight " +
+            "request should be removed from the queue when a response is 
received.");
+    }
+
     @ParameterizedTest
     @MethodSource("offsetFetchRetriableCoordinatorErrors")
     public void 
testOffsetFetchMarksCoordinatorUnknownOnRetriableCoordinatorErrors(Errors error,
@@ -556,7 +595,7 @@ public class CommitRequestManagerTest {
         partitions.add(new TopicPartition("t1", 0));
 
         long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
commitRequestManager.addOffsetFetchRequest(partitions, expirationTimeMs);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
commitRequestManager.fetchOffsets(partitions, expirationTimeMs);
 
         completeOffsetFetchRequestWithError(commitRequestManager, partitions, 
error);
 
@@ -583,7 +622,7 @@ public class CommitRequestManagerTest {
         partitions.add(new TopicPartition("t1", 0));
 
         long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
-        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
commitRequestManger.addOffsetFetchRequest(partitions, expirationTimeMs);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
commitRequestManger.fetchOffsets(partitions, expirationTimeMs);
 
         NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -595,8 +634,7 @@ public class CommitRequestManagerTest {
         assertFalse(result.isDone());
         assertCoordinatorDisconnect();
 
-        // Request should be retried after backoff expires
-        time.sleep(100);
+        time.sleep(retryBackoffMs);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
         res = commitRequestManger.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -612,8 +650,8 @@ public class CommitRequestManagerTest {
         Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
 
-        // Send commit request without expiration (async commit not expected 
to be retried).
-        CompletableFuture<Void> commitResult = 
commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false);
+        // Send async commit (not expected to be retried).
+        CompletableFuture<Void> commitResult = 
commitRequestManger.commitAsync(offsets);
         completeOffsetCommitRequestWithError(commitRequestManger, error);
         NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
@@ -638,7 +676,7 @@ public class CommitRequestManagerTest {
 
         // Send sync offset commit request that fails with retriable error.
         Long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2;
-        CompletableFuture<Void> commitResult = 
commitRequestManger.addOffsetCommitRequest(offsets, 
Optional.of(expirationTimeMs), false);
+        CompletableFuture<Void> commitResult = 
commitRequestManger.commitSync(offsets, expirationTimeMs);
         completeOffsetCommitRequestWithError(commitRequestManger, 
Errors.REQUEST_TIMED_OUT);
 
         // Request retried after backoff, and fails with retriable again. 
Should not complete yet
@@ -670,7 +708,7 @@ public class CommitRequestManagerTest {
 
         // Send offset commit request that fails with retriable error.
         long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2;
-        CompletableFuture<Void> commitResult = 
commitRequestManger.addOffsetCommitRequest(offsets, 
Optional.of(expirationTimeMs), false);
+        CompletableFuture<Void> commitResult = 
commitRequestManger.commitSync(offsets, expirationTimeMs);
         completeOffsetCommitRequestWithError(commitRequestManger, 
Errors.COORDINATOR_NOT_AVAILABLE);
 
         // Sleep to expire the request timeout. Request should fail on the 
next poll with a
@@ -696,7 +734,7 @@ public class CommitRequestManagerTest {
 
         // Send async commit request that fails with retriable error (not 
expected to be retried).
         Errors retriableError = Errors.COORDINATOR_NOT_AVAILABLE;
-        CompletableFuture<Void> commitResult = 
commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false);
+        CompletableFuture<Void> commitResult = 
commitRequestManger.commitAsync(offsets);
         completeOffsetCommitRequestWithError(commitRequestManger, 
retriableError);
         NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
         assertEquals(0, res.unsentRequests.size());
@@ -710,7 +748,6 @@ public class CommitRequestManagerTest {
         assertFutureThrows(commitResult, RetriableCommitFailedException.class);
     }
 
-
     @Test
     public void testEnsureBackoffRetryOnOffsetCommitRequestTimeout() {
         CommitRequestManager commitRequestManger = create(true, 100);
@@ -720,7 +757,7 @@ public class CommitRequestManagerTest {
             new OffsetAndMetadata(0));
 
         long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
-        commitRequestManger.addOffsetCommitRequest(offsets, 
Optional.of(expirationTimeMs), false);
+        commitRequestManger.commitSync(offsets, expirationTimeMs);
         NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
         res.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new 
TimeoutException());
@@ -802,7 +839,7 @@ public class CommitRequestManagerTest {
 
         // Send request that is expected to fail with invalid epoch.
         long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
-        commitRequestManager.addOffsetFetchRequest(partitions, 
expirationTimeMs);
+        commitRequestManager.fetchOffsets(partitions, expirationTimeMs);
 
         // Mock member has new a valid epoch.
         int newEpoch = 8;
@@ -842,7 +879,7 @@ public class CommitRequestManagerTest {
         // Send request that is expected to fail with invalid epoch.
         long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
requestResult =
-            commitRequestManager.addOffsetFetchRequest(partitions, 
expirationTimeMs);
+            commitRequestManager.fetchOffsets(partitions, expirationTimeMs);
 
         // Mock member not having a valid epoch anymore (left/failed/fenced).
         commitRequestManager.onMemberEpochUpdated(Optional.empty(), 
Optional.empty());
@@ -862,40 +899,58 @@ public class CommitRequestManagerTest {
     // STALE_MEMBER_EPOCH (ex. triggered from the reconciliation process), 
fails with invalid
     // member epoch. The expectation is that if the member already has a new 
epoch, the request
     // should be retried with the new epoch.
-    @Test
-    public void testOffsetCommitFailsWithStaleEpochAndRetriesWithNewEpoch() {
-        CommitRequestManager commitRequestManager = create(true, 100);
+    @ParameterizedTest
+    @MethodSource("offsetCommitExceptionSupplier")
+    public void 
testAutoCommitSyncBeforeRevocationRetriesOnRetriableAndStaleEpoch(Errors error) 
{
+        // Enable auto-commit but with very long interval to avoid triggering 
auto-commits on the
+        // interval and just test the auto-commits triggered before revocation
+        CommitRequestManager commitRequestManager = create(true, 
Integer.MAX_VALUE);
         
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(new TopicPartition("topic", 1),
-            new OffsetAndMetadata(0));
+        TopicPartition tp = new TopicPartition("topic", 1);
+        subscriptionState.assignFromUser(singleton(tp));
+        subscriptionState.seek(tp, 5);
+        long expirationTimeMs = time.milliseconds() + retryBackoffMs * 2;
 
-        // Send commit request expected to be retried on STALE_MEMBER_EPOCH 
error while it does not expire.
-        long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
-        commitRequestManager.addOffsetCommitRequest(offsets, 
Optional.of(expirationTimeMs), true);
+        // Send commit request expected to be retried on STALE_MEMBER_EPOCH 
error while it does not expire
+        commitRequestManager.maybeAutoCommitSyncNow(expirationTimeMs);
 
-        // Mock member has new a valid epoch.
         int newEpoch = 8;
         String memberId = "member1";
-        commitRequestManager.onMemberEpochUpdated(Optional.of(newEpoch), 
Optional.of(memberId));
-
-        completeOffsetCommitRequestWithError(commitRequestManager, 
Errors.STALE_MEMBER_EPOCH);
-
-        // Check that the request that failed was removed from the inflight 
requests buffer.
-        assertEquals(0, 
commitRequestManager.pendingRequests.inflightOffsetFetches.size());
-        assertEquals(1, 
commitRequestManager.pendingRequests.unsentOffsetCommits.size());
-
-        // Request should be retried with backoff.
-        NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
-        assertEquals(0, res.unsentRequests.size());
-        time.sleep(retryBackoffMs);
-        res = commitRequestManager.poll(time.milliseconds());
-        assertEquals(1, res.unsentRequests.size());
+        if (error == Errors.STALE_MEMBER_EPOCH) {
+            // Mock member has new a valid epoch
+            commitRequestManager.onMemberEpochUpdated(Optional.of(newEpoch), 
Optional.of(memberId));
+        }
 
-        // The retried request should include the latest member ID and epoch.
-        OffsetCommitRequestData reqData = (OffsetCommitRequestData) 
res.unsentRequests.get(0).requestBuilder().build().data();
-        assertEquals(newEpoch, reqData.generationIdOrMemberEpoch());
-        assertEquals(memberId, reqData.memberId());
+        completeOffsetCommitRequestWithError(commitRequestManager, error);
+
+        if (error.exception() instanceof RetriableException || error == 
Errors.STALE_MEMBER_EPOCH) {
+            assertEquals(1, 
commitRequestManager.pendingRequests.unsentOffsetCommits.size(),
+                "Request to be retried should be added to the outbound queue");
+
+            // Request should be retried with backoff
+            NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
+            assertEquals(0, res.unsentRequests.size());
+            time.sleep(retryBackoffMs);
+            res = commitRequestManager.poll(time.milliseconds());
+            assertEquals(1, res.unsentRequests.size());
+            if (error == Errors.STALE_MEMBER_EPOCH) {
+                // The retried request should include the latest member ID and 
epoch
+                OffsetCommitRequestData reqData = (OffsetCommitRequestData) 
res.unsentRequests.get(0).requestBuilder().build().data();
+                assertEquals(newEpoch, reqData.generationIdOrMemberEpoch());
+                assertEquals(memberId, reqData.memberId());
+            }
+        } else {
+            assertEquals(0, 
commitRequestManager.pendingRequests.unsentOffsetCommits.size(),
+                "Non-retriable failed request should be removed from the 
outbound queue");
+
+            // Request should not be retried, even after the backoff expires
+            NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
+            assertEquals(0, res.unsentRequests.size());
+            time.sleep(retryBackoffMs);
+            res = commitRequestManager.poll(time.milliseconds());
+            assertEquals(0, res.unsentRequests.size());
+        }
     }
 
     @Test
@@ -920,7 +975,7 @@ public class CommitRequestManagerTest {
                 new OffsetAndMetadata(0));
 
         long commitCreationTimeMs = time.milliseconds();
-        commitRequestManager.addOffsetCommitRequest(offsets, Optional.empty(), 
true);
+        commitRequestManager.commitAsync(offsets);
 
         NetworkClientDelegate.PollResult res = 
commitRequestManager.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -1029,7 +1084,7 @@ public class CommitRequestManagerTest {
         partitions.add(tp2);
         long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
         CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future =
-                commitRequestManger.addOffsetFetchRequest(partitions, 
expirationTimeMs);
+                commitRequestManger.fetchOffsets(partitions, expirationTimeMs);
 
         NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -1058,7 +1113,7 @@ public class CommitRequestManagerTest {
         Map<TopicPartition, OffsetAndMetadata> offsets = 
Collections.singletonMap(new TopicPartition("topic", 1),
             new OffsetAndMetadata(0));
 
-        commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), 
false);
+        commitRequestManger.commitAsync(offsets);
         commitRequestManger.signalClose();
         NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
         assertEquals(1, res.unsentRequests.size());
@@ -1089,7 +1144,7 @@ public class CommitRequestManagerTest {
         List<CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>> 
futures = new ArrayList<>();
         long expirationTimeMs = time.milliseconds() + defaultApiTimeoutMs;
         for (int i = 0; i < numRequest; i++) {
-            futures.add(commitRequestManger.addOffsetFetchRequest(partitions, 
expirationTimeMs));
+            futures.add(commitRequestManger.fetchOffsets(partitions, 
expirationTimeMs));
         }
 
         NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index 4ff652ff493..a491df417de 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -21,11 +21,13 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
-import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.AsyncCommitApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
+import org.apache.kafka.clients.consumer.internals.events.PollApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.SyncCommitApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
 import org.apache.kafka.common.Node;
@@ -135,7 +137,7 @@ public class ConsumerNetworkThreadTest {
 
     @Test
     public void testApplicationEvent() {
-        ApplicationEvent e = new CommitApplicationEvent(new HashMap<>(), 
Optional.empty());
+        ApplicationEvent e = new PollApplicationEvent(100);
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
         verify(applicationEventProcessor, times(1)).process(e);
@@ -150,11 +152,19 @@ public class ConsumerNetworkThreadTest {
     }
 
     @Test
-    public void testCommitEvent() {
-        ApplicationEvent e = new CommitApplicationEvent(new HashMap<>(), 
Optional.empty());
+    public void testAsyncCommitEvent() {
+        ApplicationEvent e = new AsyncCommitApplicationEvent(new HashMap<>());
         applicationEventsQueue.add(e);
         consumerNetworkThread.runOnce();
-        
verify(applicationEventProcessor).process(any(CommitApplicationEvent.class));
+        
verify(applicationEventProcessor).process(any(AsyncCommitApplicationEvent.class));
+    }
+
+    @Test
+    public void testSyncCommitEvent() {
+        ApplicationEvent e = new SyncCommitApplicationEvent(new HashMap<>(), 
100L);
+        applicationEventsQueue.add(e);
+        consumerNetworkThread.runOnce();
+        
verify(applicationEventProcessor).process(any(SyncCommitApplicationEvent.class));
     }
 
     @Test
@@ -209,7 +219,7 @@ public class ConsumerNetworkThreadTest {
         verify(networkClient, times(1)).poll(anyLong(), anyLong());
         verify(commitRequestManager, 
times(1)).updateAutoCommitTimer(currentTimeMs);
         // Assignment change should generate an async commit (not retried).
-        verify(commitRequestManager, 
times(1)).maybeAutoCommitAllConsumedAsync();
+        verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
     }
 
     @Test
@@ -273,8 +283,8 @@ public class ConsumerNetworkThreadTest {
         coordinatorRequestManager.markCoordinatorUnknown("test", 
time.milliseconds());
         
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"group-id", node));
         prepareOffsetCommitRequest(new HashMap<>(), Errors.NONE, false);
-        CompletableApplicationEvent<Void> event1 = spy(new 
CommitApplicationEvent(Collections.emptyMap(), Optional.empty()));
-        ApplicationEvent event2 = new 
CommitApplicationEvent(Collections.emptyMap(), Optional.empty());
+        CompletableApplicationEvent<Void> event1 = spy(new 
AsyncCommitApplicationEvent(Collections.emptyMap()));
+        ApplicationEvent event2 = new 
AsyncCommitApplicationEvent(Collections.emptyMap());
         CompletableFuture<Void> future = new CompletableFuture<>();
         when(event1.future()).thenReturn(future);
         applicationEventsQueue.add(event1);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
index 2d8ef9397f3..f5c65d58bfc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.java
@@ -69,6 +69,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doNothing;
@@ -1922,6 +1923,7 @@ public class MembershipManagerImplTest {
         Map<Uuid, SortedSet<Integer>> assignmentByTopicId = 
assignmentByTopicId(expectedAssignment);
         assertEquals(assignmentByTopicId, 
membershipManager.currentAssignment());
 
+        // The auto-commit interval should be reset (only once), when the 
reconciliation completes
         verify(commitRequestManager).resetAutoCommitTimer();
     }
 
@@ -1947,7 +1949,7 @@ public class MembershipManagerImplTest {
         if (withAutoCommit) {
             when(commitRequestManager.autoCommitEnabled()).thenReturn(true);
             CompletableFuture<Void> commitResult = new CompletableFuture<>();
-            when(commitRequestManager.maybeAutoCommitAllConsumedNow(any(), 
anyBoolean())).thenReturn(commitResult);
+            
when(commitRequestManager.maybeAutoCommitSyncNow(anyLong())).thenReturn(commitResult);
             return commitResult;
         } else {
             return CompletableFuture.completedFuture(null);

Reply via email to