kirktrue commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1833350572


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -523,12 +525,39 @@ public void transitionToJoining() {
     /**
      * Transition to {@link MemberState#PREPARE_LEAVING} to release the 
assignment. Once completed,
      * transition to {@link MemberState#LEAVING} to send the heartbeat request 
and leave the group.
-     * This is expected to be invoked when the user calls the unsubscribe API.
+     * This is expected to be invoked when the user calls the {@link 
Consumer#close()} API.
+     *
+     * @return Future that will complete when the heartbeat to leave the group 
has been sent out.
+     */
+    public CompletableFuture<Void> leaveGroupOnClose() {
+        // We pass in an already completed Future because the callback was 
already executed.
+        return leaveGroup(() -> CompletableFuture.completedFuture(null));
+    }
+
+    /**
+     * Transition to {@link MemberState#PREPARE_LEAVING} to release the 
assignment. Once completed,
+     * transition to {@link MemberState#LEAVING} to send the heartbeat request 
and leave the group.
+     * This is expected to be invoked when the user calls the {@link 
Consumer#unsubscribe()} API.
      *
      * @return Future that will complete when the callback execution completes 
and the heartbeat
      * to leave the group has been sent out.
      */
     public CompletableFuture<Void> leaveGroup() {
+        // We pass in the member leaving group Future because the callback may 
still need to be executed.
+        return leaveGroup(this::signalMemberLeavingGroup);
+    }
+
+    /**
+     * Transition to {@link MemberState#PREPARE_LEAVING} to release the 
assignment. Once completed,
+     * transition to {@link MemberState#LEAVING} to send the heartbeat request 
and leave the group.
+     * This is expected to be invoked when the user calls the unsubscribe API.
+     *
+     * @param callbackFutureSupplier Used if the consumer needs to insert the 
step to execute the
+     *                               {@link ConsumerRebalanceListener} before 
completing unsubscribe
+     * @return Future that will complete when the callback execution completes 
and the heartbeat
+     * to leave the group has been sent out.
+     */
+    protected CompletableFuture<Void> 
leaveGroup(Supplier<CompletableFuture<Void>> callbackFutureSupplier) {

Review Comment:
   It's an ordering issue 😢
   
   To pass the `CompletableFuture` into the (internal) `leaveGroup()` method, 
the code would have to execute `signalMemberLeavingGroup()` _first_. For the 
`ConsumerMembershipManager`, `signalMemberLeavingGroup()` is overridden and 
calls `invokeOnPartitionsRevokedOrLostToReleaseAssignment()`. I assumed that 
invoking `invokeOnPartitionsRevokedOrLostToReleaseAssignment()` before calling 
`leaveGroup()` would throw off the existing logic.
   
   That said, I don't like the use of `Supplier` here, either. I also wanted to 
leave the existing code as is, where possible.
   
   Let me know if you have ideas to simplify it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to