lianetm commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1834474446
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -394,6 +398,15 @@ private void process(@SuppressWarnings("unused") final
CommitOnCloseEvent event)
requestManagers.commitRequestManager.get().signalClose();
}
+ private void process(@SuppressWarnings("unused") final
LeaveGroupOnCloseEvent event) {
Review Comment:
is this suppressWarning needed? (the param is used to complete the future)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -549,7 +576,7 @@ public CompletableFuture<Void> leaveGroup() {
CompletableFuture<Void> leaveResult = new CompletableFuture<>();
leaveGroupInProgress = Optional.of(leaveResult);
- CompletableFuture<Void> callbackResult = signalMemberLeavingGroup();
+ CompletableFuture<Void> callbackResult =
signalMemberLeavingGroup(isClosing);
Review Comment:
I wonder if we could avoid propagating the isClosing/runCallbacks to this
func that is really about "invokeCallbacks" (we're effectively calling an
"invokeCallbacks" func, passing a param "runCallback" that can be false). This
seems confusing and could lead to misleading logs I would say (ie. we would end
up with the log line "Member completed callbacks..." when it really didn't do
that here).
So one idea that comes to mind is to shortcircuit before the
signalMemberLeavingGroup:
```
if (isClosing) {
CompletableFuture<Void> callbackResult =
signalMemberLeavingGroup();
callbackResult.whenComplete((result, error) -> {
// log callback completion
...
clearAssignmentAndSendLeave(); // encapsulate what we do
after callbacks
});
} else {
clearAssignmentAndSendLeave();
}
```
What do you think?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -653,27 +655,67 @@ public void testUnsubscribeOnClose() {
false));
completeUnsubscribeApplicationEventSuccessfully();
consumer.close(Duration.ZERO);
- verifyUnsubscribeEvent(subscriptions);
+
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
}
@Test
- public void testFailedPartitionRevocationOnClose() {
+ public void testCloseLeavesGroupDespiteOnPartitionsLostError() {
// If rebalance listener failed to execute during close, we still send
the leave group,
// and proceed with closing the consumer.
+ Throwable rootError = new KafkaException("Intentional error");
+ Set<TopicPartition> partitions = singleton(new
TopicPartition("topic1", 0));
+
SubscriptionState subscriptions = mock(SubscriptionState.class);
+ when(subscriptions.assignedPartitions()).thenReturn(partitions);
+ ConsumerRebalanceListenerInvoker invoker =
mock(ConsumerRebalanceListenerInvoker.class);
+ doAnswer(invocation ->
rootError).when(invoker).invokePartitionsLost(any(SortedSet.class));
+
consumer = spy(newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
+ invoker,
+ subscriptions,
+ "group-id",
+ "client-id",
+ false));
+ consumer.setGroupAssignmentSnapshot(partitions);
+
+ Throwable t = assertThrows(KafkaException.class, () ->
consumer.close(Duration.ZERO));
+ assertNotNull(t.getCause());
+ assertEquals(rootError, t.getCause());
+
+ verify(applicationEventHandler).add(any(CommitOnCloseEvent.class));
+
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {0, ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS})
+ public void testCloseLeavesGroupDespiteInterrupt(long timeoutMs) {
+ Set<TopicPartition> partitions = singleton(new
TopicPartition("topic1", 0));
+
+ SubscriptionState subscriptions = mock(SubscriptionState.class);
+ when(subscriptions.assignedPartitions()).thenReturn(partitions);
+
when(applicationEventHandler.addAndGet(any(CompletableApplicationEvent.class))).thenThrow(InterruptException.class);
+ consumer = spy(newConsumer(
+ mock(FetchBuffer.class),
+ mock(ConsumerInterceptors.class),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id",
false));
- doThrow(new
KafkaException()).when(consumer).processBackgroundEvents(any(), any(), any());
- assertThrows(KafkaException.class, () ->
consumer.close(Duration.ZERO));
- verifyUnsubscribeEvent(subscriptions);
- // Close operation should carry on even if the unsubscribe fails
- verify(applicationEventHandler).close(any(Duration.class));
+
+ Duration timeout = Duration.ofMillis(timeoutMs);
+
+ try {
+ Thread.currentThread().interrupt();
Review Comment:
do we need to interrupt if we're already mocking the interrupt exception
above? ln 698
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -824,4 +824,49 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertThrows(classOf[WakeupException], () =>
consumer.position(topicPartition, Duration.ofSeconds(100)))
}
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testCloseLeavesGroupOnInterrupt(quorum: String, groupProtocol: String):
Unit = {
Review Comment:
should we add a very similar test for close(0)?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1129,6 +1168,12 @@ private CompletableFuture<Void> assignPartitions(
if (exception == null) {
// Enable newly added partitions to start fetching and
updating positions for them.
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
+
+ Set<TopicPartition> allAssignedPartitions =
assignedPartitions.stream()
+ .map(tip -> new TopicPartition(tip.topic(),
tip.partition()))
+ .collect(Collectors.toSet());
Review Comment:
we can reuse the `toTopicPartitionSet` already defined in this same class
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -523,18 +524,44 @@ public void transitionToJoining() {
/**
* Transition to {@link MemberState#PREPARE_LEAVING} to release the
assignment. Once completed,
* transition to {@link MemberState#LEAVING} to send the heartbeat request
and leave the group.
- * This is expected to be invoked when the user calls the unsubscribe API.
+ * This is expected to be invoked when the user calls the {@link
Consumer#close()} API.
+ *
+ * @return Future that will complete when the heartbeat to leave the group
has been sent out.
+ */
+ public CompletableFuture<Void> leaveGroupOnClose() {
+ return leaveGroup(true);
+ }
+
+ /**
+ * Transition to {@link MemberState#PREPARE_LEAVING} to release the
assignment. Once completed,
+ * transition to {@link MemberState#LEAVING} to send the heartbeat request
and leave the group.
+ * This is expected to be invoked when the user calls the {@link
Consumer#unsubscribe()} API.
*
* @return Future that will complete when the callback execution completes
and the heartbeat
* to leave the group has been sent out.
*/
public CompletableFuture<Void> leaveGroup() {
+ return leaveGroup(false);
+ }
+
+ /**
+ * Transition to {@link MemberState#PREPARE_LEAVING} to release the
assignment. Once completed,
+ * transition to {@link MemberState#LEAVING} to send the heartbeat request
and leave the group.
+ * This is expected to be invoked when the user calls the unsubscribe API.
+ *
+ * @param isClosing {@code true} if the Consumer is closing, {@code false}
otherwise
+ *
+ * @return Future that will complete when the callback execution completes
and the heartbeat
+ * to leave the group has been sent out.
+ */
+ protected CompletableFuture<Void> leaveGroup(boolean isClosing) {
Review Comment:
would it be clearer to have this param explicitly naming what it does vs.
when it's used? (ie. runCallbacks). Mainly because this manager knows nothing
about closing really, it knows about leaving a group with or without callbacks)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1129,6 +1168,12 @@ private CompletableFuture<Void> assignPartitions(
if (exception == null) {
// Enable newly added partitions to start fetching and
updating positions for them.
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
+
+ Set<TopicPartition> allAssignedPartitions =
assignedPartitions.stream()
+ .map(tip -> new TopicPartition(tip.topic(),
tip.partition()))
+ .collect(Collectors.toSet());
+
+ notifyAssignmentChange(allAssignedPartitions);
Review Comment:
uhm this should probably go right after the subscription is updated, not
after the callback complete. Having it here, I guess it would allow for a race
if there is a call in the app thread to consumer.close, while a reconciliation
is running this code. If the call to consumer.close happens after callbacks
complete on ln 1167 above, but before making it to this line, the app thread
could potentially close revoking partitions without including the ones just
added here right?
So we could just call this notify right after
`updateSubscriptionAwaitingCallback` (which is the one that does the actual
update of the subscription state). From that moment on the partitions are
effectively assigned even though the onPartitionsAssigned may have not
completed.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -616,6 +644,15 @@ void notifyEpochChange(Optional<Integer> epoch) {
stateUpdatesListeners.forEach(stateListener ->
stateListener.onMemberEpochUpdated(epoch, memberId));
}
+ /**
+ * Invokes the {@link MemberStateListener#onGroupAssignmentUpdated(Set)}
callback for each listener when the
+ * set of assigned partitions changes. This includes on assignment
changes, unsubscribing, and when leaving
Review Comment:
typo unsubscribing
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -1875,7 +1907,9 @@ private void
completeUnsubscribeApplicationEventSuccessfully() {
private void completeAssignmentChangeEventSuccessfully() {
doAnswer(invocation -> {
AssignmentChangeEvent event = invocation.getArgument(0);
- consumer.subscriptions().assignFromUser(new
HashSet<>(event.partitions()));
+ HashSet<TopicPartition> partitions = new
HashSet<>(event.partitions());
+ consumer.subscriptions().assignFromUser(partitions);
+ consumer.setGroupAssignmentSnapshot(partitions);
Review Comment:
this func if about manual assignment so I would expect we don't need to set
the group assignment snapshot right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]