This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 28430c31f9e MINOR: Replace ClientPropertiesBuilder with admin()
factory methods in KafkaClusterTestKit (#21146)
28430c31f9e is described below
commit 28430c31f9ea54eb7e232666d3893355ec7ff059
Author: majialong <[email protected]>
AuthorDate: Fri Dec 19 15:40:51 2025 +0800
MINOR: Replace ClientPropertiesBuilder with admin() factory methods in
KafkaClusterTestKit (#21146)
This PR simplifies `KafkaClusterTestKit` by removing
`ClientPropertiesBuilder` and adding direct `admin()` factory methods
that align with the `ClusterInstance` style.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/server/KRaftClusterTest.scala | 42 +++++--------
.../metadata/BrokerMetadataPublisherTest.scala | 6 +-
.../org/apache/kafka/server/KRaftClusterTest.java | 28 +++------
.../ReconfigurableQuorumIntegrationTest.java | 16 ++---
.../kafka/common/test/KafkaClusterTestKit.java | 70 ++++++++++++----------
.../tools/other/ReplicationQuotasTestRig.java | 2 +-
6 files changed, 74 insertions(+), 90 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 0b18c68aa16..40d00eb73b4 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -49,7 +49,7 @@ import java.nio.charset.StandardCharsets
import java.nio.file.{FileSystems, Files, Path, Paths}
import java.util
import java.util.concurrent.{ExecutionException, TimeUnit}
-import java.util.{Optional, OptionalLong, Properties}
+import java.util.{Optional, OptionalLong}
import scala.collection.{Seq, mutable}
import scala.jdk.CollectionConverters._
import scala.util.Using
@@ -97,7 +97,7 @@ class KRaftClusterTest {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
// Create the topic.
val assignments = new util.HashMap[Integer, util.List[Integer]]
@@ -265,7 +265,7 @@ class KRaftClusterTest {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
assertEquals(Seq(ApiError.NONE), incrementalAlter(admin, Seq(
(new ConfigResource(Type.BROKER, ""), Seq(
@@ -322,7 +322,7 @@ class KRaftClusterTest {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
Seq(log, log2).foreach(_.debug("setting log4j"))
@@ -378,7 +378,7 @@ class KRaftClusterTest {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
val createResults = admin.createTopics(util.List.of(
new NewTopic("foo", 1, 3.toShort),
@@ -400,13 +400,7 @@ class KRaftClusterTest {
}
def createAdminClient(cluster: KafkaClusterTestKit, bootstrapController:
Boolean): Admin = {
- var props: Properties = null
- props = if (bootstrapController)
-
cluster.newClientPropertiesBuilder().setUsingBootstrapControllers(true).build()
- else
- cluster.clientProperties()
- props.put(AdminClientConfig.CLIENT_ID_CONFIG, this.getClass.getName)
- Admin.create(props)
+ cluster.admin(util.Map.of(AdminClientConfig.CLIENT_ID_CONFIG,
this.getClass.getName), bootstrapController)
}
@Test
@@ -523,7 +517,7 @@ class KRaftClusterTest {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
admin.updateFeatures(
util.Map.of(MetadataVersion.FEATURE_NAME,
@@ -552,9 +546,7 @@ class KRaftClusterTest {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
- val admin = Admin.create(cluster.newClientPropertiesBuilder().
- setUsingBootstrapControllers(usingBootstrapControlers).
- build())
+ val admin = cluster.admin(util.Map.of(), usingBootstrapControlers)
try {
val featureMetadata = admin.describeFeatures().featureMetadata().get()
assertEquals(new SupportedVersionRange(0, 1),
@@ -590,7 +582,7 @@ class KRaftClusterTest {
TestUtils.waitUntilTrue(() =>
cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
"RaftManager was not initialized.")
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
// Create a test topic
val newTopic = util.List.of(new NewTopic("test-topic", 1, 1.toShort))
@@ -676,7 +668,7 @@ class KRaftClusterTest {
try {
cluster.format()
cluster.startup()
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
val newTopics = new util.ArrayList[NewTopic]()
for (i <- 0 to 10000) {
@@ -758,9 +750,7 @@ class KRaftClusterTest {
try {
cluster.format()
cluster.startup()
- val admin = Admin.create(cluster.newClientPropertiesBuilder().
- setUsingBootstrapControllers(true).
- build())
+ val admin = cluster.admin(util.Map.of(), true)
try {
val exception = assertThrows(classOf[ExecutionException],
() => admin.describeCluster().clusterId().get(1, TimeUnit.MINUTES))
@@ -814,7 +804,7 @@ class KRaftClusterTest {
try {
cluster.format()
cluster.startup()
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
val broker0 = cluster.brokers().get(0)
val broker1 = cluster.brokers().get(1)
@@ -869,7 +859,7 @@ class KRaftClusterTest {
try {
cluster.format()
cluster.startup()
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
val broker0 = cluster.brokers().get(0)
val broker1 = cluster.brokers().get(1)
@@ -934,7 +924,7 @@ class KRaftClusterTest {
try {
cluster.format()
cluster.startup()
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
val broker0 = cluster.brokers().get(0)
val broker1 = cluster.brokers().get(1)
@@ -1007,7 +997,7 @@ class KRaftClusterTest {
TestUtils.waitUntilTrue(() =>
cluster.raftManagers().get(0).client.leaderAndEpoch().leaderId.isPresent,
"RaftManager was not initialized.")
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
// Create a test topic
admin.createTopics(util.List.of(
@@ -1087,7 +1077,7 @@ class KRaftClusterTest {
cluster.format()
cluster.startup()
cluster.waitForReadyBrokers()
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
admin.incrementalAlterConfigs(
util.Map.of(new ConfigResource(Type.BROKER, ""),
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 90321acdced..dc3b987488f 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -27,7 +27,7 @@ import kafka.server.share.SharePartitionManager
import kafka.server.{BrokerServer, KafkaConfig, ReplicaManager}
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry,
NewTopic}
+import org.apache.kafka.clients.admin.{AlterConfigOp, ConfigEntry, NewTopic}
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.BROKER
@@ -124,7 +124,7 @@ class BrokerMetadataPublisherTest {
override def answer(invocation: InvocationOnMock): Unit =
numTimesReloadCalled.addAndGet(1)
})
broker.brokerMetadataPublisher.dynamicConfigPublisher = publisher
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
assertEquals(0, numTimesReloadCalled.get())
admin.incrementalAlterConfigs(singletonMap(
@@ -170,7 +170,7 @@ class BrokerMetadataPublisherTest {
broker.sharedServer.loader.removeAndClosePublisher(broker.brokerMetadataPublisher).get(1,
TimeUnit.MINUTES)
broker.metadataPublishers.remove(broker.brokerMetadataPublisher)
broker.sharedServer.loader.installPublishers(List(publisher).asJava).get(1,
TimeUnit.MINUTES)
- val admin = Admin.create(cluster.clientProperties())
+ val admin = cluster.admin()
try {
admin.createTopics(singletonList(new NewTopic("foo", 1,
1.toShort))).all().get()
} finally {
diff --git a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
index 3286d0f185c..475fb11cd87 100644
--- a/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
+++ b/server/src/test/java/org/apache/kafka/server/KRaftClusterTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.server;
-import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.AlterClientQuotasResult;
@@ -74,7 +73,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -142,7 +140,7 @@ public class KRaftClusterTest {
"Broker never made it to RUNNING state.");
TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
"RaftManager was not initialized.");
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (Admin admin = cluster.admin()) {
assertEquals(cluster.nodes().clusterId(),
admin.describeCluster().clusterId().get());
}
@@ -163,7 +161,7 @@ public class KRaftClusterTest {
"Broker never made it to RUNNING state.");
TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
"RaftManager was not initialized.");
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (Admin admin = cluster.admin()) {
assertEquals(cluster.nodes().clusterId(),
admin.describeCluster().clusterId().get());
}
@@ -224,7 +222,7 @@ public class KRaftClusterTest {
cluster.waitForReadyBrokers();
assertConfigValue(cluster, 0);
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (Admin admin = cluster.admin()) {
admin.incrementalAlterConfigs(
Map.of(new ConfigResource(Type.BROKER, ""),
List.of(new AlterConfigOp(
@@ -264,7 +262,7 @@ public class KRaftClusterTest {
assertFoobarValue(cluster, 0);
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (Admin admin = cluster.admin()) {
admin.incrementalAlterConfigs(
Map.of(new ConfigResource(Type.BROKER, ""),
List.of(new AlterConfigOp(
@@ -304,7 +302,7 @@ public class KRaftClusterTest {
"RaftManager was not initialized.");
String testTopic = "test-topic";
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (Admin admin = cluster.admin()) {
// Create a test topic
List<NewTopic> newTopic = List.of(new NewTopic(testTopic, 1,
(short) 3));
CreateTopicsResult createTopicResult =
admin.createTopics(newTopic);
@@ -336,7 +334,7 @@ public class KRaftClusterTest {
TestUtils.waitForCondition(() ->
cluster.raftManagers().get(0).client().leaderAndEpoch().leaderId().isPresent(),
"RaftManager was not initialized.");
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (Admin admin = cluster.admin()) {
// Create many topics
List<NewTopic> newTopics = List.of(
new NewTopic("test-topic-1", 2, (short) 3),
@@ -382,7 +380,7 @@ public class KRaftClusterTest {
TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
"Broker never made it to RUNNING state.");
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (Admin admin = cluster.admin()) {
ClientQuotaEntity entity = new
ClientQuotaEntity(Map.of("user", "testkit"));
ClientQuotaFilter filter = ClientQuotaFilter.containsOnly(
List.of(ClientQuotaFilterComponent.ofEntity("user",
"testkit")));
@@ -470,7 +468,7 @@ public class KRaftClusterTest {
TestUtils.waitForCondition(() ->
cluster.brokers().get(0).brokerState() == BrokerState.RUNNING,
"Broker never made it to RUNNING state.");
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (Admin admin = cluster.admin()) {
ClientQuotaEntity defaultUser = new
ClientQuotaEntity(Collections.singletonMap("user", null));
ClientQuotaEntity bobUser = new
ClientQuotaEntity(Map.of("user", "bob"));
@@ -643,15 +641,7 @@ public class KRaftClusterTest {
}
private Admin createAdminClient(KafkaClusterTestKit cluster, boolean
usingBootstrapControllers) {
- Properties props = new Properties();
- if (usingBootstrapControllers) {
- props.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG,
cluster.bootstrapControllers());
- props.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
- } else {
- props = cluster.clientProperties();
- }
- props.put(AdminClientConfig.CLIENT_ID_CONFIG,
this.getClass().getName());
- return Admin.create(props);
+ return cluster.admin(Map.of(AdminClientConfig.CLIENT_ID_CONFIG,
this.getClass().getName()), usingBootstrapControllers);
}
public static class BadAuthorizer implements Authorizer {
diff --git
a/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
index ad30708d7c5..c3a14875cfa 100644
---
a/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
+++
b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
@@ -76,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
).build()) {
cluster.format();
cluster.startup();
- try (var admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = cluster.admin()) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin,
KRaftVersion.KRAFT_VERSION_0.featureLevel());
});
@@ -94,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
).setStandalone(true).build()) {
cluster.format();
cluster.startup();
- try (var admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = cluster.admin()) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin,
KRaftVersion.KRAFT_VERSION_1.featureLevel());
});
@@ -132,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
- try (var admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = cluster.admin()) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -167,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
- try (var admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = cluster.admin()) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002, 3003),
voters.keySet());
@@ -206,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
- try (var admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = cluster.admin()) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -244,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
- try (var admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = cluster.admin()) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -275,7 +275,7 @@ public class ReconfigurableQuorumIntegrationTest {
try (var cluster = new
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
- try (var admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = cluster.admin()) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -327,7 +327,7 @@ public class ReconfigurableQuorumIntegrationTest {
try (var cluster = new
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
cluster.format();
cluster.startup();
- try (var admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = cluster.admin()) {
Uuid dirId =
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
var removeFuture = admin.removeRaftVoter(
3000,
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 471f4472e8d..4cf4e0770d0 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -26,8 +26,10 @@ import kafka.server.KafkaRaftServer;
import kafka.server.SharedServer;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
@@ -66,7 +68,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
-import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -560,45 +561,48 @@ public class KafkaClusterTestKit implements AutoCloseable
{
"Failed to wait for publisher to publish the metadata update to
each broker.");
}
- public class ClientPropertiesBuilder {
- private final Properties properties;
- private boolean usingBootstrapControllers = false;
-
- public ClientPropertiesBuilder() {
- this.properties = new Properties();
- }
-
- public ClientPropertiesBuilder(Properties properties) {
- this.properties = properties;
- }
-
- public ClientPropertiesBuilder setUsingBootstrapControllers(boolean
usingBootstrapControllers) {
- this.usingBootstrapControllers = usingBootstrapControllers;
- return this;
- }
+ public Admin admin() {
+ return admin(Map.of(), false);
+ }
- public Properties build() {
- if (usingBootstrapControllers) {
-
properties.setProperty(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG,
bootstrapControllers());
-
properties.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
- } else {
-
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers());
-
properties.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
- }
- return properties;
- }
+ public Admin admin(Map<String, Object> configs) {
+ return admin(configs, false);
}
- public ClientPropertiesBuilder newClientPropertiesBuilder(Properties
properties) {
- return new ClientPropertiesBuilder(properties);
+ public Admin admin(Map<String, Object> configs, boolean
usingBootstrapControllers) {
+ Map<String, Object> props = new HashMap<>(configs);
+ setBootstrapConfig(props, usingBootstrapControllers);
+ setClientSaslConfig(props, usingBootstrapControllers);
+ return Admin.create(props);
}
- public ClientPropertiesBuilder newClientPropertiesBuilder() {
- return new ClientPropertiesBuilder();
+ private void setBootstrapConfig(Map<String, Object> props, boolean
usingBootstrapControllers) {
+ if (usingBootstrapControllers) {
+ props.putIfAbsent(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG,
bootstrapControllers());
+ props.remove(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+ } else {
+ props.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers());
+ props.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
+ }
}
- public Properties clientProperties() {
- return new ClientPropertiesBuilder().build();
+ private void setClientSaslConfig(Map<String, Object> props, boolean
usingBootstrapControllers) {
+ SecurityProtocol protocol = usingBootstrapControllers ?
+ nodes.controllerListenerProtocol() :
nodes.brokerListenerProtocol();
+
+ props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
protocol.name);
+
+ if (protocol == SecurityProtocol.SASL_PLAINTEXT) {
+ props.putIfAbsent(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ props.putIfAbsent(
+ SaslConfigs.SASL_JAAS_CONFIG,
+ String.format(
+ "org.apache.kafka.common.security.plain.PlainLoginModule
required username=\"%s\" password=\"%s\";",
+ JaasUtils.KAFKA_PLAIN_ADMIN,
+ JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD
+ )
+ );
+ }
}
public String bootstrapServers() {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
index c2294be8ed5..8aa6dd8bc46 100644
---
a/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
+++
b/tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java
@@ -182,7 +182,7 @@ public class ReplicationQuotasTestRig {
throw new RuntimeException("Failed to start test Kafka
cluster", e);
}
- adminClient = Admin.create(cluster.clientProperties());
+ adminClient = cluster.admin();
}
public void tearDown() {