frankvicky commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1887015632


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -1114,22 +1115,26 @@ private boolean isProtocolTypeInconsistent(String 
protocolType) {
      */
     @Override
     public final void close() {
-        close(time.timer(0));
+        close(time.timer(0), GroupMembershipOperation.DEFAULT);
     }
 
     /**
      * @throws KafkaException if the rebalance callback throws exception
      */
-    protected void close(Timer timer) {
+    protected void close(Timer timer, GroupMembershipOperation 
membershipOperation) {
         try {
             closeHeartbeatThread();
         } finally {
             // Synchronize after closing the heartbeat thread since heartbeat 
thread
             // needs this lock to complete and terminate after close flag is 
set.
             synchronized (this) {
-                if (rebalanceConfig.leaveGroupOnClose) {
+                // If membershipOperation is REMAIN_IN_GROUP, never send leave 
group request.
+                // If membershipOperation is DEFAULT, leave group based on 
rebalanceConfig.leaveGroupOnClose.
+                // Otherwise, leave group only if membershipOperation is 
LEAVE_GROUP.
+                if (GroupMembershipOperation.REMAIN_IN_GROUP != 
membershipOperation &&
+                    (GroupMembershipOperation.LEAVE_GROUP == 
membershipOperation || rebalanceConfig.leaveGroupOnClose)) {

Review Comment:
   I will initiate a discussion on our KIP thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to