chia7712 commented on code in PR #16449:
URL: https://github.com/apache/kafka/pull/16449#discussion_r1666993636


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -687,7 +687,6 @@ public CompletableFuture<Void> leaveGroup() {
                     "to clear its assignment and send a leave group 
heartbeat", memberId);
             }
             // Clear the subscription, no matter if the callback execution 
failed or succeeded.
-            subscriptions.unsubscribe();

Review Comment:
   This change means `leaveGroup` does NOT cleanup any subscribed/assigned 
partitions from `SubscriptionState`. It seems to me that is a bit weird since 
that is `leaveGroup`'s  responsibility to ensure the cleanup of 
subscribed/assigned partitions.  Maybe another solution is to add 
`subscriptions.unsubscribe` to all paths of `leaveGroup`. for example:
   ```java
           if (isNotInGroup()) {
               if (state == MemberState.FENCED) {
                   clearAssignment();
                   transitionTo(MemberState.UNSUBSCRIBED);
               }
               subscriptions.unsubscribe();
               return CompletableFuture.completedFuture(null);
           }
   ```
   
   Also, in the `process(UnsubscribeEvent)` we can call 
`subscriptions.unsubscribe();` directly if `MembershipMgr` is nonexistent. WDYT?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -213,14 +217,14 @@ private void process(final SubscriptionChangeEvent 
ignored) {
      *              the group is sent out.
      */
     private void process(final UnsubscribeEvent event) {
-        if (!requestManagers.heartbeatRequestManager.isPresent()) {
-            KafkaException error = new KafkaException("Group membership 
manager not present when processing an unsubscribe event");
-            event.future().completeExceptionally(error);
-            return;
+        event.future().whenComplete((__, ___) -> subscriptions.unsubscribe());
+        if (requestManagers.heartbeatRequestManager.isPresent()) {
+            MembershipManager membershipManager = 
requestManagers.heartbeatRequestManager.get().membershipManager();
+            CompletableFuture<Void> future = membershipManager.leaveGroup();
+            future.whenComplete(complete(event.future()));
+        } else {
+            event.future().complete(null);

Review Comment:
   This is another behavior change: `UnsubscribeEvent` can be sent even though 
`membershipManager` is not running 
   
   Hence, please add comments to explain it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to