philipnee commented on code in PR #15437: URL: https://github.com/apache/kafka/pull/15437#discussion_r1509285840
########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java: ########## @@ -567,6 +568,28 @@ public void testCommitAsyncLeaderEpochUpdate() { verify(applicationEventHandler).add(ArgumentMatchers.isA(AsyncCommitEvent.class)); } + @Test + public void testCommitAsyncPropagatesFencedException() { Review Comment: `poll`, `commitSync`, `commitAsync` should all throw FencedIdException. Can you test each of these API will throw the correct exception after the instance is being fenced? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -792,8 +801,8 @@ public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCo } private CompletableFuture<Void> commit(final CommitEvent commitEvent) { - maybeInvokeCommitCallbacks(); maybeThrowFencedInstanceException(); + maybeInvokeCommitCallbacks(); Review Comment: the change in order perhaps reflects to this snippet the ConsumerCoordinator: ``` if (asyncCommitFenced.get()) { throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + rebalanceConfig.groupInstanceId.orElse("unset_instance_id") + ", current member.id is " + memberId()); } while (true) { OffsetCommitCompletion completion = completedOffsetCommits.poll(); if (completion == null) { break; } completion.invoke(); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org