frankvicky commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1844979923


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -1114,22 +1115,26 @@ private boolean isProtocolTypeInconsistent(String 
protocolType) {
      */
     @Override
     public final void close() {
-        close(time.timer(0));
+        close(time.timer(0), GroupMembershipOperation.DEFAULT);
     }
 
     /**
      * @throws KafkaException if the rebalance callback throws exception
      */
-    protected void close(Timer timer) {
+    protected void close(Timer timer, GroupMembershipOperation 
membershipOperation) {
         try {
             closeHeartbeatThread();
         } finally {
             // Synchronize after closing the heartbeat thread since heartbeat 
thread
             // needs this lock to complete and terminate after close flag is 
set.
             synchronized (this) {
-                if (rebalanceConfig.leaveGroupOnClose) {
+                // If membershipOperation is REMAIN_IN_GROUP, never send leave 
group request.
+                // If membershipOperation is DEFAULT, leave group based on 
rebalanceConfig.leaveGroupOnClose.
+                // Otherwise, leave group only if membershipOperation is 
LEAVE_GROUP.
+                if (GroupMembershipOperation.REMAIN_IN_GROUP != 
membershipOperation &&
+                    (GroupMembershipOperation.LEAVE_GROUP == 
membershipOperation || rebalanceConfig.leaveGroupOnClose)) {

Review Comment:
   I'm considering whether we could move the `if` statement into 
`maybeLeaveGroup`. 
   Currently, the `if` statement is wrapped within a `synchronized` block, and 
the 
   `maybeLeaveGroup` method itself is marked as `synchronized`. 
   
   I have tested moving the `if` statement into `maybeLeaveGroup`, but this 
change 
   causes the test case `testPrepareJoinAndRejoinAfterFailedRebalance` to fail.
   
   
   
https://github.com/apache/kafka/blob/a8f84cab958761434bcafca2c0fd90f53b52aacf/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1129-L1133
   
   
https://github.com/apache/kafka/blob/a8f84cab958761434bcafca2c0fd90f53b52aacf/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1164
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to