philipnee commented on code in PR #12672: URL: https://github.com/apache/kafka/pull/12672#discussion_r998548854
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java: ########## @@ -16,45 +16,185 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientUtils; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; import org.apache.kafka.clients.consumer.internals.events.EventHandler; +import org.apache.kafka.common.errors.InterruptException; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.network.ChannelBuilder; +import org.apache.kafka.common.network.Selector; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import java.net.InetSocketAddress; +import java.time.Duration; +import java.util.List; import java.util.Optional; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** - * This class interfaces the KafkaConsumer and the background thread. It allows the caller to enqueue {@link ApplicationEvent} - * to be consumed by the background thread and poll {@linkBackgroundEvent} produced by the background thread. + * An {@code EventHandler} that uses a single background thread to consume {@code ApplicationEvent} and produce + * {@code BackgroundEvent} from the {@ConsumerBackgroundThread}. */ public class DefaultEventHandler implements EventHandler { - private final BlockingQueue<ApplicationEvent> applicationEvents; - private final BlockingQueue<BackgroundEvent> backgroundEvents; + private static final String METRIC_GRP_PREFIX = "consumer"; + private final BlockingQueue<ApplicationEvent> applicationEventQueue; + private final BlockingQueue<BackgroundEvent> backgroundEventQueue; + private final DefaultBackgroundThread backgroundThread; - public DefaultEventHandler() { - this.applicationEvents = new LinkedBlockingQueue<>(); - this.backgroundEvents = new LinkedBlockingQueue<>(); - // TODO: a concreted implementation of how requests are being consumed, and how responses are being produced. + + public DefaultEventHandler(Time time, + ConsumerConfig config, + LogContext logContext, + BlockingQueue<ApplicationEvent> applicationEventQueue, + BlockingQueue<BackgroundEvent> backgroundEventQueue, + SubscriptionState subscriptionState, + ApiVersions apiVersions, + Metrics metrics, + ClusterResourceListeners clusterResourceListeners, + Sensor fetcherThrottleTimeSensor) { + this.applicationEventQueue = applicationEventQueue; + this.backgroundEventQueue = backgroundEventQueue; + ConsumerMetadata metadata = bootstrapMetadata(logContext, Review Comment: It should be fine for this call to block, I think because all it does is create a Node object with id = -1 and an IP address. The actual metadata refresh happens during the background thread loop, not during this function call, since there are no network response deps to this call, so it shouldn't be blocking for too long. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org