lianetm commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1829951349


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1265,30 +1343,56 @@ private void close(Duration timeout, boolean 
swallowException) {
         }
     }
 
-    /**
-     * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
-     * 1. autocommit offsets
-     * 2. release assignment. This is done via a background unsubscribe event 
that will
-     * trigger the callbacks, clear the assignment on the subscription state 
and send the leave group request to the broker
-     */
-    private void releaseAssignmentAndLeaveGroup(final Timer timer) {
+    private void autoCommitOnClose(final Timer timer) {
         if (!groupMetadata.get().isPresent())
             return;
 
         if (autoCommitEnabled)
             commitSyncAllConsumed(timer);
 
         applicationEventHandler.add(new CommitOnCloseEvent());
+    }
+
+    private void releaseAssignmentOnClose(final Timer timer) {

Review Comment:
   nit: matter of changed perception I guess, but what about renaming this to 
clearly indicate that it is running callbacks? ( 
~`runRebalanceCallbacksOnClose`). I see it's important given the challenges 
that callbacks bring here



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1265,30 +1343,56 @@ private void close(Duration timeout, boolean 
swallowException) {
         }
     }
 
-    /**
-     * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
-     * 1. autocommit offsets
-     * 2. release assignment. This is done via a background unsubscribe event 
that will
-     * trigger the callbacks, clear the assignment on the subscription state 
and send the leave group request to the broker
-     */
-    private void releaseAssignmentAndLeaveGroup(final Timer timer) {
+    private void autoCommitOnClose(final Timer timer) {
         if (!groupMetadata.get().isPresent())
             return;
 
         if (autoCommitEnabled)
             commitSyncAllConsumed(timer);
 
         applicationEventHandler.add(new CommitOnCloseEvent());
+    }
+
+    private void releaseAssignmentOnClose(final Timer timer) {
+        ConsumerGroupMetadata cgm = groupMetadata.get().orElse(null);
+
+        if (cgm == null)
+            return;
+
+        Set<TopicPartition> assignedPartitions = 
subscriptions.assignedPartitions();
+
+        if (assignedPartitions.isEmpty())
+            // Nothing to revoke.
+            return;
+
+        SortedSet<TopicPartition> droppedPartitions = new 
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+        droppedPartitions.addAll(assignedPartitions);
+
+        boolean isThreadInterrupted = Thread.currentThread().isInterrupted();

Review Comment:
   this is not needed anymore right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -239,10 +243,13 @@ private void process(final AssignmentChangeEvent event) {
             manager.maybeAutoCommitAsync();
         }
 
-        log.info("Assigned to partition(s): {}", 
event.partitions().stream().map(TopicPartition::toString).collect(Collectors.joining(",
 ")));
+        Collection<TopicPartition> partitions = event.partitions();
+        log.info("Assigned to partition(s): {}", 
partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(", 
")));
         try {
-            if (subscriptions.assignFromUser(new 
HashSet<>(event.partitions())))
+            if (subscriptions.assignFromUser(new HashSet<>(partitions))) {
                 metadata.requestUpdateForNewTopics();
+                requestManagers.consumerMembershipManager.ifPresent(cmm -> 
cmm.notifyAssignmentChange(new HashSet<>(partitions)));

Review Comment:
   uhm this is tricky, I wonder if dangerous. Here we are saying that the 
membership mgr will notify about an assignment that is not really a group 
assignment (it's manual assign). That notification is then taken on the 
consumer close to trigger callbacks and send a request to the coordinator (I 
expect it will all be no-op or throw errors at some point?). I know there is a 
group check on those operations on close, but we could have groupId but not be 
in a group if there was never a call to subscribe.
   
   If what we really need in the consumer is a snapshot of group assignment 
(needed to trigger callbacks), why don't we better keep just that (explicit 
name, it's not the consumer assignment, it's the consumer group assignment), 
and do not involve that in this call which is about manual assign? Limiting the 
scope to what we really need seems to avoid trouble in this case  



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1265,30 +1343,56 @@ private void close(Duration timeout, boolean 
swallowException) {
         }
     }
 
-    /**
-     * Prior to closing the network thread, we need to make sure the following 
operations happen in the right sequence:
-     * 1. autocommit offsets
-     * 2. release assignment. This is done via a background unsubscribe event 
that will
-     * trigger the callbacks, clear the assignment on the subscription state 
and send the leave group request to the broker
-     */
-    private void releaseAssignmentAndLeaveGroup(final Timer timer) {
+    private void autoCommitOnClose(final Timer timer) {
         if (!groupMetadata.get().isPresent())
             return;
 
         if (autoCommitEnabled)
             commitSyncAllConsumed(timer);
 
         applicationEventHandler.add(new CommitOnCloseEvent());
+    }
+
+    private void releaseAssignmentOnClose(final Timer timer) {
+        ConsumerGroupMetadata cgm = groupMetadata.get().orElse(null);
+
+        if (cgm == null)
+            return;

Review Comment:
   up to you, but what about:
   
   ```suggestion
       if (!groupMetadata.get().isPresent())
           return;
       int memberEpoch = groupMetadata.get().get().generationId();
   ```
   
   I find it's comes back clearer later on, to refer to member epoch instead of 
generation id to determine the callback



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1129,6 +1169,7 @@ private CompletableFuture<Void> assignPartitions(
             if (exception == null) {
                 // Enable newly added partitions to start fetching and 
updating positions for them.
                 
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
+                notifyAssignmentChange(addedPartitions);

Review Comment:
   we should notify with the full assignment here I would say 
(`assignedPartitions`)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -523,12 +525,39 @@ public void transitionToJoining() {
     /**
      * Transition to {@link MemberState#PREPARE_LEAVING} to release the 
assignment. Once completed,
      * transition to {@link MemberState#LEAVING} to send the heartbeat request 
and leave the group.
-     * This is expected to be invoked when the user calls the unsubscribe API.
+     * This is expected to be invoked when the user calls the {@link 
Consumer#close()} API.
+     *
+     * @return Future that will complete when the heartbeat to leave the group 
has been sent out.
+     */
+    public CompletableFuture<Void> leaveGroupOnClose() {
+        // We pass in an already completed Future because the callback was 
already executed.
+        return leaveGroup(() -> CompletableFuture.completedFuture(null));
+    }
+
+    /**
+     * Transition to {@link MemberState#PREPARE_LEAVING} to release the 
assignment. Once completed,
+     * transition to {@link MemberState#LEAVING} to send the heartbeat request 
and leave the group.
+     * This is expected to be invoked when the user calls the {@link 
Consumer#unsubscribe()} API.
      *
      * @return Future that will complete when the callback execution completes 
and the heartbeat
      * to leave the group has been sent out.
      */
     public CompletableFuture<Void> leaveGroup() {
+        // We pass in the member leaving group Future because the callback may 
still need to be executed.
+        return leaveGroup(this::signalMemberLeavingGroup);
+    }
+
+    /**
+     * Transition to {@link MemberState#PREPARE_LEAVING} to release the 
assignment. Once completed,
+     * transition to {@link MemberState#LEAVING} to send the heartbeat request 
and leave the group.
+     * This is expected to be invoked when the user calls the unsubscribe API.
+     *
+     * @param callbackFutureSupplier Used if the consumer needs to insert the 
step to execute the
+     *                               {@link ConsumerRebalanceListener} before 
completing unsubscribe
+     * @return Future that will complete when the callback execution completes 
and the heartbeat
+     * to leave the group has been sent out.
+     */
+    protected CompletableFuture<Void> 
leaveGroup(Supplier<CompletableFuture<Void>> callbackFutureSupplier) {

Review Comment:
   why passing a Supplier of CompletableFuture instead of simply passing the 
CompletableFuture itself? seems more verbose/obfuscated but I could be missing 
an upside.
   
   In any case passing an Optional maybe suits better? given that we want to 
represent that the leave may (or may not) receive a future to wait on before 
sending the request



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+
+import java.time.Duration;
+
+/**
+ * When the user calls {@link Consumer#close()}, this event is sent to signal 
the {@link ConsumerMembershipManager}
+ * to perform the necessary steps to leave the consumer group cleanly, if 
possible. The event's timeout is based on
+ * either the user-provided value to {@link Consumer#close(Duration)} or
+ * {@link ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} if {@link Consumer#close()} 
was called. The event is considered
+ * complete when the membership manager sends the heartbeat message to leave 
the group. The event does not wait on a

Review Comment:
   This event does wait for the response from the coordinator right? It's 
called with `addAndGet`, and this event future completes when the 
`leaveGroupOnClose` in the membershipMgr completes, and that only happens on 
`maybeCompleteLeaveInProgress`, when we receive a response to the HB to leave 
(or skip HB, or get error). 
   
   We intentionally did that to keep the same behaviour of the classic consumer 
(coordinator does awaitPendingRequests on close), and to avoid responses to 
disconnected clients that we used to have before, when we would send the leave 
and carry on with the consumer close/shutdown without waiting for a response.  
Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to