lucasbru commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1494961008


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1344,8 +1343,8 @@ public void commitSync(Map<TopicPartition, 
OffsetAndMetadata> offsets, Duration
         long commitStart = time.nanoseconds();
         try {
             Timer requestTimer = time.timer(timeout.toMillis());
-            // Commit with a timer to control how long the request should be 
retried until it
-            // gets a successful response or non-retriable error.
+            // Commit with a retry timeout (the commit request will be retried 
until it gets a
+            // successful response, non-retriable error, or the timeout 
expires)
             CompletableFuture<Void> commitFuture = commit(offsets, true, 
Optional.of(timeout.toMillis()));

Review Comment:
   While you are at it, I think we can remove the "isWakeupable" parameter and 
just set the  `wakeUpTrigger` in the calling context.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -149,8 +148,14 @@ private void process(final CommitApplicationEvent event) {
         }
 
         CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-        Optional<Long> expirationTimeMs = 
event.retryTimeoutMs().map(this::getExpirationTimeForTimeout);
-        event.chain(manager.addOffsetCommitRequest(event.offsets(), 
expirationTimeMs, false));
+        CompletableFuture<Void> commitResult;
+        if (event.retryTimeoutMs().isPresent()) {

Review Comment:
   It's a bit weird that we use `retryTimeoutMs` to carry the information that 
this is a sync-commit vs asynccommit.
   
   How about going all-in here and just having `AsyncCommitApplicationEvent` 
and a `SyncCommitApplicationEvent`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? 
extends RequestState> request
     }
 
     /**
-     * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
-     * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
-     * request if there is no other commit request already in-flight, and if 
the commit interval
-     * has elapsed.
+     * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+     * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+     * offsets to commit, no request will be generated and a completed future 
will be returned.
      *
-     * @param offsets           Offsets to commit
-     * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
-     *                          fails with a retriable error. If not present, 
the request will be
-     *                          sent but not retried.
-     * @param checkInterval     True if the auto-commit interval expiration 
should be checked for
-     *                          sending a request. If true, the request will 
be sent only if the
-     *                          auto-commit interval has expired. Pass false to
-     *                          send the auto-commit request regardless of the 
interval (ex.
-     *                          auto-commit before rebalance).
-     * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
-     *                          {@link Errors#STALE_MEMBER_EPOCH}.
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * @param requestState Commit request
+     * @return Future containing the offsets that were committed, or an error 
if the request
+     * failed.
      */
-    private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                                    final Optional<Long> 
expirationTimeMs,
-                                                    boolean checkInterval,
-                                                    boolean retryOnStaleEpoch) 
{
-        if (!autoCommitEnabled()) {
-            log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-            return CompletableFuture.completedFuture(null);
-        }
-
+    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
         AutoCommitState autocommit = autoCommitState.get();
-        if (checkInterval && !autocommit.shouldAutoCommit()) {
-            return CompletableFuture.completedFuture(null);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result;
+        if (requestState.offsets.isEmpty()) {
+            result = CompletableFuture.completedFuture(Collections.emptyMap());
+        } else {
+            autocommit.setInflightCommitStatus(true);
+            OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+            result = request.future;
+            result.whenComplete(autoCommitCallback(request.offsets));
         }
-
-        autocommit.resetTimer();
-        autocommit.setInflightCommitStatus(true);
-        CompletableFuture<Void> result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-            .whenComplete(autoCommitCallback(offsets));
         return result;
     }
 
     /**
-     * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
-     * partitions and their current positions. Note on auto-commit timers: 
this will reset the
-     * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
-     * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
-     * failed with a retriable error.
-     *
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+     * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+     * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+     * <p/>
+     * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+     * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+     * the next commit will be generated when the interval expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() {
-        if (!autoCommitEnabled()) {
-            // Early return to ensure that no action/logging is performed.
-            return CompletableFuture.completedFuture(null);
+    public void maybeAutoCommitAsync() {
+        if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
+            OffsetCommitRequestState requestState = createOffsetCommitRequest(
+                subscriptions.allConsumed(),
+                Optional.empty());
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
requestAutoCommit(requestState);
+            // Reset timer to the interval (even if no request was generated), 
but ensure that if
+            // the request completes with a retriable error, the timer is 
reset to send the next
+            // auto-commit after the backoff expires.
+            resetAutoCommitTimer();
+            maybeResetTimerWithBackoff(result);
         }
-        Map<TopicPartition, OffsetAndMetadata> offsets = 
subscriptions.allConsumed();
-        CompletableFuture<Void> result = maybeAutoCommit(offsets, 
Optional.empty(), true, true);
-        result.whenComplete((__, error) -> {
+    }
+
+    /**
+     * Reset auto-commit timer to retry with backoff if the future failed with 
a RetriableCommitFailedException.
+     */
+    private void maybeResetTimerWithBackoff(final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
+        result.whenComplete((offsets, error) -> {
             if (error != null) {
                 if (error instanceof RetriableCommitFailedException) {
                     log.debug("Asynchronous auto-commit of offsets {} failed 
due to retriable error.", offsets, error);
                     resetAutoCommitTimer(retryBackoffMs);
                 } else {
-                    log.warn("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
+                    log.debug("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
                 }
             } else {
                 log.debug("Completed asynchronous auto-commit of offsets {}", 
offsets);
             }
         });
-
-        return result;
     }
 
     /**
-     * Commit consumed offsets if auto-commit is enabled. Retry while the 
timer is not expired,
-     * until the request succeeds or fails with a fatal error.
+     * Commit consumed offsets if auto-commit is enabled, regardless of the 
auto-commit interval.
+     * This is used for committing offsets before revoking partitions. This 
will retry committing
+     * the latest offsets until the request succeeds, fails with a fatal 
error, or the timeout
+     * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as 
a retriable error,
+     * and will retry it including the latest member ID and epoch received 
from the broker.
+     *
+     * @return Future that will complete when the offsets are successfully 
committed. It will
+     * complete exceptionally if the commit fails with a non-retriable error, 
or if the retry
+     * timeout expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedNow(
-        final Optional<Long> expirationTimeMs,
-        final boolean retryOnStaleEpoch) {
-        return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, 
false, retryOnStaleEpoch);
+    public CompletableFuture<Void> maybeAutoCommitSyncNow(final long 
retryExpirationTimeMs) {
+        if (!autoCommitEnabled()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        OffsetCommitRequestState requestState =
+            createOffsetCommitRequest(subscriptions.allConsumed(), 
Optional.of(retryExpirationTimeMs));
+        maybeAutoCommitSyncNowWithRetries(requestState, result);
+        return result;
+    }
+
+    private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState 
requestAttempt,
+                                                   CompletableFuture<Void> 
result) {
+        requestAttempt.resetFuture();
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAttempt = requestAutoCommit(requestAttempt);
+        commitAttempt.whenComplete((committedOffsets, error) -> {
+            if (error == null) {
+                result.complete(null);
+            } else {
+                if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
+                    if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+                        log.debug("Auto-commit sync timed out and won't be 
retried anymore");
+                        result.completeExceptionally(error);
+                    } else {
+                        // Make sure the auto-commit is retries with the 
latest offsets
+                        requestAttempt.offsets = subscriptions.allConsumed();
+                        maybeAutoCommitSyncNowWithRetries(requestAttempt, 
result);
+                    }
+                } else {
+                    log.debug("Auto-commit sync failed with non-retriable 
error", error);
+                    result.completeExceptionally(error);
+                }
+            }
+        });
     }
 
-    private BiConsumer<? super Void, ? super Throwable> 
autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> 
allConsumedOffsets) {
+    /**
+     * Clear the inflight auto-commit flag and log auto-commit completion 
status.
+     */
+    private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super 
Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> 
allConsumedOffsets) {
         return (response, throwable) -> {
             autoCommitState.ifPresent(autoCommitState -> 
autoCommitState.setInflightCommitStatus(false));
             if (throwable == null) {
                 
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets);
-                log.debug("Completed asynchronous auto-commit of offsets {}", 
allConsumedOffsets);
+                log.debug("Completed auto-commit of offsets {}", 
allConsumedOffsets);
             } else if (throwable instanceof RetriableCommitFailedException) {
-                log.debug("Asynchronous auto-commit of offsets {} failed due 
to retriable error: {}",
+                log.debug("Auto-commit of offsets {} failed due to retriable 
error: {}",
                         allConsumedOffsets, throwable.getMessage());
             } else {
-                log.warn("Asynchronous auto-commit of offsets {} failed", 
allConsumedOffsets, throwable);
+                log.warn("Auto-commit of offsets {} failed", 
allConsumedOffsets, throwable);
             }
         };
     }
 
     /**
-     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
-     * {@link OffsetCommitRequestState} and enqueue it to send later.
+     * Generate a request to commit offsets without retrying, even if it fails 
with a retriable
+     * error. The generated request will be added to the queue to be sent on 
the next call to
+     * {@link #poll(long)}.
+     *
+     * @param offsets Offsets to commit per partition.
+     * @return Future that will complete when a response is received, 
successfully or
+     * exceptionally depending on the response. If the request fails with a 
retriable error, the
+     * future will be completed with a {@link RetriableCommitFailedException}.
      */
-    public CompletableFuture<Void> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets,
-                                                          final Optional<Long> 
expirationTimeMs,
-                                                          final boolean 
retryOnStaleEpoch) {
+    public CompletableFuture<Void> commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
         if (offsets.isEmpty()) {
             log.debug("Skipping commit of empty offsets");
             return CompletableFuture.completedFuture(null);
         }
-        return pendingRequests.addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch).future;
+        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(offsets, Optional.empty());
+        pendingRequests.addOffsetCommitRequest(commitRequest);
+
+        CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>();
+        commitRequest.future.whenComplete((committedOffsets, error) -> {
+            if (error != null) {
+                Throwable asyncCommitException;

Review Comment:
   For `commitSync` you have `commitSyncExceptionForError`, so maybe we want to 
extract this into `commitAsyncExceptionForError`? Symmetry ftw



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? 
extends RequestState> request
     }
 
     /**
-     * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
-     * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
-     * request if there is no other commit request already in-flight, and if 
the commit interval
-     * has elapsed.
+     * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+     * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+     * offsets to commit, no request will be generated and a completed future 
will be returned.
      *
-     * @param offsets           Offsets to commit
-     * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
-     *                          fails with a retriable error. If not present, 
the request will be
-     *                          sent but not retried.
-     * @param checkInterval     True if the auto-commit interval expiration 
should be checked for
-     *                          sending a request. If true, the request will 
be sent only if the
-     *                          auto-commit interval has expired. Pass false to
-     *                          send the auto-commit request regardless of the 
interval (ex.
-     *                          auto-commit before rebalance).
-     * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
-     *                          {@link Errors#STALE_MEMBER_EPOCH}.
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * @param requestState Commit request
+     * @return Future containing the offsets that were committed, or an error 
if the request
+     * failed.
      */
-    private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                                    final Optional<Long> 
expirationTimeMs,
-                                                    boolean checkInterval,
-                                                    boolean retryOnStaleEpoch) 
{
-        if (!autoCommitEnabled()) {
-            log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-            return CompletableFuture.completedFuture(null);
-        }
-
+    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
         AutoCommitState autocommit = autoCommitState.get();
-        if (checkInterval && !autocommit.shouldAutoCommit()) {
-            return CompletableFuture.completedFuture(null);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result;
+        if (requestState.offsets.isEmpty()) {
+            result = CompletableFuture.completedFuture(Collections.emptyMap());
+        } else {
+            autocommit.setInflightCommitStatus(true);
+            OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+            result = request.future;
+            result.whenComplete(autoCommitCallback(request.offsets));
         }
-
-        autocommit.resetTimer();
-        autocommit.setInflightCommitStatus(true);
-        CompletableFuture<Void> result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-            .whenComplete(autoCommitCallback(offsets));
         return result;
     }
 
     /**
-     * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
-     * partitions and their current positions. Note on auto-commit timers: 
this will reset the
-     * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
-     * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
-     * failed with a retriable error.
-     *
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+     * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+     * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+     * <p/>
+     * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+     * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+     * the next commit will be generated when the interval expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() {
-        if (!autoCommitEnabled()) {
-            // Early return to ensure that no action/logging is performed.
-            return CompletableFuture.completedFuture(null);
+    public void maybeAutoCommitAsync() {
+        if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
+            OffsetCommitRequestState requestState = createOffsetCommitRequest(
+                subscriptions.allConsumed(),
+                Optional.empty());
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
requestAutoCommit(requestState);
+            // Reset timer to the interval (even if no request was generated), 
but ensure that if
+            // the request completes with a retriable error, the timer is 
reset to send the next
+            // auto-commit after the backoff expires.
+            resetAutoCommitTimer();
+            maybeResetTimerWithBackoff(result);
         }
-        Map<TopicPartition, OffsetAndMetadata> offsets = 
subscriptions.allConsumed();
-        CompletableFuture<Void> result = maybeAutoCommit(offsets, 
Optional.empty(), true, true);
-        result.whenComplete((__, error) -> {
+    }
+
+    /**
+     * Reset auto-commit timer to retry with backoff if the future failed with 
a RetriableCommitFailedException.
+     */
+    private void maybeResetTimerWithBackoff(final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
+        result.whenComplete((offsets, error) -> {
             if (error != null) {
                 if (error instanceof RetriableCommitFailedException) {
                     log.debug("Asynchronous auto-commit of offsets {} failed 
due to retriable error.", offsets, error);
                     resetAutoCommitTimer(retryBackoffMs);
                 } else {
-                    log.warn("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
+                    log.debug("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
                 }
             } else {
                 log.debug("Completed asynchronous auto-commit of offsets {}", 
offsets);
             }
         });
-
-        return result;
     }
 
     /**
-     * Commit consumed offsets if auto-commit is enabled. Retry while the 
timer is not expired,
-     * until the request succeeds or fails with a fatal error.
+     * Commit consumed offsets if auto-commit is enabled, regardless of the 
auto-commit interval.
+     * This is used for committing offsets before revoking partitions. This 
will retry committing
+     * the latest offsets until the request succeeds, fails with a fatal 
error, or the timeout
+     * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as 
a retriable error,
+     * and will retry it including the latest member ID and epoch received 
from the broker.
+     *
+     * @return Future that will complete when the offsets are successfully 
committed. It will
+     * complete exceptionally if the commit fails with a non-retriable error, 
or if the retry
+     * timeout expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedNow(
-        final Optional<Long> expirationTimeMs,
-        final boolean retryOnStaleEpoch) {
-        return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, 
false, retryOnStaleEpoch);
+    public CompletableFuture<Void> maybeAutoCommitSyncNow(final long 
retryExpirationTimeMs) {
+        if (!autoCommitEnabled()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        OffsetCommitRequestState requestState =
+            createOffsetCommitRequest(subscriptions.allConsumed(), 
Optional.of(retryExpirationTimeMs));
+        maybeAutoCommitSyncNowWithRetries(requestState, result);
+        return result;
+    }
+
+    private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState 
requestAttempt,
+                                                   CompletableFuture<Void> 
result) {
+        requestAttempt.resetFuture();

Review Comment:
   Could be moved down, as in `commitSyncWithRetries`



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? 
extends RequestState> request
     }
 
     /**
-     * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
-     * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
-     * request if there is no other commit request already in-flight, and if 
the commit interval
-     * has elapsed.
+     * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+     * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+     * offsets to commit, no request will be generated and a completed future 
will be returned.
      *
-     * @param offsets           Offsets to commit
-     * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
-     *                          fails with a retriable error. If not present, 
the request will be
-     *                          sent but not retried.
-     * @param checkInterval     True if the auto-commit interval expiration 
should be checked for
-     *                          sending a request. If true, the request will 
be sent only if the
-     *                          auto-commit interval has expired. Pass false to
-     *                          send the auto-commit request regardless of the 
interval (ex.
-     *                          auto-commit before rebalance).
-     * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
-     *                          {@link Errors#STALE_MEMBER_EPOCH}.
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * @param requestState Commit request
+     * @return Future containing the offsets that were committed, or an error 
if the request
+     * failed.
      */
-    private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                                    final Optional<Long> 
expirationTimeMs,
-                                                    boolean checkInterval,
-                                                    boolean retryOnStaleEpoch) 
{
-        if (!autoCommitEnabled()) {
-            log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-            return CompletableFuture.completedFuture(null);
-        }
-
+    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
         AutoCommitState autocommit = autoCommitState.get();
-        if (checkInterval && !autocommit.shouldAutoCommit()) {
-            return CompletableFuture.completedFuture(null);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result;
+        if (requestState.offsets.isEmpty()) {
+            result = CompletableFuture.completedFuture(Collections.emptyMap());
+        } else {
+            autocommit.setInflightCommitStatus(true);
+            OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+            result = request.future;
+            result.whenComplete(autoCommitCallback(request.offsets));
         }
-
-        autocommit.resetTimer();
-        autocommit.setInflightCommitStatus(true);
-        CompletableFuture<Void> result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-            .whenComplete(autoCommitCallback(offsets));
         return result;
     }
 
     /**
-     * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
-     * partitions and their current positions. Note on auto-commit timers: 
this will reset the
-     * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
-     * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
-     * failed with a retriable error.
-     *
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+     * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+     * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+     * <p/>
+     * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+     * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+     * the next commit will be generated when the interval expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() {
-        if (!autoCommitEnabled()) {
-            // Early return to ensure that no action/logging is performed.
-            return CompletableFuture.completedFuture(null);
+    public void maybeAutoCommitAsync() {
+        if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
+            OffsetCommitRequestState requestState = createOffsetCommitRequest(
+                subscriptions.allConsumed(),
+                Optional.empty());
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
requestAutoCommit(requestState);
+            // Reset timer to the interval (even if no request was generated), 
but ensure that if
+            // the request completes with a retriable error, the timer is 
reset to send the next
+            // auto-commit after the backoff expires.
+            resetAutoCommitTimer();
+            maybeResetTimerWithBackoff(result);
         }
-        Map<TopicPartition, OffsetAndMetadata> offsets = 
subscriptions.allConsumed();
-        CompletableFuture<Void> result = maybeAutoCommit(offsets, 
Optional.empty(), true, true);
-        result.whenComplete((__, error) -> {
+    }
+
+    /**
+     * Reset auto-commit timer to retry with backoff if the future failed with 
a RetriableCommitFailedException.
+     */
+    private void maybeResetTimerWithBackoff(final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
+        result.whenComplete((offsets, error) -> {
             if (error != null) {
                 if (error instanceof RetriableCommitFailedException) {
                     log.debug("Asynchronous auto-commit of offsets {} failed 
due to retriable error.", offsets, error);
                     resetAutoCommitTimer(retryBackoffMs);
                 } else {
-                    log.warn("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
+                    log.debug("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
                 }
             } else {
                 log.debug("Completed asynchronous auto-commit of offsets {}", 
offsets);
             }
         });
-
-        return result;
     }
 
     /**
-     * Commit consumed offsets if auto-commit is enabled. Retry while the 
timer is not expired,
-     * until the request succeeds or fails with a fatal error.
+     * Commit consumed offsets if auto-commit is enabled, regardless of the 
auto-commit interval.
+     * This is used for committing offsets before revoking partitions. This 
will retry committing
+     * the latest offsets until the request succeeds, fails with a fatal 
error, or the timeout
+     * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as 
a retriable error,
+     * and will retry it including the latest member ID and epoch received 
from the broker.
+     *
+     * @return Future that will complete when the offsets are successfully 
committed. It will
+     * complete exceptionally if the commit fails with a non-retriable error, 
or if the retry
+     * timeout expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedNow(
-        final Optional<Long> expirationTimeMs,
-        final boolean retryOnStaleEpoch) {
-        return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, 
false, retryOnStaleEpoch);
+    public CompletableFuture<Void> maybeAutoCommitSyncNow(final long 
retryExpirationTimeMs) {
+        if (!autoCommitEnabled()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        OffsetCommitRequestState requestState =
+            createOffsetCommitRequest(subscriptions.allConsumed(), 
Optional.of(retryExpirationTimeMs));
+        maybeAutoCommitSyncNowWithRetries(requestState, result);
+        return result;
+    }
+
+    private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState 
requestAttempt,
+                                                   CompletableFuture<Void> 
result) {
+        requestAttempt.resetFuture();
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAttempt = requestAutoCommit(requestAttempt);
+        commitAttempt.whenComplete((committedOffsets, error) -> {
+            if (error == null) {
+                result.complete(null);
+            } else {
+                if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
+                    if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+                        log.debug("Auto-commit sync timed out and won't be 
retried anymore");
+                        result.completeExceptionally(error);
+                    } else {
+                        // Make sure the auto-commit is retries with the 
latest offsets
+                        requestAttempt.offsets = subscriptions.allConsumed();
+                        maybeAutoCommitSyncNowWithRetries(requestAttempt, 
result);
+                    }
+                } else {
+                    log.debug("Auto-commit sync failed with non-retriable 
error", error);
+                    result.completeExceptionally(error);
+                }
+            }
+        });
     }
 
-    private BiConsumer<? super Void, ? super Throwable> 
autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> 
allConsumedOffsets) {
+    /**
+     * Clear the inflight auto-commit flag and log auto-commit completion 
status.
+     */
+    private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super 
Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> 
allConsumedOffsets) {
         return (response, throwable) -> {
             autoCommitState.ifPresent(autoCommitState -> 
autoCommitState.setInflightCommitStatus(false));
             if (throwable == null) {
                 
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets);
-                log.debug("Completed asynchronous auto-commit of offsets {}", 
allConsumedOffsets);
+                log.debug("Completed auto-commit of offsets {}", 
allConsumedOffsets);
             } else if (throwable instanceof RetriableCommitFailedException) {
-                log.debug("Asynchronous auto-commit of offsets {} failed due 
to retriable error: {}",
+                log.debug("Auto-commit of offsets {} failed due to retriable 
error: {}",
                         allConsumedOffsets, throwable.getMessage());
             } else {
-                log.warn("Asynchronous auto-commit of offsets {} failed", 
allConsumedOffsets, throwable);
+                log.warn("Auto-commit of offsets {} failed", 
allConsumedOffsets, throwable);
             }
         };
     }
 
     /**
-     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
-     * {@link OffsetCommitRequestState} and enqueue it to send later.
+     * Generate a request to commit offsets without retrying, even if it fails 
with a retriable
+     * error. The generated request will be added to the queue to be sent on 
the next call to
+     * {@link #poll(long)}.
+     *
+     * @param offsets Offsets to commit per partition.
+     * @return Future that will complete when a response is received, 
successfully or
+     * exceptionally depending on the response. If the request fails with a 
retriable error, the
+     * future will be completed with a {@link RetriableCommitFailedException}.
      */
-    public CompletableFuture<Void> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets,
-                                                          final Optional<Long> 
expirationTimeMs,
-                                                          final boolean 
retryOnStaleEpoch) {
+    public CompletableFuture<Void> commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
         if (offsets.isEmpty()) {
             log.debug("Skipping commit of empty offsets");
             return CompletableFuture.completedFuture(null);
         }
-        return pendingRequests.addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch).future;
+        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(offsets, Optional.empty());
+        pendingRequests.addOffsetCommitRequest(commitRequest);
+
+        CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>();
+        commitRequest.future.whenComplete((committedOffsets, error) -> {
+            if (error != null) {
+                Throwable asyncCommitException;
+                if (error instanceof RetriableException) {
+                    asyncCommitException = new 
RetriableCommitFailedException(error.getMessage());
+                } else {
+                    asyncCommitException = error;
+                }
+                asyncCommitResult.completeExceptionally(asyncCommitException);
+            } else {
+                asyncCommitResult.complete(null);
+            }
+        });
+        return asyncCommitResult;
+    }
+
+    /**
+     * Commit offsets, retrying on expected retriable errors while the retry 
timeout hasn't expired.

Review Comment:
   ```suggestion
        * Commit offsets, retrying on expected retriable errors while the retry 
timeout hasn't expired.
        *
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -1147,15 +1202,15 @@ List<NetworkClientDelegate.UnsentRequest> drain(final 
long currentTimeMs) {
          * futures with a TimeoutException.
          */
         private void failAndRemoveExpiredCommitRequests(final long 
currentTimeMs) {
-            unsentOffsetCommits.removeIf(req -> 
req.maybeExpire(currentTimeMs));
+            unsentOffsetCommits.forEach(req -> req.maybeExpire(currentTimeMs));

Review Comment:
   Foreach javadoc says
   
   >     * The behavior of this method is unspecified if the action performs
   >     * side-effects that modify the underlying source of elements, unless an
   >     * overriding class has specified a concurrent modification policy.
   
   You are removing from `unsentOffsetCommits` inside `maybeExpire`. Are we 
sure we won't run into ConcurrentModificationExceptions here?
   
   



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -719,26 +846,27 @@ NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final Abstr
 
         private void handleClientResponse(final ClientResponse response,
                                           final Throwable error,
-                                          final long currentTimeMs) {
+                                          final long requestCompletionTimeMs) {
             try {
                 if (error == null) {
                     onResponse(response);
                 } else {
                     log.debug("{} completed with error", requestDescription(), 
error);
-                    handleCoordinatorDisconnect(error, currentTimeMs);
-                    if (error instanceof RetriableException) {
-                        maybeRetry(currentTimeMs, error);
-                    } else {
-                        future().completeExceptionally(error);
-                    }
+                    onFailedAttempt(requestCompletionTimeMs);
+                    handleCoordinatorDisconnect(error, 
requestCompletionTimeMs);
+                    future().completeExceptionally(error);
                 }
             } catch (Throwable t) {
-                log.error("Unexpected error handling response for ", 
requestDescription(), t);
+                log.error("Unexpected error handling response for {}", 
requestDescription(), t);
                 future().completeExceptionally(t);
             }
         }
 
         abstract void onResponse(final ClientResponse response);
+
+        abstract boolean retryTimeoutExpired(long currentTimeMs);

Review Comment:
   Considered moving `expirationTimeout` to this super-class and inlining this? 
After all, expiration timeout is functionality that really belongs with the 
retry logic



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? 
extends RequestState> request
     }
 
     /**
-     * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
-     * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
-     * request if there is no other commit request already in-flight, and if 
the commit interval
-     * has elapsed.
+     * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+     * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+     * offsets to commit, no request will be generated and a completed future 
will be returned.
      *
-     * @param offsets           Offsets to commit
-     * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
-     *                          fails with a retriable error. If not present, 
the request will be
-     *                          sent but not retried.
-     * @param checkInterval     True if the auto-commit interval expiration 
should be checked for
-     *                          sending a request. If true, the request will 
be sent only if the
-     *                          auto-commit interval has expired. Pass false to
-     *                          send the auto-commit request regardless of the 
interval (ex.
-     *                          auto-commit before rebalance).
-     * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
-     *                          {@link Errors#STALE_MEMBER_EPOCH}.
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * @param requestState Commit request
+     * @return Future containing the offsets that were committed, or an error 
if the request
+     * failed.
      */
-    private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                                    final Optional<Long> 
expirationTimeMs,
-                                                    boolean checkInterval,
-                                                    boolean retryOnStaleEpoch) 
{
-        if (!autoCommitEnabled()) {
-            log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-            return CompletableFuture.completedFuture(null);
-        }
-
+    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
         AutoCommitState autocommit = autoCommitState.get();
-        if (checkInterval && !autocommit.shouldAutoCommit()) {
-            return CompletableFuture.completedFuture(null);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result;
+        if (requestState.offsets.isEmpty()) {
+            result = CompletableFuture.completedFuture(Collections.emptyMap());
+        } else {
+            autocommit.setInflightCommitStatus(true);
+            OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+            result = request.future;
+            result.whenComplete(autoCommitCallback(request.offsets));
         }
-
-        autocommit.resetTimer();
-        autocommit.setInflightCommitStatus(true);
-        CompletableFuture<Void> result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-            .whenComplete(autoCommitCallback(offsets));
         return result;
     }
 
     /**
-     * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
-     * partitions and their current positions. Note on auto-commit timers: 
this will reset the
-     * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
-     * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
-     * failed with a retriable error.
-     *
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+     * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+     * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+     * <p/>
+     * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+     * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+     * the next commit will be generated when the interval expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() {
-        if (!autoCommitEnabled()) {
-            // Early return to ensure that no action/logging is performed.
-            return CompletableFuture.completedFuture(null);
+    public void maybeAutoCommitAsync() {
+        if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
+            OffsetCommitRequestState requestState = createOffsetCommitRequest(
+                subscriptions.allConsumed(),
+                Optional.empty());
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
requestAutoCommit(requestState);
+            // Reset timer to the interval (even if no request was generated), 
but ensure that if
+            // the request completes with a retriable error, the timer is 
reset to send the next
+            // auto-commit after the backoff expires.
+            resetAutoCommitTimer();
+            maybeResetTimerWithBackoff(result);
         }
-        Map<TopicPartition, OffsetAndMetadata> offsets = 
subscriptions.allConsumed();
-        CompletableFuture<Void> result = maybeAutoCommit(offsets, 
Optional.empty(), true, true);
-        result.whenComplete((__, error) -> {
+    }
+
+    /**
+     * Reset auto-commit timer to retry with backoff if the future failed with 
a RetriableCommitFailedException.
+     */
+    private void maybeResetTimerWithBackoff(final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
+        result.whenComplete((offsets, error) -> {
             if (error != null) {
                 if (error instanceof RetriableCommitFailedException) {
                     log.debug("Asynchronous auto-commit of offsets {} failed 
due to retriable error.", offsets, error);
                     resetAutoCommitTimer(retryBackoffMs);
                 } else {
-                    log.warn("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
+                    log.debug("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
                 }
             } else {
                 log.debug("Completed asynchronous auto-commit of offsets {}", 
offsets);
             }
         });
-
-        return result;
     }
 
     /**
-     * Commit consumed offsets if auto-commit is enabled. Retry while the 
timer is not expired,
-     * until the request succeeds or fails with a fatal error.
+     * Commit consumed offsets if auto-commit is enabled, regardless of the 
auto-commit interval.
+     * This is used for committing offsets before revoking partitions. This 
will retry committing
+     * the latest offsets until the request succeeds, fails with a fatal 
error, or the timeout
+     * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as 
a retriable error,
+     * and will retry it including the latest member ID and epoch received 
from the broker.
+     *
+     * @return Future that will complete when the offsets are successfully 
committed. It will
+     * complete exceptionally if the commit fails with a non-retriable error, 
or if the retry
+     * timeout expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedNow(
-        final Optional<Long> expirationTimeMs,
-        final boolean retryOnStaleEpoch) {
-        return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, 
false, retryOnStaleEpoch);
+    public CompletableFuture<Void> maybeAutoCommitSyncNow(final long 
retryExpirationTimeMs) {
+        if (!autoCommitEnabled()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        OffsetCommitRequestState requestState =
+            createOffsetCommitRequest(subscriptions.allConsumed(), 
Optional.of(retryExpirationTimeMs));
+        maybeAutoCommitSyncNowWithRetries(requestState, result);
+        return result;
+    }
+
+    private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState 
requestAttempt,
+                                                   CompletableFuture<Void> 
result) {
+        requestAttempt.resetFuture();
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAttempt = requestAutoCommit(requestAttempt);
+        commitAttempt.whenComplete((committedOffsets, error) -> {
+            if (error == null) {
+                result.complete(null);
+            } else {
+                if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
+                    if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+                        log.debug("Auto-commit sync timed out and won't be 
retried anymore");
+                        result.completeExceptionally(error);
+                    } else {
+                        // Make sure the auto-commit is retries with the 
latest offsets
+                        requestAttempt.offsets = subscriptions.allConsumed();
+                        maybeAutoCommitSyncNowWithRetries(requestAttempt, 
result);
+                    }
+                } else {
+                    log.debug("Auto-commit sync failed with non-retriable 
error", error);
+                    result.completeExceptionally(error);
+                }
+            }
+        });
     }
 
-    private BiConsumer<? super Void, ? super Throwable> 
autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> 
allConsumedOffsets) {
+    /**
+     * Clear the inflight auto-commit flag and log auto-commit completion 
status.
+     */
+    private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super 
Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> 
allConsumedOffsets) {
         return (response, throwable) -> {
             autoCommitState.ifPresent(autoCommitState -> 
autoCommitState.setInflightCommitStatus(false));
             if (throwable == null) {
                 
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets);
-                log.debug("Completed asynchronous auto-commit of offsets {}", 
allConsumedOffsets);
+                log.debug("Completed auto-commit of offsets {}", 
allConsumedOffsets);
             } else if (throwable instanceof RetriableCommitFailedException) {
-                log.debug("Asynchronous auto-commit of offsets {} failed due 
to retriable error: {}",
+                log.debug("Auto-commit of offsets {} failed due to retriable 
error: {}",
                         allConsumedOffsets, throwable.getMessage());
             } else {
-                log.warn("Asynchronous auto-commit of offsets {} failed", 
allConsumedOffsets, throwable);
+                log.warn("Auto-commit of offsets {} failed", 
allConsumedOffsets, throwable);
             }
         };
     }
 
     /**
-     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
-     * {@link OffsetCommitRequestState} and enqueue it to send later.
+     * Generate a request to commit offsets without retrying, even if it fails 
with a retriable
+     * error. The generated request will be added to the queue to be sent on 
the next call to
+     * {@link #poll(long)}.
+     *
+     * @param offsets Offsets to commit per partition.
+     * @return Future that will complete when a response is received, 
successfully or
+     * exceptionally depending on the response. If the request fails with a 
retriable error, the
+     * future will be completed with a {@link RetriableCommitFailedException}.
      */
-    public CompletableFuture<Void> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets,
-                                                          final Optional<Long> 
expirationTimeMs,
-                                                          final boolean 
retryOnStaleEpoch) {
+    public CompletableFuture<Void> commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
         if (offsets.isEmpty()) {
             log.debug("Skipping commit of empty offsets");
             return CompletableFuture.completedFuture(null);
         }
-        return pendingRequests.addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch).future;
+        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(offsets, Optional.empty());
+        pendingRequests.addOffsetCommitRequest(commitRequest);
+
+        CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>();
+        commitRequest.future.whenComplete((committedOffsets, error) -> {
+            if (error != null) {
+                Throwable asyncCommitException;
+                if (error instanceof RetriableException) {
+                    asyncCommitException = new 
RetriableCommitFailedException(error.getMessage());
+                } else {
+                    asyncCommitException = error;
+                }
+                asyncCommitResult.completeExceptionally(asyncCommitException);
+            } else {
+                asyncCommitResult.complete(null);
+            }
+        });
+        return asyncCommitResult;
+    }
+
+    /**
+     * Commit offsets, retrying on expected retriable errors while the retry 
timeout hasn't expired.
+     * @param offsets Offsets to commit
+     * @param retryExpirationTimeMs Time until which the request will be 
retried if it fails with
+     *                             an expected retriable error.
+     * @return Future that will complete when a successful response
+     */
+    public CompletableFuture<Void> commitSync(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
+                                              final long 
retryExpirationTimeMs) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        OffsetCommitRequestState requestState = createOffsetCommitRequest(
+            offsets,
+            Optional.of(retryExpirationTimeMs));
+        commitSyncWithRetries(requestState, result);
+        return result;
+    }
+
+    private OffsetCommitRequestState createOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets,
+                                                               final 
Optional<Long> expirationTimeMs) {
+        return jitter.isPresent() ?
+            new OffsetCommitRequestState(
+                offsets,
+                groupId,
+                groupInstanceId,
+                expirationTimeMs,
+                retryBackoffMs,
+                retryBackoffMaxMs,
+                jitter.getAsDouble(),
+                memberInfo) :
+            new OffsetCommitRequestState(
+                offsets,
+                groupId,
+                groupInstanceId,
+                expirationTimeMs,
+                retryBackoffMs,
+                retryBackoffMaxMs,
+                memberInfo);
+    }
+
+    private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt,
+                                       CompletableFuture<Void> result) {
+        requestAttempt.resetFuture();

Review Comment:
   In the first call we just created a fresh future. Maybe move the 
`resetFuture` to the `commitSyncWithRetries` call below?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? 
extends RequestState> request
     }
 
     /**
-     * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
-     * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
-     * request if there is no other commit request already in-flight, and if 
the commit interval
-     * has elapsed.
+     * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+     * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+     * offsets to commit, no request will be generated and a completed future 
will be returned.
      *
-     * @param offsets           Offsets to commit
-     * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
-     *                          fails with a retriable error. If not present, 
the request will be
-     *                          sent but not retried.
-     * @param checkInterval     True if the auto-commit interval expiration 
should be checked for
-     *                          sending a request. If true, the request will 
be sent only if the
-     *                          auto-commit interval has expired. Pass false to
-     *                          send the auto-commit request regardless of the 
interval (ex.
-     *                          auto-commit before rebalance).
-     * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
-     *                          {@link Errors#STALE_MEMBER_EPOCH}.
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * @param requestState Commit request
+     * @return Future containing the offsets that were committed, or an error 
if the request
+     * failed.
      */
-    private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                                    final Optional<Long> 
expirationTimeMs,
-                                                    boolean checkInterval,
-                                                    boolean retryOnStaleEpoch) 
{
-        if (!autoCommitEnabled()) {
-            log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-            return CompletableFuture.completedFuture(null);
-        }
-
+    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
         AutoCommitState autocommit = autoCommitState.get();
-        if (checkInterval && !autocommit.shouldAutoCommit()) {
-            return CompletableFuture.completedFuture(null);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result;
+        if (requestState.offsets.isEmpty()) {
+            result = CompletableFuture.completedFuture(Collections.emptyMap());
+        } else {
+            autocommit.setInflightCommitStatus(true);
+            OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+            result = request.future;
+            result.whenComplete(autoCommitCallback(request.offsets));
         }
-
-        autocommit.resetTimer();
-        autocommit.setInflightCommitStatus(true);
-        CompletableFuture<Void> result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-            .whenComplete(autoCommitCallback(offsets));
         return result;
     }
 
     /**
-     * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
-     * partitions and their current positions. Note on auto-commit timers: 
this will reset the
-     * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
-     * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
-     * failed with a retriable error.
-     *
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+     * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+     * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+     * <p/>
+     * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+     * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+     * the next commit will be generated when the interval expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() {
-        if (!autoCommitEnabled()) {
-            // Early return to ensure that no action/logging is performed.
-            return CompletableFuture.completedFuture(null);
+    public void maybeAutoCommitAsync() {
+        if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
+            OffsetCommitRequestState requestState = createOffsetCommitRequest(
+                subscriptions.allConsumed(),
+                Optional.empty());
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
requestAutoCommit(requestState);
+            // Reset timer to the interval (even if no request was generated), 
but ensure that if
+            // the request completes with a retriable error, the timer is 
reset to send the next
+            // auto-commit after the backoff expires.
+            resetAutoCommitTimer();
+            maybeResetTimerWithBackoff(result);
         }
-        Map<TopicPartition, OffsetAndMetadata> offsets = 
subscriptions.allConsumed();
-        CompletableFuture<Void> result = maybeAutoCommit(offsets, 
Optional.empty(), true, true);
-        result.whenComplete((__, error) -> {
+    }
+
+    /**
+     * Reset auto-commit timer to retry with backoff if the future failed with 
a RetriableCommitFailedException.
+     */
+    private void maybeResetTimerWithBackoff(final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
+        result.whenComplete((offsets, error) -> {
             if (error != null) {
                 if (error instanceof RetriableCommitFailedException) {
                     log.debug("Asynchronous auto-commit of offsets {} failed 
due to retriable error.", offsets, error);
                     resetAutoCommitTimer(retryBackoffMs);
                 } else {
-                    log.warn("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
+                    log.debug("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
                 }
             } else {
                 log.debug("Completed asynchronous auto-commit of offsets {}", 
offsets);
             }
         });
-
-        return result;
     }
 
     /**
-     * Commit consumed offsets if auto-commit is enabled. Retry while the 
timer is not expired,
-     * until the request succeeds or fails with a fatal error.
+     * Commit consumed offsets if auto-commit is enabled, regardless of the 
auto-commit interval.
+     * This is used for committing offsets before revoking partitions. This 
will retry committing
+     * the latest offsets until the request succeeds, fails with a fatal 
error, or the timeout
+     * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as 
a retriable error,
+     * and will retry it including the latest member ID and epoch received 
from the broker.
+     *
+     * @return Future that will complete when the offsets are successfully 
committed. It will
+     * complete exceptionally if the commit fails with a non-retriable error, 
or if the retry
+     * timeout expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedNow(
-        final Optional<Long> expirationTimeMs,
-        final boolean retryOnStaleEpoch) {
-        return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, 
false, retryOnStaleEpoch);
+    public CompletableFuture<Void> maybeAutoCommitSyncNow(final long 
retryExpirationTimeMs) {
+        if (!autoCommitEnabled()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        OffsetCommitRequestState requestState =
+            createOffsetCommitRequest(subscriptions.allConsumed(), 
Optional.of(retryExpirationTimeMs));
+        maybeAutoCommitSyncNowWithRetries(requestState, result);
+        return result;
+    }
+
+    private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState 
requestAttempt,

Review Comment:
   Why `maybe`? pretty sure we are doing it 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? 
extends RequestState> request
     }
 
     /**
-     * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
-     * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
-     * request if there is no other commit request already in-flight, and if 
the commit interval
-     * has elapsed.
+     * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+     * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+     * offsets to commit, no request will be generated and a completed future 
will be returned.
      *
-     * @param offsets           Offsets to commit
-     * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
-     *                          fails with a retriable error. If not present, 
the request will be
-     *                          sent but not retried.
-     * @param checkInterval     True if the auto-commit interval expiration 
should be checked for
-     *                          sending a request. If true, the request will 
be sent only if the
-     *                          auto-commit interval has expired. Pass false to
-     *                          send the auto-commit request regardless of the 
interval (ex.
-     *                          auto-commit before rebalance).
-     * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
-     *                          {@link Errors#STALE_MEMBER_EPOCH}.
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * @param requestState Commit request
+     * @return Future containing the offsets that were committed, or an error 
if the request
+     * failed.
      */
-    private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                                    final Optional<Long> 
expirationTimeMs,
-                                                    boolean checkInterval,
-                                                    boolean retryOnStaleEpoch) 
{
-        if (!autoCommitEnabled()) {
-            log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-            return CompletableFuture.completedFuture(null);
-        }
-
+    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
         AutoCommitState autocommit = autoCommitState.get();
-        if (checkInterval && !autocommit.shouldAutoCommit()) {
-            return CompletableFuture.completedFuture(null);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result;
+        if (requestState.offsets.isEmpty()) {
+            result = CompletableFuture.completedFuture(Collections.emptyMap());
+        } else {
+            autocommit.setInflightCommitStatus(true);
+            OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+            result = request.future;
+            result.whenComplete(autoCommitCallback(request.offsets));
         }
-
-        autocommit.resetTimer();
-        autocommit.setInflightCommitStatus(true);
-        CompletableFuture<Void> result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-            .whenComplete(autoCommitCallback(offsets));
         return result;
     }
 
     /**
-     * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
-     * partitions and their current positions. Note on auto-commit timers: 
this will reset the
-     * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
-     * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
-     * failed with a retriable error.
-     *
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+     * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+     * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+     * <p/>
+     * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+     * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+     * the next commit will be generated when the interval expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() {
-        if (!autoCommitEnabled()) {
-            // Early return to ensure that no action/logging is performed.
-            return CompletableFuture.completedFuture(null);
+    public void maybeAutoCommitAsync() {
+        if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
+            OffsetCommitRequestState requestState = createOffsetCommitRequest(
+                subscriptions.allConsumed(),
+                Optional.empty());
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
requestAutoCommit(requestState);
+            // Reset timer to the interval (even if no request was generated), 
but ensure that if
+            // the request completes with a retriable error, the timer is 
reset to send the next
+            // auto-commit after the backoff expires.
+            resetAutoCommitTimer();
+            maybeResetTimerWithBackoff(result);
         }
-        Map<TopicPartition, OffsetAndMetadata> offsets = 
subscriptions.allConsumed();
-        CompletableFuture<Void> result = maybeAutoCommit(offsets, 
Optional.empty(), true, true);
-        result.whenComplete((__, error) -> {
+    }
+
+    /**
+     * Reset auto-commit timer to retry with backoff if the future failed with 
a RetriableCommitFailedException.
+     */
+    private void maybeResetTimerWithBackoff(final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
+        result.whenComplete((offsets, error) -> {
             if (error != null) {
                 if (error instanceof RetriableCommitFailedException) {
                     log.debug("Asynchronous auto-commit of offsets {} failed 
due to retriable error.", offsets, error);
                     resetAutoCommitTimer(retryBackoffMs);
                 } else {
-                    log.warn("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
+                    log.debug("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
                 }
             } else {
                 log.debug("Completed asynchronous auto-commit of offsets {}", 
offsets);
             }
         });
-
-        return result;
     }
 
     /**
-     * Commit consumed offsets if auto-commit is enabled. Retry while the 
timer is not expired,
-     * until the request succeeds or fails with a fatal error.
+     * Commit consumed offsets if auto-commit is enabled, regardless of the 
auto-commit interval.
+     * This is used for committing offsets before revoking partitions. This 
will retry committing
+     * the latest offsets until the request succeeds, fails with a fatal 
error, or the timeout
+     * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as 
a retriable error,
+     * and will retry it including the latest member ID and epoch received 
from the broker.
+     *
+     * @return Future that will complete when the offsets are successfully 
committed. It will
+     * complete exceptionally if the commit fails with a non-retriable error, 
or if the retry
+     * timeout expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedNow(
-        final Optional<Long> expirationTimeMs,
-        final boolean retryOnStaleEpoch) {
-        return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, 
false, retryOnStaleEpoch);
+    public CompletableFuture<Void> maybeAutoCommitSyncNow(final long 
retryExpirationTimeMs) {
+        if (!autoCommitEnabled()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        OffsetCommitRequestState requestState =
+            createOffsetCommitRequest(subscriptions.allConsumed(), 
Optional.of(retryExpirationTimeMs));
+        maybeAutoCommitSyncNowWithRetries(requestState, result);
+        return result;
+    }
+
+    private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState 
requestAttempt,
+                                                   CompletableFuture<Void> 
result) {
+        requestAttempt.resetFuture();
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAttempt = requestAutoCommit(requestAttempt);
+        commitAttempt.whenComplete((committedOffsets, error) -> {
+            if (error == null) {
+                result.complete(null);
+            } else {
+                if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
+                    if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+                        log.debug("Auto-commit sync timed out and won't be 
retried anymore");
+                        result.completeExceptionally(error);
+                    } else {
+                        // Make sure the auto-commit is retries with the 
latest offsets
+                        requestAttempt.offsets = subscriptions.allConsumed();
+                        maybeAutoCommitSyncNowWithRetries(requestAttempt, 
result);
+                    }
+                } else {
+                    log.debug("Auto-commit sync failed with non-retriable 
error", error);
+                    result.completeExceptionally(error);
+                }
+            }
+        });
     }
 
-    private BiConsumer<? super Void, ? super Throwable> 
autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> 
allConsumedOffsets) {
+    /**
+     * Clear the inflight auto-commit flag and log auto-commit completion 
status.
+     */
+    private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super 
Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> 
allConsumedOffsets) {
         return (response, throwable) -> {
             autoCommitState.ifPresent(autoCommitState -> 
autoCommitState.setInflightCommitStatus(false));
             if (throwable == null) {
                 
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets);
-                log.debug("Completed asynchronous auto-commit of offsets {}", 
allConsumedOffsets);
+                log.debug("Completed auto-commit of offsets {}", 
allConsumedOffsets);
             } else if (throwable instanceof RetriableCommitFailedException) {
-                log.debug("Asynchronous auto-commit of offsets {} failed due 
to retriable error: {}",
+                log.debug("Auto-commit of offsets {} failed due to retriable 
error: {}",
                         allConsumedOffsets, throwable.getMessage());
             } else {
-                log.warn("Asynchronous auto-commit of offsets {} failed", 
allConsumedOffsets, throwable);
+                log.warn("Auto-commit of offsets {} failed", 
allConsumedOffsets, throwable);
             }
         };
     }
 
     /**
-     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
-     * {@link OffsetCommitRequestState} and enqueue it to send later.
+     * Generate a request to commit offsets without retrying, even if it fails 
with a retriable
+     * error. The generated request will be added to the queue to be sent on 
the next call to
+     * {@link #poll(long)}.
+     *
+     * @param offsets Offsets to commit per partition.
+     * @return Future that will complete when a response is received, 
successfully or
+     * exceptionally depending on the response. If the request fails with a 
retriable error, the
+     * future will be completed with a {@link RetriableCommitFailedException}.
      */
-    public CompletableFuture<Void> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets,
-                                                          final Optional<Long> 
expirationTimeMs,
-                                                          final boolean 
retryOnStaleEpoch) {
+    public CompletableFuture<Void> commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
         if (offsets.isEmpty()) {
             log.debug("Skipping commit of empty offsets");
             return CompletableFuture.completedFuture(null);
         }
-        return pendingRequests.addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch).future;
+        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(offsets, Optional.empty());
+        pendingRequests.addOffsetCommitRequest(commitRequest);
+
+        CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>();
+        commitRequest.future.whenComplete((committedOffsets, error) -> {
+            if (error != null) {
+                Throwable asyncCommitException;
+                if (error instanceof RetriableException) {
+                    asyncCommitException = new 
RetriableCommitFailedException(error.getMessage());
+                } else {
+                    asyncCommitException = error;
+                }
+                asyncCommitResult.completeExceptionally(asyncCommitException);
+            } else {
+                asyncCommitResult.complete(null);
+            }
+        });
+        return asyncCommitResult;
+    }
+
+    /**
+     * Commit offsets, retrying on expected retriable errors while the retry 
timeout hasn't expired.
+     * @param offsets Offsets to commit
+     * @param retryExpirationTimeMs Time until which the request will be 
retried if it fails with
+     *                             an expected retriable error.
+     * @return Future that will complete when a successful response
+     */
+    public CompletableFuture<Void> commitSync(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
+                                              final long 
retryExpirationTimeMs) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        OffsetCommitRequestState requestState = createOffsetCommitRequest(
+            offsets,
+            Optional.of(retryExpirationTimeMs));
+        commitSyncWithRetries(requestState, result);
+        return result;
+    }
+
+    private OffsetCommitRequestState createOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets,
+                                                               final 
Optional<Long> expirationTimeMs) {
+        return jitter.isPresent() ?
+            new OffsetCommitRequestState(
+                offsets,
+                groupId,
+                groupInstanceId,
+                expirationTimeMs,
+                retryBackoffMs,
+                retryBackoffMaxMs,
+                jitter.getAsDouble(),
+                memberInfo) :
+            new OffsetCommitRequestState(
+                offsets,
+                groupId,
+                groupInstanceId,
+                expirationTimeMs,
+                retryBackoffMs,
+                retryBackoffMaxMs,
+                memberInfo);
+    }
+
+    private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt,
+                                       CompletableFuture<Void> result) {
+        requestAttempt.resetFuture();
+        pendingRequests.addOffsetCommitRequest(requestAttempt);
+
+        // Retry the same commit request while it fails with 
RetriableException and the retry
+        // timeout hasn't expired.
+        requestAttempt.future.whenComplete((res, error) -> {
+            if (error == null) {
+                result.complete(null);
+            } else {
+                if (error instanceof RetriableException) {
+                    // if (error instanceof TimeoutException && 
requestAttempt.retryTimeoutExpired

Review Comment:
   Do we want to commit that commented out code?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? 
extends RequestState> request
     }
 
     /**
-     * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
-     * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
-     * request if there is no other commit request already in-flight, and if 
the commit interval
-     * has elapsed.
+     * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+     * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+     * offsets to commit, no request will be generated and a completed future 
will be returned.
      *
-     * @param offsets           Offsets to commit
-     * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
-     *                          fails with a retriable error. If not present, 
the request will be
-     *                          sent but not retried.
-     * @param checkInterval     True if the auto-commit interval expiration 
should be checked for
-     *                          sending a request. If true, the request will 
be sent only if the
-     *                          auto-commit interval has expired. Pass false to
-     *                          send the auto-commit request regardless of the 
interval (ex.
-     *                          auto-commit before rebalance).
-     * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
-     *                          {@link Errors#STALE_MEMBER_EPOCH}.
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * @param requestState Commit request
+     * @return Future containing the offsets that were committed, or an error 
if the request
+     * failed.
      */
-    private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
-                                                    final Optional<Long> 
expirationTimeMs,
-                                                    boolean checkInterval,
-                                                    boolean retryOnStaleEpoch) 
{
-        if (!autoCommitEnabled()) {
-            log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-            return CompletableFuture.completedFuture(null);
-        }
-
+    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
         AutoCommitState autocommit = autoCommitState.get();
-        if (checkInterval && !autocommit.shouldAutoCommit()) {
-            return CompletableFuture.completedFuture(null);
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result;
+        if (requestState.offsets.isEmpty()) {
+            result = CompletableFuture.completedFuture(Collections.emptyMap());
+        } else {
+            autocommit.setInflightCommitStatus(true);
+            OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+            result = request.future;
+            result.whenComplete(autoCommitCallback(request.offsets));
         }
-
-        autocommit.resetTimer();
-        autocommit.setInflightCommitStatus(true);
-        CompletableFuture<Void> result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-            .whenComplete(autoCommitCallback(offsets));
         return result;
     }
 
     /**
-     * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
-     * partitions and their current positions. Note on auto-commit timers: 
this will reset the
-     * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
-     * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
-     * failed with a retriable error.
-     *
-     * @return Future that will complete when a response is received for the 
request, or a
-     * completed future if no request is generated.
+     * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+     * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+     * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+     * <p/>
+     * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+     * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+     * the next commit will be generated when the interval expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() {
-        if (!autoCommitEnabled()) {
-            // Early return to ensure that no action/logging is performed.
-            return CompletableFuture.completedFuture(null);
+    public void maybeAutoCommitAsync() {
+        if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) {
+            OffsetCommitRequestState requestState = createOffsetCommitRequest(
+                subscriptions.allConsumed(),
+                Optional.empty());
+            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = 
requestAutoCommit(requestState);
+            // Reset timer to the interval (even if no request was generated), 
but ensure that if
+            // the request completes with a retriable error, the timer is 
reset to send the next
+            // auto-commit after the backoff expires.
+            resetAutoCommitTimer();
+            maybeResetTimerWithBackoff(result);
         }
-        Map<TopicPartition, OffsetAndMetadata> offsets = 
subscriptions.allConsumed();
-        CompletableFuture<Void> result = maybeAutoCommit(offsets, 
Optional.empty(), true, true);
-        result.whenComplete((__, error) -> {
+    }
+
+    /**
+     * Reset auto-commit timer to retry with backoff if the future failed with 
a RetriableCommitFailedException.
+     */
+    private void maybeResetTimerWithBackoff(final 
CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
+        result.whenComplete((offsets, error) -> {
             if (error != null) {
                 if (error instanceof RetriableCommitFailedException) {
                     log.debug("Asynchronous auto-commit of offsets {} failed 
due to retriable error.", offsets, error);
                     resetAutoCommitTimer(retryBackoffMs);
                 } else {
-                    log.warn("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
+                    log.debug("Asynchronous auto-commit of offsets {} failed: 
{}", offsets, error.getMessage());
                 }
             } else {
                 log.debug("Completed asynchronous auto-commit of offsets {}", 
offsets);
             }
         });
-
-        return result;
     }
 
     /**
-     * Commit consumed offsets if auto-commit is enabled. Retry while the 
timer is not expired,
-     * until the request succeeds or fails with a fatal error.
+     * Commit consumed offsets if auto-commit is enabled, regardless of the 
auto-commit interval.
+     * This is used for committing offsets before revoking partitions. This 
will retry committing
+     * the latest offsets until the request succeeds, fails with a fatal 
error, or the timeout
+     * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as 
a retriable error,
+     * and will retry it including the latest member ID and epoch received 
from the broker.
+     *
+     * @return Future that will complete when the offsets are successfully 
committed. It will
+     * complete exceptionally if the commit fails with a non-retriable error, 
or if the retry
+     * timeout expires.
      */
-    public CompletableFuture<Void> maybeAutoCommitAllConsumedNow(
-        final Optional<Long> expirationTimeMs,
-        final boolean retryOnStaleEpoch) {
-        return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, 
false, retryOnStaleEpoch);
+    public CompletableFuture<Void> maybeAutoCommitSyncNow(final long 
retryExpirationTimeMs) {
+        if (!autoCommitEnabled()) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        OffsetCommitRequestState requestState =
+            createOffsetCommitRequest(subscriptions.allConsumed(), 
Optional.of(retryExpirationTimeMs));
+        maybeAutoCommitSyncNowWithRetries(requestState, result);
+        return result;
+    }
+
+    private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState 
requestAttempt,
+                                                   CompletableFuture<Void> 
result) {
+        requestAttempt.resetFuture();
+        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> 
commitAttempt = requestAutoCommit(requestAttempt);
+        commitAttempt.whenComplete((committedOffsets, error) -> {
+            if (error == null) {
+                result.complete(null);
+            } else {
+                if (error instanceof RetriableException || 
isStaleEpochErrorAndValidEpochAvailable(error)) {
+                    if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+                        log.debug("Auto-commit sync timed out and won't be 
retried anymore");
+                        result.completeExceptionally(error);
+                    } else {
+                        // Make sure the auto-commit is retries with the 
latest offsets
+                        requestAttempt.offsets = subscriptions.allConsumed();
+                        maybeAutoCommitSyncNowWithRetries(requestAttempt, 
result);
+                    }
+                } else {
+                    log.debug("Auto-commit sync failed with non-retriable 
error", error);
+                    result.completeExceptionally(error);
+                }
+            }
+        });
     }
 
-    private BiConsumer<? super Void, ? super Throwable> 
autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> 
allConsumedOffsets) {
+    /**
+     * Clear the inflight auto-commit flag and log auto-commit completion 
status.
+     */
+    private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super 
Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> 
allConsumedOffsets) {
         return (response, throwable) -> {
             autoCommitState.ifPresent(autoCommitState -> 
autoCommitState.setInflightCommitStatus(false));
             if (throwable == null) {
                 
offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets);
-                log.debug("Completed asynchronous auto-commit of offsets {}", 
allConsumedOffsets);
+                log.debug("Completed auto-commit of offsets {}", 
allConsumedOffsets);
             } else if (throwable instanceof RetriableCommitFailedException) {
-                log.debug("Asynchronous auto-commit of offsets {} failed due 
to retriable error: {}",
+                log.debug("Auto-commit of offsets {} failed due to retriable 
error: {}",
                         allConsumedOffsets, throwable.getMessage());
             } else {
-                log.warn("Asynchronous auto-commit of offsets {} failed", 
allConsumedOffsets, throwable);
+                log.warn("Auto-commit of offsets {} failed", 
allConsumedOffsets, throwable);
             }
         };
     }
 
     /**
-     * Handles {@link 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It 
creates an
-     * {@link OffsetCommitRequestState} and enqueue it to send later.
+     * Generate a request to commit offsets without retrying, even if it fails 
with a retriable
+     * error. The generated request will be added to the queue to be sent on 
the next call to
+     * {@link #poll(long)}.
+     *
+     * @param offsets Offsets to commit per partition.
+     * @return Future that will complete when a response is received, 
successfully or
+     * exceptionally depending on the response. If the request fails with a 
retriable error, the
+     * future will be completed with a {@link RetriableCommitFailedException}.
      */
-    public CompletableFuture<Void> addOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets,
-                                                          final Optional<Long> 
expirationTimeMs,
-                                                          final boolean 
retryOnStaleEpoch) {
+    public CompletableFuture<Void> commitAsync(final Map<TopicPartition, 
OffsetAndMetadata> offsets) {
         if (offsets.isEmpty()) {
             log.debug("Skipping commit of empty offsets");
             return CompletableFuture.completedFuture(null);
         }
-        return pendingRequests.addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch).future;
+        OffsetCommitRequestState commitRequest = 
createOffsetCommitRequest(offsets, Optional.empty());
+        pendingRequests.addOffsetCommitRequest(commitRequest);
+
+        CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>();
+        commitRequest.future.whenComplete((committedOffsets, error) -> {
+            if (error != null) {
+                Throwable asyncCommitException;
+                if (error instanceof RetriableException) {
+                    asyncCommitException = new 
RetriableCommitFailedException(error.getMessage());
+                } else {
+                    asyncCommitException = error;
+                }
+                asyncCommitResult.completeExceptionally(asyncCommitException);
+            } else {
+                asyncCommitResult.complete(null);
+            }
+        });
+        return asyncCommitResult;
+    }
+
+    /**
+     * Commit offsets, retrying on expected retriable errors while the retry 
timeout hasn't expired.
+     * @param offsets Offsets to commit
+     * @param retryExpirationTimeMs Time until which the request will be 
retried if it fails with
+     *                             an expected retriable error.
+     * @return Future that will complete when a successful response
+     */
+    public CompletableFuture<Void> commitSync(final Map<TopicPartition, 
OffsetAndMetadata> offsets,
+                                              final long 
retryExpirationTimeMs) {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+        OffsetCommitRequestState requestState = createOffsetCommitRequest(
+            offsets,
+            Optional.of(retryExpirationTimeMs));
+        commitSyncWithRetries(requestState, result);
+        return result;
+    }
+
+    private OffsetCommitRequestState createOffsetCommitRequest(final 
Map<TopicPartition, OffsetAndMetadata> offsets,
+                                                               final 
Optional<Long> expirationTimeMs) {
+        return jitter.isPresent() ?
+            new OffsetCommitRequestState(
+                offsets,
+                groupId,
+                groupInstanceId,
+                expirationTimeMs,
+                retryBackoffMs,
+                retryBackoffMaxMs,
+                jitter.getAsDouble(),
+                memberInfo) :
+            new OffsetCommitRequestState(
+                offsets,
+                groupId,
+                groupInstanceId,
+                expirationTimeMs,
+                retryBackoffMs,
+                retryBackoffMaxMs,
+                memberInfo);
+    }
+
+    private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt,
+                                       CompletableFuture<Void> result) {
+        requestAttempt.resetFuture();
+        pendingRequests.addOffsetCommitRequest(requestAttempt);
+
+        // Retry the same commit request while it fails with 
RetriableException and the retry
+        // timeout hasn't expired.
+        requestAttempt.future.whenComplete((res, error) -> {
+            if (error == null) {
+                result.complete(null);
+            } else {
+                if (error instanceof RetriableException) {
+                    // if (error instanceof TimeoutException && 
requestAttempt.retryTimeoutExpired
+                    // (requestAttempt.lastReceivedMs)) {
+                    if (error instanceof TimeoutException && 
requestAttempt.isExpired) {
+                        log.debug("OffsetCommit timeout expired so it won't be 
retried anymore");

Review Comment:
   Maybe that's even worth and `info` log?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to