frankvicky commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1887087064
##########
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java:
##########
@@ -277,13 +279,91 @@ public interface Consumer<K, V> extends Closeable {
void close();
/**
+ * This method has been deprecated since Kafka 4.0 and should use {@link
Consumer#close(CloseOption)} instead.
+ *
* @see KafkaConsumer#close(Duration)
*/
+ @Deprecated
void close(Duration timeout);
/**
* @see KafkaConsumer#wakeup()
*/
void wakeup();
+ /**
+ * @see KafkaConsumer#close(CloseOption)
+ */
+ void close(final CloseOption option);
+
+ class CloseOption {
+
+ /**
+ * Specifies the group membership operation upon shutdown.
+ * By default, {@code GroupMembershipOperation.DEFAULT} will be
applied, which follows the consumer's default behavior.
+ */
+ protected GroupMembershipOperation operation =
GroupMembershipOperation.DEFAULT;
+
+ /**
+ * Specifies the maximum amount of time to wait for the close process
to complete.
+ * This allows users to define a custom timeout for gracefully
stopping the consumer.
+ * If no value is set, the default timeout {@link
ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied.
+ */
+ protected Optional<Duration> timeout = Optional.empty();
+
+ private CloseOption() {
+ }
+
+ protected CloseOption(final CloseOption option) {
+ this.operation = option.operation;
+ this.timeout = option.timeout;
+ }
+
+ /**
+ * Static method to create a {@code CloseOption} with a custom timeout.
+ *
+ * @param timeout the maximum time to wait for the consumer to close.
+ * @return a new {@code CloseOption} instance with the specified
timeout.
+ */
+ public static CloseOption timeout(final Duration timeout) {
+ CloseOption option = new CloseOption();
+ option.timeout = Optional.ofNullable(timeout);
+ return option;
+ }
+
+ /**
+ * Static method to create a {@code CloseOption} with a specified
group membership operation.
+ *
+ * @param operation the group membership operation to apply. Must be
one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
+ * or {@code DEFAULT}.
+ * @return a new {@code CloseOption} instance with the specified group
membership operation.
+ */
+ public static CloseOption groupMembershipOperation(final
GroupMembershipOperation operation) {
Review Comment:
https://issues.apache.org/jira/browse/KAFKA-18267
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]