This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 74acbd200df KAFKA-16758: Extend Consumer#close with an option to leave 
the group or not (#17614)
74acbd200df is described below

commit 74acbd200dfc5c97cd1d74f1f56535a7c609b339
Author: TengYao Chi <[email protected]>
AuthorDate: Sun Apr 6 13:02:45 2025 +0800

    KAFKA-16758: Extend Consumer#close with an option to leave the group or not 
(#17614)
    
    JIRA: [KAFKA-16758](https://issues.apache.org/jira/browse/KAFKA-16758)
    This PR is aim to deliver
    
    
[KIP-1092](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=321719077),
    please refer to KIP-1092 and KAFKA-16758 for further details.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Chia-Ping
    Tsai <[email protected]>, Kirk True <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../kafka/clients/consumer/CloseOptions.java       | 124 +++++++++++++++++++++
 .../apache/kafka/clients/consumer/Consumer.java    |   7 ++
 .../kafka/clients/consumer/KafkaConsumer.java      |  12 ++
 .../kafka/clients/consumer/MockConsumer.java       |   6 +
 .../consumer/internals/AbstractCoordinator.java    |  40 ++++---
 .../internals/AbstractHeartbeatRequestManager.java |  10 +-
 .../internals/AbstractMembershipManager.java       |  31 +++++-
 .../consumer/internals/AsyncKafkaConsumer.java     |  23 ++--
 .../consumer/internals/ClassicKafkaConsumer.java   |  39 +++++--
 .../consumer/internals/ConsumerCoordinator.java    |   5 +-
 .../internals/ConsumerHeartbeatRequestManager.java |  10 ++
 .../internals/ConsumerMembershipManager.java       |  38 ++++++-
 .../internals/ShareHeartbeatRequestManager.java    |   5 +
 .../events/ApplicationEventProcessor.java          |   2 +-
 .../internals/events/LeaveGroupOnCloseEvent.java   |  13 ++-
 .../internals/AbstractCoordinatorTest.java         |  46 ++++++--
 .../internals/ConsumerCoordinatorTest.java         |  19 ++--
 .../ConsumerHeartbeatRequestManagerTest.java       |  69 ++++++++++--
 .../internals/ConsumerMembershipManagerTest.java   |  49 ++++++++
 .../runtime/distributed/WorkerCoordinator.java     |   3 +-
 .../runtime/distributed/WorkerGroupMember.java     |   3 +-
 .../log/remote/metadata/storage/ConsumerTask.java  |   1 +
 23 files changed, 479 insertions(+), 78 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 78d1868598e..58136bf2d3e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -129,7 +129,7 @@
               files="(OffsetFetcher|RequestResponse)Test.java"/>
 
     <suppress checks="JavaNCSS"
-              
files="RequestResponseTest.java|FetcherTest.java|FetchRequestManagerTest.java|KafkaAdminClientTest.java"/>
+              
files="RequestResponseTest.java|FetcherTest.java|FetchRequestManagerTest.java|KafkaAdminClientTest.java|ConsumerMembershipManagerTest.java"/>
 
     <suppress checks="NPathComplexity"
               
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
new file mode 100644
index 00000000000..0cbbcbca54d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/CloseOptions.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+
+public class CloseOptions {
+    /**
+     * Enum to specify the group membership operation upon leaving group.
+     *
+     * <ul>
+     *   <li><b>{@code LEAVE_GROUP}</b>:  means the consumer will leave the 
group.</li>
+     *   <li><b>{@code REMAIN_IN_GROUP}</b>: means the consumer will remain in 
the group.</li>
+     *   <li><b>{@code DEFAULT}</b>: Applies the default behavior:
+     *     <ul>
+     *       <li>For <b>static members</b>: The consumer will remain in the 
group.</li>
+     *       <li>For <b>dynamic members</b>: The consumer will leave the 
group.</li>
+     *     </ul>
+     *   </li>
+     * </ul>
+     */
+    public enum GroupMembershipOperation {
+        LEAVE_GROUP,
+        REMAIN_IN_GROUP,
+        DEFAULT
+    }
+
+    /**
+     * Specifies the group membership operation upon shutdown.
+     * By default, {@code GroupMembershipOperation.DEFAULT} will be applied, 
which follows the consumer's default behavior.
+     */
+    protected GroupMembershipOperation operation = 
GroupMembershipOperation.DEFAULT;
+
+    /**
+     * Specifies the maximum amount of time to wait for the close process to 
complete.
+     * This allows users to define a custom timeout for gracefully stopping 
the consumer.
+     * If no value is set, the default timeout {@link 
ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied.
+     */
+    protected Optional<Duration> timeout = Optional.empty();
+
+    private CloseOptions() {
+    }
+
+    protected CloseOptions(final CloseOptions option) {
+        this.operation = option.operation;
+        this.timeout = option.timeout;
+    }
+
+    /**
+     * Static method to create a {@code CloseOptions} with a custom timeout.
+     *
+     * @param timeout the maximum time to wait for the consumer to close.
+     * @return a new {@code CloseOptions} instance with the specified timeout.
+     */
+    public static CloseOptions timeout(final Duration timeout) {
+        CloseOptions option = new CloseOptions();
+        option.timeout = Optional.ofNullable(timeout);
+        return option;
+    }
+
+    /**
+     * Static method to create a {@code CloseOptions} with a specified group 
membership operation.
+     *
+     * @param operation the group membership operation to apply. Must be one 
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
+     *                  or {@code DEFAULT}.
+     * @return a new {@code CloseOptions} instance with the specified group 
membership operation.
+     */
+    public static CloseOptions groupMembershipOperation(final 
GroupMembershipOperation operation) {
+        Objects.requireNonNull(operation, "operation should not be null");
+        CloseOptions option = new CloseOptions();
+        option.operation = operation;
+        return option;
+    }
+
+    /**
+     * Fluent method to set the timeout for the close process.
+     *
+     * @param timeout the maximum time to wait for the consumer to close. If 
{@code null}, the default timeout will be used.
+     * @return this {@code CloseOptions} instance.
+     */
+    public CloseOptions withTimeout(final Duration timeout) {
+        this.timeout = Optional.ofNullable(timeout);
+        return this;
+    }
+
+    /**
+     * Fluent method to set the group membership operation upon shutdown.
+     *
+     * @param operation the group membership operation to apply. Must be one 
of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}.
+     * @return this {@code CloseOptions} instance.
+     */
+    public CloseOptions withGroupMembershipOperation(final 
GroupMembershipOperation operation) {
+        Objects.requireNonNull(operation, "operation should not be null");
+        this.operation = operation;
+        return this;
+    }
+
+    public GroupMembershipOperation groupMembershipOperation() {
+        return operation;
+    }
+
+    public Optional<Duration> timeout() {
+        return timeout;
+    }
+
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 2c8376e5ccd..6c1c42c9d8c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -277,8 +277,11 @@ public interface Consumer<K, V> extends Closeable {
     void close();
 
     /**
+     * This method has been deprecated since Kafka 4.0 and should use {@link 
Consumer#close(CloseOptions)} instead.
+     *
      * @see KafkaConsumer#close(Duration)
      */
+    @Deprecated
     void close(Duration timeout);
 
     /**
@@ -286,4 +289,8 @@ public interface Consumer<K, V> extends Closeable {
      */
     void wakeup();
 
+    /**
+     * @see KafkaConsumer#close(CloseOptions)
+     */
+    void close(final CloseOptions option);
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 70c0f7cadd5..2364864a3c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1798,6 +1798,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
      * @throws org.apache.kafka.common.KafkaException for any other error 
during close
      */
     @Override
+    @SuppressWarnings("deprecation")
     public void close(Duration timeout) {
         delegate.close(timeout);
     }
@@ -1812,6 +1813,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
         delegate.wakeup();
     }
 
+    /**
+     * This method allows the caller to specify shutdown behavior using the 
{@link CloseOptions} class.
+     * If {@code null} is provided, the default behavior will be applied, 
equivalent to providing a new {@link CloseOptions} instance.
+     *
+     * @param option see {@link CloseOptions}
+     */
+    @Override
+    public void close(CloseOptions option) {
+        delegate.close(option);
+    }
+
     // Functions below are for testing only
     String clientId() {
         return delegate.clientId();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index b9e69806694..a15ede69ceb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -549,6 +549,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
+    @SuppressWarnings("deprecation")
     public synchronized void close(Duration timeout) {
         this.closed = true;
     }
@@ -562,6 +563,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         wakeup.set(true);
     }
 
+    @Override
+    public void close(CloseOptions option) {
+        this.closed = true;
+    }
+
     /**
      * Schedule a task to be executed during a poll(). One enqueued task will 
be executed per {@link #poll(Duration)}
      * invocation. You can use this repeatedly to mock out multiple responses 
to poll invocations.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index f8165f6656d..a07e12a518a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -85,6 +86,9 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
+import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT;
+import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
+
 /**
  * AbstractCoordinator implements group management for a single group member 
by interacting with
  * a designated Kafka broker (the coordinator). Group semantics are provided 
by extending this class.
@@ -1116,23 +1120,21 @@ public abstract class AbstractCoordinator implements 
Closeable {
      */
     @Override
     public final void close() {
-        close(time.timer(0));
+        close(time.timer(0), DEFAULT);
     }
 
     /**
      * @throws KafkaException if the rebalance callback throws exception
      */
-    protected void close(Timer timer) {
+    protected void close(Timer timer, CloseOptions.GroupMembershipOperation 
membershipOperation) {
         try {
             closeHeartbeatThread();
         } finally {
             // Synchronize after closing the heartbeat thread since heartbeat 
thread
             // needs this lock to complete and terminate after close flag is 
set.
             synchronized (this) {
-                if (rebalanceConfig.leaveGroupOnClose) {
-                    onLeavePrepare();
-                    maybeLeaveGroup("the consumer is being closed");
-                }
+                onLeavePrepare();
+                maybeLeaveGroup(membershipOperation, "the consumer is being 
closed");
 
                 // At this point, there may be pending commits (async commits 
or sync commits that were
                 // interrupted using wakeup) and the leave group request which 
have been queued, but not
@@ -1153,26 +1155,22 @@ public abstract class AbstractCoordinator implements 
Closeable {
             "either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
             "returned in poll() with max.poll.records.");
 
-        maybeLeaveGroup("consumer poll timeout has expired.");
+        maybeLeaveGroup(DEFAULT, "consumer poll timeout has expired.");
     }
 
     /**
-     * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this 
member is using static membership or is already
-     * not part of the group (ie does not have a valid member id, is in the 
UNJOINED state, or the coordinator is unknown).
+     * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this 
member is using static membership
+     * with the default consumer group membership operation, or is already not 
part of the group (i.e., does not have a
+     * valid member ID, is in the UNJOINED state, or the coordinator is 
unknown).
      *
+     * @param membershipOperation the operation on consumer group membership 
that the consumer will perform when closing
      * @param leaveReason the reason to leave the group for logging
      * @throws KafkaException if the rebalance callback throws exception
      */
-    public synchronized RequestFuture<Void> maybeLeaveGroup(String 
leaveReason) {
+    public synchronized RequestFuture<Void> 
maybeLeaveGroup(CloseOptions.GroupMembershipOperation membershipOperation, 
String leaveReason) {
         RequestFuture<Void> future = null;
 
-        // Starting from 2.3, only dynamic members will send LeaveGroupRequest 
to the broker,
-        // consumer with valid group.instance.id is viewed as static member 
that never sends LeaveGroup,
-        // and the membership expiration is only controlled by session timeout.
-        if (isDynamicMember() && !coordinatorUnknown() &&
-            state != MemberState.UNJOINED && generation.hasMemberId()) {
-            // this is a minimal effort attempt to leave the group. we do not
-            // attempt any resending if the request fails or times out.
+        if (rebalanceConfig.leaveGroupOnClose && 
shouldSendLeaveGroupRequest(membershipOperation)) {
             log.info("Member {} sending LeaveGroup request to coordinator {} 
due to {}",
                 generation.memberId, coordinator, leaveReason);
             LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
@@ -1189,6 +1187,14 @@ public abstract class AbstractCoordinator implements 
Closeable {
         return future;
     }
 
+    private boolean 
shouldSendLeaveGroupRequest(CloseOptions.GroupMembershipOperation 
membershipOperation) {
+        if (!coordinatorUnknown() && state != MemberState.UNJOINED && 
generation.hasMemberId()) {
+            return membershipOperation == LEAVE_GROUP || (isDynamicMember() && 
membershipOperation == DEFAULT);
+        } else {
+            return false;
+        }
+    }
+
     protected boolean isDynamicMember() {
         return rebalanceConfig.groupInstanceId.isEmpty();
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index cefa6f2c769..9d219907926 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -181,8 +181,9 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
             return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), 
Collections.singletonList(leaveHeartbeat));
         }
 
-        // Case 1: The member is leaving
-        boolean heartbeatNow = membershipManager().state() == 
MemberState.LEAVING ||
+        // Case 1: The member state is LEAVING - if the member is a share 
consumer, we should immediately send leave;
+        // if the member is an async consumer, this will also depend on 
leavingGroupOperation.
+        boolean heartbeatNow = shouldSendLeaveHeartbeatNow() ||
             // Case 2: The member state indicates it should send a heartbeat 
without waiting for the interval,
             // and there is no heartbeat request currently in-flight
             (membershipManager().shouldHeartbeatNow() && 
!heartbeatRequestState.requestInFlight());
@@ -201,6 +202,11 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
      */
     public abstract AbstractMembershipManager<R> membershipManager();
 
+    /**
+     * @return the member should send leave heartbeat immediately or not
+     */
+    protected abstract boolean shouldSendLeaveHeartbeatNow();
+
     /**
      * Generate a heartbeat request to leave the group if the state is still 
LEAVING when this is
      * called to close the consumer.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index c276adc9c8d..41ad48bbc25 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
@@ -130,7 +131,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
      * partition assigned, or revoked), but it is not present the Metadata 
cache at that moment.
      * The cache is cleared when the subscription changes ({@link 
#transitionToJoining()}, the
      * member fails ({@link #transitionToFatal()} or leaves the group
-     * ({@link #leaveGroup()}/{@link #leaveGroupOnClose()}).
+     * ({@link #leaveGroup()}/{@link 
#leaveGroupOnClose(CloseOptions.GroupMembershipOperation)}).
      */
     private final Map<Uuid, String> assignedTopicNamesCache;
 
@@ -158,7 +159,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
 
     /**
      * If the member is currently leaving the group after a call to {@link 
#leaveGroup()} or
-     * {@link #leaveGroupOnClose()}, this will have a future that will 
complete when the ongoing leave operation
+     * {@link #leaveGroupOnClose(CloseOptions.GroupMembershipOperation)}, this 
will have a future that will complete when the ongoing leave operation
      * completes (callbacks executed and heartbeat request to leave is sent 
out). This will be empty if the
      * member is not leaving.
      */
@@ -201,6 +202,14 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
 
     private final boolean autoCommitEnabled;
 
+    /**
+     * Indicate the operation on consumer group membership that the consumer 
will perform when leaving the group.
+     * The property should remain {@code GroupMembershipOperation.DEFAULT} 
until the consumer is closing.
+     *
+     * @see CloseOptions.GroupMembershipOperation
+     */
+    protected CloseOptions.GroupMembershipOperation leaveGroupOperation = 
CloseOptions.GroupMembershipOperation.DEFAULT;
+
     AbstractMembershipManager(String groupId,
                               SubscriptionState subscriptions,
                               ConsumerMetadata metadata,
@@ -275,6 +284,15 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
         return memberEpoch;
     }
 
+    /**
+     * @return the operation the consumer will perform on leaving the group.
+     *
+     * @see CloseOptions.GroupMembershipOperation
+     */
+    public CloseOptions.GroupMembershipOperation leaveGroupOperation() {
+        return leaveGroupOperation;
+    }
+
     /**
      * Update member info and transition member state based on a successful 
heartbeat response.
      *
@@ -529,11 +547,14 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
     /**
      * Transition to {@link MemberState#PREPARE_LEAVING} to release the 
assignment. Once completed,
      * transition to {@link MemberState#LEAVING} to send the heartbeat request 
and leave the group.
+     * It also sets the membership operation to be performed on close.
      * This is expected to be invoked when the user calls the {@link 
Consumer#close()} API.
      *
+     * @param membershipOperation the membership operation to be performed on 
close
      * @return Future that will complete when the heartbeat to leave the group 
has been sent out.
      */
-    public CompletableFuture<Void> leaveGroupOnClose() {
+    public CompletableFuture<Void> 
leaveGroupOnClose(CloseOptions.GroupMembershipOperation membershipOperation) {
+        this.leaveGroupOperation = membershipOperation;
         return leaveGroup(false);
     }
 
@@ -1274,14 +1295,14 @@ public abstract class AbstractMembershipManager<R 
extends AbstractResponse> impl
     }
 
     /**
-     * Returns the epoch a member uses to join the group. This is group-type 
specific.
+     * Returns the epoch a member uses to join the group. This is 
group-type-specific.
      *
      * @return the epoch to join the group
      */
     abstract int joinGroupEpoch();
 
     /**
-     * Returns the epoch a member uses to leave the group. This is group-type 
specific.
+     * Returns the epoch a member uses to leave the group. This is 
group-type-specific.
      *
      * @return the epoch to leave the group
      */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 0ebb10db65e..ba2e145d327 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
@@ -427,7 +428,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             // call close methods if internal objects are already constructed; 
this is to prevent resource leak. see KAFKA-2121
             // we do not need to call `close` at all when `log` is null, which 
means no internal objects were initialized.
             if (this.log != null) {
-                close(Duration.ZERO, true);
+                close(Duration.ZERO, 
CloseOptions.GroupMembershipOperation.LEAVE_GROUP, true);
             }
             // now propagate the exception
             throw new KafkaException("Failed to construct kafka consumer", t);
@@ -1251,11 +1252,19 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public void close() {
-        close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+        
close(CloseOptions.timeout(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)));
     }
 
     @Override
+    @SuppressWarnings("deprecation")
     public void close(Duration timeout) {
+        close(CloseOptions.timeout(timeout));
+    }
+
+    @Override
+    public void close(CloseOptions option) {
+        Duration timeout = option.timeout().orElseGet(() -> 
Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+
         if (timeout.toMillis() < 0)
             throw new IllegalArgumentException("The timeout cannot be 
negative.");
         acquire();
@@ -1263,7 +1272,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             if (!closed) {
                 // need to close before setting the flag since the close 
function
                 // itself may trigger rebalance callback that needs the 
consumer to be open still
-                close(timeout, false);
+                close(timeout, option.groupMembershipOperation(), false);
             }
         } finally {
             closed = true;
@@ -1333,7 +1342,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      *     </li>
      * </ol>
      */
-    private void close(Duration timeout, boolean swallowException) {
+    private void close(Duration timeout, CloseOptions.GroupMembershipOperation 
membershipOperation, boolean swallowException) {
         log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
 
@@ -1353,7 +1362,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         swallow(log, Level.ERROR, "Failed to release group assignment",
             this::runRebalanceCallbacksOnClose, firstException);
         swallow(log, Level.ERROR, "Failed to leave group while closing 
consumer",
-            () -> leaveGroupOnClose(closeTimer), firstException);
+            () -> leaveGroupOnClose(closeTimer, membershipOperation), 
firstException);
         swallow(log, Level.ERROR, "Failed invoking asynchronous commit 
callbacks while closing consumer",
             () -> 
awaitPendingAsyncCommitsAndExecuteCommitCallbacks(closeTimer, false), 
firstException);
         if (applicationEventHandler != null)
@@ -1424,13 +1433,13 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             throw ConsumerUtils.maybeWrapAsKafkaException(error);
     }
 
-    private void leaveGroupOnClose(final Timer timer) {
+    private void leaveGroupOnClose(final Timer timer, final 
CloseOptions.GroupMembershipOperation membershipOperation) {
         if (groupMetadata.get().isEmpty())
             return;
 
         log.debug("Leaving the consumer group during consumer close");
         try {
-            applicationEventHandler.addAndGet(new 
LeaveGroupOnCloseEvent(calculateDeadlineMs(timer)));
+            applicationEventHandler.addAndGet(new 
LeaveGroupOnCloseEvent(calculateDeadlineMs(timer), membershipOperation));
             log.info("Completed leaving the group");
         } catch (TimeoutException e) {
             log.warn("Consumer attempted to leave the group but couldn't " +
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 9a8c637d778..61fe70cba25 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerInterceptor;
@@ -265,7 +266,10 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             // call close methods if internal objects are already constructed; 
this is to prevent resource leak. see KAFKA-2121
             // we do not need to call `close` at all when `log` is null, which 
means no internal objects were initialized.
             if (this.log != null) {
-                close(Duration.ZERO, true);
+                // If a consumer fails during initialization, it means it 
hasn't joined the group yet.
+                // Since it's not a group member, we use REMAIN_IN_GROUP 
option when closing
+                // to prevent sending an unnecessary leave request to the 
coordinator.
+                close(Duration.ZERO, 
CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP, true);
             }
             // now propagate the exception
             throw new KafkaException("Failed to construct kafka consumer", t);
@@ -578,7 +582,7 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             
fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet());
             if (this.coordinator != null) {
                 this.coordinator.onLeavePrepare();
-                this.coordinator.maybeLeaveGroup("the consumer unsubscribed 
from all topics");
+                
this.coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT, 
"the consumer unsubscribed from all topics");
             }
             this.subscriptions.unsubscribe();
             log.info("Unsubscribed all topics or patterns and assigned 
partitions");
@@ -1102,11 +1106,23 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
     @Override
     public void close() {
-        close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+        
close(CloseOptions.timeout(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)));
     }
 
     @Override
+    @SuppressWarnings("deprecation")
     public void close(Duration timeout) {
+        close(CloseOptions.timeout(timeout));
+    }
+
+    @Override
+    public void wakeup() {
+        this.client.wakeup();
+    }
+
+    @Override
+    public void close(CloseOptions option) {
+        Duration timeout = option.timeout().orElseGet(() -> 
Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
         if (timeout.toMillis() < 0)
             throw new IllegalArgumentException("The timeout cannot be 
negative.");
         acquire();
@@ -1114,7 +1130,7 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             if (!closed) {
                 // need to close before setting the flag since the close 
function
                 // itself may trigger rebalance callback that needs the 
consumer to be open still
-                close(timeout, false);
+                close(timeout, option.groupMembershipOperation(), false);
             }
         } finally {
             closed = true;
@@ -1122,18 +1138,13 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         }
     }
 
-    @Override
-    public void wakeup() {
-        this.client.wakeup();
-    }
-
     private Timer createTimerForRequest(final Duration timeout) {
         // this.time could be null if an exception occurs in constructor prior 
to setting the this.time field
         final Time localTime = (time == null) ? Time.SYSTEM : time;
         return localTime.timer(Math.min(timeout.toMillis(), requestTimeoutMs));
     }
 
-    private void close(Duration timeout, boolean swallowException) {
+    private void close(Duration timeout, CloseOptions.GroupMembershipOperation 
membershipOperation, boolean swallowException) {
         log.trace("Closing the Kafka consumer");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
 
@@ -1145,7 +1156,13 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         // consumer.
         if (coordinator != null) {
             // This is a blocking call bound by the time remaining in 
closeTimer
-            swallow(log, Level.ERROR, "Failed to close coordinator with a 
timeout(ms)=" + closeTimer.timeoutMs(), () -> coordinator.close(closeTimer), 
firstException);
+            swallow(
+                log,
+                Level.ERROR,
+                "Failed to close coordinator with a timeout(ms)=" + 
closeTimer.timeoutMs(),
+                () -> coordinator.close(closeTimer, membershipOperation),
+                firstException
+            );
         }
 
         if (fetcher != null) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 784907936f4..01fc605ea79 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
@@ -1012,7 +1013,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     /**
      * @throws KafkaException if the rebalance callback throws exception
      */
-    public void close(final Timer timer) {
+    public void close(final Timer timer, CloseOptions.GroupMembershipOperation 
membershipOperation) {
         // we do not need to re-enable wakeups since we are closing already
         client.disableWakeups();
         try {
@@ -1023,7 +1024,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                 invokeCompletedOffsetCommitCallbacks();
             }
         } finally {
-            super.close(timer);
+            super.close(timer, membershipOperation);
         }
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
index 722f76f4c03..2845f4bc9ee 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
@@ -39,6 +39,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.REGEX_RESOLUTION_NOT_SUPPORTED_MSG;
 
 /**
@@ -211,6 +212,15 @@ public class ConsumerHeartbeatRequestManager extends 
AbstractHeartbeatRequestMan
         return membershipManager;
     }
 
+    @Override
+    protected boolean shouldSendLeaveHeartbeatNow() {
+        // If the consumer has dynamic membership,
+        // we should skip the leaving heartbeat when leaveGroupOperation is 
REMAIN_IN_GROUP
+        if (membershipManager.groupInstanceId().isEmpty() && REMAIN_IN_GROUP 
== membershipManager.leaveGroupOperation())
+            return false;
+        return membershipManager().state() == MemberState.LEAVING;
+    }
+
     /**
      * Builds the heartbeat requests correctly, ensuring that all information 
is sent according to
      * the protocol, but subsequent requests do not send information which has 
not changed. This
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
index 57d6c21e48b..1003e39f908 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
@@ -45,6 +46,9 @@ import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
+import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT;
+import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
+import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;
@@ -398,6 +402,26 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
         }
     }
 
+    @Override
+    public boolean isLeavingGroup() {
+        CloseOptions.GroupMembershipOperation leaveGroupOperation = 
leaveGroupOperation();
+        if (REMAIN_IN_GROUP == leaveGroupOperation) {
+            return false;
+        }
+
+        MemberState state = state();
+        boolean isLeavingState = state == MemberState.PREPARE_LEAVING || state 
== MemberState.LEAVING;
+
+        // Default operation: both static and dynamic consumers will send a 
leave heartbeat
+        boolean hasLeaveOperation = DEFAULT == leaveGroupOperation ||
+            // Leave operation: both static and dynamic consumers will send a 
leave heartbeat
+            LEAVE_GROUP == leaveGroupOperation ||
+            // Remain in group: only static consumers will send a leave 
heartbeat, while dynamic members will not
+            groupInstanceId().isPresent();
+
+        return isLeavingState && hasLeaveOperation;
+    }
+
     /**
      * Enqueue a {@link ConsumerRebalanceListenerCallbackNeededEvent} to 
trigger the execution of the
      * appropriate {@link ConsumerRebalanceListener} {@link 
ConsumerRebalanceListenerMethodName method} on the
@@ -469,8 +493,16 @@ public class ConsumerMembershipManager extends 
AbstractMembershipManager<Consume
      */
     @Override
     public int leaveGroupEpoch() {
-        return groupInstanceId.isPresent() ?
-                ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH :
-                ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        boolean isStaticMember = groupInstanceId.isPresent();
+        // Currently, the server doesn't have a mechanism for static members 
to permanently leave the group.
+        // Therefore, we use LEAVE_GROUP_MEMBER_EPOCH to force the 
GroupMetadataManager to fence
+        // this member, effectively removing it from the group.
+        if (LEAVE_GROUP == leaveGroupOperation) {
+            return ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+        }
+
+        return isStaticMember ?
+            ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH :
+            ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
index 56c77f85ff0..21d598afb4c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.java
@@ -183,6 +183,11 @@ public class ShareHeartbeatRequestManager extends 
AbstractHeartbeatRequestManage
         return membershipManager;
     }
 
+    @Override
+    protected boolean shouldSendLeaveHeartbeatNow() {
+        return membershipManager().state() == MemberState.LEAVING;
+    }
+
     /**
      * Builds the heartbeat requests correctly, ensuring that all information 
is sent according to
      * the protocol, but subsequent requests do not send information which has 
not changed. This
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 5c11e299790..42dd1711b96 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -467,7 +467,7 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
             return;
 
         log.debug("Signal the ConsumerMembershipManager to leave the consumer 
group since the consumer is closing");
-        CompletableFuture<Void> future = 
requestManagers.consumerMembershipManager.get().leaveGroupOnClose();
+        CompletableFuture<Void> future = 
requestManagers.consumerMembershipManager.get().leaveGroupOnClose(event.membershipOperation());
         future.whenComplete(complete(event.future()));
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
index 4afc00390d4..e7496c3671a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/LeaveGroupOnCloseEvent.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals.events;
 
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
 import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
@@ -31,7 +32,17 @@ import java.time.Duration;
  */
 public class LeaveGroupOnCloseEvent extends CompletableApplicationEvent<Void> {
 
-    public LeaveGroupOnCloseEvent(final long deadlineMs) {
+    /**
+     * @see 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation
+     */
+    private final CloseOptions.GroupMembershipOperation membershipOperation;
+
+    public LeaveGroupOnCloseEvent(final long deadlineMs, final 
CloseOptions.GroupMembershipOperation membershipOperation) {
         super(Type.LEAVE_GROUP_ON_CLOSE, deadlineMs);
+        this.membershipOperation = membershipOperation;
+    }
+
+    public CloseOptions.GroupMembershipOperation membershipOperation() {
+        return membershipOperation;
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index aedc4977a03..55d39db39a1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
@@ -60,6 +61,9 @@ import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
@@ -75,6 +79,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
+import java.util.stream.Stream;
 
 import static java.util.Collections.emptyMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -130,7 +135,12 @@ public class AbstractCoordinatorTest {
             Optional.empty(), Optional.empty());
     }
 
+
     private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, 
int rebalanceTimeoutMs, Optional<String> groupInstanceId, 
Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier) {
+        setupCoordinator(retryBackoffMs, retryBackoffMaxMs, 
rebalanceTimeoutMs, groupInstanceId, heartbeatThreadSupplier, 
groupInstanceId.isEmpty());
+    }
+
+    private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, 
int rebalanceTimeoutMs, Optional<String> groupInstanceId, 
Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier, boolean 
leaveOnClose) {
         LogContext logContext = new LogContext();
         this.mockTime = new MockTime();
         ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, 
retryBackoffMaxMs, 60 * 60 * 1000L,
@@ -158,7 +168,7 @@ public class AbstractCoordinatorTest {
                                                                         
groupInstanceId,
                                                                         
retryBackoffMs,
                                                                         
retryBackoffMaxMs,
-                                                                        
groupInstanceId.isEmpty());
+                                                                        
leaveOnClose);
         this.coordinator = new DummyCoordinator(rebalanceConfig,
                                                 consumerClient,
                                                 metrics,
@@ -1095,8 +1105,29 @@ public class AbstractCoordinatorTest {
         checkLeaveGroupRequestSent(Optional.of("groupInstanceId"));
     }
 
-    private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId) {
-        setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
Integer.MAX_VALUE, groupInstanceId, Optional.empty());
+    @ParameterizedTest
+    @MethodSource("groupInstanceIdAndMembershipOperationMatrix")
+    public void 
testLeaveGroupSentWithGroupInstanceIdUnSetAndDifferentGroupMembershipOperation(Optional<String>
 groupInstanceId, CloseOptions.GroupMembershipOperation operation) {
+        checkLeaveGroupRequestSent(groupInstanceId, operation, 
Optional.empty(), true);
+    }
+
+    private static Stream<Arguments> 
groupInstanceIdAndMembershipOperationMatrix() {
+        return Stream.of(
+            Arguments.of(Optional.empty(), 
CloseOptions.GroupMembershipOperation.DEFAULT),
+            Arguments.of(Optional.empty(), 
CloseOptions.GroupMembershipOperation.LEAVE_GROUP),
+            Arguments.of(Optional.empty(), 
CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP),
+            Arguments.of(Optional.of("groupInstanceId"), 
CloseOptions.GroupMembershipOperation.DEFAULT),
+            Arguments.of(Optional.of("groupInstanceId"), 
CloseOptions.GroupMembershipOperation.LEAVE_GROUP),
+            Arguments.of(Optional.of("groupInstanceId"), 
CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP)
+        );
+    }
+
+    private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId)  
{
+        checkLeaveGroupRequestSent(groupInstanceId, 
CloseOptions.GroupMembershipOperation.DEFAULT, Optional.empty(), 
groupInstanceId.isEmpty());
+    }
+
+    private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId, 
CloseOptions.GroupMembershipOperation operation, 
Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier, boolean 
leaveOnClose) {
+        setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
Integer.MAX_VALUE, groupInstanceId, heartbeatThreadSupplier, leaveOnClose);
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, 
leaderId, Errors.NONE));
@@ -1113,12 +1144,13 @@ public class AbstractCoordinatorTest {
 
         try {
             coordinator.ensureActiveGroup();
-            coordinator.close();
-            if (coordinator.isDynamicMember()) {
+            coordinator.close(new MockTime().timer(0), operation);
+            if (CloseOptions.GroupMembershipOperation.LEAVE_GROUP == operation 
||
+                (CloseOptions.GroupMembershipOperation.DEFAULT == operation && 
coordinator.isDynamicMember())) {
                 fail("Expected leavegroup to raise an error.");
             }
         } catch (RuntimeException exception) {
-            if (coordinator.isDynamicMember()) {
+            if (CloseOptions.GroupMembershipOperation.LEAVE_GROUP == operation 
|| coordinator.isDynamicMember()) {
                 assertEquals(exception, e);
             } else {
                 fail("Coordinator with group.instance.id set shouldn't send 
leave group request.");
@@ -1206,7 +1238,7 @@ public class AbstractCoordinatorTest {
         }, leaveGroupResponse);
 
         coordinator.ensureActiveGroup();
-        return coordinator.maybeLeaveGroup(leaveReason);
+        return 
coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT, 
leaveReason);
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index c41ea0029ba..5c9e06ff90d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
@@ -229,7 +230,7 @@ public abstract class ConsumerCoordinatorTest {
     @AfterEach
     public void teardown() {
         this.metrics.close();
-        this.coordinator.close(time.timer(0));
+        this.coordinator.close(time.timer(0), 
CloseOptions.GroupMembershipOperation.DEFAULT);
     }
 
     @Test
@@ -1795,7 +1796,7 @@ public abstract class ConsumerCoordinatorTest {
             return validateLeaveGroup(groupId, consumerId, leaveRequest);
         }, new LeaveGroupResponse(
             new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
-        coordinator.close(time.timer(0));
+        coordinator.close(time.timer(0), 
CloseOptions.GroupMembershipOperation.DEFAULT);
         assertTrue(received.get());
     }
 
@@ -1810,7 +1811,7 @@ public abstract class ConsumerCoordinatorTest {
             LeaveGroupRequest leaveRequest = (LeaveGroupRequest) body;
             return validateLeaveGroup(groupId, consumerId, leaveRequest);
         }, new LeaveGroupResponse(new 
LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
-        coordinator.maybeLeaveGroup("test maybe leave group");
+        
coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT, 
"test maybe leave group");
         assertTrue(received.get());
 
         AbstractCoordinator.Generation generation = 
coordinator.generationIfStable();
@@ -1854,7 +1855,7 @@ public abstract class ConsumerCoordinatorTest {
             return validateLeaveGroup(groupId, consumerId, leaveRequest);
         }, new LeaveGroupResponse(new 
LeaveGroupResponseData().setErrorCode(Errors.NONE.code())));
 
-        coordinator.maybeLeaveGroup("pending member leaves");
+        
coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT, 
"pending member leaves");
         assertTrue(received.get());
     }
 
@@ -2570,7 +2571,7 @@ public abstract class ConsumerCoordinatorTest {
         client.prepareResponse(new LeaveGroupResponse(new 
LeaveGroupResponseData()
                 .setErrorCode(Errors.NONE.code())));
         subscriptions.unsubscribe();
-        coordinator.maybeLeaveGroup("test commit after leave");
+        
coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT, 
"test commit after leave");
         subscriptions.assignFromUser(singleton(t1p));
 
         // the client should not reuse generation/memberId from 
auto-subscribed generation
@@ -3546,7 +3547,7 @@ public abstract class ConsumerCoordinatorTest {
             assertEquals(memberId, coordinator.generation().memberId);
 
             // Imitating heartbeat thread that clears generation data.
-            coordinator.maybeLeaveGroup("Clear generation data.");
+            
coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT, 
"Clear generation data.");
 
             assertEquals(AbstractCoordinator.Generation.NO_GENERATION, 
coordinator.generation());
 
@@ -3783,7 +3784,7 @@ public abstract class ConsumerCoordinatorTest {
             // Run close on a different thread. Coordinator is locked by this 
thread, so it is
             // not safe to use the coordinator from the main thread until the 
task completes.
             Future<?> future = executor.submit(
-                () -> coordinator.close(time.timer(Math.min(closeTimeoutMs, 
requestTimeoutMs))));
+                () -> coordinator.close(time.timer(Math.min(closeTimeoutMs, 
requestTimeoutMs)), CloseOptions.GroupMembershipOperation.DEFAULT));
             // Wait for close to start. If coordinator is known, wait for 
close to queue
             // at least one request. Otherwise, sleep for a short time.
             if (!coordinatorUnknown)
@@ -4078,7 +4079,7 @@ public abstract class ConsumerCoordinatorTest {
 
     private void createRackAwareCoordinator(String rackId, 
MockPartitionAssignor assignor) {
         metrics.close();
-        coordinator.close(time.timer(0));
+        coordinator.close(time.timer(0), 
CloseOptions.GroupMembershipOperation.DEFAULT);
 
         metrics = new Metrics(time);
 
@@ -4146,7 +4147,7 @@ public abstract class ConsumerCoordinatorTest {
 
     private void createMockHeartbeatThreadCoordinator() {
         metrics.close();
-        coordinator.close(time.timer(0));
+        coordinator.close(time.timer(0), 
CloseOptions.GroupMembershipOperation.DEFAULT);
 
         metrics = new Metrics(time);
         coordinator = new ConsumerCoordinator(
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index ac27f569b89..2febd2085b8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.SubscriptionPattern;
 import 
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
@@ -64,7 +65,11 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.stream.Stream;
 
+import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.DEFAULT;
+import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.LEAVE_GROUP;
+import static 
org.apache.kafka.clients.consumer.CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP;
 import static 
org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager.CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG;
 import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.REGEX_RESOLUTION_NOT_SUPPORTED_MSG;
 import static org.apache.kafka.common.utils.Utils.mkSortedSet;
@@ -777,6 +782,29 @@ public class ConsumerHeartbeatRequestManagerTest {
         assertHeartbeat(heartbeatRequestManager, 
DEFAULT_HEARTBEAT_INTERVAL_MS);
     }
 
+    @ParameterizedTest
+    @MethodSource("pollOnLeavingMatrix")
+    public void testPollOnLeaving(Optional<String> groupInstanceId, 
CloseOptions.GroupMembershipOperation operation) {
+        heartbeatRequestManager = createHeartbeatRequestManager(
+            coordinatorRequestManager,
+            membershipManager,
+            heartbeatState,
+            heartbeatRequestState,
+            backgroundEventHandler);
+        when(membershipManager.state()).thenReturn(MemberState.LEAVING);
+        when(membershipManager.groupInstanceId()).thenReturn(groupInstanceId);
+        when(membershipManager.leaveGroupOperation()).thenReturn(operation);
+
+        if (groupInstanceId.isEmpty() && REMAIN_IN_GROUP == operation) {
+            assertNoHeartbeat(heartbeatRequestManager);
+            verify(membershipManager, never()).onHeartbeatRequestGenerated();
+        } else {
+            assertHeartbeat(heartbeatRequestManager, 
DEFAULT_HEARTBEAT_INTERVAL_MS);
+            verify(membershipManager).onHeartbeatRequestGenerated();
+        }
+
+    }
+
     /**
      * This is expected to be the case where a member is already leaving the 
group and the poll
      * timer expires. The poll timer expiration should not transition the 
member to STALE, and
@@ -859,6 +887,7 @@ public class ConsumerHeartbeatRequestManagerTest {
         assertEquals(0, result.unsentRequests.size(), "No heartbeat should be 
sent while a previous one is in-flight");
 
         when(membershipManager.state()).thenReturn(MemberState.LEAVING);
+        when(membershipManager.groupInstanceId()).thenReturn(Optional.empty());
         when(heartbeatState.buildRequestData()).thenReturn(new 
ConsumerGroupHeartbeatRequestData().setMemberEpoch(-1));
         ConsumerGroupHeartbeatRequest heartbeatToLeave = 
getHeartbeatRequest(heartbeatRequestManager, version);
         assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, 
heartbeatToLeave.data().memberEpoch());
@@ -914,17 +943,26 @@ public class ConsumerHeartbeatRequestManagerTest {
         assertEquals(Collections.singletonList(partition), 
topicPartitions.partitions());
     }
 
-    @Test
-    public void testPollOnCloseGeneratesRequestIfNeeded() {
-        when(membershipManager.isLeavingGroup()).thenReturn(true);
+    @ParameterizedTest
+    @MethodSource("pollOnLeavingMatrix")
+    public void testPollOnCloseGeneratesRequestIfNeeded(Optional<String> 
groupInstanceId, CloseOptions.GroupMembershipOperation operation) {
+        if (groupInstanceId.isEmpty() && REMAIN_IN_GROUP == operation)
+            when(membershipManager.isLeavingGroup()).thenReturn(false);
+        else
+            when(membershipManager.isLeavingGroup()).thenReturn(true);
+        when(membershipManager.groupInstanceId()).thenReturn(groupInstanceId);
+        when(membershipManager.leaveGroupOperation()).thenReturn(operation);
+        String membership = groupInstanceId.isEmpty() ? "dynamic" : "static";
         NetworkClientDelegate.PollResult pollResult = 
heartbeatRequestManager.pollOnClose(time.milliseconds());
-        assertEquals(1, pollResult.unsentRequests.size(),
-            "A request to leave the group should be generated if the member is 
still leaving when closing the manager");
-
-        when(membershipManager.isLeavingGroup()).thenReturn(false);
-        pollResult = heartbeatRequestManager.pollOnClose(time.milliseconds());
-        assertTrue(pollResult.unsentRequests.isEmpty(),
-            "No requests should be generated on close if the member is not 
leaving when closing the manager");
+        if (groupInstanceId.isEmpty() && REMAIN_IN_GROUP == operation) {
+            assertTrue(pollResult.unsentRequests.isEmpty(),
+                "A request to leave the group should not be generated if the " 
+ membership + " is still leaving when closing the manager " +
+                    "and GroupMembershipOperation is " + operation.name());
+        } else {
+            assertEquals(1, pollResult.unsentRequests.size(),
+                "A request to leave the group should be generated if the " + 
membership + " is still leaving when closing the manager " +
+                    "and GroupMembershipOperation is " + operation.name());
+        }
     }
 
     @Test
@@ -1149,4 +1187,15 @@ public class ConsumerHeartbeatRequestManagerTest {
         when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
         
when(membershipManager.serverAssignor()).thenReturn(Optional.of(DEFAULT_REMOTE_ASSIGNOR));
     }
+
+    private static Stream<Arguments> pollOnLeavingMatrix() {
+        return Stream.of(
+            Arguments.of(Optional.empty(), DEFAULT),
+            Arguments.of(Optional.empty(), LEAVE_GROUP),
+            Arguments.of(Optional.empty(), REMAIN_IN_GROUP),
+            Arguments.of(Optional.of("groupInstanceId"), DEFAULT),
+            Arguments.of(Optional.of("groupInstanceId"), LEAVE_GROUP),
+            Arguments.of(Optional.of("groupInstanceId"), REMAIN_IN_GROUP)
+        );
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 3a93c25072d..20cf5ea59e6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
@@ -448,6 +449,54 @@ public class ConsumerMembershipManagerTest {
                 membershipManager.memberEpoch());
     }
 
+    @Test
+    public void testLeaveGroupEpochOnClose() {
+        // Static member should leave the group with epoch -2 with 
GroupMembershipOperation.DEFAULT
+        ConsumerMembershipManager membershipManager = 
createMemberInStableState("instance1");
+        mockLeaveGroup();
+        
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT);
+        verify(subscriptionState).unsubscribe();
+        assertEquals(MemberState.LEAVING, membershipManager.state());
+        
assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH,
+            membershipManager.memberEpoch());
+
+        // Static member should leave the group with epoch -1 with 
GroupMembershipOperation.LEAVE_GROUP
+        membershipManager = createMemberInStableState("instance1");
+        mockLeaveGroup();
+        
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
+        verify(subscriptionState).unsubscribe();
+        assertEquals(MemberState.LEAVING, membershipManager.state());
+        assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH,
+            membershipManager.memberEpoch());
+
+        // Static member should leave the group with epoch -2 with 
GroupMembershipOperation.REMAIN_IN_GROUP
+        membershipManager = createMemberInStableState("instance1");
+        mockLeaveGroup();
+        
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP);
+        verify(subscriptionState).unsubscribe();
+        assertEquals(MemberState.LEAVING, membershipManager.state());
+        
assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH,
+            membershipManager.memberEpoch());
+
+        // Dynamic member should leave the group with epoch -1 with 
GroupMembershipOperation.DEFAULT
+        membershipManager = createMemberInStableState(null);
+        mockLeaveGroup();
+        
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.DEFAULT);
+        verify(subscriptionState).unsubscribe();
+        assertEquals(MemberState.LEAVING, membershipManager.state());
+        assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH,
+            membershipManager.memberEpoch());
+
+        // Dynamic member should leave the group with epoch -1 with 
GroupMembershipOperation.LEAVE_GROUP
+        membershipManager = createMemberInStableState(null);
+        mockLeaveGroup();
+        
membershipManager.leaveGroupOnClose(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
+        verify(subscriptionState).unsubscribe();
+        assertEquals(MemberState.LEAVING, membershipManager.state());
+        assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH,
+            membershipManager.memberEpoch());
+    }
+
     /**
      * This is the case where a member is stuck reconciling and transitions 
out of the RECONCILING
      * state (due to failure). When the reconciliation completes it should not 
be applied because
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index c8ddfe5b700..c2a0e7272ed 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.common.metrics.Measurable;
@@ -271,7 +272,7 @@ public class WorkerCoordinator extends AbstractCoordinator 
implements Closeable
     @Override
     protected void handlePollTimeoutExpiry() {
         listener.onPollTimeoutExpiry();
-        maybeLeaveGroup("worker poll timeout has expired.");
+        maybeLeaveGroup(CloseOptions.GroupMembershipOperation.DEFAULT, "worker 
poll timeout has expired.");
     }
 
     /**
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index c89eb33082f..a283ce7cf38 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.GroupRebalanceConfig;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.CloseOptions;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -198,7 +199,7 @@ public class WorkerGroupMember {
     }
 
     public void maybeLeaveGroup(String leaveReason) {
-        coordinator.maybeLeaveGroup(leaveReason);
+        
coordinator.maybeLeaveGroup(CloseOptions.GroupMembershipOperation.LEAVE_GROUP, 
leaveReason);
     }
 
     public String ownerUrl(String connector) {
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index 1679053466f..f5c5755c4eb 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -154,6 +154,7 @@ class ConsumerTask implements Runnable, Closeable {
     }
 
     // visible for testing
+    @SuppressWarnings("deprecation")
     void closeConsumer() {
         try {
             consumer.close(Duration.ofSeconds(30));

Reply via email to