This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4bd3fd840d KAFKA-14160: Streamline clusterId retrieval in Connect
(#12536)
4bd3fd840d is described below
commit 4bd3fd840d0a6d61d20f17c4ed6b53b09ec21cf1
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Aug 23 17:09:22 2022 +0200
KAFKA-14160: Streamline clusterId retrieval in Connect (#12536)
Cache the Kafka cluster Id once it has been retrieved to avoid creating
many Admin clients at startup.
Reviewers: Chris Egerton <[email protected]>
---
.../apache/kafka/connect/mirror/MirrorMaker.java | 2 +-
.../kafka/connect/cli/ConnectDistributed.java | 2 +-
.../kafka/connect/cli/ConnectStandalone.java | 9 ++-
.../org/apache/kafka/connect/runtime/Worker.java | 3 +-
.../apache/kafka/connect/runtime/WorkerConfig.java | 46 ++++++++++++++-
.../runtime/distributed/WorkerGroupMember.java | 17 ++----
.../connect/storage/KafkaConfigBackingStore.java | 4 +-
.../connect/storage/KafkaOffsetBackingStore.java | 2 +-
.../connect/storage/KafkaStatusBackingStore.java | 2 +-
.../apache/kafka/connect/util/ConnectUtils.java | 30 ----------
.../kafka/connect/runtime/WorkerConfigTest.java | 68 ++++++++++++++++++++++
.../apache/kafka/connect/runtime/WorkerTest.java | 36 ++++++------
.../runtime/distributed/WorkerGroupMemberTest.java | 10 ++--
.../storage/KafkaConfigBackingStoreTest.java | 10 ++--
.../storage/KafkaOffsetBackingStoreTest.java | 8 +--
.../kafka/connect/util/ConnectUtilsTest.java | 38 ------------
16 files changed, 160 insertions(+), 127 deletions(-)
diff --git
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
index 7ac68319bc..cdc1d47fcf 100644
---
a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
+++
b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java
@@ -233,7 +233,7 @@ public class MirrorMaker {
Plugins plugins = new Plugins(workerProps);
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig distributedConfig = new
DistributedConfig(workerProps);
- String kafkaClusterId =
ConnectUtils.lookupKafkaClusterId(distributedConfig);
+ String kafkaClusterId = distributedConfig.kafkaClusterId();
// Create the admin client to be shared by all backing stores for this
herder
Map<String, Object> adminProps = new
HashMap<>(distributedConfig.originals());
ConnectUtils.addMetricsContextProperties(adminProps,
distributedConfig, kafkaClusterId);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 8d93e79591..469f73add7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -94,7 +94,7 @@ public class ConnectDistributed {
plugins.compareAndSwapWithDelegatingLoader();
DistributedConfig config = new DistributedConfig(workerProps);
- String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+ String kafkaClusterId = config.kafkaClusterId();
log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 19cc115d9d..815be29fbe 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -32,7 +32,7 @@ import
org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
-import org.apache.kafka.connect.util.ConnectUtils;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +79,7 @@ public class ConnectStandalone {
plugins.compareAndSwapWithDelegatingLoader();
StandaloneConfig config = new StandaloneConfig(workerProps);
- String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+ String kafkaClusterId = config.kafkaClusterId();
log.debug("Kafka cluster ID: {}", kafkaClusterId);
RestServer rest = new RestServer(config);
@@ -88,10 +88,13 @@ public class ConnectStandalone {
URI advertisedUrl = rest.advertisedUrl();
String workerId = advertisedUrl.getHost() + ":" +
advertisedUrl.getPort();
+ OffsetBackingStore offsetBackingStore = new
FileOffsetBackingStore();
+ offsetBackingStore.configure(config);
+
ConnectorClientConfigOverridePolicy
connectorClientConfigOverridePolicy = plugins.newPlugin(
config.getString(WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG),
config, ConnectorClientConfigOverridePolicy.class);
- Worker worker = new Worker(workerId, time, plugins, config, new
FileOffsetBackingStore(),
+ Worker worker = new Worker(workerId, time, plugins, config,
offsetBackingStore,
connectorClientConfigOverridePolicy);
Herder herder = new StandaloneHerder(worker, kafkaClusterId,
connectorClientConfigOverridePolicy);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 5bc67693d0..55e445ca8e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -153,7 +153,7 @@ public class Worker {
ExecutorService executorService,
ConnectorClientConfigOverridePolicy
connectorClientConfigOverridePolicy
) {
- this.kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+ this.kafkaClusterId = config.kafkaClusterId();
this.metrics = new ConnectMetrics(workerId, config, time,
kafkaClusterId);
this.executor = executorService;
this.workerId = workerId;
@@ -168,7 +168,6 @@ public class Worker {
this.internalValueConverter = plugins.newInternalConverter(false,
JsonConverter.class.getName(), internalConverterConfig);
this.globalOffsetBackingStore = globalOffsetBackingStore;
- this.globalOffsetBackingStore.configure(config);
this.workerConfigTransformer = initConfigTransformer();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 38dbeb87e1..d8cec7173e 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -18,6 +18,8 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -27,6 +29,7 @@ import org.apache.kafka.common.config.SslClientAuth;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.storage.SimpleHeaderConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,6 +41,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import org.eclipse.jetty.util.StringUtil;
@@ -188,7 +192,7 @@ public class WorkerConfig extends AbstractConfig {
public static final String CONNECTOR_CLIENT_POLICY_CLASS_CONFIG =
"connector.client.config.override.policy";
public static final String CONNECTOR_CLIENT_POLICY_CLASS_DOC =
"Class name or alias of implementation of
<code>ConnectorClientConfigOverridePolicy</code>. Defines what client
configurations can be "
- + "overriden by the connector. The default implementation is `All`,
meaning connector configurations can override all client properties. "
+ + "overridden by the connector. The default implementation is `All`,
meaning connector configurations can override all client properties. "
+ "The other possible policies in the framework include `None` to
disallow connectors from overriding client properties, "
+ "and `Principal` to allow connectors to override only client
principals.";
public static final String CONNECTOR_CLIENT_POLICY_CLASS_DEFAULT = "All";
@@ -306,6 +310,35 @@ public class WorkerConfig extends AbstractConfig {
.withClientSslSupport();
}
+ private String kafkaClusterId;
+
+ public static String lookupKafkaClusterId(WorkerConfig config) {
+ log.info("Creating Kafka admin client");
+ try (Admin adminClient = Admin.create(config.originals())) {
+ return lookupKafkaClusterId(adminClient);
+ }
+ }
+
+ static String lookupKafkaClusterId(Admin adminClient) {
+ log.debug("Looking up Kafka cluster ID");
+ try {
+ KafkaFuture<String> clusterIdFuture =
adminClient.describeCluster().clusterId();
+ if (clusterIdFuture == null) {
+ log.info("Kafka cluster version is too old to return cluster
ID");
+ return null;
+ }
+ log.debug("Fetching Kafka cluster ID");
+ String kafkaClusterId = clusterIdFuture.get();
+ log.info("Kafka cluster ID: {}", kafkaClusterId);
+ return kafkaClusterId;
+ } catch (InterruptedException e) {
+ throw new ConnectException("Unexpectedly interrupted when looking
up Kafka cluster info", e);
+ } catch (ExecutionException e) {
+ throw new ConnectException("Failed to connect to and describe
Kafka cluster. "
+ + "Check worker's broker connection and
security properties.", e);
+ }
+ }
+
private void logInternalConverterRemovalWarnings(Map<String, String>
props) {
List<String> removedProperties = new ArrayList<>();
for (String property : Arrays.asList("internal.key.converter",
"internal.value.converter")) {
@@ -321,7 +354,7 @@ public class WorkerConfig extends AbstractConfig {
+ "and specifying them will have no effect. "
+ "Instead, an instance of the JsonConverter with
schemas.enable "
+ "set to false will be used. For more
information, please visit "
- + "http://kafka.apache.org/documentation/#upgrade
and consult the upgrade notes"
+ + "https://kafka.apache.org/documentation/#upgrade
and consult the upgrade notes"
+ "for the 3.0 release.",
removedProperties);
}
@@ -409,6 +442,13 @@ public class WorkerConfig extends AbstractConfig {
return null;
}
+ public String kafkaClusterId() {
+ if (kafkaClusterId == null) {
+ kafkaClusterId = lookupKafkaClusterId(this);
+ }
+ return kafkaClusterId;
+ }
+
@Override
protected Map<String, Object> postProcessParsedConfig(final Map<String,
Object> parsedValues) {
return CommonClientConfigs.postProcessReconnectBackoffConfigs(this,
parsedValues);
@@ -434,7 +474,7 @@ public class WorkerConfig extends AbstractConfig {
String[] configTokens = config.trim().split("\\s+", 2);
if (configTokens.length != 2) {
throw new ConfigException(String.format("Invalid format of
header config '%s'. "
- + "Expected: '[ation] [header name]:[header value]'",
config));
+ + "Expected: '[action] [header name]:[header value]'",
config));
}
// validate action
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 4c1d6a5b9e..d7ad3f4eb3 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -39,7 +39,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.ConfigBackingStore;
-import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
@@ -62,12 +61,9 @@ public class WorkerGroupMember {
private static final String JMX_PREFIX = "kafka.connect";
private final Logger log;
- private final Time time;
private final String clientId;
private final ConsumerNetworkClient client;
private final Metrics metrics;
- private final Metadata metadata;
- private final long retryBackoffMs;
private final WorkerCoordinator coordinator;
private boolean stopped = false;
@@ -80,7 +76,6 @@ public class WorkerGroupMember {
String clientId,
LogContext logContext) {
try {
- this.time = time;
this.clientId = clientId;
this.log = logContext.logger(WorkerGroupMember.class);
@@ -98,23 +93,23 @@ public class WorkerGroupMember {
Map<String, Object> contextLabels = new HashMap<>();
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
- contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID,
ConnectUtils.lookupKafkaClusterId(config));
+ contextLabels.put(WorkerConfig.CONNECT_KAFKA_CLUSTER_ID,
config.kafkaClusterId());
contextLabels.put(WorkerConfig.CONNECT_GROUP_ID,
config.getString(DistributedConfig.GROUP_ID_CONFIG));
MetricsContext metricsContext = new
KafkaMetricsContext(JMX_PREFIX, contextLabels);
this.metrics = new Metrics(metricConfig, reporters, time,
metricsContext);
- this.retryBackoffMs =
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
- this.metadata = new Metadata(retryBackoffMs,
config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
+ long retryBackoffMs =
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
+ Metadata metadata = new Metadata(retryBackoffMs,
config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG),
logContext, new ClusterResourceListeners());
List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(
config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
- this.metadata.bootstrap(addresses);
+ metadata.bootstrap(addresses);
String metricGrpPrefix = "connect";
ChannelBuilder channelBuilder =
ClientUtils.createChannelBuilder(config, time, logContext);
NetworkClient netClient = new NetworkClient(
new
Selector(config.getLong(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG),
metrics, time, metricGrpPrefix, channelBuilder, logContext),
- this.metadata,
+ metadata,
clientId,
100, // a fixed large enough value will suffice
config.getLong(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG),
@@ -142,7 +137,7 @@ public class WorkerGroupMember {
this.client,
metrics,
metricGrpPrefix,
- this.time,
+ time,
restUrl,
configStorage,
listener,
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 6edd0e5a76..edbab8ec04 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -378,7 +378,7 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
private Map<String, Object> baseProducerProps(WorkerConfig workerConfig) {
Map<String, Object> producerProps = new
HashMap<>(workerConfig.originals());
- String kafkaClusterId =
ConnectUtils.lookupKafkaClusterId(workerConfig);
+ String kafkaClusterId = workerConfig.kafkaClusterId();
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
Integer.MAX_VALUE);
@@ -665,7 +665,7 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic,
final WorkerConfig config) {
Map<String, Object> producerProps = new HashMap<>(baseProducerProps);
- String clusterId = ConnectUtils.lookupKafkaClusterId(config);
+ String clusterId = config.kafkaClusterId();
Map<String, Object> originals = config.originals();
Map<String, Object> consumerProps = new HashMap<>(originals);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 6693572002..28fd37709d 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -172,7 +172,7 @@ public class KafkaOffsetBackingStore implements
OffsetBackingStore {
this.exactlyOnce = config.exactlyOnceSourceEnabled();
- String clusterId = ConnectUtils.lookupKafkaClusterId(config);
+ String clusterId = config.kafkaClusterId();
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index 3ba6996da8..ebf4939bfd 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -164,7 +164,7 @@ public class KafkaStatusBackingStore implements
StatusBackingStore {
if (this.statusTopic == null || this.statusTopic.trim().length() == 0)
throw new ConfigException("Must specify topic for connector
status.");
- String clusterId = ConnectUtils.lookupKafkaClusterId(config);
+ String clusterId = config.kafkaClusterId();
Map<String, Object> originals = config.originals();
Map<String, Object> producerProps = new HashMap<>(originals);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index 0af14cc7f3..a043d0f470 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -17,8 +17,6 @@
package org.apache.kafka.connect.util;
import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.connect.connector.Connector;
@@ -35,7 +33,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
@@ -52,33 +49,6 @@ public final class ConnectUtils {
throw new InvalidRecordException(String.format("Invalid record
timestamp %d", timestamp));
}
- public static String lookupKafkaClusterId(WorkerConfig config) {
- log.info("Creating Kafka admin client");
- try (Admin adminClient = Admin.create(config.originals())) {
- return lookupKafkaClusterId(adminClient);
- }
- }
-
- static String lookupKafkaClusterId(Admin adminClient) {
- log.debug("Looking up Kafka cluster ID");
- try {
- KafkaFuture<String> clusterIdFuture =
adminClient.describeCluster().clusterId();
- if (clusterIdFuture == null) {
- log.info("Kafka cluster version is too old to return cluster
ID");
- return null;
- }
- log.debug("Fetching Kafka cluster ID");
- String kafkaClusterId = clusterIdFuture.get();
- log.info("Kafka cluster ID: {}", kafkaClusterId);
- return kafkaClusterId;
- } catch (InterruptedException e) {
- throw new ConnectException("Unexpectedly interrupted when looking
up Kafka cluster info", e);
- } catch (ExecutionException e) {
- throw new ConnectException("Failed to connect to and describe
Kafka cluster. "
- + "Check worker's broker connection and
security properties.", e);
- }
- }
-
/**
* Ensure that the {@link Map properties} contain an expected value for
the given key, inserting the
* expected value into the properties if necessary.
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
index f02892888e..ec9a843d4d 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTest.java
@@ -17,9 +17,16 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.MockedStatic;
+import org.mockito.internal.stubbing.answers.CallsRealMethods;
import java.util.Arrays;
import java.util.HashMap;
@@ -31,6 +38,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.times;
public class WorkerConfigTest {
private static final List<String> VALID_HEADER_CONFIGS = Arrays.asList(
@@ -58,6 +68,20 @@ public class WorkerConfigTest {
"set X-Frame-Options:DENY, add :no-cache, no-store,
must-revalidate "
);
+ private static final String CLUSTER_ID = "cluster-id";
+ private MockedStatic<WorkerConfig> workerConfigMockedStatic;
+
+ @Before
+ public void setup() {
+ workerConfigMockedStatic = mockStatic(WorkerConfig.class, new
CallsRealMethods());
+ workerConfigMockedStatic.when(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
+ }
+
+ @After
+ public void teardown() {
+ workerConfigMockedStatic.close();
+ }
+
@Test
public void testListenersConfigAllowedValues() {
Map<String, String> props = baseProps();
@@ -157,6 +181,50 @@ public class WorkerConfigTest {
assertTrue(ce.getMessage().contains(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG));
}
+ @Test
+ public void testLookupKafkaClusterId() {
+ final Node broker1 = new Node(0, "dummyHost-1", 1234);
+ final Node broker2 = new Node(1, "dummyHost-2", 1234);
+ List<Node> cluster = Arrays.asList(broker1, broker2);
+ MockAdminClient adminClient = new MockAdminClient.Builder().
+ brokers(cluster).build();
+ assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID,
WorkerConfig.lookupKafkaClusterId(adminClient));
+ }
+
+ @Test
+ public void testLookupNullKafkaClusterId() {
+ final Node broker1 = new Node(0, "dummyHost-1", 1234);
+ final Node broker2 = new Node(1, "dummyHost-2", 1234);
+ List<Node> cluster = Arrays.asList(broker1, broker2);
+ MockAdminClient adminClient = new MockAdminClient.Builder().
+ brokers(cluster).clusterId(null).build();
+ assertNull(WorkerConfig.lookupKafkaClusterId(adminClient));
+ }
+
+ @Test
+ public void testLookupKafkaClusterIdTimeout() {
+ final Node broker1 = new Node(0, "dummyHost-1", 1234);
+ final Node broker2 = new Node(1, "dummyHost-2", 1234);
+ List<Node> cluster = Arrays.asList(broker1, broker2);
+ MockAdminClient adminClient = new MockAdminClient.Builder().
+ brokers(cluster).build();
+ adminClient.timeoutNextRequest(1);
+
+ assertThrows(ConnectException.class, () ->
WorkerConfig.lookupKafkaClusterId(adminClient));
+ }
+
+ @Test
+ public void testKafkaClusterId() {
+ Map<String, String> props = baseProps();
+ WorkerConfig config = new WorkerConfig(WorkerConfig.baseConfigDef(),
props);
+ assertEquals(CLUSTER_ID, config.kafkaClusterId());
+ workerConfigMockedStatic.verify(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)), times(1));
+
+ // next calls hit the cache
+ assertEquals(CLUSTER_ID, config.kafkaClusterId());
+ workerConfigMockedStatic.verify(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)), times(1));
+ }
+
private void assertInvalidHeaderConfig(String config) {
assertThrows(ConfigException.class, () ->
WorkerConfig.validateHttpResponseHeaderConfig(config));
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index ef9398ace3..9eb622cafb 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -59,7 +59,6 @@ import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
-import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.apache.kafka.connect.util.ParameterizedTest;
@@ -201,7 +200,7 @@ public class WorkerTest {
private final boolean enableTopicCreation;
private MockedStatic<Plugins> pluginsMockedStatic;
- private MockedStatic<ConnectUtils> connectUtilsMockedStatic;
+ private MockedStatic<WorkerConfig> workerConfigMockedStatic;
private MockedConstruction<WorkerSourceTask> sourceTaskMockedConstruction;
private MockitoSession mockitoSession;
@@ -263,8 +262,8 @@ public class WorkerTest {
pluginsMockedStatic = mockStatic(Plugins.class);
// pass through things that aren't explicitly mocked out
- connectUtilsMockedStatic = mockStatic(ConnectUtils.class, new
CallsRealMethods());
- connectUtilsMockedStatic.when(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
+ workerConfigMockedStatic = mockStatic(WorkerConfig.class, new
CallsRealMethods());
+ workerConfigMockedStatic.when(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
// Make calls to new WorkerSourceTask() return a mock to avoid the
source task trying to connect to a broker.
sourceTaskMockedConstruction =
mockConstructionWithAnswer(WorkerSourceTask.class, invocation -> {
@@ -289,7 +288,7 @@ public class WorkerTest {
// Ideal would be to use try-with-resources in an individual test, but
it introduced a rather large level of
// indentation of most test bodies, hence sticking with setup() /
teardown()
pluginsMockedStatic.close();
- connectUtilsMockedStatic.close();
+ workerConfigMockedStatic.close();
sourceTaskMockedConstruction.close();
mockitoSession.finishMocking();
@@ -309,7 +308,7 @@ public class WorkerTest {
pluginsMockedStatic.when(() ->
Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
pluginsMockedStatic.when(() ->
Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
- connectUtilsMockedStatic.when(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
+ workerConfigMockedStatic.when(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)))
.thenReturn(CLUSTER_ID);
worker = new Worker(WORKER_ID, new MockTime(), plugins, config,
offsetBackingStore, noneConnectorClientConfigOverridePolicy);
@@ -359,7 +358,7 @@ public class WorkerTest {
pluginsMockedStatic.verify(() ->
Plugins.compareAndSwapLoaders(pluginLoader), times(2));
pluginsMockedStatic.verify(() ->
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
- connectUtilsMockedStatic.verify(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+ workerConfigMockedStatic.verify(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
verify(sourceConnector).stop();
verify(connectorStatusListener).onShutdown(CONNECTOR_ID);
@@ -387,7 +386,7 @@ public class WorkerTest {
when(plugins.delegatingLoader()).thenReturn(delegatingLoader);
when(delegatingLoader.connectorLoader(nonConnectorClass)).thenReturn(delegatingLoader);
pluginsMockedStatic.when(() ->
Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(delegatingLoader);
- connectUtilsMockedStatic.when(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
+ workerConfigMockedStatic.when(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)))
.thenReturn("test-cluster");
when(plugins.newConnector(anyString())).thenThrow(exception);
@@ -422,7 +421,7 @@ public class WorkerTest {
verify(plugins).newConnector(anyString());
verify(connectorStatusListener).onFailure(eq(CONNECTOR_ID),
any(ConnectException.class));
pluginsMockedStatic.verify(() ->
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
- connectUtilsMockedStatic.verify(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+ workerConfigMockedStatic.verify(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@@ -437,7 +436,7 @@ public class WorkerTest {
pluginsMockedStatic.when(() ->
Plugins.compareAndSwapLoaders(pluginLoader)).thenReturn(delegatingLoader);
pluginsMockedStatic.when(() ->
Plugins.compareAndSwapLoaders(delegatingLoader)).thenReturn(pluginLoader);
- connectUtilsMockedStatic.when(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)))
+ workerConfigMockedStatic.when(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)))
.thenReturn("test-cluster");
connectorProps.put(CONNECTOR_CLASS_CONFIG, connectorAlias);
@@ -479,7 +478,7 @@ public class WorkerTest {
pluginsMockedStatic.verify(() ->
Plugins.compareAndSwapLoaders(pluginLoader), times(2));
pluginsMockedStatic.verify(() ->
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
- connectUtilsMockedStatic.verify(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+ workerConfigMockedStatic.verify(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@@ -529,7 +528,7 @@ public class WorkerTest {
verify(ctx).close();
pluginsMockedStatic.verify(() ->
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
- connectUtilsMockedStatic.verify(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+ workerConfigMockedStatic.verify(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@@ -676,7 +675,7 @@ public class WorkerTest {
pluginsMockedStatic.verify(() ->
Plugins.compareAndSwapLoaders(pluginLoader), times(2));
pluginsMockedStatic.verify(() ->
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
- connectUtilsMockedStatic.verify(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+ workerConfigMockedStatic.verify(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@@ -734,7 +733,7 @@ public class WorkerTest {
pluginsMockedStatic.verify(() ->
Plugins.compareAndSwapLoaders(pluginLoader), times(2));
pluginsMockedStatic.verify(() ->
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
- connectUtilsMockedStatic.verify(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+ workerConfigMockedStatic.verify(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@@ -806,7 +805,7 @@ public class WorkerTest {
pluginsMockedStatic.verify(() ->
Plugins.compareAndSwapLoaders(pluginLoader), times(2));
pluginsMockedStatic.verify(() ->
Plugins.compareAndSwapLoaders(delegatingLoader), times(2));
- connectUtilsMockedStatic.verify(() ->
ConnectUtils.lookupKafkaClusterId(any(WorkerConfig.class)));
+ workerConfigMockedStatic.verify(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@@ -918,7 +917,7 @@ public class WorkerTest {
mockInternalConverters();
mockFileConfigProvider();
- connectUtilsMockedStatic.when(() ->
ConnectUtils.lookupKafkaClusterId(any())).thenReturn(CLUSTER_ID);
+ workerConfigMockedStatic.when(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class))).thenReturn(CLUSTER_ID);
worker = new Worker(WORKER_ID,
new MockTime(),
@@ -935,7 +934,7 @@ public class WorkerTest {
assertEquals(1L, (long) metricGroup.taskCounter("c2").metricValue(0L));
assertEquals(0L, (long)
metricGroup.taskCounter("fakeConnector").metricValue(0L));
- connectUtilsMockedStatic.verify(() ->
ConnectUtils.lookupKafkaClusterId(any()));
+ workerConfigMockedStatic.verify(() ->
WorkerConfig.lookupKafkaClusterId(any(WorkerConfig.class)));
}
@Test
@@ -1250,7 +1249,7 @@ public class WorkerTest {
Map<String, String> props = new HashMap<>(workerProps);
props.put("admin.client.id", "testid");
props.put("admin.metadata.max.age.ms", "5000");
- props.put("producer.bootstrap.servers", "cbeauho.com");
+ props.put("producer.bootstrap.servers", "localhost:1234");
props.put("consumer.bootstrap.servers", "localhost:4761");
WorkerConfig configWithOverrides = new StandaloneConfig(props);
@@ -1919,7 +1918,6 @@ public class WorkerTest {
private void verifyStorage() {
- verify(offsetBackingStore).configure(any(WorkerConfig.class));
verify(offsetBackingStore).start();
verify(herder).statusBackingStore();
verify(offsetBackingStore).stop();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
index 563d71dbed..fa910bb54c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMemberTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.MockConnectMetrics;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.ConfigBackingStore;
-import org.apache.kafka.connect.util.ConnectUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -65,12 +64,11 @@ public class WorkerGroupMemberTest {
LogContext logContext = new LogContext("[Worker clientId=client-1 +
groupId= group-1]");
-
- try (MockedStatic<ConnectUtils> utilities =
mockStatic(ConnectUtils.class)) {
- utilities.when(() ->
ConnectUtils.lookupKafkaClusterId(any())).thenReturn("cluster-1");
+ try (MockedStatic<WorkerConfig> utilities =
mockStatic(WorkerConfig.class)) {
+ utilities.when(() ->
WorkerConfig.lookupKafkaClusterId(any())).thenReturn("cluster-1");
member = new WorkerGroupMember(config, "", configBackingStore,
null, Time.SYSTEM, "client-1", logContext);
- utilities.verify(() -> ConnectUtils.lookupKafkaClusterId(any()));
- }
+ utilities.verify(() -> WorkerConfig.lookupKafkaClusterId(any()));
+ }
boolean entered = false;
for (MetricsReporter reporter : member.metrics().reporters()) {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index f3b74f0666..557d677525 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -33,9 +33,9 @@ import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.TargetState;
+import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
-import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TestFuture;
@@ -81,7 +81,7 @@ import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({KafkaConfigBackingStore.class, ConnectUtils.class})
+@PrepareForTest({KafkaConfigBackingStore.class, WorkerConfig.class})
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
public class KafkaConfigBackingStoreTest {
private static final String TOPIC = "connect-configs";
@@ -190,9 +190,9 @@ public class KafkaConfigBackingStoreTest {
@Before
public void setUp() {
- PowerMock.mockStaticPartial(ConnectUtils.class,
"lookupKafkaClusterId");
-
EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
- PowerMock.replay(ConnectUtils.class);
+ PowerMock.mockStaticPartial(WorkerConfig.class,
"lookupKafkaClusterId");
+
EasyMock.expect(WorkerConfig.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
+ PowerMock.replay(WorkerConfig.class);
createStore(DEFAULT_DISTRIBUTED_CONFIG, storeLog);
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index cf11230f3d..d019c98398 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -25,9 +25,9 @@ import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.util.Callback;
-import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.connect.util.TopicAdmin;
import org.easymock.Capture;
@@ -66,7 +66,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
-@PrepareForTest({KafkaOffsetBackingStore.class, ConnectUtils.class})
+@PrepareForTest({KafkaOffsetBackingStore.class, WorkerConfig.class})
@PowerMockIgnore({"javax.management.*", "javax.crypto.*"})
public class KafkaOffsetBackingStoreTest {
private static final String TOPIC = "connect-offsets";
@@ -491,8 +491,8 @@ public class KafkaOffsetBackingStoreTest {
}
private void expectClusterId() {
- PowerMock.mockStaticPartial(ConnectUtils.class,
"lookupKafkaClusterId");
-
EasyMock.expect(ConnectUtils.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
+ PowerMock.mockStaticPartial(WorkerConfig.class,
"lookupKafkaClusterId");
+
EasyMock.expect(WorkerConfig.lookupKafkaClusterId(EasyMock.anyObject())).andReturn("test-cluster").anyTimes();
}
private static ByteBuffer buffer(String v) {
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
index d1330277e8..d68add14eb 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConnectUtilsTest.java
@@ -17,59 +17,21 @@
package org.apache.kafka.connect.util;
import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.admin.MockAdminClient;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.junit.Test;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
public class ConnectUtilsTest {
- @Test
- public void testLookupKafkaClusterId() {
- final Node broker1 = new Node(0, "dummyHost-1", 1234);
- final Node broker2 = new Node(1, "dummyHost-2", 1234);
- List<Node> cluster = Arrays.asList(broker1, broker2);
- MockAdminClient adminClient = new MockAdminClient.Builder().
- brokers(cluster).build();
- assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID,
ConnectUtils.lookupKafkaClusterId(adminClient));
- }
-
- @Test
- public void testLookupNullKafkaClusterId() {
- final Node broker1 = new Node(0, "dummyHost-1", 1234);
- final Node broker2 = new Node(1, "dummyHost-2", 1234);
- List<Node> cluster = Arrays.asList(broker1, broker2);
- MockAdminClient adminClient = new MockAdminClient.Builder().
- brokers(cluster).clusterId(null).build();
- assertNull(ConnectUtils.lookupKafkaClusterId(adminClient));
- }
-
- @Test
- public void testLookupKafkaClusterIdTimeout() {
- final Node broker1 = new Node(0, "dummyHost-1", 1234);
- final Node broker2 = new Node(1, "dummyHost-2", 1234);
- List<Node> cluster = Arrays.asList(broker1, broker2);
- MockAdminClient adminClient = new MockAdminClient.Builder().
- brokers(cluster).build();
- adminClient.timeoutNextRequest(1);
-
- assertThrows(ConnectException.class, () ->
ConnectUtils.lookupKafkaClusterId(adminClient));
- }
-
@Test
public void testAddMetricsContextPropertiesDistributed() {
Map<String, String> props = new HashMap<>();