kirktrue commented on code in PR #14670:
URL: https://github.com/apache/kafka/pull/14670#discussion_r1378277149


##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -670,168 +582,11 @@ public KafkaConsumer(Properties properties,
     public KafkaConsumer(Map<String, Object> configs,
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
-        this(new 
ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(configs, 
keyDeserializer, valueDeserializer)),
-                keyDeserializer, valueDeserializer);
+        delegate = CREATOR.create(configs, keyDeserializer, valueDeserializer);
     }
 
-    @SuppressWarnings("unchecked")
     KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, 
Deserializer<V> valueDeserializer) {
-        try {
-            GroupRebalanceConfig groupRebalanceConfig = new 
GroupRebalanceConfig(config,
-                    GroupRebalanceConfig.ProtocolType.CONSUMER);
-
-            this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
-            this.clientId = 
config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
-            LogContext logContext = createLogContext(config, 
groupRebalanceConfig);
-            this.log = logContext.logger(getClass());
-            boolean enableAutoCommit = config.maybeOverrideEnableAutoCommit();
-            groupId.ifPresent(groupIdStr -> {
-                if (groupIdStr.isEmpty()) {
-                    log.warn("Support for using the empty group id by 
consumers is deprecated and will be removed in the next major release.");
-                }
-            });
-
-            log.debug("Initializing the Kafka consumer");
-            this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-            this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
-            this.time = Time.SYSTEM;
-            this.metrics = createMetrics(config, time);
-            this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
-            this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
-
-            List<ConsumerInterceptor<K, V>> interceptorList = 
configuredConsumerInterceptors(config);
-            this.interceptors = new ConsumerInterceptors<>(interceptorList);
-            this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer);
-            this.subscriptions = createSubscriptionState(config, logContext);
-            ClusterResourceListeners clusterResourceListeners = 
ClientUtils.configureClusterResourceListeners(
-                    metrics.reporters(),
-                    interceptorList,
-                    Arrays.asList(this.deserializers.keyDeserializer, 
this.deserializers.valueDeserializer));
-            this.metadata = new ConsumerMetadata(config, subscriptions, 
logContext, clusterResourceListeners);
-            List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config);
-            this.metadata.bootstrap(addresses);
-
-            FetchMetricsManager fetchMetricsManager = 
createFetchMetricsManager(metrics);
-            FetchConfig fetchConfig = new FetchConfig(config);
-            this.isolationLevel = fetchConfig.isolationLevel;
-
-            ApiVersions apiVersions = new ApiVersions();
-            this.client = createConsumerNetworkClient(config,
-                    metrics,
-                    logContext,
-                    apiVersions,
-                    time,
-                    metadata,
-                    fetchMetricsManager.throttleTimeSensor(),
-                    retryBackoffMs);
-
-            this.assignors = ConsumerPartitionAssignor.getAssignorInstances(
-                    
config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
-                    
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId))
-            );
-
-            // no coordinator will be constructed for the default (null) group 
id
-            if (!groupId.isPresent()) {
-                config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
-                
config.ignore(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED);
-                this.coordinator = null;
-            } else {
-                this.coordinator = new 
ConsumerCoordinator(groupRebalanceConfig,
-                        logContext,
-                        this.client,
-                        assignors,
-                        this.metadata,
-                        this.subscriptions,
-                        metrics,
-                        CONSUMER_METRIC_GROUP_PREFIX,
-                        this.time,
-                        enableAutoCommit,
-                        
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
-                        this.interceptors,
-                        
config.getBoolean(ConsumerConfig.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED),
-                        config.getString(ConsumerConfig.CLIENT_RACK_CONFIG));
-            }
-            this.fetcher = new Fetcher<>(
-                    logContext,
-                    this.client,
-                    this.metadata,
-                    this.subscriptions,
-                    fetchConfig,
-                    this.deserializers,
-                    fetchMetricsManager,
-                    this.time);
-            this.offsetFetcher = new OffsetFetcher(logContext,
-                    client,
-                    metadata,
-                    subscriptions,
-                    time,
-                    retryBackoffMs,
-                    requestTimeoutMs,
-                    isolationLevel,
-                    apiVersions);
-            this.topicMetadataFetcher = new TopicMetadataFetcher(logContext,
-                    client,
-                    retryBackoffMs,
-                    retryBackoffMaxMs);
-
-            this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, 
CONSUMER_METRIC_GROUP_PREFIX);
-
-            config.logUnused();
-            AppInfoParser.registerAppInfo(CONSUMER_JMX_PREFIX, clientId, 
metrics, time.milliseconds());
-            log.debug("Kafka consumer initialized");
-        } catch (Throwable t) {
-            // call close methods if internal objects are already constructed; 
this is to prevent resource leak. see KAFKA-2121
-            // we do not need to call `close` at all when `log` is null, which 
means no internal objects were initialized.
-            if (this.log != null) {
-                close(Duration.ZERO, true);
-            }
-            // now propagate the exception
-            throw new KafkaException("Failed to construct kafka consumer", t);
-        }
-    }
-
-    // visible for testing
-    KafkaConsumer(LogContext logContext,

Review Comment:
   Plus the testing dependencies differ wildly between the delegate 
implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to