lucasbru commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1494961008
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1344,8 +1343,8 @@ public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration long commitStart = time.nanoseconds(); try { Timer requestTimer = time.timer(timeout.toMillis()); - // Commit with a timer to control how long the request should be retried until it - // gets a successful response or non-retriable error. + // Commit with a retry timeout (the commit request will be retried until it gets a + // successful response, non-retriable error, or the timeout expires) CompletableFuture<Void> commitFuture = commit(offsets, true, Optional.of(timeout.toMillis())); Review Comment: While you are at it, I think we can remove the "isWakeupable" parameter and just set the `wakeUpTrigger` in the calling context. ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ########## @@ -149,8 +148,14 @@ private void process(final CommitApplicationEvent event) { } CommitRequestManager manager = requestManagers.commitRequestManager.get(); - Optional<Long> expirationTimeMs = event.retryTimeoutMs().map(this::getExpirationTimeForTimeout); - event.chain(manager.addOffsetCommitRequest(event.offsets(), expirationTimeMs, false)); + CompletableFuture<Void> commitResult; + if (event.retryTimeoutMs().isPresent()) { Review Comment: It's a bit weird that we use `retryTimeoutMs` to carry the information that this is a sync-commit vs asynccommit. How about going all-in here and just having `AsyncCommitApplicationEvent` and a `SyncCommitApplicationEvent`? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? extends RequestState> request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ - private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - boolean checkInterval, - boolean retryOnStaleEpoch) { - if (!autoCommitEnabled()) { - log.debug("Skipping auto-commit because auto-commit config is not enabled."); - return CompletableFuture.completedFuture(null); - } - + private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); - if (checkInterval && !autocommit.shouldAutoCommit()) { - return CompletableFuture.completedFuture(null); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result; + if (requestState.offsets.isEmpty()) { + result = CompletableFuture.completedFuture(Collections.emptyMap()); + } else { + autocommit.setInflightCommitStatus(true); + OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); + result = request.future; + result.whenComplete(autoCommitCallback(request.offsets)); } - - autocommit.resetTimer(); - autocommit.setInflightCommitStatus(true); - CompletableFuture<Void> result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) - .whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * <p/> + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() { - if (!autoCommitEnabled()) { - // Early return to ensure that no action/logging is performed. - return CompletableFuture.completedFuture(null); + public void maybeAutoCommitAsync() { + if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) { + OffsetCommitRequestState requestState = createOffsetCommitRequest( + subscriptions.allConsumed(), + Optional.empty()); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = requestAutoCommit(requestState); + // Reset timer to the interval (even if no request was generated), but ensure that if + // the request completes with a retriable error, the timer is reset to send the next + // auto-commit after the backoff expires. + resetAutoCommitTimer(); + maybeResetTimerWithBackoff(result); } - Map<TopicPartition, OffsetAndMetadata> offsets = subscriptions.allConsumed(); - CompletableFuture<Void> result = maybeAutoCommit(offsets, Optional.empty(), true, true); - result.whenComplete((__, error) -> { + } + + /** + * Reset auto-commit timer to retry with backoff if the future failed with a RetriableCommitFailedException. + */ + private void maybeResetTimerWithBackoff(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) { + result.whenComplete((offsets, error) -> { if (error != null) { if (error instanceof RetriableCommitFailedException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error.", offsets, error); resetAutoCommitTimer(retryBackoffMs); } else { - log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); + log.debug("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); } } else { log.debug("Completed asynchronous auto-commit of offsets {}", offsets); } }); - - return result; } /** - * Commit consumed offsets if auto-commit is enabled. Retry while the timer is not expired, - * until the request succeeds or fails with a fatal error. + * Commit consumed offsets if auto-commit is enabled, regardless of the auto-commit interval. + * This is used for committing offsets before revoking partitions. This will retry committing + * the latest offsets until the request succeeds, fails with a fatal error, or the timeout + * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as a retriable error, + * and will retry it including the latest member ID and epoch received from the broker. + * + * @return Future that will complete when the offsets are successfully committed. It will + * complete exceptionally if the commit fails with a non-retriable error, or if the retry + * timeout expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedNow( - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { - return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, false, retryOnStaleEpoch); + public CompletableFuture<Void> maybeAutoCommitSyncNow(final long retryExpirationTimeMs) { + if (!autoCommitEnabled()) { + return CompletableFuture.completedFuture(null); + } + + CompletableFuture<Void> result = new CompletableFuture<>(); + OffsetCommitRequestState requestState = + createOffsetCommitRequest(subscriptions.allConsumed(), Optional.of(retryExpirationTimeMs)); + maybeAutoCommitSyncNowWithRetries(requestState, result); + return result; + } + + private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState requestAttempt, + CompletableFuture<Void> result) { + requestAttempt.resetFuture(); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAttempt = requestAutoCommit(requestAttempt); + commitAttempt.whenComplete((committedOffsets, error) -> { + if (error == null) { + result.complete(null); + } else { + if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { + if (error instanceof TimeoutException && requestAttempt.isExpired) { + log.debug("Auto-commit sync timed out and won't be retried anymore"); + result.completeExceptionally(error); + } else { + // Make sure the auto-commit is retries with the latest offsets + requestAttempt.offsets = subscriptions.allConsumed(); + maybeAutoCommitSyncNowWithRetries(requestAttempt, result); + } + } else { + log.debug("Auto-commit sync failed with non-retriable error", error); + result.completeExceptionally(error); + } + } + }); } - private BiConsumer<? super Void, ? super Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) { + /** + * Clear the inflight auto-commit flag and log auto-commit completion status. + */ + private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) { return (response, throwable) -> { autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); if (throwable == null) { offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets); - log.debug("Completed asynchronous auto-commit of offsets {}", allConsumedOffsets); + log.debug("Completed auto-commit of offsets {}", allConsumedOffsets); } else if (throwable instanceof RetriableCommitFailedException) { - log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", + log.debug("Auto-commit of offsets {} failed due to retriable error: {}", allConsumedOffsets, throwable.getMessage()); } else { - log.warn("Asynchronous auto-commit of offsets {} failed", allConsumedOffsets, throwable); + log.warn("Auto-commit of offsets {} failed", allConsumedOffsets, throwable); } }; } /** - * Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an - * {@link OffsetCommitRequestState} and enqueue it to send later. + * Generate a request to commit offsets without retrying, even if it fails with a retriable + * error. The generated request will be added to the queue to be sent on the next call to + * {@link #poll(long)}. + * + * @param offsets Offsets to commit per partition. + * @return Future that will complete when a response is received, successfully or + * exceptionally depending on the response. If the request fails with a retriable error, the + * future will be completed with a {@link RetriableCommitFailedException}. */ - public CompletableFuture<Void> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { + public CompletableFuture<Void> commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) { if (offsets.isEmpty()) { log.debug("Skipping commit of empty offsets"); return CompletableFuture.completedFuture(null); } - return pendingRequests.addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch).future; + OffsetCommitRequestState commitRequest = createOffsetCommitRequest(offsets, Optional.empty()); + pendingRequests.addOffsetCommitRequest(commitRequest); + + CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>(); + commitRequest.future.whenComplete((committedOffsets, error) -> { + if (error != null) { + Throwable asyncCommitException; Review Comment: For `commitSync` you have `commitSyncExceptionForError`, so maybe we want to extract this into `commitAsyncExceptionForError`? Symmetry ftw ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? extends RequestState> request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ - private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - boolean checkInterval, - boolean retryOnStaleEpoch) { - if (!autoCommitEnabled()) { - log.debug("Skipping auto-commit because auto-commit config is not enabled."); - return CompletableFuture.completedFuture(null); - } - + private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); - if (checkInterval && !autocommit.shouldAutoCommit()) { - return CompletableFuture.completedFuture(null); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result; + if (requestState.offsets.isEmpty()) { + result = CompletableFuture.completedFuture(Collections.emptyMap()); + } else { + autocommit.setInflightCommitStatus(true); + OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); + result = request.future; + result.whenComplete(autoCommitCallback(request.offsets)); } - - autocommit.resetTimer(); - autocommit.setInflightCommitStatus(true); - CompletableFuture<Void> result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) - .whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * <p/> + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() { - if (!autoCommitEnabled()) { - // Early return to ensure that no action/logging is performed. - return CompletableFuture.completedFuture(null); + public void maybeAutoCommitAsync() { + if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) { + OffsetCommitRequestState requestState = createOffsetCommitRequest( + subscriptions.allConsumed(), + Optional.empty()); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = requestAutoCommit(requestState); + // Reset timer to the interval (even if no request was generated), but ensure that if + // the request completes with a retriable error, the timer is reset to send the next + // auto-commit after the backoff expires. + resetAutoCommitTimer(); + maybeResetTimerWithBackoff(result); } - Map<TopicPartition, OffsetAndMetadata> offsets = subscriptions.allConsumed(); - CompletableFuture<Void> result = maybeAutoCommit(offsets, Optional.empty(), true, true); - result.whenComplete((__, error) -> { + } + + /** + * Reset auto-commit timer to retry with backoff if the future failed with a RetriableCommitFailedException. + */ + private void maybeResetTimerWithBackoff(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) { + result.whenComplete((offsets, error) -> { if (error != null) { if (error instanceof RetriableCommitFailedException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error.", offsets, error); resetAutoCommitTimer(retryBackoffMs); } else { - log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); + log.debug("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); } } else { log.debug("Completed asynchronous auto-commit of offsets {}", offsets); } }); - - return result; } /** - * Commit consumed offsets if auto-commit is enabled. Retry while the timer is not expired, - * until the request succeeds or fails with a fatal error. + * Commit consumed offsets if auto-commit is enabled, regardless of the auto-commit interval. + * This is used for committing offsets before revoking partitions. This will retry committing + * the latest offsets until the request succeeds, fails with a fatal error, or the timeout + * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as a retriable error, + * and will retry it including the latest member ID and epoch received from the broker. + * + * @return Future that will complete when the offsets are successfully committed. It will + * complete exceptionally if the commit fails with a non-retriable error, or if the retry + * timeout expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedNow( - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { - return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, false, retryOnStaleEpoch); + public CompletableFuture<Void> maybeAutoCommitSyncNow(final long retryExpirationTimeMs) { + if (!autoCommitEnabled()) { + return CompletableFuture.completedFuture(null); + } + + CompletableFuture<Void> result = new CompletableFuture<>(); + OffsetCommitRequestState requestState = + createOffsetCommitRequest(subscriptions.allConsumed(), Optional.of(retryExpirationTimeMs)); + maybeAutoCommitSyncNowWithRetries(requestState, result); + return result; + } + + private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState requestAttempt, + CompletableFuture<Void> result) { + requestAttempt.resetFuture(); Review Comment: Could be moved down, as in `commitSyncWithRetries` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? extends RequestState> request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ - private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - boolean checkInterval, - boolean retryOnStaleEpoch) { - if (!autoCommitEnabled()) { - log.debug("Skipping auto-commit because auto-commit config is not enabled."); - return CompletableFuture.completedFuture(null); - } - + private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); - if (checkInterval && !autocommit.shouldAutoCommit()) { - return CompletableFuture.completedFuture(null); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result; + if (requestState.offsets.isEmpty()) { + result = CompletableFuture.completedFuture(Collections.emptyMap()); + } else { + autocommit.setInflightCommitStatus(true); + OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); + result = request.future; + result.whenComplete(autoCommitCallback(request.offsets)); } - - autocommit.resetTimer(); - autocommit.setInflightCommitStatus(true); - CompletableFuture<Void> result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) - .whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * <p/> + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() { - if (!autoCommitEnabled()) { - // Early return to ensure that no action/logging is performed. - return CompletableFuture.completedFuture(null); + public void maybeAutoCommitAsync() { + if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) { + OffsetCommitRequestState requestState = createOffsetCommitRequest( + subscriptions.allConsumed(), + Optional.empty()); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = requestAutoCommit(requestState); + // Reset timer to the interval (even if no request was generated), but ensure that if + // the request completes with a retriable error, the timer is reset to send the next + // auto-commit after the backoff expires. + resetAutoCommitTimer(); + maybeResetTimerWithBackoff(result); } - Map<TopicPartition, OffsetAndMetadata> offsets = subscriptions.allConsumed(); - CompletableFuture<Void> result = maybeAutoCommit(offsets, Optional.empty(), true, true); - result.whenComplete((__, error) -> { + } + + /** + * Reset auto-commit timer to retry with backoff if the future failed with a RetriableCommitFailedException. + */ + private void maybeResetTimerWithBackoff(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) { + result.whenComplete((offsets, error) -> { if (error != null) { if (error instanceof RetriableCommitFailedException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error.", offsets, error); resetAutoCommitTimer(retryBackoffMs); } else { - log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); + log.debug("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); } } else { log.debug("Completed asynchronous auto-commit of offsets {}", offsets); } }); - - return result; } /** - * Commit consumed offsets if auto-commit is enabled. Retry while the timer is not expired, - * until the request succeeds or fails with a fatal error. + * Commit consumed offsets if auto-commit is enabled, regardless of the auto-commit interval. + * This is used for committing offsets before revoking partitions. This will retry committing + * the latest offsets until the request succeeds, fails with a fatal error, or the timeout + * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as a retriable error, + * and will retry it including the latest member ID and epoch received from the broker. + * + * @return Future that will complete when the offsets are successfully committed. It will + * complete exceptionally if the commit fails with a non-retriable error, or if the retry + * timeout expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedNow( - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { - return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, false, retryOnStaleEpoch); + public CompletableFuture<Void> maybeAutoCommitSyncNow(final long retryExpirationTimeMs) { + if (!autoCommitEnabled()) { + return CompletableFuture.completedFuture(null); + } + + CompletableFuture<Void> result = new CompletableFuture<>(); + OffsetCommitRequestState requestState = + createOffsetCommitRequest(subscriptions.allConsumed(), Optional.of(retryExpirationTimeMs)); + maybeAutoCommitSyncNowWithRetries(requestState, result); + return result; + } + + private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState requestAttempt, + CompletableFuture<Void> result) { + requestAttempt.resetFuture(); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAttempt = requestAutoCommit(requestAttempt); + commitAttempt.whenComplete((committedOffsets, error) -> { + if (error == null) { + result.complete(null); + } else { + if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { + if (error instanceof TimeoutException && requestAttempt.isExpired) { + log.debug("Auto-commit sync timed out and won't be retried anymore"); + result.completeExceptionally(error); + } else { + // Make sure the auto-commit is retries with the latest offsets + requestAttempt.offsets = subscriptions.allConsumed(); + maybeAutoCommitSyncNowWithRetries(requestAttempt, result); + } + } else { + log.debug("Auto-commit sync failed with non-retriable error", error); + result.completeExceptionally(error); + } + } + }); } - private BiConsumer<? super Void, ? super Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) { + /** + * Clear the inflight auto-commit flag and log auto-commit completion status. + */ + private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) { return (response, throwable) -> { autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); if (throwable == null) { offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets); - log.debug("Completed asynchronous auto-commit of offsets {}", allConsumedOffsets); + log.debug("Completed auto-commit of offsets {}", allConsumedOffsets); } else if (throwable instanceof RetriableCommitFailedException) { - log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", + log.debug("Auto-commit of offsets {} failed due to retriable error: {}", allConsumedOffsets, throwable.getMessage()); } else { - log.warn("Asynchronous auto-commit of offsets {} failed", allConsumedOffsets, throwable); + log.warn("Auto-commit of offsets {} failed", allConsumedOffsets, throwable); } }; } /** - * Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an - * {@link OffsetCommitRequestState} and enqueue it to send later. + * Generate a request to commit offsets without retrying, even if it fails with a retriable + * error. The generated request will be added to the queue to be sent on the next call to + * {@link #poll(long)}. + * + * @param offsets Offsets to commit per partition. + * @return Future that will complete when a response is received, successfully or + * exceptionally depending on the response. If the request fails with a retriable error, the + * future will be completed with a {@link RetriableCommitFailedException}. */ - public CompletableFuture<Void> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { + public CompletableFuture<Void> commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) { if (offsets.isEmpty()) { log.debug("Skipping commit of empty offsets"); return CompletableFuture.completedFuture(null); } - return pendingRequests.addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch).future; + OffsetCommitRequestState commitRequest = createOffsetCommitRequest(offsets, Optional.empty()); + pendingRequests.addOffsetCommitRequest(commitRequest); + + CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>(); + commitRequest.future.whenComplete((committedOffsets, error) -> { + if (error != null) { + Throwable asyncCommitException; + if (error instanceof RetriableException) { + asyncCommitException = new RetriableCommitFailedException(error.getMessage()); + } else { + asyncCommitException = error; + } + asyncCommitResult.completeExceptionally(asyncCommitException); + } else { + asyncCommitResult.complete(null); + } + }); + return asyncCommitResult; + } + + /** + * Commit offsets, retrying on expected retriable errors while the retry timeout hasn't expired. Review Comment: ```suggestion * Commit offsets, retrying on expected retriable errors while the retry timeout hasn't expired. * ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -1147,15 +1202,15 @@ List<NetworkClientDelegate.UnsentRequest> drain(final long currentTimeMs) { * futures with a TimeoutException. */ private void failAndRemoveExpiredCommitRequests(final long currentTimeMs) { - unsentOffsetCommits.removeIf(req -> req.maybeExpire(currentTimeMs)); + unsentOffsetCommits.forEach(req -> req.maybeExpire(currentTimeMs)); Review Comment: Foreach javadoc says > * The behavior of this method is unspecified if the action performs > * side-effects that modify the underlying source of elements, unless an > * overriding class has specified a concurrent modification policy. You are removing from `unsentOffsetCommits` inside `maybeExpire`. Are we sure we won't run into ConcurrentModificationExceptions here? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -719,26 +846,27 @@ NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final Abstr private void handleClientResponse(final ClientResponse response, final Throwable error, - final long currentTimeMs) { + final long requestCompletionTimeMs) { try { if (error == null) { onResponse(response); } else { log.debug("{} completed with error", requestDescription(), error); - handleCoordinatorDisconnect(error, currentTimeMs); - if (error instanceof RetriableException) { - maybeRetry(currentTimeMs, error); - } else { - future().completeExceptionally(error); - } + onFailedAttempt(requestCompletionTimeMs); + handleCoordinatorDisconnect(error, requestCompletionTimeMs); + future().completeExceptionally(error); } } catch (Throwable t) { - log.error("Unexpected error handling response for ", requestDescription(), t); + log.error("Unexpected error handling response for {}", requestDescription(), t); future().completeExceptionally(t); } } abstract void onResponse(final ClientResponse response); + + abstract boolean retryTimeoutExpired(long currentTimeMs); Review Comment: Considered moving `expirationTimeout` to this super-class and inlining this? After all, expiration timeout is functionality that really belongs with the retry logic ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? extends RequestState> request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ - private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - boolean checkInterval, - boolean retryOnStaleEpoch) { - if (!autoCommitEnabled()) { - log.debug("Skipping auto-commit because auto-commit config is not enabled."); - return CompletableFuture.completedFuture(null); - } - + private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); - if (checkInterval && !autocommit.shouldAutoCommit()) { - return CompletableFuture.completedFuture(null); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result; + if (requestState.offsets.isEmpty()) { + result = CompletableFuture.completedFuture(Collections.emptyMap()); + } else { + autocommit.setInflightCommitStatus(true); + OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); + result = request.future; + result.whenComplete(autoCommitCallback(request.offsets)); } - - autocommit.resetTimer(); - autocommit.setInflightCommitStatus(true); - CompletableFuture<Void> result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) - .whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * <p/> + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() { - if (!autoCommitEnabled()) { - // Early return to ensure that no action/logging is performed. - return CompletableFuture.completedFuture(null); + public void maybeAutoCommitAsync() { + if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) { + OffsetCommitRequestState requestState = createOffsetCommitRequest( + subscriptions.allConsumed(), + Optional.empty()); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = requestAutoCommit(requestState); + // Reset timer to the interval (even if no request was generated), but ensure that if + // the request completes with a retriable error, the timer is reset to send the next + // auto-commit after the backoff expires. + resetAutoCommitTimer(); + maybeResetTimerWithBackoff(result); } - Map<TopicPartition, OffsetAndMetadata> offsets = subscriptions.allConsumed(); - CompletableFuture<Void> result = maybeAutoCommit(offsets, Optional.empty(), true, true); - result.whenComplete((__, error) -> { + } + + /** + * Reset auto-commit timer to retry with backoff if the future failed with a RetriableCommitFailedException. + */ + private void maybeResetTimerWithBackoff(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) { + result.whenComplete((offsets, error) -> { if (error != null) { if (error instanceof RetriableCommitFailedException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error.", offsets, error); resetAutoCommitTimer(retryBackoffMs); } else { - log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); + log.debug("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); } } else { log.debug("Completed asynchronous auto-commit of offsets {}", offsets); } }); - - return result; } /** - * Commit consumed offsets if auto-commit is enabled. Retry while the timer is not expired, - * until the request succeeds or fails with a fatal error. + * Commit consumed offsets if auto-commit is enabled, regardless of the auto-commit interval. + * This is used for committing offsets before revoking partitions. This will retry committing + * the latest offsets until the request succeeds, fails with a fatal error, or the timeout + * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as a retriable error, + * and will retry it including the latest member ID and epoch received from the broker. + * + * @return Future that will complete when the offsets are successfully committed. It will + * complete exceptionally if the commit fails with a non-retriable error, or if the retry + * timeout expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedNow( - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { - return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, false, retryOnStaleEpoch); + public CompletableFuture<Void> maybeAutoCommitSyncNow(final long retryExpirationTimeMs) { + if (!autoCommitEnabled()) { + return CompletableFuture.completedFuture(null); + } + + CompletableFuture<Void> result = new CompletableFuture<>(); + OffsetCommitRequestState requestState = + createOffsetCommitRequest(subscriptions.allConsumed(), Optional.of(retryExpirationTimeMs)); + maybeAutoCommitSyncNowWithRetries(requestState, result); + return result; + } + + private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState requestAttempt, + CompletableFuture<Void> result) { + requestAttempt.resetFuture(); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAttempt = requestAutoCommit(requestAttempt); + commitAttempt.whenComplete((committedOffsets, error) -> { + if (error == null) { + result.complete(null); + } else { + if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { + if (error instanceof TimeoutException && requestAttempt.isExpired) { + log.debug("Auto-commit sync timed out and won't be retried anymore"); + result.completeExceptionally(error); + } else { + // Make sure the auto-commit is retries with the latest offsets + requestAttempt.offsets = subscriptions.allConsumed(); + maybeAutoCommitSyncNowWithRetries(requestAttempt, result); + } + } else { + log.debug("Auto-commit sync failed with non-retriable error", error); + result.completeExceptionally(error); + } + } + }); } - private BiConsumer<? super Void, ? super Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) { + /** + * Clear the inflight auto-commit flag and log auto-commit completion status. + */ + private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) { return (response, throwable) -> { autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); if (throwable == null) { offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets); - log.debug("Completed asynchronous auto-commit of offsets {}", allConsumedOffsets); + log.debug("Completed auto-commit of offsets {}", allConsumedOffsets); } else if (throwable instanceof RetriableCommitFailedException) { - log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", + log.debug("Auto-commit of offsets {} failed due to retriable error: {}", allConsumedOffsets, throwable.getMessage()); } else { - log.warn("Asynchronous auto-commit of offsets {} failed", allConsumedOffsets, throwable); + log.warn("Auto-commit of offsets {} failed", allConsumedOffsets, throwable); } }; } /** - * Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an - * {@link OffsetCommitRequestState} and enqueue it to send later. + * Generate a request to commit offsets without retrying, even if it fails with a retriable + * error. The generated request will be added to the queue to be sent on the next call to + * {@link #poll(long)}. + * + * @param offsets Offsets to commit per partition. + * @return Future that will complete when a response is received, successfully or + * exceptionally depending on the response. If the request fails with a retriable error, the + * future will be completed with a {@link RetriableCommitFailedException}. */ - public CompletableFuture<Void> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { + public CompletableFuture<Void> commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) { if (offsets.isEmpty()) { log.debug("Skipping commit of empty offsets"); return CompletableFuture.completedFuture(null); } - return pendingRequests.addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch).future; + OffsetCommitRequestState commitRequest = createOffsetCommitRequest(offsets, Optional.empty()); + pendingRequests.addOffsetCommitRequest(commitRequest); + + CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>(); + commitRequest.future.whenComplete((committedOffsets, error) -> { + if (error != null) { + Throwable asyncCommitException; + if (error instanceof RetriableException) { + asyncCommitException = new RetriableCommitFailedException(error.getMessage()); + } else { + asyncCommitException = error; + } + asyncCommitResult.completeExceptionally(asyncCommitException); + } else { + asyncCommitResult.complete(null); + } + }); + return asyncCommitResult; + } + + /** + * Commit offsets, retrying on expected retriable errors while the retry timeout hasn't expired. + * @param offsets Offsets to commit + * @param retryExpirationTimeMs Time until which the request will be retried if it fails with + * an expected retriable error. + * @return Future that will complete when a successful response + */ + public CompletableFuture<Void> commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, + final long retryExpirationTimeMs) { + CompletableFuture<Void> result = new CompletableFuture<>(); + OffsetCommitRequestState requestState = createOffsetCommitRequest( + offsets, + Optional.of(retryExpirationTimeMs)); + commitSyncWithRetries(requestState, result); + return result; + } + + private OffsetCommitRequestState createOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets, + final Optional<Long> expirationTimeMs) { + return jitter.isPresent() ? + new OffsetCommitRequestState( + offsets, + groupId, + groupInstanceId, + expirationTimeMs, + retryBackoffMs, + retryBackoffMaxMs, + jitter.getAsDouble(), + memberInfo) : + new OffsetCommitRequestState( + offsets, + groupId, + groupInstanceId, + expirationTimeMs, + retryBackoffMs, + retryBackoffMaxMs, + memberInfo); + } + + private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt, + CompletableFuture<Void> result) { + requestAttempt.resetFuture(); Review Comment: In the first call we just created a fresh future. Maybe move the `resetFuture` to the `commitSyncWithRetries` call below? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? extends RequestState> request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ - private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - boolean checkInterval, - boolean retryOnStaleEpoch) { - if (!autoCommitEnabled()) { - log.debug("Skipping auto-commit because auto-commit config is not enabled."); - return CompletableFuture.completedFuture(null); - } - + private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); - if (checkInterval && !autocommit.shouldAutoCommit()) { - return CompletableFuture.completedFuture(null); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result; + if (requestState.offsets.isEmpty()) { + result = CompletableFuture.completedFuture(Collections.emptyMap()); + } else { + autocommit.setInflightCommitStatus(true); + OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); + result = request.future; + result.whenComplete(autoCommitCallback(request.offsets)); } - - autocommit.resetTimer(); - autocommit.setInflightCommitStatus(true); - CompletableFuture<Void> result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) - .whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * <p/> + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() { - if (!autoCommitEnabled()) { - // Early return to ensure that no action/logging is performed. - return CompletableFuture.completedFuture(null); + public void maybeAutoCommitAsync() { + if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) { + OffsetCommitRequestState requestState = createOffsetCommitRequest( + subscriptions.allConsumed(), + Optional.empty()); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = requestAutoCommit(requestState); + // Reset timer to the interval (even if no request was generated), but ensure that if + // the request completes with a retriable error, the timer is reset to send the next + // auto-commit after the backoff expires. + resetAutoCommitTimer(); + maybeResetTimerWithBackoff(result); } - Map<TopicPartition, OffsetAndMetadata> offsets = subscriptions.allConsumed(); - CompletableFuture<Void> result = maybeAutoCommit(offsets, Optional.empty(), true, true); - result.whenComplete((__, error) -> { + } + + /** + * Reset auto-commit timer to retry with backoff if the future failed with a RetriableCommitFailedException. + */ + private void maybeResetTimerWithBackoff(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) { + result.whenComplete((offsets, error) -> { if (error != null) { if (error instanceof RetriableCommitFailedException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error.", offsets, error); resetAutoCommitTimer(retryBackoffMs); } else { - log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); + log.debug("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); } } else { log.debug("Completed asynchronous auto-commit of offsets {}", offsets); } }); - - return result; } /** - * Commit consumed offsets if auto-commit is enabled. Retry while the timer is not expired, - * until the request succeeds or fails with a fatal error. + * Commit consumed offsets if auto-commit is enabled, regardless of the auto-commit interval. + * This is used for committing offsets before revoking partitions. This will retry committing + * the latest offsets until the request succeeds, fails with a fatal error, or the timeout + * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as a retriable error, + * and will retry it including the latest member ID and epoch received from the broker. + * + * @return Future that will complete when the offsets are successfully committed. It will + * complete exceptionally if the commit fails with a non-retriable error, or if the retry + * timeout expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedNow( - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { - return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, false, retryOnStaleEpoch); + public CompletableFuture<Void> maybeAutoCommitSyncNow(final long retryExpirationTimeMs) { + if (!autoCommitEnabled()) { + return CompletableFuture.completedFuture(null); + } + + CompletableFuture<Void> result = new CompletableFuture<>(); + OffsetCommitRequestState requestState = + createOffsetCommitRequest(subscriptions.allConsumed(), Optional.of(retryExpirationTimeMs)); + maybeAutoCommitSyncNowWithRetries(requestState, result); + return result; + } + + private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState requestAttempt, Review Comment: Why `maybe`? pretty sure we are doing it ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? extends RequestState> request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ - private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - boolean checkInterval, - boolean retryOnStaleEpoch) { - if (!autoCommitEnabled()) { - log.debug("Skipping auto-commit because auto-commit config is not enabled."); - return CompletableFuture.completedFuture(null); - } - + private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); - if (checkInterval && !autocommit.shouldAutoCommit()) { - return CompletableFuture.completedFuture(null); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result; + if (requestState.offsets.isEmpty()) { + result = CompletableFuture.completedFuture(Collections.emptyMap()); + } else { + autocommit.setInflightCommitStatus(true); + OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); + result = request.future; + result.whenComplete(autoCommitCallback(request.offsets)); } - - autocommit.resetTimer(); - autocommit.setInflightCommitStatus(true); - CompletableFuture<Void> result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) - .whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * <p/> + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() { - if (!autoCommitEnabled()) { - // Early return to ensure that no action/logging is performed. - return CompletableFuture.completedFuture(null); + public void maybeAutoCommitAsync() { + if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) { + OffsetCommitRequestState requestState = createOffsetCommitRequest( + subscriptions.allConsumed(), + Optional.empty()); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = requestAutoCommit(requestState); + // Reset timer to the interval (even if no request was generated), but ensure that if + // the request completes with a retriable error, the timer is reset to send the next + // auto-commit after the backoff expires. + resetAutoCommitTimer(); + maybeResetTimerWithBackoff(result); } - Map<TopicPartition, OffsetAndMetadata> offsets = subscriptions.allConsumed(); - CompletableFuture<Void> result = maybeAutoCommit(offsets, Optional.empty(), true, true); - result.whenComplete((__, error) -> { + } + + /** + * Reset auto-commit timer to retry with backoff if the future failed with a RetriableCommitFailedException. + */ + private void maybeResetTimerWithBackoff(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) { + result.whenComplete((offsets, error) -> { if (error != null) { if (error instanceof RetriableCommitFailedException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error.", offsets, error); resetAutoCommitTimer(retryBackoffMs); } else { - log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); + log.debug("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); } } else { log.debug("Completed asynchronous auto-commit of offsets {}", offsets); } }); - - return result; } /** - * Commit consumed offsets if auto-commit is enabled. Retry while the timer is not expired, - * until the request succeeds or fails with a fatal error. + * Commit consumed offsets if auto-commit is enabled, regardless of the auto-commit interval. + * This is used for committing offsets before revoking partitions. This will retry committing + * the latest offsets until the request succeeds, fails with a fatal error, or the timeout + * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as a retriable error, + * and will retry it including the latest member ID and epoch received from the broker. + * + * @return Future that will complete when the offsets are successfully committed. It will + * complete exceptionally if the commit fails with a non-retriable error, or if the retry + * timeout expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedNow( - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { - return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, false, retryOnStaleEpoch); + public CompletableFuture<Void> maybeAutoCommitSyncNow(final long retryExpirationTimeMs) { + if (!autoCommitEnabled()) { + return CompletableFuture.completedFuture(null); + } + + CompletableFuture<Void> result = new CompletableFuture<>(); + OffsetCommitRequestState requestState = + createOffsetCommitRequest(subscriptions.allConsumed(), Optional.of(retryExpirationTimeMs)); + maybeAutoCommitSyncNowWithRetries(requestState, result); + return result; + } + + private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState requestAttempt, + CompletableFuture<Void> result) { + requestAttempt.resetFuture(); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAttempt = requestAutoCommit(requestAttempt); + commitAttempt.whenComplete((committedOffsets, error) -> { + if (error == null) { + result.complete(null); + } else { + if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { + if (error instanceof TimeoutException && requestAttempt.isExpired) { + log.debug("Auto-commit sync timed out and won't be retried anymore"); + result.completeExceptionally(error); + } else { + // Make sure the auto-commit is retries with the latest offsets + requestAttempt.offsets = subscriptions.allConsumed(); + maybeAutoCommitSyncNowWithRetries(requestAttempt, result); + } + } else { + log.debug("Auto-commit sync failed with non-retriable error", error); + result.completeExceptionally(error); + } + } + }); } - private BiConsumer<? super Void, ? super Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) { + /** + * Clear the inflight auto-commit flag and log auto-commit completion status. + */ + private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) { return (response, throwable) -> { autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); if (throwable == null) { offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets); - log.debug("Completed asynchronous auto-commit of offsets {}", allConsumedOffsets); + log.debug("Completed auto-commit of offsets {}", allConsumedOffsets); } else if (throwable instanceof RetriableCommitFailedException) { - log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", + log.debug("Auto-commit of offsets {} failed due to retriable error: {}", allConsumedOffsets, throwable.getMessage()); } else { - log.warn("Asynchronous auto-commit of offsets {} failed", allConsumedOffsets, throwable); + log.warn("Auto-commit of offsets {} failed", allConsumedOffsets, throwable); } }; } /** - * Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an - * {@link OffsetCommitRequestState} and enqueue it to send later. + * Generate a request to commit offsets without retrying, even if it fails with a retriable + * error. The generated request will be added to the queue to be sent on the next call to + * {@link #poll(long)}. + * + * @param offsets Offsets to commit per partition. + * @return Future that will complete when a response is received, successfully or + * exceptionally depending on the response. If the request fails with a retriable error, the + * future will be completed with a {@link RetriableCommitFailedException}. */ - public CompletableFuture<Void> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { + public CompletableFuture<Void> commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) { if (offsets.isEmpty()) { log.debug("Skipping commit of empty offsets"); return CompletableFuture.completedFuture(null); } - return pendingRequests.addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch).future; + OffsetCommitRequestState commitRequest = createOffsetCommitRequest(offsets, Optional.empty()); + pendingRequests.addOffsetCommitRequest(commitRequest); + + CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>(); + commitRequest.future.whenComplete((committedOffsets, error) -> { + if (error != null) { + Throwable asyncCommitException; + if (error instanceof RetriableException) { + asyncCommitException = new RetriableCommitFailedException(error.getMessage()); + } else { + asyncCommitException = error; + } + asyncCommitResult.completeExceptionally(asyncCommitException); + } else { + asyncCommitResult.complete(null); + } + }); + return asyncCommitResult; + } + + /** + * Commit offsets, retrying on expected retriable errors while the retry timeout hasn't expired. + * @param offsets Offsets to commit + * @param retryExpirationTimeMs Time until which the request will be retried if it fails with + * an expected retriable error. + * @return Future that will complete when a successful response + */ + public CompletableFuture<Void> commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, + final long retryExpirationTimeMs) { + CompletableFuture<Void> result = new CompletableFuture<>(); + OffsetCommitRequestState requestState = createOffsetCommitRequest( + offsets, + Optional.of(retryExpirationTimeMs)); + commitSyncWithRetries(requestState, result); + return result; + } + + private OffsetCommitRequestState createOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets, + final Optional<Long> expirationTimeMs) { + return jitter.isPresent() ? + new OffsetCommitRequestState( + offsets, + groupId, + groupInstanceId, + expirationTimeMs, + retryBackoffMs, + retryBackoffMaxMs, + jitter.getAsDouble(), + memberInfo) : + new OffsetCommitRequestState( + offsets, + groupId, + groupInstanceId, + expirationTimeMs, + retryBackoffMs, + retryBackoffMaxMs, + memberInfo); + } + + private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt, + CompletableFuture<Void> result) { + requestAttempt.resetFuture(); + pendingRequests.addOffsetCommitRequest(requestAttempt); + + // Retry the same commit request while it fails with RetriableException and the retry + // timeout hasn't expired. + requestAttempt.future.whenComplete((res, error) -> { + if (error == null) { + result.complete(null); + } else { + if (error instanceof RetriableException) { + // if (error instanceof TimeoutException && requestAttempt.retryTimeoutExpired Review Comment: Do we want to commit that commented out code? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ########## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection<? extends RequestState> request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ - private CompletableFuture<Void> maybeAutoCommit(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - boolean checkInterval, - boolean retryOnStaleEpoch) { - if (!autoCommitEnabled()) { - log.debug("Skipping auto-commit because auto-commit config is not enabled."); - return CompletableFuture.completedFuture(null); - } - + private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); - if (checkInterval && !autocommit.shouldAutoCommit()) { - return CompletableFuture.completedFuture(null); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result; + if (requestState.offsets.isEmpty()) { + result = CompletableFuture.completedFuture(Collections.emptyMap()); + } else { + autocommit.setInflightCommitStatus(true); + OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); + result = request.future; + result.whenComplete(autoCommitCallback(request.offsets)); } - - autocommit.resetTimer(); - autocommit.setInflightCommitStatus(true); - CompletableFuture<Void> result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) - .whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * <p/> + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() { - if (!autoCommitEnabled()) { - // Early return to ensure that no action/logging is performed. - return CompletableFuture.completedFuture(null); + public void maybeAutoCommitAsync() { + if (autoCommitEnabled() && autoCommitState.get().shouldAutoCommit()) { + OffsetCommitRequestState requestState = createOffsetCommitRequest( + subscriptions.allConsumed(), + Optional.empty()); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = requestAutoCommit(requestState); + // Reset timer to the interval (even if no request was generated), but ensure that if + // the request completes with a retriable error, the timer is reset to send the next + // auto-commit after the backoff expires. + resetAutoCommitTimer(); + maybeResetTimerWithBackoff(result); } - Map<TopicPartition, OffsetAndMetadata> offsets = subscriptions.allConsumed(); - CompletableFuture<Void> result = maybeAutoCommit(offsets, Optional.empty(), true, true); - result.whenComplete((__, error) -> { + } + + /** + * Reset auto-commit timer to retry with backoff if the future failed with a RetriableCommitFailedException. + */ + private void maybeResetTimerWithBackoff(final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) { + result.whenComplete((offsets, error) -> { if (error != null) { if (error instanceof RetriableCommitFailedException) { log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error.", offsets, error); resetAutoCommitTimer(retryBackoffMs); } else { - log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); + log.debug("Asynchronous auto-commit of offsets {} failed: {}", offsets, error.getMessage()); } } else { log.debug("Completed asynchronous auto-commit of offsets {}", offsets); } }); - - return result; } /** - * Commit consumed offsets if auto-commit is enabled. Retry while the timer is not expired, - * until the request succeeds or fails with a fatal error. + * Commit consumed offsets if auto-commit is enabled, regardless of the auto-commit interval. + * This is used for committing offsets before revoking partitions. This will retry committing + * the latest offsets until the request succeeds, fails with a fatal error, or the timeout + * expires. Note that this considers {@link Errors#STALE_MEMBER_EPOCH} as a retriable error, + * and will retry it including the latest member ID and epoch received from the broker. + * + * @return Future that will complete when the offsets are successfully committed. It will + * complete exceptionally if the commit fails with a non-retriable error, or if the retry + * timeout expires. */ - public CompletableFuture<Void> maybeAutoCommitAllConsumedNow( - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { - return maybeAutoCommit(subscriptions.allConsumed(), expirationTimeMs, false, retryOnStaleEpoch); + public CompletableFuture<Void> maybeAutoCommitSyncNow(final long retryExpirationTimeMs) { + if (!autoCommitEnabled()) { + return CompletableFuture.completedFuture(null); + } + + CompletableFuture<Void> result = new CompletableFuture<>(); + OffsetCommitRequestState requestState = + createOffsetCommitRequest(subscriptions.allConsumed(), Optional.of(retryExpirationTimeMs)); + maybeAutoCommitSyncNowWithRetries(requestState, result); + return result; + } + + private void maybeAutoCommitSyncNowWithRetries(OffsetCommitRequestState requestAttempt, + CompletableFuture<Void> result) { + requestAttempt.resetFuture(); + CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAttempt = requestAutoCommit(requestAttempt); + commitAttempt.whenComplete((committedOffsets, error) -> { + if (error == null) { + result.complete(null); + } else { + if (error instanceof RetriableException || isStaleEpochErrorAndValidEpochAvailable(error)) { + if (error instanceof TimeoutException && requestAttempt.isExpired) { + log.debug("Auto-commit sync timed out and won't be retried anymore"); + result.completeExceptionally(error); + } else { + // Make sure the auto-commit is retries with the latest offsets + requestAttempt.offsets = subscriptions.allConsumed(); + maybeAutoCommitSyncNowWithRetries(requestAttempt, result); + } + } else { + log.debug("Auto-commit sync failed with non-retriable error", error); + result.completeExceptionally(error); + } + } + }); } - private BiConsumer<? super Void, ? super Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) { + /** + * Clear the inflight auto-commit flag and log auto-commit completion status. + */ + private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super Throwable> autoCommitCallback(final Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) { return (response, throwable) -> { autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false)); if (throwable == null) { offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets); - log.debug("Completed asynchronous auto-commit of offsets {}", allConsumedOffsets); + log.debug("Completed auto-commit of offsets {}", allConsumedOffsets); } else if (throwable instanceof RetriableCommitFailedException) { - log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", + log.debug("Auto-commit of offsets {} failed due to retriable error: {}", allConsumedOffsets, throwable.getMessage()); } else { - log.warn("Asynchronous auto-commit of offsets {} failed", allConsumedOffsets, throwable); + log.warn("Auto-commit of offsets {} failed", allConsumedOffsets, throwable); } }; } /** - * Handles {@link org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent}. It creates an - * {@link OffsetCommitRequestState} and enqueue it to send later. + * Generate a request to commit offsets without retrying, even if it fails with a retriable + * error. The generated request will be added to the queue to be sent on the next call to + * {@link #poll(long)}. + * + * @param offsets Offsets to commit per partition. + * @return Future that will complete when a response is received, successfully or + * exceptionally depending on the response. If the request fails with a retriable error, the + * future will be completed with a {@link RetriableCommitFailedException}. */ - public CompletableFuture<Void> addOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets, - final Optional<Long> expirationTimeMs, - final boolean retryOnStaleEpoch) { + public CompletableFuture<Void> commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets) { if (offsets.isEmpty()) { log.debug("Skipping commit of empty offsets"); return CompletableFuture.completedFuture(null); } - return pendingRequests.addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch).future; + OffsetCommitRequestState commitRequest = createOffsetCommitRequest(offsets, Optional.empty()); + pendingRequests.addOffsetCommitRequest(commitRequest); + + CompletableFuture<Void> asyncCommitResult = new CompletableFuture<>(); + commitRequest.future.whenComplete((committedOffsets, error) -> { + if (error != null) { + Throwable asyncCommitException; + if (error instanceof RetriableException) { + asyncCommitException = new RetriableCommitFailedException(error.getMessage()); + } else { + asyncCommitException = error; + } + asyncCommitResult.completeExceptionally(asyncCommitException); + } else { + asyncCommitResult.complete(null); + } + }); + return asyncCommitResult; + } + + /** + * Commit offsets, retrying on expected retriable errors while the retry timeout hasn't expired. + * @param offsets Offsets to commit + * @param retryExpirationTimeMs Time until which the request will be retried if it fails with + * an expected retriable error. + * @return Future that will complete when a successful response + */ + public CompletableFuture<Void> commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, + final long retryExpirationTimeMs) { + CompletableFuture<Void> result = new CompletableFuture<>(); + OffsetCommitRequestState requestState = createOffsetCommitRequest( + offsets, + Optional.of(retryExpirationTimeMs)); + commitSyncWithRetries(requestState, result); + return result; + } + + private OffsetCommitRequestState createOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets, + final Optional<Long> expirationTimeMs) { + return jitter.isPresent() ? + new OffsetCommitRequestState( + offsets, + groupId, + groupInstanceId, + expirationTimeMs, + retryBackoffMs, + retryBackoffMaxMs, + jitter.getAsDouble(), + memberInfo) : + new OffsetCommitRequestState( + offsets, + groupId, + groupInstanceId, + expirationTimeMs, + retryBackoffMs, + retryBackoffMaxMs, + memberInfo); + } + + private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt, + CompletableFuture<Void> result) { + requestAttempt.resetFuture(); + pendingRequests.addOffsetCommitRequest(requestAttempt); + + // Retry the same commit request while it fails with RetriableException and the retry + // timeout hasn't expired. + requestAttempt.future.whenComplete((res, error) -> { + if (error == null) { + result.complete(null); + } else { + if (error instanceof RetriableException) { + // if (error instanceof TimeoutException && requestAttempt.retryTimeoutExpired + // (requestAttempt.lastReceivedMs)) { + if (error instanceof TimeoutException && requestAttempt.isExpired) { + log.debug("OffsetCommit timeout expired so it won't be retried anymore"); Review Comment: Maybe that's even worth and `info` log? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org