Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-21 Thread via GitHub


lucasbru merged PR #15357:
URL: https://github.com/apache/kafka/pull/15357


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on PR #15357:
URL: https://github.com/apache/kafka/pull/15357#issuecomment-1954655740

   Thanks for the comments @lucasbru , all addressed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1496179477


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -1147,15 +1202,15 @@ List drain(final 
long currentTimeMs) {
  * futures with a TimeoutException.
  */
 private void failAndRemoveExpiredCommitRequests(final long 
currentTimeMs) {
-unsentOffsetCommits.removeIf(req -> 
req.maybeExpire(currentTimeMs));
+unsentOffsetCommits.forEach(req -> req.maybeExpire(currentTimeMs));

Review Comment:
   Definitely, good catch, fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1496101641


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection request
 }
 
 /**
- * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
- * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
- * request if there is no other commit request already in-flight, and if 
the commit interval
- * has elapsed.
+ * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+ * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+ * offsets to commit, no request will be generated and a completed future 
will be returned.
  *
- * @param offsets   Offsets to commit
- * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
- *  fails with a retriable error. If not present, 
the request will be
- *  sent but not retried.
- * @param checkInterval True if the auto-commit interval expiration 
should be checked for
- *  sending a request. If true, the request will 
be sent only if the
- *  auto-commit interval has expired. Pass false to
- *  send the auto-commit request regardless of the 
interval (ex.
- *  auto-commit before rebalance).
- * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
- *  {@link Errors#STALE_MEMBER_EPOCH}.
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * @param requestState Commit request
+ * @return Future containing the offsets that were committed, or an error 
if the request
+ * failed.
  */
-private CompletableFuture maybeAutoCommit(final Map offsets,
-final Optional 
expirationTimeMs,
-boolean checkInterval,
-boolean retryOnStaleEpoch) 
{
-if (!autoCommitEnabled()) {
-log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-return CompletableFuture.completedFuture(null);
-}
-
+private CompletableFuture> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
 AutoCommitState autocommit = autoCommitState.get();
-if (checkInterval && !autocommit.shouldAutoCommit()) {
-return CompletableFuture.completedFuture(null);
+CompletableFuture> result;
+if (requestState.offsets.isEmpty()) {
+result = CompletableFuture.completedFuture(Collections.emptyMap());
+} else {
+autocommit.setInflightCommitStatus(true);
+OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+result = request.future;
+result.whenComplete(autoCommitCallback(request.offsets));
 }
-
-autocommit.resetTimer();
-autocommit.setInflightCommitStatus(true);
-CompletableFuture result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-.whenComplete(autoCommitCallback(offsets));
 return result;
 }
 
 /**
- * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
- * partitions and their current positions. Note on auto-commit timers: 
this will reset the
- * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
- * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
- * failed with a retriable error.
- *
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+ * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+ * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+ * 
+ * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+ * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+ * the next commit will be generated when the interval expires.
  */
-public CompletableFuture maybeAutoCommitAllConsumedAsync() {
-if (!autoCommitEnabled()) {
-// Early return to 

Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1496091256


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -719,26 +846,27 @@ NetworkClientDelegate.UnsentRequest 
buildRequestWithResponseHandling(final Abstr
 
 private void handleClientResponse(final ClientResponse response,
   final Throwable error,
-  final long currentTimeMs) {
+  final long requestCompletionTimeMs) {
 try {
 if (error == null) {
 onResponse(response);
 } else {
 log.debug("{} completed with error", requestDescription(), 
error);
-handleCoordinatorDisconnect(error, currentTimeMs);
-if (error instanceof RetriableException) {
-maybeRetry(currentTimeMs, error);
-} else {
-future().completeExceptionally(error);
-}
+onFailedAttempt(requestCompletionTimeMs);
+handleCoordinatorDisconnect(error, 
requestCompletionTimeMs);
+future().completeExceptionally(error);
 }
 } catch (Throwable t) {
-log.error("Unexpected error handling response for ", 
requestDescription(), t);
+log.error("Unexpected error handling response for {}", 
requestDescription(), t);
 future().completeExceptionally(t);
 }
 }
 
 abstract void onResponse(final ClientResponse response);
+
+abstract boolean retryTimeoutExpired(long currentTimeMs);

Review Comment:
   Good point, done. I missed that probably influenced by the fact that, in the 
case of the commits, the expirationTimeout does not apply to all request. But I 
do agree with you that the expiration belongs with the retry logic so it does 
fit well at the RetriableRequest level (just Optional).  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1496065426


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection request
 }
 
 /**
- * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
- * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
- * request if there is no other commit request already in-flight, and if 
the commit interval
- * has elapsed.
+ * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+ * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+ * offsets to commit, no request will be generated and a completed future 
will be returned.
  *
- * @param offsets   Offsets to commit
- * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
- *  fails with a retriable error. If not present, 
the request will be
- *  sent but not retried.
- * @param checkInterval True if the auto-commit interval expiration 
should be checked for
- *  sending a request. If true, the request will 
be sent only if the
- *  auto-commit interval has expired. Pass false to
- *  send the auto-commit request regardless of the 
interval (ex.
- *  auto-commit before rebalance).
- * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
- *  {@link Errors#STALE_MEMBER_EPOCH}.
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * @param requestState Commit request
+ * @return Future containing the offsets that were committed, or an error 
if the request
+ * failed.
  */
-private CompletableFuture maybeAutoCommit(final Map offsets,
-final Optional 
expirationTimeMs,
-boolean checkInterval,
-boolean retryOnStaleEpoch) 
{
-if (!autoCommitEnabled()) {
-log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-return CompletableFuture.completedFuture(null);
-}
-
+private CompletableFuture> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
 AutoCommitState autocommit = autoCommitState.get();
-if (checkInterval && !autocommit.shouldAutoCommit()) {
-return CompletableFuture.completedFuture(null);
+CompletableFuture> result;
+if (requestState.offsets.isEmpty()) {
+result = CompletableFuture.completedFuture(Collections.emptyMap());
+} else {
+autocommit.setInflightCommitStatus(true);
+OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+result = request.future;
+result.whenComplete(autoCommitCallback(request.offsets));
 }
-
-autocommit.resetTimer();
-autocommit.setInflightCommitStatus(true);
-CompletableFuture result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-.whenComplete(autoCommitCallback(offsets));
 return result;
 }
 
 /**
- * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
- * partitions and their current positions. Note on auto-commit timers: 
this will reset the
- * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
- * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
- * failed with a retriable error.
- *
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+ * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+ * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+ * 
+ * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+ * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+ * the next commit will be generated when the interval expires.
  */
-public CompletableFuture maybeAutoCommitAllConsumedAsync() {
-if (!autoCommitEnabled()) {
-// Early return to 

Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1496060942


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection request
 }
 
 /**
- * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
- * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
- * request if there is no other commit request already in-flight, and if 
the commit interval
- * has elapsed.
+ * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+ * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+ * offsets to commit, no request will be generated and a completed future 
will be returned.
  *
- * @param offsets   Offsets to commit
- * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
- *  fails with a retriable error. If not present, 
the request will be
- *  sent but not retried.
- * @param checkInterval True if the auto-commit interval expiration 
should be checked for
- *  sending a request. If true, the request will 
be sent only if the
- *  auto-commit interval has expired. Pass false to
- *  send the auto-commit request regardless of the 
interval (ex.
- *  auto-commit before rebalance).
- * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
- *  {@link Errors#STALE_MEMBER_EPOCH}.
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * @param requestState Commit request
+ * @return Future containing the offsets that were committed, or an error 
if the request
+ * failed.
  */
-private CompletableFuture maybeAutoCommit(final Map offsets,
-final Optional 
expirationTimeMs,
-boolean checkInterval,
-boolean retryOnStaleEpoch) 
{
-if (!autoCommitEnabled()) {
-log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-return CompletableFuture.completedFuture(null);
-}
-
+private CompletableFuture> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
 AutoCommitState autocommit = autoCommitState.get();
-if (checkInterval && !autocommit.shouldAutoCommit()) {
-return CompletableFuture.completedFuture(null);
+CompletableFuture> result;
+if (requestState.offsets.isEmpty()) {
+result = CompletableFuture.completedFuture(Collections.emptyMap());
+} else {
+autocommit.setInflightCommitStatus(true);
+OffsetCommitRequestState request = 
pendingRequests.addOffsetCommitRequest(requestState);
+result = request.future;
+result.whenComplete(autoCommitCallback(request.offsets));
 }
-
-autocommit.resetTimer();
-autocommit.setInflightCommitStatus(true);
-CompletableFuture result = addOffsetCommitRequest(offsets, 
expirationTimeMs, retryOnStaleEpoch)
-.whenComplete(autoCommitCallback(offsets));
 return result;
 }
 
 /**
- * If auto-commit is enabled, this will generate a commit offsets request 
for all assigned
- * partitions and their current positions. Note on auto-commit timers: 
this will reset the
- * auto-commit timer to the interval before issuing the async commit, and 
when the async commit
- * completes, it will reset the auto-commit timer with the exponential 
backoff if the request
- * failed with a retriable error.
- *
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * If auto-commit is enabled, and the auto-commit interval has expired, 
this will generate and
+ * enqueue a request to commit all consumed offsets, and will reset the 
auto-commit timer to the
+ * interval. The request will be sent on the next call to {@link 
#poll(long)}.
+ * 
+ * If the request completes with a retriable error, this will reset the 
auto-commit timer with
+ * the exponential backoff. If it fails with a non-retriable error, no 
action is taken, so
+ * the next commit will be generated when the interval expires.
  */
-public CompletableFuture maybeAutoCommitAllConsumedAsync() {
-if (!autoCommitEnabled()) {
-// Early return to 

Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-20 Thread via GitHub


lianetm commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1496049319


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -149,8 +148,14 @@ private void process(final CommitApplicationEvent event) {
 }
 
 CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-Optional expirationTimeMs = 
event.retryTimeoutMs().map(this::getExpirationTimeForTimeout);
-event.chain(manager.addOffsetCommitRequest(event.offsets(), 
expirationTimeMs, false));
+CompletableFuture commitResult;
+if (event.retryTimeoutMs().isPresent()) {

Review Comment:
   Say no more, I was leaning towards that too (just a bit hesitant to 
introduce more on this already big PR), but totally agree that it's 
conceptually clearer, the result is nice, done.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1344,8 +1343,8 @@ public void commitSync(Map offsets, Duration
 long commitStart = time.nanoseconds();
 try {
 Timer requestTimer = time.timer(timeout.toMillis());
-// Commit with a timer to control how long the request should be 
retried until it
-// gets a successful response or non-retriable error.
+// Commit with a retry timeout (the commit request will be retried 
until it gets a
+// successful response, non-retriable error, or the timeout 
expires)
 CompletableFuture commitFuture = commit(offsets, true, 
Optional.of(timeout.toMillis()));

Review Comment:
   Makes sense, done. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-19 Thread via GitHub


lucasbru commented on code in PR #15357:
URL: https://github.com/apache/kafka/pull/15357#discussion_r1494961008


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1344,8 +1343,8 @@ public void commitSync(Map offsets, Duration
 long commitStart = time.nanoseconds();
 try {
 Timer requestTimer = time.timer(timeout.toMillis());
-// Commit with a timer to control how long the request should be 
retried until it
-// gets a successful response or non-retriable error.
+// Commit with a retry timeout (the commit request will be retried 
until it gets a
+// successful response, non-retriable error, or the timeout 
expires)
 CompletableFuture commitFuture = commit(offsets, true, 
Optional.of(timeout.toMillis()));

Review Comment:
   While you are at it, I think we can remove the "isWakeupable" parameter and 
just set the  `wakeUpTrigger` in the calling context.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -149,8 +148,14 @@ private void process(final CommitApplicationEvent event) {
 }
 
 CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-Optional expirationTimeMs = 
event.retryTimeoutMs().map(this::getExpirationTimeForTimeout);
-event.chain(manager.addOffsetCommitRequest(event.offsets(), 
expirationTimeMs, false));
+CompletableFuture commitResult;
+if (event.retryTimeoutMs().isPresent()) {

Review Comment:
   It's a bit weird that we use `retryTimeoutMs` to carry the information that 
this is a sync-commit vs asynccommit.
   
   How about going all-in here and just having `AsyncCommitApplicationEvent` 
and a `SyncCommitApplicationEvent`?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -204,126 +205,315 @@ private static long findMinTime(final Collection request
 }
 
 /**
- * Generate a request to commit offsets if auto-commit is enabled. The 
request will be
- * returned to be sent out on the next call to {@link #poll(long)}. This 
will only generate a
- * request if there is no other commit request already in-flight, and if 
the commit interval
- * has elapsed.
+ * Generate a request to commit consumed offsets. Add the request to the 
queue of pending
+ * requests to be sent out on the next call to {@link #poll(long)}. If 
there are empty
+ * offsets to commit, no request will be generated and a completed future 
will be returned.
  *
- * @param offsets   Offsets to commit
- * @param expirationTimeMs  Time until which the request will continue to 
be retried if it
- *  fails with a retriable error. If not present, 
the request will be
- *  sent but not retried.
- * @param checkInterval True if the auto-commit interval expiration 
should be checked for
- *  sending a request. If true, the request will 
be sent only if the
- *  auto-commit interval has expired. Pass false to
- *  send the auto-commit request regardless of the 
interval (ex.
- *  auto-commit before rebalance).
- * @param retryOnStaleEpoch True if the request should be retried in case 
it fails with
- *  {@link Errors#STALE_MEMBER_EPOCH}.
- * @return Future that will complete when a response is received for the 
request, or a
- * completed future if no request is generated.
+ * @param requestState Commit request
+ * @return Future containing the offsets that were committed, or an error 
if the request
+ * failed.
  */
-private CompletableFuture maybeAutoCommit(final Map offsets,
-final Optional 
expirationTimeMs,
-boolean checkInterval,
-boolean retryOnStaleEpoch) 
{
-if (!autoCommitEnabled()) {
-log.debug("Skipping auto-commit because auto-commit config is not 
enabled.");
-return CompletableFuture.completedFuture(null);
-}
-
+private CompletableFuture> 
requestAutoCommit(final OffsetCommitRequestState requestState) {
 AutoCommitState autocommit = autoCommitState.get();
-if (checkInterval && !autocommit.shouldAutoCommit()) {
-return CompletableFuture.completedFuture(null);
+CompletableFuture> result;
+if (requestState.offsets.isEmpty()) {
+result = CompletableFuture.completedFuture(Collections.emptyMap());
+} else {
+autocommit.setInflightCommitStatus(true);
+OffsetCommitRequestState request = 

[PR] KAFKA-16033: Commit retry logic fixes [kafka]

2024-02-12 Thread via GitHub


lianetm opened a new pull request, #15357:
URL: https://github.com/apache/kafka/pull/15357

   This PR modifies the commit manager for improved retry logic & fixing bugs:
   - defines high level functions for each of the different types of commit: 
commitSync, commitAsync, autoCommitSync (used from consumer close), 
autoCommitAsync (on interval), autoCommitNow (before revocation).
   - moves retry logic to these caller functions, keeping a common response 
error handling that propagates errors that each caller functions retry as it 
needs. 
   
   Fixes the following issues:
   - auto-commit before revocation should retry with latest consumed offsets
   - auto-commit before revocation should only reset the timer once, when the 
rebalance completes
   - StaleMemberEpoch error (fatal) is considered retriable only when 
committing offsets before revocation, where it is retried with backoff if the 
member has a valid epoch. All other commits will fail fatally on stale epoch. 
Note that auto commit on the interval (autoCommitAsync) does not have any 
specific retry logic for the stale epoch, but will effectively retry on the 
next interval (as it does for any other fatal error) 
   - fix duplicate and noisy logs for auto-commit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org