This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 74acbd200df KAFKA-16758: Extend Consumer#close with an option to leave
the group or not (#17614)
74acbd200df is described below
commit 74acbd200dfc5c97cd1d74f1f56535a7c609b339
Author: TengYao Chi <[email protected]>
AuthorDate: Sun Apr 6 13:02:45 2025 +0800
KAFKA-16758: Extend Consumer#close with an option to leave the group or not
(#17614)
JIRA: [KAFKA-16758](https://issues.apache.org/jira/browse/KAFKA-16758)
This PR is aim to deliver
[KIP-1092](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=321719077),
please refer to KIP-1092 and KAFKA-16758 for further details.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Chia-Ping
Tsai <[email protected]>, Kirk True <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../kafka/clients/consumer/CloseOptions.java | 124 +++++++++++++++++++++
.../apache/kafka/clients/consumer/Consumer.java | 7 ++
.../kafka/clients/consumer/KafkaConsumer.java | 12 ++
.../kafka/clients/consumer/MockConsumer.java | 6 +
.../consumer/internals/AbstractCoordinator.java | 40 ++++---
.../internals/AbstractHeartbeatRequestManager.java | 10 +-
.../internals/AbstractMembershipManager.java | 31 +++++-
.../consumer/internals/AsyncKafkaConsumer.java | 23 ++--
.../consumer/internals/ClassicKafkaConsumer.java | 39 +++++--
.../consumer/internals/ConsumerCoordinator.java | 5 +-
.../internals/ConsumerHeartbeatRequestManager.java | 10 ++
.../internals/ConsumerMembershipManager.java | 38 ++++++-
.../internals/ShareHeartbeatRequestManager.java | 5 +
.../events/ApplicationEventProcessor.java | 2 +-
.../internals/events/LeaveGroupOnCloseEvent.java | 13 ++-
.../internals/AbstractCoordinatorTest.java | 46 ++++++--
.../internals/ConsumerCoordinatorTest.java | 19 ++--
.../ConsumerHeartbeatRequestManagerTest.java | 69 ++++++++++--
.../internals/ConsumerMembershipManagerTest.java | 49 ++++++++
.../runtime/distributed/WorkerCoordinator.java | 3 +-
.../runtime/distributed/WorkerGroupMember.java | 3 +-
.../log/remote/metadata/storage/ConsumerTask.java | 1 +
23 files changed, 479 insertions(+), 78 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 78d1868598e..58136bf2d3e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -129,7 +129,7 @@
files="(OffsetFetcher|RequestResponse)Test.java"/>
<suppress checks="JavaNCSS"
-
files="RequestResponseTest.java|FetcherTest.java|FetchRequestManagerTest.java|KafkaAdminClientTest.java"/>
+
files="RequestResponseTest.java|FetcherTest.java|FetchRequestManagerTest.java|KafkaAdminClientTest.java|ConsumerMembershipManagerTest.java"/>
<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
new file mode 100644
index 00000000000..0cbbcbca54d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+
+public class CloseOptions {
+ /**
+ * Enum to specify the group membership operation upon leaving group.
+ *
+ * <ul>
+ * <li><b>{@code LEAVE_GROUP}</b>: means the consumer will leave the
group.</li>
+ * <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in
the group.</li>
+ * <li><b>{@code DEFAULT}</b>: Applies the default behavior:
+ * <ul>
+ * <li>For <b>static members</b>: The consumer will remain in the
group.</li>
+ * <li>For <b>dynamic members</b>: The consumer will leave the
group.</li>
+ * </ul>
+ * </li>
+ * </ul>
+ */
+ public enum GroupMembershipOperation {
+ LEAVE_GROUP,
+ REMAIN_IN_GROUP,
+ DEFAULT
+ }
+
+ /**
+ * Specifies the group membership operation upon shutdown.
+ * By default, {@code GroupMembershipOperation.DEFAULT} will be applied,
which follows the consumer's default behavior.
+ */
+ protected GroupMembershipOperation operation =
GroupMembershipOperation.DEFAULT;
+
+ /**
+ * Specifies the maximum amount of time to wait for the close process to
complete.
+ * This allows users to define a custom timeout for gracefully stopping
the consumer.
+ * If no value is set, the default timeout {@link
ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied.
+ */
+ protected Optional<Duration> timeout = Optional.empty();
+
+ private CloseOptions() {
+ }
+
+ protected CloseOptions(final CloseOptions option) {
+ this.operation = option.operation;
+ this.timeout = option.timeout;
+ }
+
+ /**
+ * Static method to create a {@code CloseOptions} with a custom timeout.
+ *
+ * @param timeout the maximum time to wait for the consumer to close.
+ * @return a new {@code CloseOptions} instance with the specified timeout.
+ */
+ public static CloseOptions timeout(final Duration timeout) {
+ CloseOptions option = new CloseOptions();
+ option.timeout = Optional.ofNullable(timeout);
+ return option;
+ }
+
+ /**
+ * Static method to create a {@code CloseOptions} with a specified group
membership operation.
+ *
+ * @param operation the group membership operation to apply. Must be one
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
+ * or {@code DEFAULT}.
+ * @return a new {@code CloseOptions} instance with the specified group
membership operation.
+ */
+ public static CloseOptions groupMembershipOperation(final
GroupMembershipOperation operation) {
+ Objects.requireNonNull(operation, "operation should not be null");
+ CloseOptions option = new CloseOptions();
+ option.operation = operation;
+ return option;
+ }
+
+ /**
+ * Fluent method to set the timeout for the close process.
+ *
+ * @param timeout the maximum time to wait for the consumer to close. If
{@code null}, the default timeout will be used.
+ * @return this {@code CloseOptions} instance.
+ */
+ public CloseOptions withTimeout(final Duration timeout) {
+ this.timeout = Optional.ofNullable(timeout);
+ return this;
+ }
+
+ /**
+ * Fluent method to set the group membership operation upon shutdown.
+ *
+ * @param operation the group membership operation to apply. Must be one
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}.
+ * @return this {@code CloseOptions} instance.
+ */
+ public CloseOptions withGroupMembershipOperation(final
GroupMembershipOperation operation) {
+ Objects.requireNonNull(operation, "operation should not be null");
+ this.operation = operation;
+ return this;
+ }
+
+ public GroupMembershipOperation groupMembershipOperation() {
+ return operation;
+ }
+
+ public Optional<Duration> timeout() {
+ return timeout;
+ }
+
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 2c8376e5ccd..6c1c42c9d8c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -277,8 +277,11 @@ public interface Consumer<K, V> extends Closeable {
void close();
/**
+ * This method has been deprecated since Kafka 4.0 and should use {@link
Consumer#close(CloseOptions)} instead.
+ *
* @see KafkaConsumer#close(Duration)
*/
+ @Deprecated
void close(Duration timeout);
/**
@@ -286,4 +289,8 @@ public interface Consumer<K, V> extends Closeable {
*/
void wakeup();
+ /**
+ * @see KafkaConsumer#close(CloseOptions)
+ */
+ void close(final CloseOptions option);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 70c0f7cadd5..2364864a3c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1798,6 +1798,7 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
* @throws org.apache.kafka.common.KafkaException for any other error
during close
*/
@Override
+ @SuppressWarnings("deprecation")
public void close(Duration timeout) {
delegate.close(timeout);
}
@@ -1812,6 +1813,17 @@ public class KafkaConsumer<K, V> implements Consumer<K,
V> {
delegate.wakeup();
}
+ /**
+ * This method allows the caller to specify shutdown behavior using the
{@link CloseOptions} class.
+ * If {@code null} is provided, the default behavior will be applied,
equivalent to providing a new {@link CloseOptions} instance.
+ *
+ * @param option see {@link CloseOptions}
+ */
+ @Override
+ public void close(CloseOptions option) {
+ delegate.close(option);
+ }
+
// Functions below are for testing only
String clientId() {
return delegate.clientId();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index b9e69806694..a15ede69ceb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -549,6 +549,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
@Override
+ @SuppressWarnings("deprecation")
public synchronized void close(Duration timeout) {
this.closed = true;
}
@@ -562,6 +563,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
wakeup.set(true);
}
+ @Override
+ public void close(CloseOptions option) {
+ this.closed = true;
+ }
+
/**
* Schedule a task to be executed during a poll(). One enqueued task will
be executed per {@link #poll(Duration)}
* invocation. You can use this repeatedly to mock out multiple responses
to poll invocations.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index f8165f6656d..a07e12a518a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
@@ -85,6 +86,9 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
+import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT;
+import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
+
/**
* AbstractCoordinator implements group management for a single group member
by interacting with
* a designated Kafka broker (the coordinator). Group semantics are provided
by extending this class.
@@ -1116,23 +1120,21 @@ public abstract class AbstractCoordinator implements
Closeable {
*/
@Override
public final void close() {
- close(time.timer(0));
+ close(time.timer(0), DEFAULT);
}
/**
* @throws KafkaException if the rebalance callback throws exception
*/
- protected void close(Timer timer) {
+ protected void close(Timer timer, CloseOptions.GroupMembershipOperation
membershipOperation) {
try {
closeHeartbeatThread();
} finally {
// Synchronize after closing the heartbeat thread since heartbeat
thread
// needs this lock to complete and terminate after close flag is
set.
synchronized (this) {
- if (rebalanceConfig.leaveGroupOnClose) {
- onLeavePrepare();
- maybeLeaveGroup("the consumer is being closed");
- }
+ onLeavePrepare();
+ maybeLeaveGroup(membershipOperation, "the consumer is being
closed");
// At this point, there may be pending commits (async commits
or sync commits that were
// interrupted using wakeup) and the leave group request which
have been queued, but not
@@ -1153,26 +1155,22 @@ public abstract class AbstractCoordinator implements
Closeable {
"either by increasing max.poll.interval.ms or by reducing the
maximum size of batches " +
"returned in poll() with max.poll.records.");
- maybeLeaveGroup("consumer poll timeout has expired.");
+ maybeLeaveGroup(DEFAULT, "consumer poll timeout has expired.");
}
/**
- * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this
member is using static membership or is already
- * not part of the group (ie does not have a valid member id, is in the
UNJOINED state, or the coordinator is unknown).
+ * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this
member is using static membership
+ * with the default consumer group membership operation, or is already not
part of the group (i.e., does not have a
+ * valid member ID, is in the UNJOINED state, or the coordinator is
unknown).
*
+ * @param membershipOperation the operation on consumer group membership
that the consumer will perform when closing
* @param leaveReason the reason to leave the group for logging
* @throws KafkaException if the rebalance callback throws exception
*/
- public synchronized RequestFuture<Void> maybeLeaveGroup(String
leaveReason) {
+ public synchronized RequestFuture<Void>
maybeLeaveGroup(CloseOptions.GroupMembershipOperation membershipOperation,
String leaveReason) {
RequestFuture<Void> future = null;
- // Starting from 2.3, only dynamic members will send LeaveGroupRequest
to the broker,
- // consumer with valid group.instance.id is viewed as static member
that never sends LeaveGroup,
- // and the membership expiration is only controlled by session timeout.
- if (isDynamicMember() && !coordinatorUnknown() &&
- state != MemberState.UNJOINED && generation.hasMemberId()) {
- // this is a minimal effort attempt to leave the group. we do not
- // attempt any resending if the request fails or times out.
+ if (rebalanceConfig.leaveGroupOnClose &&
shouldSendLeaveGroupRequest(membershipOperation)) {
log.info("Member {} sending LeaveGroup request to coordinator {}
due to {}",
generation.memberId, coordinator, leaveReason);
LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
@@ -1189,6 +1187,14 @@ public abstract class AbstractCoordinator implements
Closeable {
return future;
}
+ private boolean
shouldSendLeaveGroupRequest(CloseOptions.GroupMembershipOperation
membershipOperation) {
+ if (!coordinatorUnknown() && state != MemberState.UNJOINED &&
generation.hasMemberId()) {
+ return membershipOperation == LEAVE_GROUP || (isDynamicMember() &&
membershipOperation == DEFAULT);
+ } else {
+ return false;
+ }
+ }
+
protected boolean isDynamicMember() {
return rebalanceConfig.groupInstanceId.isEmpty();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index cefa6f2c769..9d219907926 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -181,8 +181,9 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
return new
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(),
Collections.singletonList(leaveHeartbeat));
}
- // Case 1: The member is leaving
- boolean heartbeatNow = membershipManager().state() ==
MemberState.LEAVING ||
+ // Case 1: The member state is LEAVING - if the member is a share
consumer, we should immediately send leave;
+ // if the member is an async consumer, this will also depend on
leavingGroupOperation.
+ boolean heartbeatNow = shouldSendLeaveHeartbeatNow() ||
// Case 2: The member state indicates it should send a heartbeat
without waiting for the interval,
// and there is no heartbeat request currently in-flight
(membershipManager().shouldHeartbeatNow() &&
!heartbeatRequestState.requestInFlight());
@@ -201,6 +202,11 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
*/
public abstract AbstractMembershipManager<R> membershipManager();
+ /**
+ * @return the member should send leave heartbeat immediately or not
+ */
+ protected abstract boolean shouldSendLeaveHeartbeatNow();
+
/**
* Generate a heartbeat request to leave the group if the state is still
LEAVING when this is
* called to close the consumer.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index c276adc9c8d..41ad48bbc25 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
@@ -130,7 +131,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
* partition assigned, or revoked), but it is not present the Metadata
cache at that moment.
* The cache is cleared when the subscription changes ({@link
#transitionToJoining()}, the
* member fails ({@link #transitionToFatal()} or leaves the group
- * ({@link #leaveGroup()}/{@link #leaveGroupOnClose()}).
+ * ({@link #leaveGroup()}/{@link
#leaveGroupOnClose(CloseOptions.GroupMembershipOperation)}).
*/
private final Map<Uuid, String> assignedTopicNamesCache;
@@ -158,7 +159,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
/**
* If the member is currently leaving the group after a call to {@link
#leaveGroup()} or
- * {@link #leaveGroupOnClose()}, this will have a future that will
complete when the ongoing leave operation
+ * {@link #leaveGroupOnClose(CloseOptions.GroupMembershipOperation)}, this
will have a future that will complete when the ongoing leave operation
* completes (callbacks executed and heartbeat request to leave is sent
out). This will be empty if the
* member is not leaving.
*/
@@ -201,6 +202,14 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
private final boolean autoCommitEnabled;
+ /**
+ * Indicate the operation on consumer group membership that the consumer
will perform when leaving the group.
+ * The property should remain {@code GroupMembershipOperation.DEFAULT}
until the consumer is closing.
+ *
+ * @see CloseOptions.GroupMembershipOperation
+ */
+ protected CloseOptions.GroupMembershipOperation leaveGroupOperation =
CloseOptions.GroupMembershipOperation.DEFAULT;
+
AbstractMembershipManager(String groupId,
SubscriptionState subscriptions,
ConsumerMetadata metadata,
@@ -275,6 +284,15 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
return memberEpoch;
}
+ /**
+ * @return the operation the consumer will perform on leaving the group.
+ *
+ * @see CloseOptions.GroupMembershipOperation
+ */
+ public CloseOptions.GroupMembershipOperation leaveGroupOperation() {
+ return leaveGroupOperation;
+ }
+
/**
* Update member info and transition member state based on a successful
heartbeat response.
*
@@ -529,11 +547,14 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
/**
* Transition to {@link MemberState#PREPARE_LEAVING} to release the
assignment. Once completed,
* transition to {@link MemberState#LEAVING} to send the heartbeat request
and leave the group.
+ * It also sets the membership operation to be performed on close.
* This is expected to be invoked when the user calls the {@link
Consumer#close()} API.
*
+ * @param membershipOperation the membership operation to be performed on
close
* @return Future that will complete when the heartbeat to leave the group
has been sent out.
*/
- public CompletableFuture<Void> leaveGroupOnClose() {
+ public CompletableFuture<Void>
leaveGroupOnClose(CloseOptions.GroupMembershipOperation membershipOperation) {
+ this.leaveGroupOperation = membershipOperation;
return leaveGroup(false);
}
@@ -1274,14 +1295,14 @@ public abstract class AbstractMembershipManager<R
extends AbstractResponse> impl
}
/**
- * Returns the epoch a member uses to join the group. This is group-type
specific.
+ * Returns the epoch a member uses to join the group. This is
group-type-specific.
*
* @return the epoch to join the group
*/
abstract int joinGroupEpoch();
/**
- * Returns the epoch a member uses to leave the group. This is group-type
specific.
+ * Returns the epoch a member uses to leave the group. This is
group-type-specific.
*
* @return the epoch to leave the group
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 0ebb10db65e..ba2e145d327 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
@@ -427,7 +428,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// call close methods if internal objects are already constructed;
this is to prevent resource leak. see KAFKA-2121
// we do not need to call `close` at all when `log` is null, which
means no internal objects were initialized.
if (this.log != null) {
- close(Duration.ZERO, true);
+ close(Duration.ZERO,
CloseOptions.GroupMembershipOperation.LEAVE_GROUP, true);
}
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
@@ -1251,11 +1252,19 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public void close() {
- close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+
close(CloseOptions.timeout(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)));
}
@Override
+ @SuppressWarnings("deprecation")
public void close(Duration timeout) {
+ close(CloseOptions.timeout(timeout));
+ }
+
+ @Override
+ public void close(CloseOptions option) {
+ Duration timeout = option.timeout().orElseGet(() ->
Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+
if (timeout.toMillis() < 0)
throw new IllegalArgumentException("The timeout cannot be
negative.");
acquire();
@@ -1263,7 +1272,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
if (!closed) {
// need to close before setting the flag since the close
function
// itself may trigger rebalance callback that needs the
consumer to be open still
- close(timeout, false);
+ close(timeout, option.groupMembershipOperation(), false);
}
} finally {
closed = true;
@@ -1333,7 +1342,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* </li>
* </ol>
*/
- private void close(Duration timeout, boolean swallowException) {
+ private void close(Duration timeout, CloseOptions.GroupMembershipOperation
membershipOperation, boolean swallowException) {
log.trace("Closing the Kafka consumer");
AtomicReference<Throwable> firstException = new AtomicReference<>();
@@ -1353,7 +1362,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
swallow(log, Level.ERROR, "Failed to release group assignment",
this::runRebalanceCallbacksOnClose, firstException);
swallow(log, Level.ERROR, "Failed to leave group while closing
consumer",
- () -> leaveGroupOnClose(closeTimer), firstException);
+ () -> leaveGroupOnClose(closeTimer, membershipOperation),
firstException);
swallow(log, Level.ERROR, "Failed invoking asynchronous commit
callbacks while closing consumer",
() ->
awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false),
firstException);
if (applicationEventHandler != null)
@@ -1424,13 +1433,13 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
throw ConsumerUtils.maybeWrapAsKafkaException(error);
}
- private void leaveGroupOnClose(final Timer timer) {
+ private void leaveGroupOnClose(final Timer timer, final
CloseOptions.GroupMembershipOperation membershipOperation) {
if (groupMetadata.get().isEmpty())
return;
log.debug("Leaving the consumer group during consumer close");
try {
- applicationEventHandler.addAndGet(new
LeaveGroupOnCloseEvent(calculateDeadlineMs(timer)));
+ applicationEventHandler.addAndGet(new
LeaveGroupOnCloseEvent(calculateDeadlineMs(timer), membershipOperation));
log.info("Completed leaving the group");
} catch (TimeoutException e) {
log.warn("Consumer attempted to leave the group but couldn't " +
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 9a8c637d778..61fe70cba25 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
@@ -265,7 +266,10 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// call close methods if internal objects are already constructed;
this is to prevent resource leak. see KAFKA-2121
// we do not need to call `close` at all when `log` is null, which
means no internal objects were initialized.
if (this.log != null) {
- close(Duration.ZERO, true);
+ // If a consumer fails during initialization, it means it
hasn't joined the group yet.
+ // Since it's not a group member, we use REMAIN_IN_GROUP
option when closing
+ // to prevent sending an unnecessary leave request to the
coordinator.
+ close(Duration.ZERO,
CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP, true);
}
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
@@ -578,7 +582,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
if (this.coordinator != null) {
this.coordinator.onLeavePrepare();
- this.coordinator.maybeLeaveGroup("the consumer unsubscribed
from all topics");
+
this.coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT,
"the consumer unsubscribed from all topics");
}
this.subscriptions.unsubscribe();
log.info("Unsubscribed all topics or patterns and assigned
partitions");
@@ -1102,11 +1106,23 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public void close() {
- close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+
close(CloseOptions.timeout(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)));
}
@Override
+ @SuppressWarnings("deprecation")
public void close(Duration timeout) {
+ close(CloseOptions.timeout(timeout));
+ }
+
+ @Override
+ public void wakeup() {
+ this.client.wakeup();
+ }
+
+ @Override
+ public void close(CloseOptions option) {
+ Duration timeout = option.timeout().orElseGet(() ->
Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
if (timeout.toMillis() < 0)
throw new IllegalArgumentException("The timeout cannot be
negative.");
acquire();
@@ -1114,7 +1130,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
if (!closed) {
// need to close before setting the flag since the close
function
// itself may trigger rebalance callback that needs the
consumer to be open still
- close(timeout, false);
+ close(timeout, option.groupMembershipOperation(), false);
}
} finally {
closed = true;
@@ -1122,18 +1138,13 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
}
- @Override
- public void wakeup() {
- this.client.wakeup();
- }
-
private Timer createTimerForRequest(final Duration timeout) {
// this.time could be null if an exception occurs in constructor prior
to setting the this.time field
final Time localTime = (time == null) ? Time.SYSTEM : time;
return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
}
- private void close(Duration timeout, boolean swallowException) {
+ private void close(Duration timeout, CloseOptions.GroupMembershipOperation
membershipOperation, boolean swallowException) {
log.trace("Closing the Kafka consumer");
AtomicReference<Throwable> firstException = new AtomicReference<>();
@@ -1145,7 +1156,13 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
// consumer.
if (coordinator != null) {
// This is a blocking call bound by the time remaining in
closeTimer
- swallow(log, Level.ERROR, "Failed to close coordinator with a
timeout(ms)=" + closeTimer.timeoutMs(), () -> coordinator.close(closeTimer),
firstException);
+ swallow(
+ log,
+ Level.ERROR,
+ "Failed to close coordinator with a timeout(ms)=" +
closeTimer.timeoutMs(),
+ () -> coordinator.close(closeTimer, membershipOperation),
+ firstException
+ );
}
if (fetcher != null) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 784907936f4..01fc605ea79 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
@@ -1012,7 +1013,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
/**
* @throws KafkaException if the rebalance callback throws exception
*/
- public void close(final Timer timer) {
+ public void close(final Timer timer, CloseOptions.GroupMembershipOperation
membershipOperation) {
// we do not need to re-enable wakeups since we are closing already
client.disableWakeups();
try {
@@ -1023,7 +1024,7 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
invokeCompletedOffsetCommitCallbacks();
}
} finally {
- super.close(timer);
+ super.close(timer, membershipOperation);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
index 722f76f4c03..2845f4bc9ee 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
@@ -39,6 +39,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
+import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.REGEX_RESOLUTION_NOT_SUPPORTED_MSG;
/**
@@ -211,6 +212,15 @@ public class ConsumerHeartbeatRequestManager extends
AbstractHeartbeatRequestMan
return membershipManager;
}
+ @Override
+ protected boolean shouldSendLeaveHeartbeatNow() {
+ // If the consumer has dynamic membership,
+ // we should skip the leaving heartbeat when leaveGroupOperation is
REMAIN_IN_GROUP
+ if (membershipManager.groupInstanceId().isEmpty() && REMAIN_IN_GROUP
== membershipManager.leaveGroupOperation())
+ return false;
+ return membershipManager().state() == MemberState.LEAVING;
+ }
+
/**
* Builds the heartbeat requests correctly, ensuring that all information
is sent according to
* the protocol, but subsequent requests do not send information which has
not changed. This
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
index 57d6c21e48b..1003e39f908 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
@@ -45,6 +46,9 @@ import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT;
+import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
+import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST;
import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;
@@ -398,6 +402,26 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
}
}
+ @Override
+ public boolean isLeavingGroup() {
+ CloseOptions.GroupMembershipOperation leaveGroupOperation =
leaveGroupOperation();
+ if (REMAIN_IN_GROUP == leaveGroupOperation) {
+ return false;
+ }
+
+ MemberState state = state();
+ boolean isLeavingState = state == MemberState.PREPARE_LEAVING || state
== MemberState.LEAVING;
+
+ // Default operation: both static and dynamic consumers will send a
leave heartbeat
+ boolean hasLeaveOperation = DEFAULT == leaveGroupOperation ||
+ // Leave operation: both static and dynamic consumers will send a
leave heartbeat
+ LEAVE_GROUP == leaveGroupOperation ||
+ // Remain in group: only static consumers will send a leave
heartbeat, while dynamic members will not
+ groupInstanceId().isPresent();
+
+ return isLeavingState && hasLeaveOperation;
+ }
+
/**
* Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to
trigger the execution of the
* appropriate {@link ConsumerRebalanceListener} {@link
ConsumerRebalanceListenerMethodName method} on the
@@ -469,8 +493,16 @@ public class ConsumerMembershipManager extends
AbstractMembershipManager<Consume
*/
@Override
public int leaveGroupEpoch() {
- return groupInstanceId.isPresent() ?
- ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH :
- ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+ boolean isStaticMember = groupInstanceId.isPresent();
+ // Currently, the server doesn't have a mechanism for static members
to permanently leave the group.
+ // Therefore, we use LEAVE_GROUP_MEMBER_EPOCH to force the
GroupMetadataManager to fence
+ // this member, effectively removing it from the group.
+ if (LEAVE_GROUP == leaveGroupOperation) {
+ return ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+ }
+
+ return isStaticMember ?
+ ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH :
+ ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
index 56c77f85ff0..21d598afb4c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
@@ -183,6 +183,11 @@ public class ShareHeartbeatRequestManager extends
AbstractHeartbeatRequestManage
return membershipManager;
}
+ @Override
+ protected boolean shouldSendLeaveHeartbeatNow() {
+ return membershipManager().state() == MemberState.LEAVING;
+ }
+
/**
* Builds the heartbeat requests correctly, ensuring that all information
is sent according to
* the protocol, but subsequent requests do not send information which has
not changed. This
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 5c11e299790..42dd1711b96 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -467,7 +467,7 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
return;
log.debug("Signal the ConsumerMembershipManager to leave the consumer
group since the consumer is closing");
- CompletableFuture<Void> future =
requestManagers.consumerMembershipManager.get().leaveGroupOnClose();
+ CompletableFuture<Void> future =
requestManagers.consumerMembershipManager.get().leaveGroupOnClose(event.membershipOperation());
future.whenComplete(complete(event.future()));
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
index 4afc00390d4..e7496c3671a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals.events;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
@@ -31,7 +32,17 @@ import java.time.Duration;
*/
public class LeaveGroupOnCloseEvent extends CompletableApplicationEvent<Void> {
- public LeaveGroupOnCloseEvent(final long deadlineMs) {
+ /**
+ * @see
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation
+ */
+ private final CloseOptions.GroupMembershipOperation membershipOperation;
+
+ public LeaveGroupOnCloseEvent(final long deadlineMs, final
CloseOptions.GroupMembershipOperation membershipOperation) {
super(Type.LEAVE_GROUP_ON_CLOSE, deadlineMs);
+ this.membershipOperation = membershipOperation;
+ }
+
+ public CloseOptions.GroupMembershipOperation membershipOperation() {
+ return membershipOperation;
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index aedc4977a03..55d39db39a1 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
@@ -60,6 +61,9 @@ import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -75,6 +79,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
+import java.util.stream.Stream;
import static java.util.Collections.emptyMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -130,7 +135,12 @@ public class AbstractCoordinatorTest {
Optional.empty(), Optional.empty());
}
+
private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs,
int rebalanceTimeoutMs, Optional<String> groupInstanceId,
Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier) {
+ setupCoordinator(retryBackoffMs, retryBackoffMaxMs,
rebalanceTimeoutMs, groupInstanceId, heartbeatThreadSupplier,
groupInstanceId.isEmpty());
+ }
+
+ private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs,
int rebalanceTimeoutMs, Optional<String> groupInstanceId,
Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier, boolean
leaveOnClose) {
LogContext logContext = new LogContext();
this.mockTime = new MockTime();
ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs,
retryBackoffMaxMs, 60 * 60 * 1000L,
@@ -158,7 +168,7 @@ public class AbstractCoordinatorTest {
groupInstanceId,
retryBackoffMs,
retryBackoffMaxMs,
-
groupInstanceId.isEmpty());
+
leaveOnClose);
this.coordinator = new DummyCoordinator(rebalanceConfig,
consumerClient,
metrics,
@@ -1095,8 +1105,29 @@ public class AbstractCoordinatorTest {
checkLeaveGroupRequestSent(Optional.of("groupInstanceId"));
}
- private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId) {
- setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
Integer.MAX_VALUE, groupInstanceId, Optional.empty());
+ @ParameterizedTest
+ @MethodSource("groupInstanceIdAndMembershipOperationMatrix")
+ public void
testLeaveGroupSentWithGroupInstanceIdUnSetAndDifferentGroupMembershipOperation(Optional<String>
groupInstanceId, CloseOptions.GroupMembershipOperation operation) {
+ checkLeaveGroupRequestSent(groupInstanceId, operation,
Optional.empty(), true);
+ }
+
+ private static Stream<Arguments>
groupInstanceIdAndMembershipOperationMatrix() {
+ return Stream.of(
+ Arguments.of(Optional.empty(),
CloseOptions.GroupMembershipOperation.DEFAULT),
+ Arguments.of(Optional.empty(),
CloseOptions.GroupMembershipOperation.LEAVE_GROUP),
+ Arguments.of(Optional.empty(),
CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP),
+ Arguments.of(Optional.of("groupInstanceId"),
CloseOptions.GroupMembershipOperation.DEFAULT),
+ Arguments.of(Optional.of("groupInstanceId"),
CloseOptions.GroupMembershipOperation.LEAVE_GROUP),
+ Arguments.of(Optional.of("groupInstanceId"),
CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
+ );
+ }
+
+ private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId)
{
+ checkLeaveGroupRequestSent(groupInstanceId,
CloseOptions.GroupMembershipOperation.DEFAULT, Optional.empty(),
groupInstanceId.isEmpty());
+ }
+
+ private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId,
CloseOptions.GroupMembershipOperation operation,
Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier, boolean
leaveOnClose) {
+ setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS,
Integer.MAX_VALUE, groupInstanceId, heartbeatThreadSupplier, leaveOnClose);
mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId,
leaderId, Errors.NONE));
@@ -1113,12 +1144,13 @@ public class AbstractCoordinatorTest {
try {
coordinator.ensureActiveGroup();
- coordinator.close();
- if (coordinator.isDynamicMember()) {
+ coordinator.close(new MockTime().timer(0), operation);
+ if (CloseOptions.GroupMembershipOperation.LEAVE_GROUP == operation
||
+ (CloseOptions.GroupMembershipOperation.DEFAULT == operation &&
coordinator.isDynamicMember())) {
fail("Expected leavegroup to raise an error.");
}
} catch (RuntimeException exception) {
- if (coordinator.isDynamicMember()) {
+ if (CloseOptions.GroupMembershipOperation.LEAVE_GROUP == operation
|| coordinator.isDynamicMember()) {
assertEquals(exception, e);
} else {
fail("Coordinator with group.instance.id set shouldn't send
leave group request.");
@@ -1206,7 +1238,7 @@ public class AbstractCoordinatorTest {
}, leaveGroupResponse);
coordinator.ensureActiveGroup();
- return coordinator.maybeLeaveGroup(leaveReason);
+ return
coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT,
leaveReason);
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c41ea0029ba..5c9e06ff90d 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
@@ -229,7 +230,7 @@ public abstract class ConsumerCoordinatorTest {
@AfterEach
public void teardown() {
this.metrics.close();
- this.coordinator.close(time.timer(0));
+ this.coordinator.close(time.timer(0),
CloseOptions.GroupMembershipOperation.DEFAULT);
}
@Test
@@ -1795,7 +1796,7 @@ public abstract class ConsumerCoordinatorTest {
return validateLeaveGroup(groupId, consumerId, leaveRequest);
}, new LeaveGroupResponse(
new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
- coordinator.close(time.timer(0));
+ coordinator.close(time.timer(0),
CloseOptions.GroupMembershipOperation.DEFAULT);
assertTrue(received.get());
}
@@ -1810,7 +1811,7 @@ public abstract class ConsumerCoordinatorTest {
LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
return validateLeaveGroup(groupId, consumerId, leaveRequest);
}, new LeaveGroupResponse(new
LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
- coordinator.maybeLeaveGroup("test maybe leave group");
+
coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT,
"test maybe leave group");
assertTrue(received.get());
AbstractCoordinator.Generation generation =
coordinator.generationIfStable();
@@ -1854,7 +1855,7 @@ public abstract class ConsumerCoordinatorTest {
return validateLeaveGroup(groupId, consumerId, leaveRequest);
}, new LeaveGroupResponse(new
LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
- coordinator.maybeLeaveGroup("pending member leaves");
+
coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT,
"pending member leaves");
assertTrue(received.get());
}
@@ -2570,7 +2571,7 @@ public abstract class ConsumerCoordinatorTest {
client.prepareResponse(new LeaveGroupResponse(new
LeaveGroupResponseData()
.setErrorCode(Errors.NONE.code())));
subscriptions.unsubscribe();
- coordinator.maybeLeaveGroup("test commit after leave");
+
coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT,
"test commit after leave");
subscriptions.assignFromUser(singleton(t1p));
// the client should not reuse generation/memberId from
auto-subscribed generation
@@ -3546,7 +3547,7 @@ public abstract class ConsumerCoordinatorTest {
assertEquals(memberId, coordinator.generation().memberId);
// Imitating heartbeat thread that clears generation data.
- coordinator.maybeLeaveGroup("Clear generation data.");
+
coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT,
"Clear generation data.");
assertEquals(AbstractCoordinator.Generation.NO_GENERATION,
coordinator.generation());
@@ -3783,7 +3784,7 @@ public abstract class ConsumerCoordinatorTest {
// Run close on a different thread. Coordinator is locked by this
thread, so it is
// not safe to use the coordinator from the main thread until the
task completes.
Future<?> future = executor.submit(
- () -> coordinator.close(time.timer(Math.min(closeTimeoutMs,
requestTimeoutMs))));
+ () -> coordinator.close(time.timer(Math.min(closeTimeoutMs,
requestTimeoutMs)), CloseOptions.GroupMembershipOperation.DEFAULT));
// Wait for close to start. If coordinator is known, wait for
close to queue
// at least one request. Otherwise, sleep for a short time.
if (!coordinatorUnknown)
@@ -4078,7 +4079,7 @@ public abstract class ConsumerCoordinatorTest {
private void createRackAwareCoordinator(String rackId,
MockPartitionAssignor assignor) {
metrics.close();
- coordinator.close(time.timer(0));
+ coordinator.close(time.timer(0),
CloseOptions.GroupMembershipOperation.DEFAULT);
metrics = new Metrics(time);
@@ -4146,7 +4147,7 @@ public abstract class ConsumerCoordinatorTest {
private void createMockHeartbeatThreadCoordinator() {
metrics.close();
- coordinator.close(time.timer(0));
+ coordinator.close(time.timer(0),
CloseOptions.GroupMembershipOperation.DEFAULT);
metrics = new Metrics(time);
coordinator = new ConsumerCoordinator(
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index ac27f569b89..2febd2085b8 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
@@ -64,7 +65,11 @@ import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
+import java.util.stream.Stream;
+import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT;
+import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
+import static
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
import static
org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager.CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG;
import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.REGEX_RESOLUTION_NOT_SUPPORTED_MSG;
import static org.apache.kafka.common.utils.Utils.mkSortedSet;
@@ -777,6 +782,29 @@ public class ConsumerHeartbeatRequestManagerTest {
assertHeartbeat(heartbeatRequestManager,
DEFAULT_HEARTBEAT_INTERVAL_MS);
}
+ @ParameterizedTest
+ @MethodSource("pollOnLeavingMatrix")
+ public void testPollOnLeaving(Optional<String> groupInstanceId,
CloseOptions.GroupMembershipOperation operation) {
+ heartbeatRequestManager = createHeartbeatRequestManager(
+ coordinatorRequestManager,
+ membershipManager,
+ heartbeatState,
+ heartbeatRequestState,
+ backgroundEventHandler);
+ when(membershipManager.state()).thenReturn(MemberState.LEAVING);
+ when(membershipManager.groupInstanceId()).thenReturn(groupInstanceId);
+ when(membershipManager.leaveGroupOperation()).thenReturn(operation);
+
+ if (groupInstanceId.isEmpty() && REMAIN_IN_GROUP == operation) {
+ assertNoHeartbeat(heartbeatRequestManager);
+ verify(membershipManager, never()).onHeartbeatRequestGenerated();
+ } else {
+ assertHeartbeat(heartbeatRequestManager,
DEFAULT_HEARTBEAT_INTERVAL_MS);
+ verify(membershipManager).onHeartbeatRequestGenerated();
+ }
+
+ }
+
/**
* This is expected to be the case where a member is already leaving the
group and the poll
* timer expires. The poll timer expiration should not transition the
member to STALE, and
@@ -859,6 +887,7 @@ public class ConsumerHeartbeatRequestManagerTest {
assertEquals(0, result.unsentRequests.size(), "No heartbeat should be
sent while a previous one is in-flight");
when(membershipManager.state()).thenReturn(MemberState.LEAVING);
+ when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
when(heartbeatState.buildRequestData()).thenReturn(new
ConsumerGroupHeartbeatRequestData().setMemberEpoch(-1));
ConsumerGroupHeartbeatRequest heartbeatToLeave =
getHeartbeatRequest(heartbeatRequestManager, version);
assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH,
heartbeatToLeave.data().memberEpoch());
@@ -914,17 +943,26 @@ public class ConsumerHeartbeatRequestManagerTest {
assertEquals(Collections.singletonList(partition),
topicPartitions.partitions());
}
- @Test
- public void testPollOnCloseGeneratesRequestIfNeeded() {
- when(membershipManager.isLeavingGroup()).thenReturn(true);
+ @ParameterizedTest
+ @MethodSource("pollOnLeavingMatrix")
+ public void testPollOnCloseGeneratesRequestIfNeeded(Optional<String>
groupInstanceId, CloseOptions.GroupMembershipOperation operation) {
+ if (groupInstanceId.isEmpty() && REMAIN_IN_GROUP == operation)
+ when(membershipManager.isLeavingGroup()).thenReturn(false);
+ else
+ when(membershipManager.isLeavingGroup()).thenReturn(true);
+ when(membershipManager.groupInstanceId()).thenReturn(groupInstanceId);
+ when(membershipManager.leaveGroupOperation()).thenReturn(operation);
+ String membership = groupInstanceId.isEmpty() ? "dynamic" : "static";
NetworkClientDelegate.PollResult pollResult =
heartbeatRequestManager.pollOnClose(time.milliseconds());
- assertEquals(1, pollResult.unsentRequests.size(),
- "A request to leave the group should be generated if the member is
still leaving when closing the manager");
-
- when(membershipManager.isLeavingGroup()).thenReturn(false);
- pollResult = heartbeatRequestManager.pollOnClose(time.milliseconds());
- assertTrue(pollResult.unsentRequests.isEmpty(),
- "No requests should be generated on close if the member is not
leaving when closing the manager");
+ if (groupInstanceId.isEmpty() && REMAIN_IN_GROUP == operation) {
+ assertTrue(pollResult.unsentRequests.isEmpty(),
+ "A request to leave the group should not be generated if the "
+ membership + " is still leaving when closing the manager " +
+ "and GroupMembershipOperation is " + operation.name());
+ } else {
+ assertEquals(1, pollResult.unsentRequests.size(),
+ "A request to leave the group should be generated if the " +
membership + " is still leaving when closing the manager " +
+ "and GroupMembershipOperation is " + operation.name());
+ }
}
@Test
@@ -1149,4 +1187,15 @@ public class ConsumerHeartbeatRequestManagerTest {
when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
when(membershipManager.serverAssignor()).thenReturn(Optional.of(DEFAULT_REMOTE_ASSIGNOR));
}
+
+ private static Stream<Arguments> pollOnLeavingMatrix() {
+ return Stream.of(
+ Arguments.of(Optional.empty(), DEFAULT),
+ Arguments.of(Optional.empty(), LEAVE_GROUP),
+ Arguments.of(Optional.empty(), REMAIN_IN_GROUP),
+ Arguments.of(Optional.of("groupInstanceId"), DEFAULT),
+ Arguments.of(Optional.of("groupInstanceId"), LEAVE_GROUP),
+ Arguments.of(Optional.of("groupInstanceId"), REMAIN_IN_GROUP)
+ );
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 3a93c25072d..20cf5ea59e6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
@@ -448,6 +449,54 @@ public class ConsumerMembershipManagerTest {
membershipManager.memberEpoch());
}
+ @Test
+ public void testLeaveGroupEpochOnClose() {
+ // Static member should leave the group with epoch -2 with
GroupMembershipOperation.DEFAULT
+ ConsumerMembershipManager membershipManager =
createMemberInStableState("instance1");
+ mockLeaveGroup();
+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT);
+ verify(subscriptionState).unsubscribe();
+ assertEquals(MemberState.LEAVING, membershipManager.state());
+
assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH,
+ membershipManager.memberEpoch());
+
+ // Static member should leave the group with epoch -1 with
GroupMembershipOperation.LEAVE_GROUP
+ membershipManager = createMemberInStableState("instance1");
+ mockLeaveGroup();
+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
+ verify(subscriptionState).unsubscribe();
+ assertEquals(MemberState.LEAVING, membershipManager.state());
+ assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH,
+ membershipManager.memberEpoch());
+
+ // Static member should leave the group with epoch -2 with
GroupMembershipOperation.REMAIN_IN_GROUP
+ membershipManager = createMemberInStableState("instance1");
+ mockLeaveGroup();
+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+ verify(subscriptionState).unsubscribe();
+ assertEquals(MemberState.LEAVING, membershipManager.state());
+
assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH,
+ membershipManager.memberEpoch());
+
+ // Dynamic member should leave the group with epoch -1 with
GroupMembershipOperation.DEFAULT
+ membershipManager = createMemberInStableState(null);
+ mockLeaveGroup();
+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT);
+ verify(subscriptionState).unsubscribe();
+ assertEquals(MemberState.LEAVING, membershipManager.state());
+ assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH,
+ membershipManager.memberEpoch());
+
+ // Dynamic member should leave the group with epoch -1 with
GroupMembershipOperation.LEAVE_GROUP
+ membershipManager = createMemberInStableState(null);
+ mockLeaveGroup();
+
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
+ verify(subscriptionState).unsubscribe();
+ assertEquals(MemberState.LEAVING, membershipManager.state());
+ assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH,
+ membershipManager.memberEpoch());
+ }
+
/**
* This is the case where a member is stuck reconciling and transitions
out of the RECONCILING
* state (due to failure). When the reconciliation completes it should not
be applied because
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index c8ddfe5b700..c2a0e7272ed 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.metrics.Measurable;
@@ -271,7 +272,7 @@ public class WorkerCoordinator extends AbstractCoordinator
implements Closeable
@Override
protected void handlePollTimeoutExpiry() {
listener.onPollTimeoutExpiry();
- maybeLeaveGroup("worker poll timeout has expired.");
+ maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT, "worker
poll timeout has expired.");
}
/**
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index c89eb33082f..a283ce7cf38 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -198,7 +199,7 @@ public class WorkerGroupMember {
}
public void maybeLeaveGroup(String leaveReason) {
- coordinator.maybeLeaveGroup(leaveReason);
+
coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.LEAVE_GROUP,
leaveReason);
}
public String ownerUrl(String connector) {
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index 1679053466f..f5c5755c4eb 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -154,6 +154,7 @@ class ConsumerTask implements Runnable, Closeable {
}
// visible for testing
+ @SuppressWarnings("deprecation")
void closeConsumer() {
try {
consumer.close(Duration.ofSeconds(30));