kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1334894116


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java:
##########
@@ -16,41 +16,149 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.internals.IdempotentCloser;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Supplier;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchConfig;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredIsolationLevel;
 
 /**
  * {@code RequestManagers} provides a means to pass around the set of {@link 
RequestManager} instances in the system.
  * This allows callers to both use the specific {@link RequestManager} 
instance, or to iterate over the list via
  * the {@link #entries()} method.
  */
-public class RequestManagers {
+public class RequestManagers implements Closeable {
 
+    private final Logger log;
     public final Optional<CoordinatorRequestManager> coordinatorRequestManager;
     public final Optional<CommitRequestManager> commitRequestManager;
     public final OffsetsRequestManager offsetsRequestManager;
+    public final FetchRequestManager fetchRequestManager;
+
     private final List<Optional<? extends RequestManager>> entries;
+    private final IdempotentCloser closer = new IdempotentCloser();
 
-    public RequestManagers(OffsetsRequestManager offsetsRequestManager,
+    public RequestManagers(LogContext logContext,
+                           OffsetsRequestManager offsetsRequestManager,
+                           FetchRequestManager fetchRequestManager,
                            Optional<CoordinatorRequestManager> 
coordinatorRequestManager,
                            Optional<CommitRequestManager> 
commitRequestManager) {
-        this.offsetsRequestManager = requireNonNull(offsetsRequestManager,
-                "OffsetsRequestManager cannot be null");
+        this.log = logContext.logger(RequestManagers.class);
+        this.offsetsRequestManager = requireNonNull(offsetsRequestManager, 
"OffsetsRequestManager cannot be null");
         this.coordinatorRequestManager = coordinatorRequestManager;
         this.commitRequestManager = commitRequestManager;
+        this.fetchRequestManager = fetchRequestManager;
 
         List<Optional<? extends RequestManager>> list = new ArrayList<>();
         list.add(coordinatorRequestManager);
         list.add(commitRequestManager);
         list.add(Optional.of(offsetsRequestManager));
+        list.add(Optional.of(fetchRequestManager));
         entries = Collections.unmodifiableList(list);
     }
 
     public List<Optional<? extends RequestManager>> entries() {
         return entries;
     }
+
+    @Override
+    public void close() {
+        closer.close(
+                () -> {
+                    log.debug("Closing RequestManagers");
+
+                    entries.forEach(rm -> {
+                        rm.ifPresent(requestManager -> {
+                            try {
+                                requestManager.close();
+                            } catch (Throwable t) {
+                                log.debug("Error closing request manager {}", 
requestManager.getClass().getSimpleName(), t);
+                            }
+                        });
+                    });
+                    log.debug("RequestManagers has been closed");
+                },
+                () -> log.debug("RequestManagers was already closed"));
+
+    }
+
+    /**
+     * Creates a {@link Supplier} for deferred creation during invocation by
+     * {@link 
org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread}.
+     */
+    public static <K, V> Supplier<RequestManagers> supplier(final Time time,

Review Comment:
   Thanks for the catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to