kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1334894116
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java: ########## @@ -16,41 +16,149 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.internals.IdempotentCloser; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; + +import java.io.Closeable; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.function.Supplier; import static java.util.Objects.requireNonNull; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchConfig; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.configuredIsolationLevel; /** * {@code RequestManagers} provides a means to pass around the set of {@link RequestManager} instances in the system. * This allows callers to both use the specific {@link RequestManager} instance, or to iterate over the list via * the {@link #entries()} method. */ -public class RequestManagers { +public class RequestManagers implements Closeable { + private final Logger log; public final Optional<CoordinatorRequestManager> coordinatorRequestManager; public final Optional<CommitRequestManager> commitRequestManager; public final OffsetsRequestManager offsetsRequestManager; + public final FetchRequestManager fetchRequestManager; + private final List<Optional<? extends RequestManager>> entries; + private final IdempotentCloser closer = new IdempotentCloser(); - public RequestManagers(OffsetsRequestManager offsetsRequestManager, + public RequestManagers(LogContext logContext, + OffsetsRequestManager offsetsRequestManager, + FetchRequestManager fetchRequestManager, Optional<CoordinatorRequestManager> coordinatorRequestManager, Optional<CommitRequestManager> commitRequestManager) { - this.offsetsRequestManager = requireNonNull(offsetsRequestManager, - "OffsetsRequestManager cannot be null"); + this.log = logContext.logger(RequestManagers.class); + this.offsetsRequestManager = requireNonNull(offsetsRequestManager, "OffsetsRequestManager cannot be null"); this.coordinatorRequestManager = coordinatorRequestManager; this.commitRequestManager = commitRequestManager; + this.fetchRequestManager = fetchRequestManager; List<Optional<? extends RequestManager>> list = new ArrayList<>(); list.add(coordinatorRequestManager); list.add(commitRequestManager); list.add(Optional.of(offsetsRequestManager)); + list.add(Optional.of(fetchRequestManager)); entries = Collections.unmodifiableList(list); } public List<Optional<? extends RequestManager>> entries() { return entries; } + + @Override + public void close() { + closer.close( + () -> { + log.debug("Closing RequestManagers"); + + entries.forEach(rm -> { + rm.ifPresent(requestManager -> { + try { + requestManager.close(); + } catch (Throwable t) { + log.debug("Error closing request manager {}", requestManager.getClass().getSimpleName(), t); + } + }); + }); + log.debug("RequestManagers has been closed"); + }, + () -> log.debug("RequestManagers was already closed")); + + } + + /** + * Creates a {@link Supplier} for deferred creation during invocation by + * {@link org.apache.kafka.clients.consumer.internals.DefaultBackgroundThread}. + */ + public static <K, V> Supplier<RequestManagers> supplier(final Time time, Review Comment: Thanks for the catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org