philipnee commented on code in PR #12672:
URL: https://github.com/apache/kafka/pull/12672#discussion_r998548854


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultEventHandler.java:
##########
@@ -16,45 +16,185 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import org.apache.kafka.clients.consumer.internals.events.EventHandler;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
 
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
- * This class interfaces the KafkaConsumer and the background thread.  It 
allows the caller to enqueue {@link ApplicationEvent}
- * to be consumed by the background thread and poll {@linkBackgroundEvent} 
produced by the background thread.
+ * An {@code EventHandler} that uses a single background thread to consume 
{@code ApplicationEvent} and produce
+ * {@code BackgroundEvent} from the {@ConsumerBackgroundThread}.
  */
 public class DefaultEventHandler implements EventHandler {
-    private final BlockingQueue<ApplicationEvent> applicationEvents;
-    private final BlockingQueue<BackgroundEvent> backgroundEvents;
+    private static final String METRIC_GRP_PREFIX = "consumer";
+    private final BlockingQueue<ApplicationEvent> applicationEventQueue;
+    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+    private final DefaultBackgroundThread backgroundThread;
 
-    public DefaultEventHandler() {
-        this.applicationEvents = new LinkedBlockingQueue<>();
-        this.backgroundEvents = new LinkedBlockingQueue<>();
-        // TODO: a concreted implementation of how requests are being 
consumed, and how responses are being produced.
+
+    public DefaultEventHandler(Time time,
+                               ConsumerConfig config,
+                               LogContext logContext,
+                               BlockingQueue<ApplicationEvent> 
applicationEventQueue,
+                               BlockingQueue<BackgroundEvent> 
backgroundEventQueue,
+                               SubscriptionState subscriptionState,
+                               ApiVersions apiVersions,
+                               Metrics metrics,
+                               ClusterResourceListeners 
clusterResourceListeners,
+                               Sensor fetcherThrottleTimeSensor) {
+        this.applicationEventQueue = applicationEventQueue;
+        this.backgroundEventQueue = backgroundEventQueue;
+        ConsumerMetadata metadata = bootstrapMetadata(logContext,

Review Comment:
   It should be fine for this call to block, I think because all it does is 
create a Node object with id = -1 and an IP address. The actual metadata 
refresh happens during the background thread loop, not during this function 
call, since there are no network response deps to this call, so it shouldn't be 
blocking for too long.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to