chia7712 commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1886779860
##########
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java:
##########
@@ -277,13 +279,91 @@ public interface Consumer<K, V> extends Closeable {
void close();
/**
+ * This method has been deprecated since Kafka 4.0 and should use {@link
Consumer#close(CloseOption)} instead.
+ *
* @see KafkaConsumer#close(Duration)
*/
+ @Deprecated
void close(Duration timeout);
/**
* @see KafkaConsumer#wakeup()
*/
void wakeup();
+ /**
+ * @see KafkaConsumer#close(CloseOption)
+ */
+ void close(final CloseOption option);
+
+ class CloseOption {
+
+ /**
+ * Specifies the group membership operation upon shutdown.
+ * By default, {@code GroupMembershipOperation.DEFAULT} will be
applied, which follows the consumer's default behavior.
+ */
+ protected GroupMembershipOperation operation =
GroupMembershipOperation.DEFAULT;
+
+ /**
+ * Specifies the maximum amount of time to wait for the close process
to complete.
+ * This allows users to define a custom timeout for gracefully
stopping the consumer.
+ * If no value is set, the default timeout {@link
ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied.
+ */
+ protected Optional<Duration> timeout = Optional.empty();
+
+ private CloseOption() {
+ }
+
+ protected CloseOption(final CloseOption option) {
+ this.operation = option.operation;
+ this.timeout = option.timeout;
+ }
+
+ /**
+ * Static method to create a {@code CloseOption} with a custom timeout.
+ *
+ * @param timeout the maximum time to wait for the consumer to close.
+ * @return a new {@code CloseOption} instance with the specified
timeout.
+ */
+ public static CloseOption timeout(final Duration timeout) {
+ CloseOption option = new CloseOption();
+ option.timeout = Optional.ofNullable(timeout);
+ return option;
+ }
+
+ /**
+ * Static method to create a {@code CloseOption} with a specified
group membership operation.
+ *
+ * @param operation the group membership operation to apply. Must be
one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
+ * or {@code DEFAULT}.
+ * @return a new {@code CloseOption} instance with the specified group
membership operation.
+ */
+ public static CloseOption groupMembershipOperation(final
GroupMembershipOperation operation) {
+ CloseOption option = new CloseOption();
+ option.operation = operation;
Review Comment:
please add null check
##########
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java:
##########
@@ -277,13 +279,91 @@ public interface Consumer<K, V> extends Closeable {
void close();
/**
+ * This method has been deprecated since Kafka 4.0 and should use {@link
Consumer#close(CloseOption)} instead.
+ *
* @see KafkaConsumer#close(Duration)
*/
+ @Deprecated
void close(Duration timeout);
/**
* @see KafkaConsumer#wakeup()
*/
void wakeup();
+ /**
+ * @see KafkaConsumer#close(CloseOption)
+ */
+ void close(final CloseOption option);
+
+ class CloseOption {
+
+ /**
+ * Specifies the group membership operation upon shutdown.
+ * By default, {@code GroupMembershipOperation.DEFAULT} will be
applied, which follows the consumer's default behavior.
+ */
+ protected GroupMembershipOperation operation =
GroupMembershipOperation.DEFAULT;
+
+ /**
+ * Specifies the maximum amount of time to wait for the close process
to complete.
+ * This allows users to define a custom timeout for gracefully
stopping the consumer.
+ * If no value is set, the default timeout {@link
ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied.
+ */
+ protected Optional<Duration> timeout = Optional.empty();
+
+ private CloseOption() {
+ }
+
+ protected CloseOption(final CloseOption option) {
+ this.operation = option.operation;
+ this.timeout = option.timeout;
+ }
+
+ /**
+ * Static method to create a {@code CloseOption} with a custom timeout.
+ *
+ * @param timeout the maximum time to wait for the consumer to close.
+ * @return a new {@code CloseOption} instance with the specified
timeout.
+ */
+ public static CloseOption timeout(final Duration timeout) {
+ CloseOption option = new CloseOption();
+ option.timeout = Optional.ofNullable(timeout);
+ return option;
+ }
+
+ /**
+ * Static method to create a {@code CloseOption} with a specified
group membership operation.
+ *
+ * @param operation the group membership operation to apply. Must be
one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
+ * or {@code DEFAULT}.
+ * @return a new {@code CloseOption} instance with the specified group
membership operation.
+ */
+ public static CloseOption groupMembershipOperation(final
GroupMembershipOperation operation) {
+ CloseOption option = new CloseOption();
+ option.operation = operation;
+ return option;
+ }
+
+ /**
+ * Fluent method to set the timeout for the close process.
+ *
+ * @param timeout the maximum time to wait for the consumer to close.
If {@code null}, the default timeout will be used.
+ * @return this {@code CloseOption} instance.
+ */
+ public CloseOption withTimeout(final Duration timeout) {
+ this.timeout = Optional.ofNullable(timeout);
+ return this;
+ }
+
+ /**
+ * Fluent method to set the group membership operation upon shutdown.
+ *
+ * @param operation the group membership operation to apply. Must be
one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP}, or {@code DEFAULT}.
+ * @return this {@code CloseOption} instance.
+ */
+ public CloseOption withGroupMembershipOperation(final
GroupMembershipOperation operation) {
+ this.operation = operation;
Review Comment:
please add null check
##########
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java:
##########
@@ -277,13 +279,91 @@ public interface Consumer<K, V> extends Closeable {
void close();
/**
+ * This method has been deprecated since Kafka 4.0 and should use {@link
Consumer#close(CloseOption)} instead.
+ *
* @see KafkaConsumer#close(Duration)
*/
+ @Deprecated
void close(Duration timeout);
/**
* @see KafkaConsumer#wakeup()
*/
void wakeup();
+ /**
+ * @see KafkaConsumer#close(CloseOption)
+ */
+ void close(final CloseOption option);
+
+ class CloseOption {
Review Comment:
Perhaps we can move the close method out of the `Consumer` class. Currently,
users need to call
`consumer.close(Consumer.CloseOption.timeout(Duration.xxx))`, which is somewhat
verbose. Simplifying this could enhance the user experience. :(
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java:
##########
@@ -1102,10 +1103,11 @@ public void enforceRebalance() {
@Override
public void close() {
- close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+
close(CloseOption.timeout(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)));
}
@Override
+ @SuppressWarnings("deprecation")
public void close(Duration timeout) {
Review Comment:
ditto
##########
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java:
##########
@@ -277,13 +279,91 @@ public interface Consumer<K, V> extends Closeable {
void close();
/**
+ * This method has been deprecated since Kafka 4.0 and should use {@link
Consumer#close(CloseOption)} instead.
+ *
* @see KafkaConsumer#close(Duration)
*/
+ @Deprecated
void close(Duration timeout);
/**
* @see KafkaConsumer#wakeup()
*/
void wakeup();
+ /**
+ * @see KafkaConsumer#close(CloseOption)
+ */
+ void close(final CloseOption option);
+
+ class CloseOption {
+
+ /**
+ * Specifies the group membership operation upon shutdown.
+ * By default, {@code GroupMembershipOperation.DEFAULT} will be
applied, which follows the consumer's default behavior.
+ */
+ protected GroupMembershipOperation operation =
GroupMembershipOperation.DEFAULT;
+
+ /**
+ * Specifies the maximum amount of time to wait for the close process
to complete.
+ * This allows users to define a custom timeout for gracefully
stopping the consumer.
+ * If no value is set, the default timeout {@link
ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied.
+ */
+ protected Optional<Duration> timeout = Optional.empty();
+
+ private CloseOption() {
+ }
+
+ protected CloseOption(final CloseOption option) {
+ this.operation = option.operation;
+ this.timeout = option.timeout;
+ }
+
+ /**
+ * Static method to create a {@code CloseOption} with a custom timeout.
+ *
+ * @param timeout the maximum time to wait for the consumer to close.
+ * @return a new {@code CloseOption} instance with the specified
timeout.
+ */
+ public static CloseOption timeout(final Duration timeout) {
+ CloseOption option = new CloseOption();
+ option.timeout = Optional.ofNullable(timeout);
+ return option;
+ }
+
+ /**
+ * Static method to create a {@code CloseOption} with a specified
group membership operation.
+ *
+ * @param operation the group membership operation to apply. Must be
one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
+ * or {@code DEFAULT}.
+ * @return a new {@code CloseOption} instance with the specified group
membership operation.
+ */
+ public static CloseOption groupMembershipOperation(final
GroupMembershipOperation operation) {
Review Comment:
Could you please file a follow-up to increase the test coverage?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java:
##########
@@ -172,24 +168,31 @@ public ConsumerMembershipManager(String groupId,
Time time,
RebalanceMetricsManager metricsManager) {
super(groupId,
+ groupInstanceId,
subscriptions,
metadata,
logContext.logger(ConsumerMembershipManager.class),
time,
metricsManager);
- this.groupInstanceId = groupInstanceId;
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.serverAssignor = serverAssignor;
this.commitRequestManager = commitRequestManager;
this.backgroundEventHandler = backgroundEventHandler;
}
/**
- * @return Instance ID used by the member when joining the group. If
non-empty, it will indicate that
- * this is a static member.
+ * {@inheritDoc}
*/
- public Optional<String> groupInstanceId() {
- return groupInstanceId;
+ @Override
+ public void leaveGroupOperationOnClose(GroupMembershipOperation operation)
{
+ if (GroupMembershipOperation.DEFAULT.equals(operation)) {
Review Comment:
why do we need those check? we eventually need to handle the `DEFAULT` for
different cases, right?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##########
@@ -1114,22 +1115,26 @@ private boolean isProtocolTypeInconsistent(String
protocolType) {
*/
@Override
public final void close() {
- close(time.timer(0));
+ close(time.timer(0), GroupMembershipOperation.DEFAULT);
}
/**
* @throws KafkaException if the rebalance callback throws exception
*/
- protected void close(Timer timer) {
+ protected void close(Timer timer, GroupMembershipOperation
membershipOperation) {
try {
closeHeartbeatThread();
} finally {
// Synchronize after closing the heartbeat thread since heartbeat
thread
// needs this lock to complete and terminate after close flag is
set.
synchronized (this) {
- if (rebalanceConfig.leaveGroupOnClose) {
+ // If membershipOperation is REMAIN_IN_GROUP, never send leave
group request.
+ // If membershipOperation is DEFAULT, leave group based on
rebalanceConfig.leaveGroupOnClose.
+ // Otherwise, leave group only if membershipOperation is
LEAVE_GROUP.
+ if (GroupMembershipOperation.REMAIN_IN_GROUP !=
membershipOperation &&
+ (GroupMembershipOperation.LEAVE_GROUP ==
membershipOperation || rebalanceConfig.leaveGroupOnClose)) {
Review Comment:
I agree with @ableegoldman that we should check the new option in
`maybeLeaveGroup`. Additionally, since we haven't discussed it in the KIP,
should we skip the callback if the state is `REMAIN_IN_GROUP`? I prefer not to
modify the callback to minimize changes
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1236,10 +1237,11 @@ public void enforceRebalance(String reason) {
@Override
public void close() {
- close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
+
close(CloseOption.timeout(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS)));
}
@Override
+ @SuppressWarnings("deprecation")
public void close(Duration timeout) {
Review Comment:
this implementation should call `close(CloseOption.timeout(timeout));`
directly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]