lianetm commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1834474446


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -394,6 +398,15 @@ private void process(@SuppressWarnings("unused") final 
CommitOnCloseEvent event)
         requestManagers.commitRequestManager.get().signalClose();
     }
 
+    private void process(@SuppressWarnings("unused") final 
LeaveGroupOnCloseEvent event) {

Review Comment:
   is this suppressWarning needed? (the param is used to complete the future)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -549,7 +576,7 @@ public CompletableFuture<Void> leaveGroup() {
         CompletableFuture<Void> leaveResult = new CompletableFuture<>();
         leaveGroupInProgress = Optional.of(leaveResult);
 
-        CompletableFuture<Void> callbackResult = signalMemberLeavingGroup();
+        CompletableFuture<Void> callbackResult = 
signalMemberLeavingGroup(isClosing);

Review Comment:
   I wonder if we could avoid propagating the isClosing/runCallbacks to this 
func that is really about "invokeCallbacks" (we're effectively calling an 
"invokeCallbacks" func, passing a param "runCallback" that can be false). This 
seems confusing and could lead to misleading logs I would say (ie. we would end 
up with the log line "Member completed callbacks..."  when it really didn't do 
that here).
   
   So one idea that comes to mind is to shortcircuit before the 
signalMemberLeavingGroup:
   
   ```
           if (isClosing) {
               CompletableFuture<Void> callbackResult = 
signalMemberLeavingGroup();
               callbackResult.whenComplete((result, error) -> {
                   // log callback completion
                   ...
                   clearAssignmentAndSendLeave(); // encapsulate what we do 
after callbacks
               });
           } else {
               clearAssignmentAndSendLeave();
           }
   ```
    What do you think? 



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -653,27 +655,67 @@ public void testUnsubscribeOnClose() {
             false));
         completeUnsubscribeApplicationEventSuccessfully();
         consumer.close(Duration.ZERO);
-        verifyUnsubscribeEvent(subscriptions);
+        
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
     }
 
     @Test
-    public void testFailedPartitionRevocationOnClose() {
+    public void testCloseLeavesGroupDespiteOnPartitionsLostError() {
         // If rebalance listener failed to execute during close, we still send 
the leave group,
         // and proceed with closing the consumer.
+        Throwable rootError = new KafkaException("Intentional error");
+        Set<TopicPartition> partitions = singleton(new 
TopicPartition("topic1", 0));
+
         SubscriptionState subscriptions = mock(SubscriptionState.class);
+        when(subscriptions.assignedPartitions()).thenReturn(partitions);
+        ConsumerRebalanceListenerInvoker invoker = 
mock(ConsumerRebalanceListenerInvoker.class);
+        doAnswer(invocation -> 
rootError).when(invoker).invokePartitionsLost(any(SortedSet.class));
+
         consumer = spy(newConsumer(
             mock(FetchBuffer.class),
             new ConsumerInterceptors<>(Collections.emptyList()),
+            invoker,
+            subscriptions,
+            "group-id",
+            "client-id",
+            false));
+        consumer.setGroupAssignmentSnapshot(partitions);
+
+        Throwable t = assertThrows(KafkaException.class, () -> 
consumer.close(Duration.ZERO));
+        assertNotNull(t.getCause());
+        assertEquals(rootError, t.getCause());
+
+        verify(applicationEventHandler).add(any(CommitOnCloseEvent.class));
+        
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
+    }
+
+    @ParameterizedTest
+    @ValueSource(longs = {0, ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS})
+    public void testCloseLeavesGroupDespiteInterrupt(long timeoutMs) {
+        Set<TopicPartition> partitions = singleton(new 
TopicPartition("topic1", 0));
+
+        SubscriptionState subscriptions = mock(SubscriptionState.class);
+        when(subscriptions.assignedPartitions()).thenReturn(partitions);
+        
when(applicationEventHandler.addAndGet(any(CompletableApplicationEvent.class))).thenThrow(InterruptException.class);
+        consumer = spy(newConsumer(
+            mock(FetchBuffer.class),
+            mock(ConsumerInterceptors.class),
             mock(ConsumerRebalanceListenerInvoker.class),
             subscriptions,
             "group-id",
             "client-id",
             false));
-        doThrow(new 
KafkaException()).when(consumer).processBackgroundEvents(any(), any(), any());
-        assertThrows(KafkaException.class, () -> 
consumer.close(Duration.ZERO));
-        verifyUnsubscribeEvent(subscriptions);
-        // Close operation should carry on even if the unsubscribe fails
-        verify(applicationEventHandler).close(any(Duration.class));
+
+        Duration timeout = Duration.ofMillis(timeoutMs);
+
+        try {
+            Thread.currentThread().interrupt();

Review Comment:
   do we need to interrupt if we're already mocking the interrupt exception 
above? ln 698



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -824,4 +824,49 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     assertThrows(classOf[WakeupException], () => 
consumer.position(topicPartition, Duration.ofSeconds(100)))
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCloseLeavesGroupOnInterrupt(quorum: String, groupProtocol: String): 
Unit = {

Review Comment:
   should we add a very similar test for close(0)? 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1129,6 +1168,12 @@ private CompletableFuture<Void> assignPartitions(
             if (exception == null) {
                 // Enable newly added partitions to start fetching and 
updating positions for them.
                 
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
+
+                Set<TopicPartition> allAssignedPartitions = 
assignedPartitions.stream()
+                    .map(tip -> new TopicPartition(tip.topic(), 
tip.partition()))
+                    .collect(Collectors.toSet());

Review Comment:
   we can reuse the `toTopicPartitionSet` already defined in this same class 



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -523,18 +524,44 @@ public void transitionToJoining() {
     /**
      * Transition to {@link MemberState#PREPARE_LEAVING} to release the 
assignment. Once completed,
      * transition to {@link MemberState#LEAVING} to send the heartbeat request 
and leave the group.
-     * This is expected to be invoked when the user calls the unsubscribe API.
+     * This is expected to be invoked when the user calls the {@link 
Consumer#close()} API.
+     *
+     * @return Future that will complete when the heartbeat to leave the group 
has been sent out.
+     */
+    public CompletableFuture<Void> leaveGroupOnClose() {
+        return leaveGroup(true);
+    }
+
+    /**
+     * Transition to {@link MemberState#PREPARE_LEAVING} to release the 
assignment. Once completed,
+     * transition to {@link MemberState#LEAVING} to send the heartbeat request 
and leave the group.
+     * This is expected to be invoked when the user calls the {@link 
Consumer#unsubscribe()} API.
      *
      * @return Future that will complete when the callback execution completes 
and the heartbeat
      * to leave the group has been sent out.
      */
     public CompletableFuture<Void> leaveGroup() {
+        return leaveGroup(false);
+    }
+
+    /**
+     * Transition to {@link MemberState#PREPARE_LEAVING} to release the 
assignment. Once completed,
+     * transition to {@link MemberState#LEAVING} to send the heartbeat request 
and leave the group.
+     * This is expected to be invoked when the user calls the unsubscribe API.
+     *
+     * @param isClosing {@code true} if the Consumer is closing, {@code false} 
otherwise
+     *
+     * @return Future that will complete when the callback execution completes 
and the heartbeat
+     * to leave the group has been sent out.
+     */
+    protected CompletableFuture<Void> leaveGroup(boolean isClosing) {

Review Comment:
   would it be clearer to have this param explicitly naming what it does vs. 
when it's used? (ie. runCallbacks). Mainly because this manager knows nothing 
about closing really, it knows about leaving a group with or without callbacks)



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1129,6 +1168,12 @@ private CompletableFuture<Void> assignPartitions(
             if (exception == null) {
                 // Enable newly added partitions to start fetching and 
updating positions for them.
                 
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
+
+                Set<TopicPartition> allAssignedPartitions = 
assignedPartitions.stream()
+                    .map(tip -> new TopicPartition(tip.topic(), 
tip.partition()))
+                    .collect(Collectors.toSet());
+
+                notifyAssignmentChange(allAssignedPartitions);

Review Comment:
   uhm this should probably go right after the subscription is updated, not 
after the callback complete. Having it here, I guess it would allow for a race 
if there is a call in the app thread to consumer.close, while a reconciliation 
is running this code. If the call to consumer.close happens after callbacks 
complete on ln 1167 above, but before making it to this line, the app thread 
could potentially close revoking partitions without including the ones just 
added here right? 
   
   So we could just call this notify right after 
`updateSubscriptionAwaitingCallback` (which is the one that does the actual 
update of the subscription state). From that moment on the partitions are 
effectively assigned even though the onPartitionsAssigned may have not 
completed.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -616,6 +644,15 @@ void notifyEpochChange(Optional<Integer> epoch) {
         stateUpdatesListeners.forEach(stateListener -> 
stateListener.onMemberEpochUpdated(epoch, memberId));
     }
 
+    /**
+     * Invokes the {@link MemberStateListener#onGroupAssignmentUpdated(Set)} 
callback for each listener when the
+     * set of assigned partitions changes. This includes on assignment 
changes, unsubscribing, and when leaving

Review Comment:
   typo unsubscribing



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1875,7 +1907,9 @@ private void 
completeUnsubscribeApplicationEventSuccessfully() {
     private void completeAssignmentChangeEventSuccessfully() {
         doAnswer(invocation -> {
             AssignmentChangeEvent event = invocation.getArgument(0);
-            consumer.subscriptions().assignFromUser(new 
HashSet<>(event.partitions()));
+            HashSet<TopicPartition> partitions = new 
HashSet<>(event.partitions());
+            consumer.subscriptions().assignFromUser(partitions);
+            consumer.setGroupAssignmentSnapshot(partitions);

Review Comment:
   this func if about manual assignment so I would expect we don't need to set 
the group assignment snapshot right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to