philipnee commented on code in PR #14680: URL: https://github.com/apache/kafka/pull/14680#discussion_r1388398462
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ########## @@ -147,6 +151,10 @@ public class PrototypeAsyncConsumer<K, V> implements Consumer<K, V> { // to keep from repeatedly scanning subscriptions in poll(), cache the result during metadata updates private boolean cachedSubscriptionHasAllFetchPositions; private final WakeupTrigger wakeupTrigger = new WakeupTrigger(); + private volatile boolean isFenced = false; Review Comment: Thanks, we don't need this to be volatile. In terms of exception throwing, I think once the instance is fenced, the we want to keep throwing if the user tries to commit or poll, because the expectation is that the user should close and reconfigure the instance. See the original implementation ``` void invokeCompletedOffsetCommitCallbacks() { if (asyncCommitFenced.get()) { throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + rebalanceConfig.groupInstanceId.orElse("unset_instance_id") + ", current member.id is " + memberId()); } ... } ``` Here asyncCommitFenced is set during the callback failure in the original implementation. ``` if (commitException instanceof FencedInstanceIdException) { asyncCommitFenced.set(true); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org