Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
lucasbru merged PR #15437: URL: https://github.com/apache/kafka/pull/15437 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
lucasbru commented on PR #15437: URL: https://github.com/apache/kafka/pull/15437#issuecomment-1976699674 Hi @philipnee > Hi @lucasbru - Thanks for the PR. I think it would be good to test if all commit API throws the fenced exception. I believe async commit has already been tested, but I don't see one for commit sync. Would you mind checking the test cases for these three APIs and make sure they throw this exception? I did not implement it here because the ticket was about commitAsync. But why not, done. > Aside from that: I also see `updateAssignmentMetadataIfNeeded` throws fenced exception. Can we also check that? This doesn't seem to be used publicly, so I wouldn't consider it interface surface that we need to cover. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
lucasbru commented on code in PR #15437: URL: https://github.com/apache/kafka/pull/15437#discussion_r1511245829 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -792,8 +801,8 @@ public void commitAsync(Map offsets, OffsetCo } private CompletableFuture commit(final CommitEvent commitEvent) { -maybeInvokeCommitCallbacks(); maybeThrowFencedInstanceException(); +maybeInvokeCommitCallbacks(); Review Comment: Yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
lucasbru commented on code in PR #15437: URL: https://github.com/apache/kafka/pull/15437#discussion_r1511245201 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -567,6 +568,28 @@ public void testCommitAsyncLeaderEpochUpdate() { verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class)); } +@Test +public void testCommitAsyncPropagatesFencedException() { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
philipnee commented on code in PR #15437: URL: https://github.com/apache/kafka/pull/15437#discussion_r1509285840 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ## @@ -567,6 +568,28 @@ public void testCommitAsyncLeaderEpochUpdate() { verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class)); } +@Test +public void testCommitAsyncPropagatesFencedException() { Review Comment: `poll`, `commitSync`, `commitAsync` should all throw FencedIdException. Can you test each of these API will throw the correct exception after the instance is being fenced? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -792,8 +801,8 @@ public void commitAsync(Map offsets, OffsetCo } private CompletableFuture commit(final CommitEvent commitEvent) { -maybeInvokeCommitCallbacks(); maybeThrowFencedInstanceException(); +maybeInvokeCommitCallbacks(); Review Comment: the change in order perhaps reflects to this snippet the ConsumerCoordinator: ``` if (asyncCommitFenced.get()) { throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + rebalanceConfig.groupInstanceId.orElse("unset_instance_id") + ", current member.id is " + memberId()); } while (true) { OffsetCommitCompletion completion = completedOffsetCommits.poll(); if (completion == null) { break; } completion.invoke(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
philipnee commented on PR #15437: URL: https://github.com/apache/kafka/pull/15437#issuecomment-1973537076 Hey thanks for the PR - I notice a subtle thing here: Seems like we never null check interceptors in the async consumer. Can `interceptors` ever be null? ``` try { Timer requestTimer = time.timer(timeout.toMillis()); // Commit with a timer to control how long the request should be retried until it // gets a successful response or non-retriable error. CompletableFuture commitFuture = commit(offsets, true, Optional.of(timeout.toMillis())); ConsumerUtils.getResult(commitFuture, requestTimer); -> interceptors.onCommit(offsets); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
lucasbru commented on PR #15437: URL: https://github.com/apache/kafka/pull/15437#issuecomment-1972669980 @philipnee Could you please act as a second pair of eyes here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
lucasbru commented on PR #15437: URL: https://github.com/apache/kafka/pull/15437#issuecomment-1966915060 @mjsax Could you have a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
lucasbru commented on code in PR #15437: URL: https://github.com/apache/kafka/pull/15437#discussion_r1504535154 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -438,7 +438,7 @@ private Throwable commitSyncExceptionForError(Throwable error) { private Throwable commitAsyncExceptionForError(Throwable error) { if (error instanceof RetriableException) { -return new RetriableCommitFailedException(error.getMessage()); +return new RetriableCommitFailedException(error); Review Comment: Noticed minor behavioral difference between old and new consumer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
lucasbru commented on code in PR #15437: URL: https://github.com/apache/kafka/pull/15437#discussion_r1504533449 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetCommitCallbackInvoker.java: ## @@ -33,7 +32,6 @@ */ public class OffsetCommitCallbackInvoker { private final ConsumerInterceptors interceptors; -private boolean hasFencedException = false; Review Comment: Since we are setting it now from `AsyncKafkaConsumer`, it seemed cleaner to move it from `OffsetCommitCallbackInvoker` to `AsyncKafkaConsumer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
lucasbru commented on code in PR #15437: URL: https://github.com/apache/kafka/pull/15437#discussion_r1504531700 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -96,7 +95,6 @@ public class CommitRequestManagerTest { private long retryBackoffMs = 100; private long retryBackoffMaxMs = 1000; -private String consumerMetricGroupPrefix = CONSUMER_METRIC_GROUP_PREFIX; Review Comment: unused -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16169: FencedException in commitAsync not propagated without callback [kafka]
lucasbru opened a new pull request, #15437: URL: https://github.com/apache/kafka/pull/15437 The javadocs for `commitAsync()` (w/o callback) say: ``` @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker. ``` If no callback is passed into `commitAsync()`, no offset commit callback invocation is submitted. However, we only check for a `FencedInstanceIdException` when we execute a callback. When the consumer gets fenced by another consumer with the same `group.instance.id`, and we do not use a callback, we miss the exception. This change modifies the behavior to propagate the `FencedInstanceIdException` even if no callback is used. The code is kept very similar to the original consumer. We also change the order - first try to throw the fenced exception, then execute callbacks. That is the order in the original consumer so it's safer to keep it this way. For testing, we add a unit test that verifies that the `FencedInstanceIdException` is thrown in that case. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org