This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4bd3fd840d KAFKA-14160: Streamline clusterId retrieval in Connect 
(#12536)
4bd3fd840d is described below

commit 4bd3fd840d0a6d61d20f17c4ed6b53b09ec21cf1
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Aug 23 17:09:22 2022 +0200

    KAFKA-14160: Streamline clusterId retrieval in Connect (#12536)
    
    Cache the Kafka cluster Id once it has been retrieved to avoid creating 
many Admin clients at startup.
    
    Reviewers: Chris Egerton <[email protected]>
---
 .../apache/kafka/connect/mirror/MirrorMaker.java   |  2 +-
 .../kafka/connect/cli/ConnectDistributed.java      |  2 +-
 .../kafka/connect/cli/ConnectStandalone.java       |  9 ++-
 .../org/apache/kafka/connect/runtime/Worker.java   |  3 +-
 .../apache/kafka/connect/runtime/WorkerConfig.java | 46 ++++++++++++++-
 .../runtime/distributed/WorkerGroupMember.java     | 17 ++----
 .../connect/storage/KafkaConfigBackingStore.java   |  4 +-
 .../connect/storage/KafkaOffsetBackingStore.java   |  2 +-
 .../connect/storage/KafkaStatusBackingStore.java   |  2 +-
 .../apache/kafka/connect/util/ConnectUtils.java    | 30 ----------
 .../kafka/connect/runtime/WorkerConfigTest.java    | 68 ++++++++++++++++++++++
 .../apache/kafka/connect/runtime/WorkerTest.java   | 36 ++++++------
 .../runtime/distributed/WorkerGroupMemberTest.java | 10 ++--
 .../storage/KafkaConfigBackingStoreTest.java       | 10 ++--
 .../storage/KafkaOffsetBackingStoreTest.java       |  8 +--
 .../kafka/connect/util/ConnectUtilsTest.java       | 38 ------------
 16 files changed, 160 insertions(+), 127 deletions(-)

diff --git 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index 7ac68319bc..cdc1d47fcf 100644
--- 
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++ 
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -233,7 +233,7 @@ public class MirrorMaker {
         Plugins plugins = new Plugins(workerProps);
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig distributedConfig = new 
DistributedConfig(workerProps);
-        String kafkaClusterId = 
ConnectUtils.lookupKafkaClusterId(distributedConfig);
+        String kafkaClusterId = distributedConfig.kafkaClusterId();
         // Create the admin client to be shared by all backing stores for this 
herder
         Map<String, Object> adminProps = new 
HashMap<>(distributedConfig.originals());
         ConnectUtils.addMetricsContextProperties(adminProps, 
distributedConfig, kafkaClusterId);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 8d93e79591..469f73add7 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -94,7 +94,7 @@ public class ConnectDistributed {
         plugins.compareAndSwapWithDelegatingLoader();
         DistributedConfig config = new DistributedConfig(workerProps);
 
-        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+        String kafkaClusterId = config.kafkaClusterId();
         log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
         RestServer rest = new RestServer(config);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 19cc115d9d..815be29fbe 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -32,7 +32,7 @@ import 
org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.connect.storage.FileOffsetBackingStore;
-import org.apache.kafka.connect.util.ConnectUtils;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,7 +79,7 @@ public class ConnectStandalone {
             plugins.compareAndSwapWithDelegatingLoader();
             StandaloneConfig config = new StandaloneConfig(workerProps);
 
-            String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+            String kafkaClusterId = config.kafkaClusterId();
             log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
             RestServer rest = new RestServer(config);
@@ -88,10 +88,13 @@ public class ConnectStandalone {
             URI advertisedUrl = rest.advertisedUrl();
             String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
 
+            OffsetBackingStore offsetBackingStore = new 
FileOffsetBackingStore();
+            offsetBackingStore.configure(config);
+
             ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy = plugins.newPlugin(
                 
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
                 config, ConnectorClientConfigOverridePolicy.class);
-            Worker worker = new Worker(workerId, time, plugins, config, new 
FileOffsetBackingStore(),
+            Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore,
                                        connectorClientConfigOverridePolicy);
 
             Herder herder = new StandaloneHerder(worker, kafkaClusterId, 
connectorClientConfigOverridePolicy);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 5bc67693d0..55e445ca8e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -153,7 +153,7 @@ public class Worker {
             ExecutorService executorService,
             ConnectorClientConfigOverridePolicy 
connectorClientConfigOverridePolicy
     ) {
-        this.kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+        this.kafkaClusterId = config.kafkaClusterId();
         this.metrics = new ConnectMetrics(workerId, config, time, 
kafkaClusterId);
         this.executor = executorService;
         this.workerId = workerId;
@@ -168,7 +168,6 @@ public class Worker {
         this.internalValueConverter = plugins.newInternalConverter(false, 
JsonConverter.class.getName(), internalConverterConfig);
 
         this.globalOffsetBackingStore = globalOffsetBackingStore;
-        this.globalOffsetBackingStore.configure(config);
 
         this.workerConfigTransformer = initConfigTransformer();
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 38dbeb87e1..d8cec7173e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -18,6 +18,8 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -27,6 +29,7 @@ import org.apache.kafka.common.config.SslClientAuth;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.storage.SimpleHeaderConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +41,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
 import java.util.regex.Pattern;
 
 import org.eclipse.jetty.util.StringUtil;
@@ -188,7 +192,7 @@ public class WorkerConfig extends AbstractConfig {
     public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG = 
"connector.client.config.override.policy";
     public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC =
         "Class name or alias of implementation of 
<code>ConnectorClientConfigOverridePolicy</code>. Defines what client 
configurations can be "
-        + "overriden by the connector. The default implementation is `All`, 
meaning connector configurations can override all client properties. "
+        + "overridden by the connector. The default implementation is `All`, 
meaning connector configurations can override all client properties. "
         + "The other possible policies in the framework include `None` to 
disallow connectors from overriding client properties, "
         + "and `Principal` to allow connectors to override only client 
principals.";
     public static final String CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT = "All";
@@ -306,6 +310,35 @@ public class WorkerConfig extends AbstractConfig {
                 .withClientSslSupport();
     }
 
+    private String kafkaClusterId;
+
+    public static String lookupKafkaClusterId(WorkerConfig config) {
+        log.info("Creating Kafka admin client");
+        try (Admin adminClient = Admin.create(config.originals())) {
+            return lookupKafkaClusterId(adminClient);
+        }
+    }
+
+    static String lookupKafkaClusterId(Admin adminClient) {
+        log.debug("Looking up Kafka cluster ID");
+        try {
+            KafkaFuture<String> clusterIdFuture = 
adminClient.describeCluster().clusterId();
+            if (clusterIdFuture == null) {
+                log.info("Kafka cluster version is too old to return cluster 
ID");
+                return null;
+            }
+            log.debug("Fetching Kafka cluster ID");
+            String kafkaClusterId = clusterIdFuture.get();
+            log.info("Kafka cluster ID: {}", kafkaClusterId);
+            return kafkaClusterId;
+        } catch (InterruptedException e) {
+            throw new ConnectException("Unexpectedly interrupted when looking 
up Kafka cluster info", e);
+        } catch (ExecutionException e) {
+            throw new ConnectException("Failed to connect to and describe 
Kafka cluster. "
+                                       + "Check worker's broker connection and 
security properties.", e);
+        }
+    }
+
     private void logInternalConverterRemovalWarnings(Map<String, String> 
props) {
         List<String> removedProperties = new ArrayList<>();
         for (String property : Arrays.asList("internal.key.converter", 
"internal.value.converter")) {
@@ -321,7 +354,7 @@ public class WorkerConfig extends AbstractConfig {
                             + "and specifying them will have no effect. "
                             + "Instead, an instance of the JsonConverter with 
schemas.enable "
                             + "set to false will be used. For more 
information, please visit "
-                            + "http://kafka.apache.org/documentation/#upgrade 
and consult the upgrade notes"
+                            + "https://kafka.apache.org/documentation/#upgrade 
and consult the upgrade notes"
                             + "for the 3.0 release.",
                     removedProperties);
         }
@@ -409,6 +442,13 @@ public class WorkerConfig extends AbstractConfig {
         return null;
     }
 
+    public String kafkaClusterId() {
+        if (kafkaClusterId == null) {
+            kafkaClusterId = lookupKafkaClusterId(this);
+        }
+        return kafkaClusterId;
+    }
+
     @Override
     protected Map<String, Object> postProcessParsedConfig(final Map<String, 
Object> parsedValues) {
         return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, 
parsedValues);
@@ -434,7 +474,7 @@ public class WorkerConfig extends AbstractConfig {
             String[] configTokens = config.trim().split("\\s+", 2);
             if (configTokens.length != 2) {
                 throw new ConfigException(String.format("Invalid format of 
header config '%s'. "
-                        + "Expected: '[ation] [header name]:[header value]'", 
config));
+                        + "Expected: '[action] [header name]:[header value]'", 
config));
             }
 
             // validate action
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 4c1d6a5b9e..d7ad3f4eb3 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -39,7 +39,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
-import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 
@@ -62,12 +61,9 @@ public class WorkerGroupMember {
     private static final String JMX_PREFIX = "kafka.connect";
 
     private final Logger log;
-    private final Time time;
     private final String clientId;
     private final ConsumerNetworkClient client;
     private final Metrics metrics;
-    private final Metadata metadata;
-    private final long retryBackoffMs;
     private final WorkerCoordinator coordinator;
 
     private boolean stopped = false;
@@ -80,7 +76,6 @@ public class WorkerGroupMember {
                              String clientId,
                              LogContext logContext) {
         try {
-            this.time = time;
             this.clientId = clientId;
             this.log = logContext.logger(WorkerGroupMember.class);
 
@@ -98,23 +93,23 @@ public class WorkerGroupMember {
 
             Map<String, Object> contextLabels = new HashMap<>();
             
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
-            contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, 
ConnectUtils.lookupKafkaClusterId(config));
+            contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID, 
config.kafkaClusterId());
             contextLabels.put(WorkerConfig.CONNECT_GROUP_ID, 
config.getString(DistributedConfig.GROUP_ID_CONFIG));
             MetricsContext metricsContext = new 
KafkaMetricsContext(JMX_PREFIX, contextLabels);
 
             this.metrics = new Metrics(metricConfig, reporters, time, 
metricsContext);
-            this.retryBackoffMs = 
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
-            this.metadata = new Metadata(retryBackoffMs, 
config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
+            long retryBackoffMs = 
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
+            Metadata metadata = new Metadata(retryBackoffMs, 
config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
                     logContext, new ClusterResourceListeners());
             List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(
                     
config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
                     
config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
-            this.metadata.bootstrap(addresses);
+            metadata.bootstrap(addresses);
             String metricGrpPrefix = "connect";
             ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(config, time, logContext);
             NetworkClient netClient = new NetworkClient(
                     new 
Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG), 
metrics, time, metricGrpPrefix, channelBuilder, logContext),
-                    this.metadata,
+                    metadata,
                     clientId,
                     100, // a fixed large enough value will suffice
                     
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
@@ -142,7 +137,7 @@ public class WorkerGroupMember {
                     this.client,
                     metrics,
                     metricGrpPrefix,
-                    this.time,
+                    time,
                     restUrl,
                     configStorage,
                     listener,
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 6edd0e5a76..edbab8ec04 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -378,7 +378,7 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
 
     private Map<String, Object> baseProducerProps(WorkerConfig workerConfig) {
         Map<String, Object> producerProps = new 
HashMap<>(workerConfig.originals());
-        String kafkaClusterId = 
ConnectUtils.lookupKafkaClusterId(workerConfig);
+        String kafkaClusterId = workerConfig.kafkaClusterId();
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 
Integer.MAX_VALUE);
@@ -665,7 +665,7 @@ public class KafkaConfigBackingStore implements 
ConfigBackingStore {
     KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, 
final WorkerConfig config) {
         Map<String, Object> producerProps = new HashMap<>(baseProducerProps);
 
-        String clusterId = ConnectUtils.lookupKafkaClusterId(config);
+        String clusterId = config.kafkaClusterId();
         Map<String, Object> originals = config.originals();
 
         Map<String, Object> consumerProps = new HashMap<>(originals);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 6693572002..28fd37709d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -172,7 +172,7 @@ public class KafkaOffsetBackingStore implements 
OffsetBackingStore {
 
         this.exactlyOnce = config.exactlyOnceSourceEnabled();
 
-        String clusterId = ConnectUtils.lookupKafkaClusterId(config);
+        String clusterId = config.kafkaClusterId();
 
         Map<String, Object> originals = config.originals();
         Map<String, Object> producerProps = new HashMap<>(originals);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 3ba6996da8..ebf4939bfd 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -164,7 +164,7 @@ public class KafkaStatusBackingStore implements 
StatusBackingStore {
         if (this.statusTopic == null || this.statusTopic.trim().length() == 0)
             throw new ConfigException("Must specify topic for connector 
status.");
 
-        String clusterId = ConnectUtils.lookupKafkaClusterId(config);
+        String clusterId = config.kafkaClusterId();
         Map<String, Object> originals = config.originals();
         Map<String, Object> producerProps = new HashMap<>(originals);
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index 0af14cc7f3..a043d0f470 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -17,8 +17,6 @@
 package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.InvalidRecordException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.connect.connector.Connector;
@@ -35,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
@@ -52,33 +49,6 @@ public final class ConnectUtils {
             throw new InvalidRecordException(String.format("Invalid record 
timestamp %d", timestamp));
     }
 
-    public static String lookupKafkaClusterId(WorkerConfig config) {
-        log.info("Creating Kafka admin client");
-        try (Admin adminClient = Admin.create(config.originals())) {
-            return lookupKafkaClusterId(adminClient);
-        }
-    }
-
-    static String lookupKafkaClusterId(Admin adminClient) {
-        log.debug("Looking up Kafka cluster ID");
-        try {
-            KafkaFuture<String> clusterIdFuture = 
adminClient.describeCluster().clusterId();
-            if (clusterIdFuture == null) {
-                log.info("Kafka cluster version is too old to return cluster 
ID");
-                return null;
-            }
-            log.debug("Fetching Kafka cluster ID");
-            String kafkaClusterId = clusterIdFuture.get();
-            log.info("Kafka cluster ID: {}", kafkaClusterId);
-            return kafkaClusterId;
-        } catch (InterruptedException e) {
-            throw new ConnectException("Unexpectedly interrupted when looking 
up Kafka cluster info", e);
-        } catch (ExecutionException e) {
-            throw new ConnectException("Failed to connect to and describe 
Kafka cluster. "
-                                       + "Check worker's broker connection and 
security properties.", e);
-        }
-    }
-
     /**
      * Ensure that the {@link Map properties} contain an expected value for 
the given key, inserting the
      * expected value into the properties if necessary.
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
index f02892888e..ec9a843d4d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
@@ -17,9 +17,16 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
 
 import java.util.Arrays;
 import java.util.HashMap;
@@ -31,6 +38,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
 
 public class WorkerConfigTest {
     private static final List<String> VALID_HEADER_CONFIGS = Arrays.asList(
@@ -58,6 +68,20 @@ public class WorkerConfigTest {
             "set X-Frame-Options:DENY, add  :no-cache, no-store, 
must-revalidate "
     );
 
+    private static final String CLUSTER_ID = "cluster-id";
+    private MockedStatic<WorkerConfig> workerConfigMockedStatic;
+
+    @Before
+    public void setup() {
+        workerConfigMockedStatic = mockStatic(WorkerConfig.class, new 
CallsRealMethods());
+        workerConfigMockedStatic.when(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
+    }
+
+    @After
+    public void teardown() {
+        workerConfigMockedStatic.close();
+    }
+
     @Test
     public void testListenersConfigAllowedValues() {
         Map<String, String> props = baseProps();
@@ -157,6 +181,50 @@ public class WorkerConfigTest {
         
assertTrue(ce.getMessage().contains(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG));
     }
 
+    @Test
+    public void testLookupKafkaClusterId() {
+        final Node broker1 = new Node(0, "dummyHost-1", 1234);
+        final Node broker2 = new Node(1, "dummyHost-2", 1234);
+        List<Node> cluster = Arrays.asList(broker1, broker2);
+        MockAdminClient adminClient = new MockAdminClient.Builder().
+                brokers(cluster).build();
+        assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, 
WorkerConfig.lookupKafkaClusterId(adminClient));
+    }
+
+    @Test
+    public void testLookupNullKafkaClusterId() {
+        final Node broker1 = new Node(0, "dummyHost-1", 1234);
+        final Node broker2 = new Node(1, "dummyHost-2", 1234);
+        List<Node> cluster = Arrays.asList(broker1, broker2);
+        MockAdminClient adminClient = new MockAdminClient.Builder().
+                brokers(cluster).clusterId(null).build();
+        assertNull(WorkerConfig.lookupKafkaClusterId(adminClient));
+    }
+
+    @Test
+    public void testLookupKafkaClusterIdTimeout() {
+        final Node broker1 = new Node(0, "dummyHost-1", 1234);
+        final Node broker2 = new Node(1, "dummyHost-2", 1234);
+        List<Node> cluster = Arrays.asList(broker1, broker2);
+        MockAdminClient adminClient = new MockAdminClient.Builder().
+                brokers(cluster).build();
+        adminClient.timeoutNextRequest(1);
+
+        assertThrows(ConnectException.class, () -> 
WorkerConfig.lookupKafkaClusterId(adminClient));
+    }
+
+    @Test
+    public void testKafkaClusterId() {
+        Map<String, String> props = baseProps();
+        WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(), 
props);
+        assertEquals(CLUSTER_ID, config.kafkaClusterId());
+        workerConfigMockedStatic.verify(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)), times(1));
+
+        // next calls hit the cache
+        assertEquals(CLUSTER_ID, config.kafkaClusterId());
+        workerConfigMockedStatic.verify(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)), times(1));
+    }
+
     private void assertInvalidHeaderConfig(String config) {
         assertThrows(ConfigException.class, () -> 
WorkerConfig.validateHttpResponseHeaderConfig(config));
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index ef9398ace3..9eb622cafb 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -59,7 +59,6 @@ import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
-import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.FutureCallback;
 import org.apache.kafka.connect.util.ParameterizedTest;
@@ -201,7 +200,7 @@ public class WorkerTest {
     private final boolean enableTopicCreation;
 
     private MockedStatic<Plugins> pluginsMockedStatic;
-    private MockedStatic<ConnectUtils> connectUtilsMockedStatic;
+    private MockedStatic<WorkerConfig> workerConfigMockedStatic;
     private MockedConstruction<WorkerSourceTask> sourceTaskMockedConstruction;
     private MockitoSession mockitoSession;
 
@@ -263,8 +262,8 @@ public class WorkerTest {
         pluginsMockedStatic = mockStatic(Plugins.class);
 
         // pass through things that aren't explicitly mocked out
-        connectUtilsMockedStatic = mockStatic(ConnectUtils.class, new 
CallsRealMethods());
-        connectUtilsMockedStatic.when(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
+        workerConfigMockedStatic = mockStatic(WorkerConfig.class, new 
CallsRealMethods());
+        workerConfigMockedStatic.when(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
 
         // Make calls to new WorkerSourceTask() return a mock to avoid the 
source task trying to connect to a broker.
         sourceTaskMockedConstruction = 
mockConstructionWithAnswer(WorkerSourceTask.class, invocation -> {
@@ -289,7 +288,7 @@ public class WorkerTest {
         // Ideal would be to use try-with-resources in an individual test, but 
it introduced a rather large level of
         // indentation of most test bodies, hence sticking with setup() / 
teardown()
         pluginsMockedStatic.close();
-        connectUtilsMockedStatic.close();
+        workerConfigMockedStatic.close();
         sourceTaskMockedConstruction.close();
 
         mockitoSession.finishMocking();
@@ -309,7 +308,7 @@ public class WorkerTest {
 
         pluginsMockedStatic.when(() -> 
Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
         pluginsMockedStatic.when(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
-        connectUtilsMockedStatic.when(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
+        workerConfigMockedStatic.when(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)))
                                 .thenReturn(CLUSTER_ID);
 
         worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@@ -359,7 +358,7 @@ public class WorkerTest {
 
         pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(pluginLoader), times(2));
         pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
-        connectUtilsMockedStatic.verify(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+        workerConfigMockedStatic.verify(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
 
         verify(sourceConnector).stop();
         verify(connectorStatusListener).onShutdown(CONNECTOR_ID);
@@ -387,7 +386,7 @@ public class WorkerTest {
         when(plugins.delegatingLoader()).thenReturn(delegatingLoader);
         
when(delegatingLoader.connectorLoader(nonConnectorClass)).thenReturn(delegatingLoader);
         pluginsMockedStatic.when(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(delegatingLoader);
-        connectUtilsMockedStatic.when(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
+        workerConfigMockedStatic.when(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)))
                                 .thenReturn("test-cluster");
 
         when(plugins.newConnector(anyString())).thenThrow(exception);
@@ -422,7 +421,7 @@ public class WorkerTest {
         verify(plugins).newConnector(anyString());
         verify(connectorStatusListener).onFailure(eq(CONNECTOR_ID), 
any(ConnectException.class));
         pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
-        connectUtilsMockedStatic.verify(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+        workerConfigMockedStatic.verify(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
     }
 
     @Test
@@ -437,7 +436,7 @@ public class WorkerTest {
 
         pluginsMockedStatic.when(() -> 
Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
         pluginsMockedStatic.when(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
-        connectUtilsMockedStatic.when(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
+        workerConfigMockedStatic.when(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)))
                                 .thenReturn("test-cluster");
 
         connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorAlias);
@@ -479,7 +478,7 @@ public class WorkerTest {
 
         pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(pluginLoader), times(2));
         pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
-        connectUtilsMockedStatic.verify(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+        workerConfigMockedStatic.verify(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
     }
 
     @Test
@@ -529,7 +528,7 @@ public class WorkerTest {
         verify(ctx).close();
 
         pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
-        connectUtilsMockedStatic.verify(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+        workerConfigMockedStatic.verify(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
     }
 
     @Test
@@ -676,7 +675,7 @@ public class WorkerTest {
 
         pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(pluginLoader), times(2));
         pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
-        connectUtilsMockedStatic.verify(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+        workerConfigMockedStatic.verify(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
     }
 
     @Test
@@ -734,7 +733,7 @@ public class WorkerTest {
 
         pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(pluginLoader), times(2));
         pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
-        connectUtilsMockedStatic.verify(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+        workerConfigMockedStatic.verify(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
     }
 
     @Test
@@ -806,7 +805,7 @@ public class WorkerTest {
 
         pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(pluginLoader), times(2));
         pluginsMockedStatic.verify(() -> 
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
-        connectUtilsMockedStatic.verify(() -> 
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+        workerConfigMockedStatic.verify(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
     }
 
     @Test
@@ -918,7 +917,7 @@ public class WorkerTest {
         mockInternalConverters();
         mockFileConfigProvider();
 
-        connectUtilsMockedStatic.when(() -> 
ConnectUtils.lookupKafkaClusterId(any())).thenReturn(CLUSTER_ID);
+        workerConfigMockedStatic.when(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
 
         worker = new Worker(WORKER_ID,
             new MockTime(),
@@ -935,7 +934,7 @@ public class WorkerTest {
         assertEquals(1L, (long) metricGroup.taskCounter("c2").metricValue(0L));
         assertEquals(0L, (long) 
metricGroup.taskCounter("fakeConnector").metricValue(0L));
 
-        connectUtilsMockedStatic.verify(() -> 
ConnectUtils.lookupKafkaClusterId(any()));
+        workerConfigMockedStatic.verify(() -> 
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
     }
 
     @Test
@@ -1250,7 +1249,7 @@ public class WorkerTest {
         Map<String, String> props = new HashMap<>(workerProps);
         props.put("admin.client.id", "testid");
         props.put("admin.metadata.max.age.ms", "5000");
-        props.put("producer.bootstrap.servers", "cbeauho.com");
+        props.put("producer.bootstrap.servers", "localhost:1234");
         props.put("consumer.bootstrap.servers", "localhost:4761");
         WorkerConfig configWithOverrides = new StandaloneConfig(props);
 
@@ -1919,7 +1918,6 @@ public class WorkerTest {
 
 
     private void verifyStorage() {
-        verify(offsetBackingStore).configure(any(WorkerConfig.class));
         verify(offsetBackingStore).start();
         verify(herder).statusBackingStore();
         verify(offsetBackingStore).stop();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
index 563d71dbed..fa910bb54c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.runtime.MockConnectMetrics;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
-import org.apache.kafka.connect.util.ConnectUtils;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -65,12 +64,11 @@ public class WorkerGroupMemberTest {
 
 
         LogContext logContext = new LogContext("[Worker clientId=client-1 + 
groupId= group-1]");
-
-        try (MockedStatic<ConnectUtils> utilities = 
mockStatic(ConnectUtils.class)) {
-            utilities.when(() -> 
ConnectUtils.lookupKafkaClusterId(any())).thenReturn("cluster-1");
+        try (MockedStatic<WorkerConfig> utilities = 
mockStatic(WorkerConfig.class)) {
+            utilities.when(() -> 
WorkerConfig.lookupKafkaClusterId(any())).thenReturn("cluster-1");
             member = new WorkerGroupMember(config, "", configBackingStore, 
null, Time.SYSTEM, "client-1", logContext);
-            utilities.verify(() -> ConnectUtils.lookupKafkaClusterId(any()));
-        }     
+            utilities.verify(() -> WorkerConfig.lookupKafkaClusterId(any()));
+        }
 
         boolean entered = false;
         for (MetricsReporter reporter : member.metrics().reporters()) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index f3b74f0666..557d677525 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -33,9 +33,9 @@ import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.runtime.RestartRequest;
 import org.apache.kafka.connect.runtime.TargetState;
+import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
-import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TestFuture;
@@ -81,7 +81,7 @@ import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({KafkaConfigBackingStore.class, ConnectUtils.class})
+@PrepareForTest({KafkaConfigBackingStore.class, WorkerConfig.class})
 @PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
 public class KafkaConfigBackingStoreTest {
     private static final String TOPIC = "connect-configs";
@@ -190,9 +190,9 @@ public class KafkaConfigBackingStoreTest {
 
     @Before
     public void setUp() {
-        PowerMock.mockStaticPartial(ConnectUtils.class, 
"lookupKafkaClusterId");
-        
EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
-        PowerMock.replay(ConnectUtils.class);
+        PowerMock.mockStaticPartial(WorkerConfig.class, 
"lookupKafkaClusterId");
+        
EasyMock.expect(WorkerConfig.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
+        PowerMock.replay(WorkerConfig.class);
 
         createStore(DEFAULT_DISTRIBUTED_CONFIG, storeLog);
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index cf11230f3d..d019c98398 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -25,9 +25,9 @@ import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
-import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
 import org.easymock.Capture;
@@ -66,7 +66,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({KafkaOffsetBackingStore.class, ConnectUtils.class})
+@PrepareForTest({KafkaOffsetBackingStore.class, WorkerConfig.class})
 @PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
 public class KafkaOffsetBackingStoreTest {
     private static final String TOPIC = "connect-offsets";
@@ -491,8 +491,8 @@ public class KafkaOffsetBackingStoreTest {
     }
 
     private void expectClusterId() {
-        PowerMock.mockStaticPartial(ConnectUtils.class, 
"lookupKafkaClusterId");
-        
EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
+        PowerMock.mockStaticPartial(WorkerConfig.class, 
"lookupKafkaClusterId");
+        
EasyMock.expect(WorkerConfig.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
     }
 
     private static ByteBuffer buffer(String v) {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
index d1330277e8..d68add14eb 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
@@ -17,59 +17,21 @@
 package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.MockAdminClient;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.junit.Test;
 
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
 
 public class ConnectUtilsTest {
 
-    @Test
-    public void testLookupKafkaClusterId() {
-        final Node broker1 = new Node(0, "dummyHost-1", 1234);
-        final Node broker2 = new Node(1, "dummyHost-2", 1234);
-        List<Node> cluster = Arrays.asList(broker1, broker2);
-        MockAdminClient adminClient = new MockAdminClient.Builder().
-            brokers(cluster).build();
-        assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, 
ConnectUtils.lookupKafkaClusterId(adminClient));
-    }
-
-    @Test
-    public void testLookupNullKafkaClusterId() {
-        final Node broker1 = new Node(0, "dummyHost-1", 1234);
-        final Node broker2 = new Node(1, "dummyHost-2", 1234);
-        List<Node> cluster = Arrays.asList(broker1, broker2);
-        MockAdminClient adminClient = new MockAdminClient.Builder().
-            brokers(cluster).clusterId(null).build();
-        assertNull(ConnectUtils.lookupKafkaClusterId(adminClient));
-    }
-
-    @Test
-    public void testLookupKafkaClusterIdTimeout() {
-        final Node broker1 = new Node(0, "dummyHost-1", 1234);
-        final Node broker2 = new Node(1, "dummyHost-2", 1234);
-        List<Node> cluster = Arrays.asList(broker1, broker2);
-        MockAdminClient adminClient = new MockAdminClient.Builder().
-            brokers(cluster).build();
-        adminClient.timeoutNextRequest(1);
-
-        assertThrows(ConnectException.class, () -> 
ConnectUtils.lookupKafkaClusterId(adminClient));
-    }
-
     @Test
     public void testAddMetricsContextPropertiesDistributed() {
         Map<String, String> props = new HashMap<>();

Reply via email to