kirktrue commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1840505903


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -549,31 +578,40 @@ public CompletableFuture<Void> leaveGroup() {
         CompletableFuture<Void> leaveResult = new CompletableFuture<>();
         leaveGroupInProgress = Optional.of(leaveResult);
 
-        CompletableFuture<Void> callbackResult = signalMemberLeavingGroup();
-        callbackResult.whenComplete((result, error) -> {
-            if (error != null) {
-                log.error("Member {} callback to release assignment failed. It 
will proceed " +
-                    "to clear its assignment and send a leave group 
heartbeat", memberId, error);
-            } else {
-                log.info("Member {} completed callback to release assignment. 
It will proceed " +
-                    "to clear its assignment and send a leave group 
heartbeat", memberId);
-            }
-
-            // Clear the subscription, no matter if the callback execution 
failed or succeeded.
-            subscriptions.unsubscribe();
-            clearAssignment();
+        if (runCallbacks) {
+            CompletableFuture<Void> callbackResult = 
signalMemberLeavingGroup();
+            callbackResult.whenComplete((result, error) -> {
+                if (error != null) {
+                    log.error("Member {} callback to release assignment 
failed. It will proceed " +
+                        "to clear its assignment and send a leave group 
heartbeat", memberId, error);
+                } else {
+                    log.info("Member {} completed callback to release 
assignment. It will proceed " +
+                        "to clear its assignment and send a leave group 
heartbeat", memberId);
+                }
 
-            // Transition to ensure that a heartbeat request is sent out to 
effectively leave the
-            // group (even in the case where the member had no assignment to 
release or when the
-            // callback execution failed.)
-            transitionToSendingLeaveGroup(false);
-        });
+                // Clear the assignment, no matter if the callback execution 
failed or succeeded.
+                clearAssignmentAndLeaveGroup();
+            });
+        } else {
+            clearAssignmentAndLeaveGroup();
+        }
 
         // Return future to indicate that the leave group is done when the 
callbacks
         // complete, and the transition to send the heartbeat has been made.
         return leaveResult;
     }
 
+    private void clearAssignmentAndLeaveGroup() {
+        subscriptions.unsubscribe();
+        clearAssignment();
+        notifyAssignmentChange(Collections.emptySet());

Review Comment:
   Correct. I removed the extra call since `clearAssignment()` handles it where 
appropriate anyway.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1216,6 +1235,69 @@ public void close(Duration timeout) {
         }
     }
 
+    /**
+     * Please keep these tenets in mind for the implementation of the {@link 
AsyncKafkaConsumer}’s
+     * {@link #close(Duration)} method. In the future, these tenets may be 
made officially part of the top-level
+     * {@link KafkaConsumer#close(Duration)} API, but for now they remain here.
+     *
+     * <ol>
+     *     <li>
+     *         The execution of the {@link ConsumerRebalanceListener} callback 
(if applicable) must be performed on
+     *         the application thread to ensure it does not interfere with the 
network I/O on the background thread.
+     *     </li>
+     *     <li>
+     *         The {@link ConsumerRebalanceListener} callback execution must 
complete before an attempt to leave
+     *         the consumer group is performed. In this context, “complete” 
does not necessarily imply
+     *         <em>success</em>; execution is “complete” even if the execution 
<em>fails</em> with an error.
+     *     </li>
+     *     <li>
+     *         Any error thrown during the {@link ConsumerRebalanceListener} 
callback execution will be caught to
+     *         ensure it does not prevent execution of the remaining {@link 
#close()} logic.
+     *     </li>
+     *     <li>
+     *         The application thread will be blocked during the entire 
duration of the execution of the
+     *         {@link ConsumerRebalanceListener}. The consumer does not employ 
a mechanism to short-circuit the
+     *         callback execution, so execution is not bound by the timeout in 
{@link #close(Duration)}.
+     *     </li>
+     *     <li>
+     *         A given {@link ConsumerRebalanceListener} implementation may be 
affected by the application thread's
+     *         interrupt state. If the callback implementation performs any 
blocking operations, it may result in
+     *         an error. An implementation may choose to preemptively check 
the thread's interrupt flag via
+     *         {@link Thread#isInterrupted()} or {@link 
Thread#isInterrupted()} and alter its behavior.
+     *     </li>
+     *     <li>
+     *         If the application thread was interrupted <em>prior</em> to the 
execution of the
+     *         {@link ConsumerRebalanceListener} callback, the thread's 
interrupt state will be preserved for the
+     *         {@link ConsumerRebalanceListener} execution.
+     *     </li>
+     *     <li>
+     *         If the application thread was interrupted <em>prior</em> to the 
execution of the
+     *         {@link ConsumerRebalanceListener} callback <em>but</em> the 
callback cleared out the interrupt state,
+     *         the {@link #close()} method will not make any effort to restore 
the application thread's interrupt
+     *         state for the remainder of the execution of {@link #close()}.
+     *     </li>
+     *     <li>
+     *         Leaving the consumer group is achieved by issuing a ‘leave 
group‘ network request. The consumer will
+     *         attempt to leave the group on a “best-case” basis. There is no 
stated guarantee that the consumer will
+     *         have successfully left the group before the {@link #close()} 
method completes processing.
+     *     </li>
+     *     <li>
+     *         The consumer will attempt to leave the group regardless of the 
timeout elapsing or the application
+     *         thread receiving an {@link InterruptException} or {@link 
InterruptedException}.
+     *     </li>
+     *     <li>
+     *         The application thread will wait for confirmation that the 
consumer left the group until one of the
+     *         following occurs:
+     *
+     *         <ol>
+     *             <li>Confirmation that the ’leave group‘ response was 
received from the group coordinator</li>
+     *             <li>The timeout provided by the user elapses</li>
+     *             <li>An {@link InterruptException} or {@link 
InterruptedException} is thrown</li>
+     *         </ol>
+     *     </li>
+     * </ol>
+     */
+

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to