This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b6b2c9ebc45 KAFKA-16985: Ensure consumer attempts to send leave
request on close even if interrupted (#16686)
b6b2c9ebc45 is described below
commit b6b2c9ebc45bd60572c24355886620dbdc406ce9
Author: Kirk True <[email protected]>
AuthorDate: Wed Nov 13 11:26:40 2024 -0800
KAFKA-16985: Ensure consumer attempts to send leave request on close even
if interrupted (#16686)
Reviewers: Andrew Schofield <[email protected]>, Chia-Ping Tsai
<[email protected]>, Lianet Magrans <[email protected]>, Philip Nee
<[email protected]>
---
.../internals/AbstractMembershipManager.java | 96 ++++++++---
.../consumer/internals/AsyncKafkaConsumer.java | 179 +++++++++++++++++----
.../consumer/internals/MemberStateListener.java | 15 +-
.../consumer/internals/SubscriptionState.java | 2 +-
.../internals/events/ApplicationEvent.java | 2 +-
.../events/ApplicationEventProcessor.java | 13 ++
.../internals/events/LeaveGroupOnCloseEvent.java | 37 +++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 83 ++++++----
.../kafka/api/PlaintextConsumerTest.scala | 49 +++++-
9 files changed, 385 insertions(+), 91 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 2c4867f2f13..82b4e567d34 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -30,7 +30,6 @@ import org.slf4j.Logger;
import java.time.Duration;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -130,7 +129,8 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
* requests in cases where a currently assigned topic is in the target
assignment (new
* partition assigned, or revoked), but it is not present the Metadata
cache at that moment.
* The cache is cleared when the subscription changes ({@link
#transitionToJoining()}, the
- * member fails ({@link #transitionToFatal()} or leaves the group ({@link
#leaveGroup()}).
+ * member fails ({@link #transitionToFatal()} or leaves the group
+ * ({@link #leaveGroup()}/{@link #leaveGroupOnClose()}).
*/
private final Map<Uuid, String> assignedTopicNamesCache;
@@ -157,9 +157,9 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
private boolean rejoinedWhileReconciliationInProgress;
/**
- * If the member is currently leaving the group after a call to {@link
#leaveGroup()}}, this
- * will have a future that will complete when the ongoing leave operation
completes
- * (callbacks executed and heartbeat request to leave is sent out). This
will be empty is the
+ * If the member is currently leaving the group after a call to {@link
#leaveGroup()} or
+ * {@link #leaveGroupOnClose()}, this will have a future that will
complete when the ongoing leave operation
+ * completes (callbacks executed and heartbeat request to leave is sent
out). This will be empty is the
* member is not leaving.
*/
private Optional<CompletableFuture<Void>> leaveGroupInProgress =
Optional.empty();
@@ -481,6 +481,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
private void clearAssignment() {
if (subscriptions.hasAutoAssignedPartitions()) {
subscriptions.assignFromSubscribed(Collections.emptySet());
+ notifyAssignmentChange(Collections.emptySet());
}
currentAssignment = LocalAssignment.NONE;
clearPendingAssignmentsAndLocalNamesCache();
@@ -496,8 +497,9 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
*/
private void
updateSubscriptionAwaitingCallback(SortedSet<TopicIdPartition>
assignedPartitions,
SortedSet<TopicPartition>
addedPartitions) {
- Collection<TopicPartition> assignedTopicPartitions =
toTopicPartitionSet(assignedPartitions);
+ Set<TopicPartition> assignedTopicPartitions =
toTopicPartitionSet(assignedPartitions);
subscriptions.assignFromSubscribedAwaitingCallback(assignedTopicPartitions,
addedPartitions);
+ notifyAssignmentChange(assignedTopicPartitions);
}
/**
@@ -523,18 +525,45 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
/**
* Transition to {@link MemberState#PREPARE_LEAVING} to release the
assignment. Once completed,
* transition to {@link MemberState#LEAVING} to send the heartbeat request
and leave the group.
- * This is expected to be invoked when the user calls the unsubscribe API.
+ * This is expected to be invoked when the user calls the {@link
Consumer#close()} API.
+ *
+ * @return Future that will complete when the heartbeat to leave the group
has been sent out.
+ */
+ public CompletableFuture<Void> leaveGroupOnClose() {
+ return leaveGroup(false);
+ }
+
+ /**
+ * Transition to {@link MemberState#PREPARE_LEAVING} to release the
assignment. Once completed,
+ * transition to {@link MemberState#LEAVING} to send the heartbeat request
and leave the group.
+ * This is expected to be invoked when the user calls the {@link
Consumer#unsubscribe()} API.
*
* @return Future that will complete when the callback execution completes
and the heartbeat
* to leave the group has been sent out.
*/
public CompletableFuture<Void> leaveGroup() {
+ return leaveGroup(true);
+ }
+
+ /**
+ * Transition to {@link MemberState#PREPARE_LEAVING} to release the
assignment. Once completed,
+ * transition to {@link MemberState#LEAVING} to send the heartbeat request
and leave the group.
+ * This is expected to be invoked when the user calls the unsubscribe API
or is closing the consumer.
+ *
+ * @param runCallbacks {@code true} to insert the step to execute the
{@link ConsumerRebalanceListener} callback,
+ * {@code false} to skip
+ *
+ * @return Future that will complete when the callback execution completes
and the heartbeat
+ * to leave the group has been sent out.
+ */
+ protected CompletableFuture<Void> leaveGroup(boolean runCallbacks) {
if (isNotInGroup()) {
if (state == MemberState.FENCED) {
clearAssignment();
transitionTo(MemberState.UNSUBSCRIBED);
}
subscriptions.unsubscribe();
+ notifyAssignmentChange(Collections.emptySet());
return CompletableFuture.completedFuture(null);
}
@@ -549,31 +578,39 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
CompletableFuture<Void> leaveResult = new CompletableFuture<>();
leaveGroupInProgress = Optional.of(leaveResult);
- CompletableFuture<Void> callbackResult = signalMemberLeavingGroup();
- callbackResult.whenComplete((result, error) -> {
- if (error != null) {
- log.error("Member {} callback to release assignment failed. It
will proceed " +
- "to clear its assignment and send a leave group
heartbeat", memberId, error);
- } else {
- log.info("Member {} completed callback to release assignment.
It will proceed " +
- "to clear its assignment and send a leave group
heartbeat", memberId);
- }
-
- // Clear the subscription, no matter if the callback execution
failed or succeeded.
- subscriptions.unsubscribe();
- clearAssignment();
+ if (runCallbacks) {
+ CompletableFuture<Void> callbackResult =
signalMemberLeavingGroup();
+ callbackResult.whenComplete((result, error) -> {
+ if (error != null) {
+ log.error("Member {} callback to release assignment
failed. It will proceed " +
+ "to clear its assignment and send a leave group
heartbeat", memberId, error);
+ } else {
+ log.info("Member {} completed callback to release
assignment. It will proceed " +
+ "to clear its assignment and send a leave group
heartbeat", memberId);
+ }
- // Transition to ensure that a heartbeat request is sent out to
effectively leave the
- // group (even in the case where the member had no assignment to
release or when the
- // callback execution failed.)
- transitionToSendingLeaveGroup(false);
- });
+ // Clear the assignment, no matter if the callback execution
failed or succeeded.
+ clearAssignmentAndLeaveGroup();
+ });
+ } else {
+ clearAssignmentAndLeaveGroup();
+ }
// Return future to indicate that the leave group is done when the
callbacks
// complete, and the transition to send the heartbeat has been made.
return leaveResult;
}
+ private void clearAssignmentAndLeaveGroup() {
+ subscriptions.unsubscribe();
+ clearAssignment();
+
+ // Transition to ensure that a heartbeat request is sent out to
effectively leave the
+ // group (even in the case where the member had no assignment to
release or when the
+ // callback execution failed.)
+ transitionToSendingLeaveGroup(false);
+ }
+
/**
* Reset member epoch to the value required for the leave the group
heartbeat request, and
* transition to the {@link MemberState#LEAVING} state so that a heartbeat
request is sent
@@ -616,6 +653,15 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
stateUpdatesListeners.forEach(stateListener ->
stateListener.onMemberEpochUpdated(epoch, memberId));
}
+ /**
+ * Invokes the {@link MemberStateListener#onGroupAssignmentUpdated(Set)}
callback for each listener when the
+ * set of assigned partitions changes. This includes on assignment
changes, unsubscribe, and when leaving
+ * the group.
+ */
+ void notifyAssignmentChange(Set<TopicPartition> partitions) {
+ stateUpdatesListeners.forEach(stateListener ->
stateListener.onGroupAssignmentUpdated(partitions));
+ }
+
/**
* @return True if the member should send heartbeat to the coordinator
without waiting for
* the interval.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 7fda9a20c05..fcf688b7f8e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -54,6 +54,7 @@ import
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEve
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
@@ -112,6 +113,7 @@ import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
@@ -126,6 +128,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
+import static
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.TOPIC_PARTITION_COMPARATOR;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_JMX_PREFIX;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS;
@@ -170,12 +173,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
*/
private class BackgroundEventProcessor implements
EventProcessor<BackgroundEvent> {
- private final ConsumerRebalanceListenerInvoker
rebalanceListenerInvoker;
-
- public BackgroundEventProcessor(final ConsumerRebalanceListenerInvoker
rebalanceListenerInvoker) {
- this.rebalanceListenerInvoker = rebalanceListenerInvoker;
- }
-
@Override
public void process(final BackgroundEvent event) {
switch (event.type()) {
@@ -234,6 +231,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private final IsolationLevel isolationLevel;
private final SubscriptionState subscriptions;
+
+ /**
+ * This is a snapshot of the partitions assigned to this consumer.
HOWEVER, this is only populated and used in
+ * the case where this consumer is in a consumer group. Self-assigned
partitions do not appear here.
+ */
+ private final AtomicReference<Set<TopicPartition>> groupAssignmentSnapshot
= new AtomicReference<>(Collections.emptySet());
private final ConsumerMetadata metadata;
private final Metrics metrics;
private final long retryBackoffMs;
@@ -247,6 +250,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private boolean cachedSubscriptionHasAllFetchPositions;
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
+ private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
// Last triggered async commit future. Used to wait until all previous
async commits are completed.
// We only need to keep track of the last one, since they are guaranteed
to complete in order.
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>
lastPendingAsyncCommit = null;
@@ -256,6 +260,18 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
private final AtomicInteger refCount = new AtomicInteger(0);
+ private final MemberStateListener memberStateListener = new
MemberStateListener() {
+ @Override
+ public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String
memberId) {
+ updateGroupMetadata(memberEpoch, memberId);
+ }
+
+ @Override
+ public void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
+ setGroupAssignmentSnapshot(partitions);
+ }
+ };
+
AsyncKafkaConsumer(final ConsumerConfig config,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
@@ -348,7 +364,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
clientTelemetryReporter,
metrics,
offsetCommitCallbackInvoker,
- this::updateGroupMetadata
+ memberStateListener
);
final Supplier<ApplicationEventProcessor>
applicationEventProcessorSupplier =
ApplicationEventProcessor.supplier(logContext,
metadata,
@@ -363,15 +379,13 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
networkClientDelegateSupplier,
requestManagersSupplier);
- ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = new
ConsumerRebalanceListenerInvoker(
+ this.rebalanceListenerInvoker = new
ConsumerRebalanceListenerInvoker(
logContext,
subscriptions,
time,
new RebalanceCallbackMetricsManager(metrics)
);
- this.backgroundEventProcessor = new BackgroundEventProcessor(
- rebalanceListenerInvoker
- );
+ this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper =
backgroundEventReaperFactory.build(logContext);
// The FetchCollector is only used on the application thread.
@@ -431,7 +445,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.interceptors = Objects.requireNonNull(interceptors);
this.time = time;
this.backgroundEventQueue = backgroundEventQueue;
- this.backgroundEventProcessor = new
BackgroundEventProcessor(rebalanceListenerInvoker);
+ this.rebalanceListenerInvoker = rebalanceListenerInvoker;
+ this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = backgroundEventReaper;
this.metrics = metrics;
this.groupMetadata.set(initializeGroupMetadata(groupId,
Optional.empty()));
@@ -490,7 +505,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
this.backgroundEventQueue = new LinkedBlockingQueue<>();
BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(backgroundEventQueue);
- ConsumerRebalanceListenerInvoker rebalanceListenerInvoker = new
ConsumerRebalanceListenerInvoker(
+ this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
logContext,
subscriptions,
time,
@@ -521,7 +536,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
clientTelemetryReporter,
metrics,
offsetCommitCallbackInvoker,
- this::updateGroupMetadata
+ memberStateListener
);
Supplier<ApplicationEventProcessor> applicationEventProcessorSupplier
= ApplicationEventProcessor.supplier(
logContext,
@@ -536,7 +551,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
requestManagersSupplier);
- this.backgroundEventProcessor = new
BackgroundEventProcessor(rebalanceListenerInvoker);
+ this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = new CompletableEventReaper(logContext);
}
@@ -639,6 +654,10 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
);
}
+ void setGroupAssignmentSnapshot(final Set<TopicPartition> partitions) {
+ groupAssignmentSnapshot.set(Collections.unmodifiableSet(partitions));
+ }
+
@Override
public void registerMetricForSubscription(KafkaMetric metric) {
if (clientTelemetryReporter.isPresent()) {
@@ -1216,6 +1235,68 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
}
+ /**
+ * Please keep these tenets in mind for the implementation of the {@link
AsyncKafkaConsumer}’s
+ * {@link #close(Duration)} method. In the future, these tenets may be
made officially part of the top-level
+ * {@link KafkaConsumer#close(Duration)} API, but for now they remain here.
+ *
+ * <ol>
+ * <li>
+ * The execution of the {@link ConsumerRebalanceListener} callback
(if applicable) must be performed on
+ * the application thread to ensure it does not interfere with the
network I/O on the background thread.
+ * </li>
+ * <li>
+ * The {@link ConsumerRebalanceListener} callback execution must
complete before an attempt to leave
+ * the consumer group is performed. In this context, “complete”
does not necessarily imply
+ * <em>success</em>; execution is “complete” even if the execution
<em>fails</em> with an error.
+ * </li>
+ * <li>
+ * Any error thrown during the {@link ConsumerRebalanceListener}
callback execution will be caught to
+ * ensure it does not prevent execution of the remaining {@link
#close()} logic.
+ * </li>
+ * <li>
+ * The application thread will be blocked during the entire
duration of the execution of the
+ * {@link ConsumerRebalanceListener}. The consumer does not employ
a mechanism to short-circuit the
+ * callback execution, so execution is not bound by the timeout in
{@link #close(Duration)}.
+ * </li>
+ * <li>
+ * A given {@link ConsumerRebalanceListener} implementation may be
affected by the application thread's
+ * interrupt state. If the callback implementation performs any
blocking operations, it may result in
+ * an error. An implementation may choose to preemptively check
the thread's interrupt flag via
+ * {@link Thread#isInterrupted()} or {@link
Thread#isInterrupted()} and alter its behavior.
+ * </li>
+ * <li>
+ * If the application thread was interrupted <em>prior</em> to the
execution of the
+ * {@link ConsumerRebalanceListener} callback, the thread's
interrupt state will be preserved for the
+ * {@link ConsumerRebalanceListener} execution.
+ * </li>
+ * <li>
+ * If the application thread was interrupted <em>prior</em> to the
execution of the
+ * {@link ConsumerRebalanceListener} callback <em>but</em> the
callback cleared out the interrupt state,
+ * the {@link #close()} method will not make any effort to restore
the application thread's interrupt
+ * state for the remainder of the execution of {@link #close()}.
+ * </li>
+ * <li>
+ * Leaving the consumer group is achieved by issuing a ‘leave
group‘ network request. The consumer will
+ * attempt to leave the group on a “best-case” basis. There is no
stated guarantee that the consumer will
+ * have successfully left the group before the {@link #close()}
method completes processing.
+ * </li>
+ * <li>
+ * The consumer will attempt to leave the group regardless of the
timeout elapsing or the application
+ * thread receiving an {@link InterruptException} or {@link
InterruptedException}.
+ * </li>
+ * <li>
+ * The application thread will wait for confirmation that the
consumer left the group until one of the
+ * following occurs:
+ *
+ * <ol>
+ * <li>Confirmation that the ’leave group‘ response was
received from the group coordinator</li>
+ * <li>The timeout provided by the user elapses</li>
+ * <li>An {@link InterruptException} or {@link
InterruptedException} is thrown</li>
+ * </ol>
+ * </li>
+ * </ol>
+ */
private void close(Duration timeout, boolean swallowException) {
log.trace("Closing the Kafka consumer");
AtomicReference<Throwable> firstException = new AtomicReference<>();
@@ -1227,9 +1308,15 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
clientTelemetryReporter.ifPresent(ClientTelemetryReporter::initiateClose);
closeTimer.update();
// Prepare shutting down the network thread
- swallow(log, Level.ERROR, "Failed to release assignment before closing
consumer",
- () -> releaseAssignmentAndLeaveGroup(closeTimer), firstException);
- swallow(log, Level.ERROR, "Failed invoking asynchronous commit
callback.",
+ // Prior to closing the network thread, we need to make sure the
following operations happen in the right
+ // sequence...
+ swallow(log, Level.ERROR, "Failed to auto-commit offsets",
+ () -> autoCommitOnClose(closeTimer), firstException);
+ swallow(log, Level.ERROR, "Failed to release group assignment",
+ () -> runRebalanceCallbacksOnClose(closeTimer), firstException);
+ swallow(log, Level.ERROR, "Failed to leave group while closing
consumer",
+ () -> leaveGroupOnClose(closeTimer), firstException);
+ swallow(log, Level.ERROR, "Failed invoking asynchronous commit
callbacks while closing consumer",
() ->
awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false),
firstException);
if (applicationEventHandler != null)
closeQuietly(() ->
applicationEventHandler.close(Duration.ofMillis(closeTimer.remainingMs())),
"Failed shutting down network thread", firstException);
@@ -1257,13 +1344,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
}
- /**
- * Prior to closing the network thread, we need to make sure the following
operations happen in the right sequence:
- * 1. autocommit offsets
- * 2. release assignment. This is done via a background unsubscribe event
that will
- * trigger the callbacks, clear the assignment on the subscription state
and send the leave group request to the broker
- */
- private void releaseAssignmentAndLeaveGroup(final Timer timer) {
+ private void autoCommitOnClose(final Timer timer) {
if (!groupMetadata.get().isPresent())
return;
@@ -1271,18 +1352,48 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
commitSyncAllConsumed(timer);
applicationEventHandler.add(new CommitOnCloseEvent());
+ }
+
+ private void runRebalanceCallbacksOnClose(final Timer timer) {
+ if (groupMetadata.get().isEmpty())
+ return;
+
+ int memberEpoch = groupMetadata.get().get().generationId();
+
+ Set<TopicPartition> assignedPartitions = groupAssignmentSnapshot.get();
+
+ if (assignedPartitions.isEmpty())
+ // Nothing to revoke.
+ return;
+
+ SortedSet<TopicPartition> droppedPartitions = new
TreeSet<>(TOPIC_PARTITION_COMPARATOR);
+ droppedPartitions.addAll(assignedPartitions);
+
+ try {
+ final Exception error;
+
+ if (memberEpoch > 0)
+ error =
rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
+ else
+ error =
rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
+
+ if (error != null)
+ throw ConsumerUtils.maybeWrapAsKafkaException(error);
+ } finally {
+ timer.update();
+ }
+ }
+
+ private void leaveGroupOnClose(final Timer timer) {
+ if (!groupMetadata.get().isPresent())
+ return;
- log.info("Releasing assignment and leaving group before closing
consumer");
- UnsubscribeEvent unsubscribeEvent = new
UnsubscribeEvent(calculateDeadlineMs(timer));
- applicationEventHandler.add(unsubscribeEvent);
+ log.debug("Leaving the consumer group during consumer close");
try {
- // If users subscribe to an invalid topic name, they will get
InvalidTopicException in error events,
- // because network thread keeps trying to send MetadataRequest in
the background.
- // Ignore it to avoid unsubscribe failed.
- processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e
instanceof InvalidTopicException);
- log.info("Completed releasing assignment and sending leave group
to close consumer");
+ applicationEventHandler.addAndGet(new
LeaveGroupOnCloseEvent(calculateDeadlineMs(timer)));
+ log.info("Completed leaving the group");
} catch (TimeoutException e) {
- log.warn("Consumer triggered an unsubscribe event to leave the
group but couldn't " +
+ log.warn("Consumer attempted to leave the group but couldn't " +
"complete it within {} ms. It will proceed to close.",
timer.timeoutMs());
} finally {
timer.update();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
index 8b977eb5c35..98b6271fcc0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java
@@ -17,10 +17,13 @@
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.common.TopicPartition;
+
import java.util.Optional;
+import java.util.Set;
/**
- * Listener for getting notified of member epoch changes.
+ * Listener for getting notified of membership state changes.
*/
public interface MemberStateListener {
@@ -34,4 +37,14 @@ public interface MemberStateListener {
* @param memberId Current member ID. It won't change until the process
is terminated.
*/
void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId);
+
+ /**
+ * This callback is invoked when a group member's assigned set of
partitions changes. Assignments can change via
+ * group coordinator partition assignment changes, unsubscribing, and when
leaving the group.
+ *
+ * @param partitions New assignment, can be empty, but not {@code null}
+ */
+ default void onGroupAssignmentUpdated(Set<TopicPartition> partitions) {
+
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 310c7a3b8b1..8871694db45 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -100,7 +100,7 @@ public class SubscriptionState {
private final OffsetResetStrategy defaultResetStrategy;
/* User-provided listener to be invoked when assignment changes */
- private Optional<ConsumerRebalanceListener> rebalanceListener;
+ private Optional<ConsumerRebalanceListener> rebalanceListener =
Optional.empty();
private int assignmentId = 0;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index e11e702388c..9bd229a3fe8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -33,7 +33,7 @@ public abstract class ApplicationEvent {
LIST_OFFSETS, CHECK_AND_UPDATE_POSITIONS, RESET_OFFSET,
TOPIC_METADATA, ALL_TOPICS_METADATA,
TOPIC_SUBSCRIPTION_CHANGE, TOPIC_PATTERN_SUBSCRIPTION_CHANGE,
UPDATE_SUBSCRIPTION_METADATA,
UNSUBSCRIBE, CONSUMER_REBALANCE_LISTENER_CALLBACK_COMPLETED,
- COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS,
+ COMMIT_ON_CLOSE, CREATE_FETCH_REQUESTS, LEAVE_GROUP_ON_CLOSE,
SHARE_FETCH, SHARE_ACKNOWLEDGE_ASYNC, SHARE_ACKNOWLEDGE_SYNC,
SHARE_SUBSCRIPTION_CHANGE, SHARE_UNSUBSCRIBE,
SHARE_ACKNOWLEDGE_ON_CLOSE,
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 642e5e0cca2..7bd2d1f28b7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -136,6 +136,10 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
process((CommitOnCloseEvent) event);
return;
+ case LEAVE_GROUP_ON_CLOSE:
+ process((LeaveGroupOnCloseEvent) event);
+ return;
+
case CREATE_FETCH_REQUESTS:
process((CreateFetchRequestsEvent) event);
return;
@@ -394,6 +398,15 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
requestManagers.commitRequestManager.get().signalClose();
}
+ private void process(final LeaveGroupOnCloseEvent event) {
+ if (!requestManagers.consumerMembershipManager.isPresent())
+ return;
+
+ log.debug("Signal the ConsumerMembershipManager to leave the consumer
group since the consumer is closing");
+ CompletableFuture<Void> future =
requestManagers.consumerMembershipManager.get().leaveGroupOnClose();
+ future.whenComplete(complete(event.future()));
+ }
+
/**
* Process event that tells the share consume request manager to fetch
more records.
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
new file mode 100644
index 00000000000..4afc00390d4
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.events;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+
+import java.time.Duration;
+
+/**
+ * When the user calls {@link Consumer#close()}, this event is sent to signal
the {@link ConsumerMembershipManager}
+ * to perform the necessary steps to leave the consumer group cleanly, if
possible. The event's timeout is based on
+ * either the user-provided value to {@link Consumer#close(Duration)} or
+ * {@link ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} if {@link Consumer#close()}
was called. The event is considered
+ * complete when the membership manager receives the heartbeat response that
it has left the group.
+ */
+public class LeaveGroupOnCloseEvent extends CompletableApplicationEvent<Void> {
+
+ public LeaveGroupOnCloseEvent(final long deadlineMs) {
+ super(Type.LEAVE_GROUP_ON_CLOSE, deadlineMs);
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 54a41587b06..eaf974b91c4 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -44,6 +44,7 @@ import
org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEve
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
+import
org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
@@ -80,6 +81,7 @@ import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
@@ -636,12 +638,13 @@ public class AsyncKafkaConsumerTest {
completeUnsubscribeApplicationEventSuccessfully();
doReturn(null).when(applicationEventHandler).addAndGet(any());
consumer.close();
- verify(applicationEventHandler).add(any(UnsubscribeEvent.class));
verify(applicationEventHandler).add(any(CommitOnCloseEvent.class));
+
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
}
- @Test
- public void testUnsubscribeOnClose() {
+ @ParameterizedTest
+ @ValueSource(longs = {0, ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS})
+ public void testCloseLeavesGroup(long timeoutMs) {
SubscriptionState subscriptions = mock(SubscriptionState.class);
consumer = spy(newConsumer(
mock(FetchBuffer.class),
@@ -651,29 +654,64 @@ public class AsyncKafkaConsumerTest {
"group-id",
"client-id",
false));
- completeUnsubscribeApplicationEventSuccessfully();
- consumer.close(Duration.ZERO);
- verifyUnsubscribeEvent(subscriptions);
+ consumer.close(Duration.ofMillis(timeoutMs));
+
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
}
@Test
- public void testFailedPartitionRevocationOnClose() {
+ public void testCloseLeavesGroupDespiteOnPartitionsLostError() {
// If rebalance listener failed to execute during close, we still send
the leave group,
// and proceed with closing the consumer.
+ Throwable rootError = new KafkaException("Intentional error");
+ Set<TopicPartition> partitions = singleton(new
TopicPartition("topic1", 0));
SubscriptionState subscriptions = mock(SubscriptionState.class);
+ when(subscriptions.assignedPartitions()).thenReturn(partitions);
+ ConsumerRebalanceListenerInvoker invoker =
mock(ConsumerRebalanceListenerInvoker.class);
+ doAnswer(invocation ->
rootError).when(invoker).invokePartitionsLost(any(SortedSet.class));
+
consumer = spy(newConsumer(
mock(FetchBuffer.class),
new ConsumerInterceptors<>(Collections.emptyList()),
+ invoker,
+ subscriptions,
+ "group-id",
+ "client-id",
+ false));
+ consumer.setGroupAssignmentSnapshot(partitions);
+
+ Throwable t = assertThrows(KafkaException.class, () ->
consumer.close(Duration.ZERO));
+ assertNotNull(t.getCause());
+ assertEquals(rootError, t.getCause());
+
+
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
+ }
+
+ @ParameterizedTest
+ @ValueSource(longs = {0, ConsumerUtils.DEFAULT_CLOSE_TIMEOUT_MS})
+ public void testCloseLeavesGroupDespiteInterrupt(long timeoutMs) {
+ Set<TopicPartition> partitions = singleton(new
TopicPartition("topic1", 0));
+ SubscriptionState subscriptions = mock(SubscriptionState.class);
+ when(subscriptions.assignedPartitions()).thenReturn(partitions);
+
when(applicationEventHandler.addAndGet(any(CompletableApplicationEvent.class))).thenThrow(InterruptException.class);
+ consumer = spy(newConsumer(
+ mock(FetchBuffer.class),
+ mock(ConsumerInterceptors.class),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
"client-id",
false));
- doThrow(new
KafkaException()).when(consumer).processBackgroundEvents(any(), any(), any());
- assertThrows(KafkaException.class, () ->
consumer.close(Duration.ZERO));
- verifyUnsubscribeEvent(subscriptions);
- // Close operation should carry on even if the unsubscribe fails
- verify(applicationEventHandler).close(any(Duration.class));
+
+ Duration timeout = Duration.ofMillis(timeoutMs);
+
+ try {
+ assertThrows(InterruptException.class, () ->
consumer.close(timeout));
+ } finally {
+ Thread.interrupted();
+ }
+
+ verify(applicationEventHandler).add(any(CommitOnCloseEvent.class));
+
verify(applicationEventHandler).addAndGet(any(LeaveGroupOnCloseEvent.class));
}
@Test
@@ -1577,9 +1615,11 @@ public class AsyncKafkaConsumerTest {
final OffsetAndMetadata nextOffsetAndMetadata = new
OffsetAndMetadata(4, Optional.of(0), "");
// On the first iteration, return no data; on the second, return two
records
+ Set<TopicPartition> partitions = singleton(tp);
doAnswer(invocation -> {
// Mock the subscription being assigned as the first fetch is
collected
-
consumer.subscriptions().assignFromSubscribed(Collections.singleton(tp));
+ consumer.subscriptions().assignFromSubscribed(partitions);
+ consumer.setGroupAssignmentSnapshot(partitions);
return Fetch.empty();
}).doAnswer(invocation ->
Fetch.forPartition(tp, records, true, nextOffsetAndMetadata)
@@ -1593,7 +1633,7 @@ public class AsyncKafkaConsumerTest {
assertEquals(Optional.of(0),
returnedRecords.nextOffsets().get(tp).leaderEpoch());
assertEquals(singleton(topicName), consumer.subscription());
- assertEquals(singleton(tp), consumer.assignment());
+ assertEquals(partitions, consumer.assignment());
}
/**
@@ -1771,18 +1811,6 @@ public class AsyncKafkaConsumerTest {
assertEquals(OffsetResetStrategy.LATEST,
resetOffsetEvent.offsetResetStrategy());
}
- private void verifyUnsubscribeEvent(SubscriptionState subscriptions) {
- // Check that an unsubscribe event was generated, and that the
consumer waited for it to
- // complete processing background events.
- verify(applicationEventHandler).add(any(UnsubscribeEvent.class));
- verify(consumer).processBackgroundEvents(any(), any(), any());
-
- // The consumer should not clear the assignment in the app thread. The
unsubscribe
- // event is the one responsible for updating the assignment in the
background when it
- // completes.
- verify(subscriptions, never()).assignFromSubscribed(any());
- }
-
private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
@@ -1875,7 +1903,8 @@ public class AsyncKafkaConsumerTest {
private void completeAssignmentChangeEventSuccessfully() {
doAnswer(invocation -> {
AssignmentChangeEvent event = invocation.getArgument(0);
- consumer.subscriptions().assignFromUser(new
HashSet<>(event.partitions()));
+ HashSet<TopicPartition> partitions = new
HashSet<>(event.partitions());
+ consumer.subscriptions().assignFromUser(partitions);
event.future().complete(null);
return null;
}).when(applicationEventHandler).addAndGet(ArgumentMatchers.isA(AssignmentChangeEvent.class));
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 6ce0f6c00de..cbf763ec581 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.admin.{NewPartitions,
NewTopic}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.config.TopicConfig
-import org.apache.kafka.common.errors.{InvalidGroupIdException,
InvalidTopicException, TimeoutException, WakeupException}
+import org.apache.kafka.common.errors.{InterruptException,
InvalidGroupIdException, InvalidTopicException, TimeoutException,
WakeupException}
import org.apache.kafka.common.record.{CompressionType, TimestampType}
import org.apache.kafka.common.serialization._
import org.apache.kafka.common.{MetricName, TopicPartition}
@@ -35,7 +35,7 @@ import org.junit.jupiter.api.Timeout
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
-import java.util.concurrent.{CompletableFuture, TimeUnit}
+import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
import scala.jdk.CollectionConverters._
@Timeout(600)
@@ -824,4 +824,49 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertThrows(classOf[WakeupException], () =>
consumer.position(topicPartition, Duration.ofSeconds(100)))
}
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+ def testCloseLeavesGroupOnInterrupt(quorum: String, groupProtocol: String):
Unit = {
+ val adminClient = createAdminClient()
+ val consumer = createConsumer()
+ val listener = new TestConsumerReassignmentListener()
+ consumer.subscribe(List(topic).asJava, listener)
+ awaitRebalance(consumer, listener)
+
+ assertEquals(1, listener.callsToAssigned)
+ assertEquals(0, listener.callsToRevoked)
+
+ try {
+ Thread.currentThread().interrupt()
+ assertThrows(classOf[InterruptException], () => consumer.close())
+ } finally {
+ // Clear the interrupted flag so we don't create problems for subsequent
tests.
+ Thread.interrupted()
+ }
+
+ assertEquals(1, listener.callsToAssigned)
+ assertEquals(1, listener.callsToRevoked)
+
+ val config = new ConsumerConfig(consumerConfig)
+
+ // Set the wait timeout to be only *half* the configured session timeout.
This way we can make sure that the
+ // consumer explicitly left the group as opposed to being kicked out by
the broker.
+ val leaveGroupTimeoutMs =
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG) / 2
+
+ TestUtils.waitUntilTrue(
+ () => {
+ try {
+ val groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG)
+ val groupDescription = adminClient.describeConsumerGroups
(Collections.singletonList (groupId) ).describedGroups.get (groupId).get
+ groupDescription.members.isEmpty
+ } catch {
+ case _: ExecutionException | _: InterruptedException =>
+ false
+ }
+ },
+ msg=s"Consumer did not leave the consumer group within
$leaveGroupTimeoutMs ms of close",
+ waitTimeMs=leaveGroupTimeoutMs
+ )
+ }
}