frankvicky commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1887087064


##########
clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java:
##########
@@ -277,13 +279,91 @@ public interface Consumer<K, V> extends Closeable {
     void close();
 
     /**
+     * This method has been deprecated since Kafka 4.0 and should use {@link 
Consumer#close(CloseOption)} instead.
+     *
      * @see KafkaConsumer#close(Duration)
      */
+    @Deprecated
     void close(Duration timeout);
 
     /**
      * @see KafkaConsumer#wakeup()
      */
     void wakeup();
 
+    /**
+     * @see KafkaConsumer#close(CloseOption)
+     */
+    void close(final CloseOption option);
+
+    class CloseOption {
+
+        /**
+         * Specifies the group membership operation upon shutdown.
+         * By default, {@code GroupMembershipOperation.DEFAULT} will be 
applied, which follows the consumer's default behavior.
+         */
+        protected GroupMembershipOperation operation = 
GroupMembershipOperation.DEFAULT;
+
+        /**
+         * Specifies the maximum amount of time to wait for the close process 
to complete.
+         * This allows users to define a custom timeout for gracefully 
stopping the consumer.
+         * If no value is set, the default timeout {@link 
ConsumerUtils#DEFAULT_CLOSE_TIMEOUT_MS} will be applied.
+         */
+        protected Optional<Duration> timeout = Optional.empty();
+
+        private CloseOption() {
+        }
+
+        protected CloseOption(final CloseOption option) {
+            this.operation = option.operation;
+            this.timeout = option.timeout;
+        }
+
+        /**
+         * Static method to create a {@code CloseOption} with a custom timeout.
+         *
+         * @param timeout the maximum time to wait for the consumer to close.
+         * @return a new {@code CloseOption} instance with the specified 
timeout.
+         */
+        public static CloseOption timeout(final Duration timeout) {
+            CloseOption option = new CloseOption();
+            option.timeout = Optional.ofNullable(timeout);
+            return option;
+        }
+
+        /**
+         * Static method to create a {@code CloseOption} with a specified 
group membership operation.
+         *
+         * @param operation the group membership operation to apply. Must be 
one of {@code LEAVE_GROUP}, {@code REMAIN_IN_GROUP},
+         *                 or {@code DEFAULT}.
+         * @return a new {@code CloseOption} instance with the specified group 
membership operation.
+         */
+        public static CloseOption groupMembershipOperation(final 
GroupMembershipOperation operation) {

Review Comment:
   https://issues.apache.org/jira/browse/KAFKA-18267



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to