Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]

2024-03-05 Thread via GitHub


lucasbru merged PR #15437:
URL: https://github.com/apache/kafka/pull/15437


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]

2024-03-04 Thread via GitHub


lucasbru commented on PR #15437:
URL: https://github.com/apache/kafka/pull/15437#issuecomment-1976699674

   Hi @philipnee 
   
   > Hi @lucasbru - Thanks for the PR. I think it would be good to test if all 
commit API throws the fenced exception. I believe async commit has already been 
tested, but I don't see one for commit sync. Would you mind checking the test 
cases for these three APIs and make sure they throw this exception?
   
   I did not implement it here because the ticket was about commitAsync. But 
why not, done.
   
   > Aside from that: I also see `updateAssignmentMetadataIfNeeded` throws 
fenced exception. Can we also check that?
   
   This doesn't seem to be used publicly, so I wouldn't consider it interface 
surface that we need to cover.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]

2024-03-04 Thread via GitHub


lucasbru commented on code in PR #15437:
URL: https://github.com/apache/kafka/pull/15437#discussion_r1511245829


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -792,8 +801,8 @@ public void commitAsync(Map offsets, OffsetCo
 }
 
 private CompletableFuture commit(final CommitEvent commitEvent) {
-maybeInvokeCommitCallbacks();
 maybeThrowFencedInstanceException();
+maybeInvokeCommitCallbacks();

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]

2024-03-04 Thread via GitHub


lucasbru commented on code in PR #15437:
URL: https://github.com/apache/kafka/pull/15437#discussion_r1511245201


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -567,6 +568,28 @@ public void testCommitAsyncLeaderEpochUpdate() {
 
verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
 }
 
+@Test
+public void testCommitAsyncPropagatesFencedException() {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]

2024-03-01 Thread via GitHub


philipnee commented on code in PR #15437:
URL: https://github.com/apache/kafka/pull/15437#discussion_r1509285840


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -567,6 +568,28 @@ public void testCommitAsyncLeaderEpochUpdate() {
 
verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
 }
 
+@Test
+public void testCommitAsyncPropagatesFencedException() {

Review Comment:
   `poll`, `commitSync`, `commitAsync` should all throw FencedIdException.  Can 
you test each of these API will throw the correct exception after the instance 
is being fenced?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -792,8 +801,8 @@ public void commitAsync(Map offsets, OffsetCo
 }
 
 private CompletableFuture commit(final CommitEvent commitEvent) {
-maybeInvokeCommitCallbacks();
 maybeThrowFencedInstanceException();
+maybeInvokeCommitCallbacks();

Review Comment:
   the change in order perhaps reflects to this snippet the ConsumerCoordinator:
   ```
if (asyncCommitFenced.get()) {
   throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id "
   + rebalanceConfig.groupInstanceId.orElse("unset_instance_id")
   + ", current member.id is " + memberId());
   }
   while (true) {
   OffsetCommitCompletion completion = 
completedOffsetCommits.poll();
   if (completion == null) {
   break;
   }
   completion.invoke();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]

2024-03-01 Thread via GitHub


philipnee commented on PR #15437:
URL: https://github.com/apache/kafka/pull/15437#issuecomment-1973537076

   Hey thanks for the PR - I notice a subtle thing here: Seems like we never 
null check interceptors in the async consumer.  Can `interceptors` ever be 
null? 
   
   ```
   try {
   Timer requestTimer = time.timer(timeout.toMillis());
   // Commit with a timer to control how long the request should be 
retried until it
   // gets a successful response or non-retriable error.
   CompletableFuture commitFuture = commit(offsets, true, 
Optional.of(timeout.toMillis()));
   ConsumerUtils.getResult(commitFuture, requestTimer);
   ->  interceptors.onCommit(offsets);
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]

2024-02-29 Thread via GitHub


lucasbru commented on PR #15437:
URL: https://github.com/apache/kafka/pull/15437#issuecomment-1972669980

   @philipnee Could you please act as a second pair of eyes here?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]

2024-02-27 Thread via GitHub


lucasbru commented on PR #15437:
URL: https://github.com/apache/kafka/pull/15437#issuecomment-1966915060

   @mjsax Could you have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]

2024-02-27 Thread via GitHub


lucasbru commented on code in PR #15437:
URL: https://github.com/apache/kafka/pull/15437#discussion_r1504535154


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -438,7 +438,7 @@ private Throwable commitSyncExceptionForError(Throwable 
error) {
 
 private Throwable commitAsyncExceptionForError(Throwable error) {
 if (error instanceof RetriableException) {
-return new RetriableCommitFailedException(error.getMessage());
+return new RetriableCommitFailedException(error);

Review Comment:
   Noticed minor behavioral difference between old and new consumer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]

2024-02-27 Thread via GitHub


lucasbru commented on code in PR #15437:
URL: https://github.com/apache/kafka/pull/15437#discussion_r1504533449


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java:
##
@@ -33,7 +32,6 @@
  */
 public class OffsetCommitCallbackInvoker {
 private final ConsumerInterceptors interceptors;
-private boolean hasFencedException = false;

Review Comment:
   Since we are setting it now from `AsyncKafkaConsumer`, it seemed cleaner to 
move it from `OffsetCommitCallbackInvoker` to `AsyncKafkaConsumer`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]

2024-02-27 Thread via GitHub


lucasbru commented on code in PR #15437:
URL: https://github.com/apache/kafka/pull/15437#discussion_r1504531700


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -96,7 +95,6 @@ public class CommitRequestManagerTest {
 
 private long retryBackoffMs = 100;
 private long retryBackoffMaxMs = 1000;
-private String consumerMetricGroupPrefix = CONSUMER_METRIC_GROUP_PREFIX;

Review Comment:
   unused



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]

2024-02-27 Thread via GitHub


lucasbru opened a new pull request, #15437:
URL: https://github.com/apache/kafka/pull/15437

   The javadocs for `commitAsync()` (w/o callback) say:
   ```
   @throws org.apache.kafka.common.errors.FencedInstanceIdException 
   if this consumer instance gets fenced by broker.
   ```

   If no callback is passed into `commitAsync()`, no offset commit callback 
invocation is submitted. However, we only check for a 
`FencedInstanceIdException` when we execute a callback. When the consumer gets 
fenced by another consumer with the same `group.instance.id`, and we do not use 
a callback, we miss the exception.
   
   This change modifies the behavior to propagate the 
`FencedInstanceIdException` even if no callback is used. The code is kept very 
similar to the original consumer.
   
   We also change the order - first try to throw the fenced exception, then 
execute callbacks. That is the order in the original consumer so it's safer to 
keep it this way.
   
   For testing, we add a unit test that verifies that the 
`FencedInstanceIdException` is thrown in that case.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org