philipnee commented on code in PR #14680:
URL: https://github.com/apache/kafka/pull/14680#discussion_r1388398462


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##########
@@ -147,6 +151,10 @@ public class PrototypeAsyncConsumer<K, V> implements 
Consumer<K, V> {
     // to keep from repeatedly scanning subscriptions in poll(), cache the 
result during metadata updates
     private boolean cachedSubscriptionHasAllFetchPositions;
     private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
+    private volatile boolean isFenced = false;

Review Comment:
   Thanks, we don't need this to be volatile.  In terms of exception throwing, 
I think once the instance is fenced, the we want to keep throwing if the user 
tries to commit or poll, because the expectation is that the user should close 
and reconfigure the instance.  See the original implementation
   
   ```
   void invokeCompletedOffsetCommitCallbacks() {
           if (asyncCommitFenced.get()) {
               throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id "
                   + rebalanceConfig.groupInstanceId.orElse("unset_instance_id")
                   + ", current member.id is " + memberId());
           }
            ...
   }
   ```
   
   Here asyncCommitFenced is set during the callback failure in the original 
implementation.
   ```
   if (commitException instanceof FencedInstanceIdException) {
                       asyncCommitFenced.set(true);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to