Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lucasbru merged PR #15357: URL: https://github.com/apache/kafka/pull/15357 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on PR #15357: URL: https://github.com/apache/kafka/pull/15357#issuecomment-1954655740 Thanks for the comments @lucasbru , all addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1496179477 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -1147,15 +1202,15 @@ List drain(final long currentTimeMs) { * futures with a TimeoutException. */ private void failAndRemoveExpiredCommitRequests(final long currentTimeMs) { -unsentOffsetCommits.removeIf(req -> req.maybeExpire(currentTimeMs)); +unsentOffsetCommits.forEach(req -> req.maybeExpire(currentTimeMs)); Review Comment: Definitely, good catch, fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1496101641 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ -private CompletableFuture maybeAutoCommit(final Map offsets, -final Optional expirationTimeMs, -boolean checkInterval, -boolean retryOnStaleEpoch) { -if (!autoCommitEnabled()) { -log.debug("Skipping auto-commit because auto-commit config is not enabled."); -return CompletableFuture.completedFuture(null); -} - +private CompletableFuture> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); -if (checkInterval && !autocommit.shouldAutoCommit()) { -return CompletableFuture.completedFuture(null); +CompletableFuture> result; +if (requestState.offsets.isEmpty()) { +result = CompletableFuture.completedFuture(Collections.emptyMap()); +} else { +autocommit.setInflightCommitStatus(true); +OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); +result = request.future; +result.whenComplete(autoCommitCallback(request.offsets)); } - -autocommit.resetTimer(); -autocommit.setInflightCommitStatus(true); -CompletableFuture result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) -.whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ -public CompletableFuture maybeAutoCommitAllConsumedAsync() { -if (!autoCommitEnabled()) { -// Early return to
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1496091256 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -719,26 +846,27 @@ NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(final Abstr private void handleClientResponse(final ClientResponse response, final Throwable error, - final long currentTimeMs) { + final long requestCompletionTimeMs) { try { if (error == null) { onResponse(response); } else { log.debug("{} completed with error", requestDescription(), error); -handleCoordinatorDisconnect(error, currentTimeMs); -if (error instanceof RetriableException) { -maybeRetry(currentTimeMs, error); -} else { -future().completeExceptionally(error); -} +onFailedAttempt(requestCompletionTimeMs); +handleCoordinatorDisconnect(error, requestCompletionTimeMs); +future().completeExceptionally(error); } } catch (Throwable t) { -log.error("Unexpected error handling response for ", requestDescription(), t); +log.error("Unexpected error handling response for {}", requestDescription(), t); future().completeExceptionally(t); } } abstract void onResponse(final ClientResponse response); + +abstract boolean retryTimeoutExpired(long currentTimeMs); Review Comment: Good point, done. I missed that probably influenced by the fact that, in the case of the commits, the expirationTimeout does not apply to all request. But I do agree with you that the expiration belongs with the retry logic so it does fit well at the RetriableRequest level (just Optional). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1496065426 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ -private CompletableFuture maybeAutoCommit(final Map offsets, -final Optional expirationTimeMs, -boolean checkInterval, -boolean retryOnStaleEpoch) { -if (!autoCommitEnabled()) { -log.debug("Skipping auto-commit because auto-commit config is not enabled."); -return CompletableFuture.completedFuture(null); -} - +private CompletableFuture> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); -if (checkInterval && !autocommit.shouldAutoCommit()) { -return CompletableFuture.completedFuture(null); +CompletableFuture> result; +if (requestState.offsets.isEmpty()) { +result = CompletableFuture.completedFuture(Collections.emptyMap()); +} else { +autocommit.setInflightCommitStatus(true); +OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); +result = request.future; +result.whenComplete(autoCommitCallback(request.offsets)); } - -autocommit.resetTimer(); -autocommit.setInflightCommitStatus(true); -CompletableFuture result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) -.whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ -public CompletableFuture maybeAutoCommitAllConsumedAsync() { -if (!autoCommitEnabled()) { -// Early return to
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1496060942 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ -private CompletableFuture maybeAutoCommit(final Map offsets, -final Optional expirationTimeMs, -boolean checkInterval, -boolean retryOnStaleEpoch) { -if (!autoCommitEnabled()) { -log.debug("Skipping auto-commit because auto-commit config is not enabled."); -return CompletableFuture.completedFuture(null); -} - +private CompletableFuture> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); -if (checkInterval && !autocommit.shouldAutoCommit()) { -return CompletableFuture.completedFuture(null); +CompletableFuture> result; +if (requestState.offsets.isEmpty()) { +result = CompletableFuture.completedFuture(Collections.emptyMap()); +} else { +autocommit.setInflightCommitStatus(true); +OffsetCommitRequestState request = pendingRequests.addOffsetCommitRequest(requestState); +result = request.future; +result.whenComplete(autoCommitCallback(request.offsets)); } - -autocommit.resetTimer(); -autocommit.setInflightCommitStatus(true); -CompletableFuture result = addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch) -.whenComplete(autoCommitCallback(offsets)); return result; } /** - * If auto-commit is enabled, this will generate a commit offsets request for all assigned - * partitions and their current positions. Note on auto-commit timers: this will reset the - * auto-commit timer to the interval before issuing the async commit, and when the async commit - * completes, it will reset the auto-commit timer with the exponential backoff if the request - * failed with a retriable error. - * - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * If auto-commit is enabled, and the auto-commit interval has expired, this will generate and + * enqueue a request to commit all consumed offsets, and will reset the auto-commit timer to the + * interval. The request will be sent on the next call to {@link #poll(long)}. + * + * If the request completes with a retriable error, this will reset the auto-commit timer with + * the exponential backoff. If it fails with a non-retriable error, no action is taken, so + * the next commit will be generated when the interval expires. */ -public CompletableFuture maybeAutoCommitAllConsumedAsync() { -if (!autoCommitEnabled()) { -// Early return to
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1496049319 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -149,8 +148,14 @@ private void process(final CommitApplicationEvent event) { } CommitRequestManager manager = requestManagers.commitRequestManager.get(); -Optional expirationTimeMs = event.retryTimeoutMs().map(this::getExpirationTimeForTimeout); -event.chain(manager.addOffsetCommitRequest(event.offsets(), expirationTimeMs, false)); +CompletableFuture commitResult; +if (event.retryTimeoutMs().isPresent()) { Review Comment: Say no more, I was leaning towards that too (just a bit hesitant to introduce more on this already big PR), but totally agree that it's conceptually clearer, the result is nice, done. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1344,8 +1343,8 @@ public void commitSync(Map offsets, Duration long commitStart = time.nanoseconds(); try { Timer requestTimer = time.timer(timeout.toMillis()); -// Commit with a timer to control how long the request should be retried until it -// gets a successful response or non-retriable error. +// Commit with a retry timeout (the commit request will be retried until it gets a +// successful response, non-retriable error, or the timeout expires) CompletableFuture commitFuture = commit(offsets, true, Optional.of(timeout.toMillis())); Review Comment: Makes sense, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]
lucasbru commented on code in PR #15357: URL: https://github.com/apache/kafka/pull/15357#discussion_r1494961008 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1344,8 +1343,8 @@ public void commitSync(Map offsets, Duration long commitStart = time.nanoseconds(); try { Timer requestTimer = time.timer(timeout.toMillis()); -// Commit with a timer to control how long the request should be retried until it -// gets a successful response or non-retriable error. +// Commit with a retry timeout (the commit request will be retried until it gets a +// successful response, non-retriable error, or the timeout expires) CompletableFuture commitFuture = commit(offsets, true, Optional.of(timeout.toMillis())); Review Comment: While you are at it, I think we can remove the "isWakeupable" parameter and just set the `wakeUpTrigger` in the calling context. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java: ## @@ -149,8 +148,14 @@ private void process(final CommitApplicationEvent event) { } CommitRequestManager manager = requestManagers.commitRequestManager.get(); -Optional expirationTimeMs = event.retryTimeoutMs().map(this::getExpirationTimeForTimeout); -event.chain(manager.addOffsetCommitRequest(event.offsets(), expirationTimeMs, false)); +CompletableFuture commitResult; +if (event.retryTimeoutMs().isPresent()) { Review Comment: It's a bit weird that we use `retryTimeoutMs` to carry the information that this is a sync-commit vs asynccommit. How about going all-in here and just having `AsyncCommitApplicationEvent` and a `SyncCommitApplicationEvent`? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -204,126 +205,315 @@ private static long findMinTime(final Collection request } /** - * Generate a request to commit offsets if auto-commit is enabled. The request will be - * returned to be sent out on the next call to {@link #poll(long)}. This will only generate a - * request if there is no other commit request already in-flight, and if the commit interval - * has elapsed. + * Generate a request to commit consumed offsets. Add the request to the queue of pending + * requests to be sent out on the next call to {@link #poll(long)}. If there are empty + * offsets to commit, no request will be generated and a completed future will be returned. * - * @param offsets Offsets to commit - * @param expirationTimeMs Time until which the request will continue to be retried if it - * fails with a retriable error. If not present, the request will be - * sent but not retried. - * @param checkInterval True if the auto-commit interval expiration should be checked for - * sending a request. If true, the request will be sent only if the - * auto-commit interval has expired. Pass false to - * send the auto-commit request regardless of the interval (ex. - * auto-commit before rebalance). - * @param retryOnStaleEpoch True if the request should be retried in case it fails with - * {@link Errors#STALE_MEMBER_EPOCH}. - * @return Future that will complete when a response is received for the request, or a - * completed future if no request is generated. + * @param requestState Commit request + * @return Future containing the offsets that were committed, or an error if the request + * failed. */ -private CompletableFuture maybeAutoCommit(final Map offsets, -final Optional expirationTimeMs, -boolean checkInterval, -boolean retryOnStaleEpoch) { -if (!autoCommitEnabled()) { -log.debug("Skipping auto-commit because auto-commit config is not enabled."); -return CompletableFuture.completedFuture(null); -} - +private CompletableFuture> requestAutoCommit(final OffsetCommitRequestState requestState) { AutoCommitState autocommit = autoCommitState.get(); -if (checkInterval && !autocommit.shouldAutoCommit()) { -return CompletableFuture.completedFuture(null); +CompletableFuture> result; +if (requestState.offsets.isEmpty()) { +result = CompletableFuture.completedFuture(Collections.emptyMap()); +} else { +autocommit.setInflightCommitStatus(true); +OffsetCommitRequestState request = pen
[PR] KAFKA-16033: Commit retry logic fixes [kafka]
lianetm opened a new pull request, #15357: URL: https://github.com/apache/kafka/pull/15357 This PR modifies the commit manager for improved retry logic & fixing bugs: - defines high level functions for each of the different types of commit: commitSync, commitAsync, autoCommitSync (used from consumer close), autoCommitAsync (on interval), autoCommitNow (before revocation). - moves retry logic to these caller functions, keeping a common response error handling that propagates errors that each caller functions retry as it needs. Fixes the following issues: - auto-commit before revocation should retry with latest consumed offsets - auto-commit before revocation should only reset the timer once, when the rebalance completes - StaleMemberEpoch error (fatal) is considered retriable only when committing offsets before revocation, where it is retried with backoff if the member has a valid epoch. All other commits will fail fatally on stale epoch. Note that auto commit on the interval (autoCommitAsync) does not have any specific retry logic for the stale epoch, but will effectively retry on the next interval (as it does for any other fatal error) - fix duplicate and noisy logs for auto-commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org