lianetm commented on code in PR #16686:
URL: https://github.com/apache/kafka/pull/16686#discussion_r1829951349
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1265,30 +1343,56 @@ private void close(Duration timeout, boolean
swallowException) {
}
}
- /**
- * Prior to closing the network thread, we need to make sure the following
operations happen in the right sequence:
- * 1. autocommit offsets
- * 2. release assignment. This is done via a background unsubscribe event
that will
- * trigger the callbacks, clear the assignment on the subscription state
and send the leave group request to the broker
- */
- private void releaseAssignmentAndLeaveGroup(final Timer timer) {
+ private void autoCommitOnClose(final Timer timer) {
if (!groupMetadata.get().isPresent())
return;
if (autoCommitEnabled)
commitSyncAllConsumed(timer);
applicationEventHandler.add(new CommitOnCloseEvent());
+ }
+
+ private void releaseAssignmentOnClose(final Timer timer) {
Review Comment:
nit: matter of changed perception I guess, but what about renaming this to
clearly indicate that it is running callbacks? (
~`runRebalanceCallbacksOnClose`). I see it's important given the challenges
that callbacks bring here
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1265,30 +1343,56 @@ private void close(Duration timeout, boolean
swallowException) {
}
}
- /**
- * Prior to closing the network thread, we need to make sure the following
operations happen in the right sequence:
- * 1. autocommit offsets
- * 2. release assignment. This is done via a background unsubscribe event
that will
- * trigger the callbacks, clear the assignment on the subscription state
and send the leave group request to the broker
- */
- private void releaseAssignmentAndLeaveGroup(final Timer timer) {
+ private void autoCommitOnClose(final Timer timer) {
if (!groupMetadata.get().isPresent())
return;
if (autoCommitEnabled)
commitSyncAllConsumed(timer);
applicationEventHandler.add(new CommitOnCloseEvent());
+ }
+
+ private void releaseAssignmentOnClose(final Timer timer) {
+ ConsumerGroupMetadata cgm = groupMetadata.get().orElse(null);
+
+ if (cgm == null)
+ return;
+
+ Set<TopicPartition> assignedPartitions =
subscriptions.assignedPartitions();
+
+ if (assignedPartitions.isEmpty())
+ // Nothing to revoke.
+ return;
+
+ SortedSet<TopicPartition> droppedPartitions = new
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+ droppedPartitions.addAll(assignedPartitions);
+
+ boolean isThreadInterrupted = Thread.currentThread().isInterrupted();
Review Comment:
this is not needed anymore right?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -239,10 +243,13 @@ private void process(final AssignmentChangeEvent event) {
manager.maybeAutoCommitAsync();
}
- log.info("Assigned to partition(s): {}",
event.partitions().stream().map(TopicPartition::toString).collect(Collectors.joining(",
")));
+ Collection<TopicPartition> partitions = event.partitions();
+ log.info("Assigned to partition(s): {}",
partitions.stream().map(TopicPartition::toString).collect(Collectors.joining(",
")));
try {
- if (subscriptions.assignFromUser(new
HashSet<>(event.partitions())))
+ if (subscriptions.assignFromUser(new HashSet<>(partitions))) {
metadata.requestUpdateForNewTopics();
+ requestManagers.consumerMembershipManager.ifPresent(cmm ->
cmm.notifyAssignmentChange(new HashSet<>(partitions)));
Review Comment:
uhm this is tricky, I wonder if dangerous. Here we are saying that the
membership mgr will notify about an assignment that is not really a group
assignment (it's manual assign). That notification is then taken on the
consumer close to trigger callbacks and send a request to the coordinator (I
expect it will all be no-op or throw errors at some point?). I know there is a
group check on those operations on close, but we could have groupId but not be
in a group if there was never a call to subscribe.
If what we really need in the consumer is a snapshot of group assignment
(needed to trigger callbacks), why don't we better keep just that (explicit
name, it's not the consumer assignment, it's the consumer group assignment),
and do not involve that in this call which is about manual assign? Limiting the
scope to what we really need seems to avoid trouble in this case
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1265,30 +1343,56 @@ private void close(Duration timeout, boolean
swallowException) {
}
}
- /**
- * Prior to closing the network thread, we need to make sure the following
operations happen in the right sequence:
- * 1. autocommit offsets
- * 2. release assignment. This is done via a background unsubscribe event
that will
- * trigger the callbacks, clear the assignment on the subscription state
and send the leave group request to the broker
- */
- private void releaseAssignmentAndLeaveGroup(final Timer timer) {
+ private void autoCommitOnClose(final Timer timer) {
if (!groupMetadata.get().isPresent())
return;
if (autoCommitEnabled)
commitSyncAllConsumed(timer);
applicationEventHandler.add(new CommitOnCloseEvent());
+ }
+
+ private void releaseAssignmentOnClose(final Timer timer) {
+ ConsumerGroupMetadata cgm = groupMetadata.get().orElse(null);
+
+ if (cgm == null)
+ return;
Review Comment:
up to you, but what about:
```suggestion
if (!groupMetadata.get().isPresent())
return;
int memberEpoch = groupMetadata.get().get().generationId();
```
I find it's comes back clearer later on, to refer to member epoch instead of
generation id to determine the callback
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -1129,6 +1169,7 @@ private CompletableFuture<Void> assignPartitions(
if (exception == null) {
// Enable newly added partitions to start fetching and
updating positions for them.
subscriptions.enablePartitionsAwaitingCallback(addedPartitions);
+ notifyAssignmentChange(addedPartitions);
Review Comment:
we should notify with the full assignment here I would say
(`assignedPartitions`)
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##########
@@ -523,12 +525,39 @@ public void transitionToJoining() {
/**
* Transition to {@link MemberState#PREPARE_LEAVING} to release the
assignment. Once completed,
* transition to {@link MemberState#LEAVING} to send the heartbeat request
and leave the group.
- * This is expected to be invoked when the user calls the unsubscribe API.
+ * This is expected to be invoked when the user calls the {@link
Consumer#close()} API.
+ *
+ * @return Future that will complete when the heartbeat to leave the group
has been sent out.
+ */
+ public CompletableFuture<Void> leaveGroupOnClose() {
+ // We pass in an already completed Future because the callback was
already executed.
+ return leaveGroup(() -> CompletableFuture.completedFuture(null));
+ }
+
+ /**
+ * Transition to {@link MemberState#PREPARE_LEAVING} to release the
assignment. Once completed,
+ * transition to {@link MemberState#LEAVING} to send the heartbeat request
and leave the group.
+ * This is expected to be invoked when the user calls the {@link
Consumer#unsubscribe()} API.
*
* @return Future that will complete when the callback execution completes
and the heartbeat
* to leave the group has been sent out.
*/
public CompletableFuture<Void> leaveGroup() {
+ // We pass in the member leaving group Future because the callback may
still need to be executed.
+ return leaveGroup(this::signalMemberLeavingGroup);
+ }
+
+ /**
+ * Transition to {@link MemberState#PREPARE_LEAVING} to release the
assignment. Once completed,
+ * transition to {@link MemberState#LEAVING} to send the heartbeat request
and leave the group.
+ * This is expected to be invoked when the user calls the unsubscribe API.
+ *
+ * @param callbackFutureSupplier Used if the consumer needs to insert the
step to execute the
+ * {@link ConsumerRebalanceListener} before
completing unsubscribe
+ * @return Future that will complete when the callback execution completes
and the heartbeat
+ * to leave the group has been sent out.
+ */
+ protected CompletableFuture<Void>
leaveGroup(Supplier<CompletableFuture<Void>> callbackFutureSupplier) {
Review Comment:
why passing a Supplier of CompletableFuture instead of simply passing the
CompletableFuture itself? seems more verbose/obfuscated but I could be missing
an upside.
In any case passing an Optional maybe suits better? given that we want to
represent that the leave may (or may not) receive a future to wait on before
sending the request
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+
+import java.time.Duration;
+
+/**
+ * When the user calls {@link Consumer#close()}, this event is sent to signal
the {@link ConsumerMembershipManager}
+ * to perform the necessary steps to leave the consumer group cleanly, if
possible. The event's timeout is based on
+ * either the user-provided value to {@link Consumer#close(Duration)} or
+ * {@link ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} if {@link Consumer#close()}
was called. The event is considered
+ * complete when the membership manager sends the heartbeat message to leave
the group. The event does not wait on a
Review Comment:
This event does wait for the response from the coordinator right? It's
called with `addAndGet`, and this event future completes when the
`leaveGroupOnClose` in the membershipMgr completes, and that only happens on
`maybeCompleteLeaveInProgress`, when we receive a response to the HB to leave
(or skip HB, or get error).
We intentionally did that to keep the same behaviour of the classic consumer
(coordinator does awaitPendingRequests on close), and to avoid responses to
disconnected clients that we used to have before, when we would send the leave
and carry on with the consumer close/shutdown without waiting for a response.
Makes sense?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]