TaiJuWu commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1859485006


##########
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java:
##########
@@ -267,13 +269,92 @@ public interface Consumer<K, V> extends Closeable {
     void close();
 
     /**
+     * This method has been deprecated since Kafka 4.0 and should use {@link 
Consumer#close(CloseOption)} instead.
+     *
      * @see KafkaConsumer#close(Duration)
      */
+    @Deprecated
     void close(Duration timeout);
 
     /**
      * @see KafkaConsumer#wakeup()
      */
     void wakeup();
 
+    /**
+     * @see KafkaConsumer#close(CloseOption)
+     */
+    void close(final CloseOption option);
+
+    class CloseOption {
+
+        /**
+         * Specifies the group membership operation upon shutdown.
+         * By default, {@code GroupMembershipOperation.DEFAULT} will be 
applied, which follows the consumer's default behavior.
+         */
+        protected GroupMembershipOperation operation = 
GroupMembershipOperation.DEFAULT;
+
+        /**
+         * Specifies the maximum amount of time to wait for the close process 
to complete.
+         * This allows users to define a custom timeout for gracefully 
stopping the consumer.
+         * If no value is set, the default timeout {@link 
ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied.
+         */
+        protected Optional<Duration> timeout = Optional.empty();
+
+        private CloseOption() {
+        }
+
+        protected CloseOption(final CloseOption option) {
+            this.operation = option.operation;
+            this.timeout = option.timeout;
+        }
+
+        /**
+         * Static method to create a {@code CloseOption} with a custom timeout.
+         *
+         * @param timeout the maximum time to wait for the consumer to close.
+         * @return a new {@code CloseOption} instance with the specified 
timeout.
+         */
+        public static CloseOption timeout(final Duration timeout) {
+            CloseOption option = new CloseOption();
+            option.timeout = Optional.ofNullable(timeout);
+            return option;
+        }
+
+        /**
+         * Static method to create a {@code CloseOption} with a specified 
group membership operation.
+         *
+         * @param operation the group membership operation to apply. Must be 
one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
+         *                 or {@code DEFAULT}.
+         * @return a new {@code CloseOption} instance with the specified group 
membership operation.
+         */
+        public static CloseOption groupMembershipOperation(final 
GroupMembershipOperation operation) {
+            CloseOption option = new CloseOption();
+            option.operation = operation;
+            return option;
+        }
+
+        /**
+         * Fluent method to set the timeout for the close process.
+         *
+         * @param timeout the maximum time to wait for the consumer to close. 
If {@code null}, the default timeout will be used.
+         * @return this {@code CloseOption} instance.
+         */
+        public CloseOption withTimeout(final Duration timeout) {
+            this.timeout = Optional.ofNullable(timeout);
+            return this;
+        }
+
+        /**
+         * Fluent method to set the group membership operation upon shutdown.
+         *
+         * @param operation the group membership operation to apply. Must be 
one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
+     *                 or {@code DEFAULT}.

Review Comment:
   nit: align start



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to