[ https://issues.apache.org/jira/browse/KAFKA-6363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316912#comment-16316912 ]
ASF GitHub Bot commented on KAFKA-6363: --------------------------------------- guozhangwang closed pull request #4371: KAFKA-6363: Use MockAdminClient for any unit tests that depend on Adm… URL: https://github.com/apache/kafka/pull/4371 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java similarity index 91% rename from clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java rename to clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java index cca35ac22c9..10281fb6ffa 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java @@ -35,21 +35,21 @@ * <p> * When finished, be sure to {@link #close() close} the environment object. */ -public class MockKafkaAdminClientEnv implements AutoCloseable { +public class AdminClientUnitTestEnv implements AutoCloseable { private final Time time; private final Cluster cluster; private final MockClient mockClient; private final KafkaAdminClient adminClient; - public MockKafkaAdminClientEnv(Cluster cluster, String...vals) { + public AdminClientUnitTestEnv(Cluster cluster, String...vals) { this(Time.SYSTEM, cluster, vals); } - public MockKafkaAdminClientEnv(Time time, Cluster cluster, String...vals) { + public AdminClientUnitTestEnv(Time time, Cluster cluster, String...vals) { this(time, cluster, newStrMap(vals)); } - public MockKafkaAdminClientEnv(Time time, Cluster cluster, Map<String, Object> config) { + public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, Object> config) { this.time = time; this.cluster = cluster; AdminClientConfig adminClientConfig = new AdminClientConfig(config); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index c0fe73c36ed..84588a9f3be 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -38,10 +38,10 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateAclsResponse; import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; +import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; @@ -75,8 +75,8 @@ import java.util.concurrent.Future; import static java.util.Arrays.asList; -import static org.apache.kafka.common.requests.ResourceType.TOPIC; import static org.apache.kafka.common.requests.ResourceType.BROKER; +import static org.apache.kafka.common.requests.ResourceType.TOPIC; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -155,7 +155,7 @@ public void testGenerateClientId() { KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG, "myCustomId"))); } - private static MockKafkaAdminClientEnv mockClientEnv(String... configVals) { + private static AdminClientUnitTestEnv mockClientEnv(String... configVals) { HashMap<Integer, Node> nodes = new HashMap<>(); nodes.put(0, new Node(0, "localhost", 8121)); nodes.put(1, new Node(1, "localhost", 8122)); @@ -163,12 +163,12 @@ private static MockKafkaAdminClientEnv mockClientEnv(String... configVals) { Cluster cluster = new Cluster("mockClusterId", nodes.values(), Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(), Collections.<String>emptySet(), nodes.get(0)); - return new MockKafkaAdminClientEnv(cluster, configVals); + return new AdminClientUnitTestEnv(cluster, configVals); } @Test public void testCloseAdminClient() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { } } @@ -190,7 +190,7 @@ private static void assertFutureError(Future<?> future, Class<? extends Throwabl */ @Test public void testTimeoutWithoutMetadata() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) { + try (AdminClientUnitTestEnv env = mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().setNode(new Node(0, "localhost", 8121)); env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, "")))); @@ -203,7 +203,7 @@ public void testTimeoutWithoutMetadata() throws Exception { @Test public void testCreateTopics() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().controller()); @@ -226,7 +226,7 @@ public void testCreateTopics() throws Exception { @Test public void testDescribeAcls() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().controller()); @@ -250,7 +250,7 @@ public void testDescribeAcls() throws Exception { @Test public void testCreateAcls() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().controller()); @@ -279,7 +279,7 @@ public void testCreateAcls() throws Exception { @Test public void testDeleteAcls() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().controller()); @@ -330,7 +330,7 @@ public void testHandleTimeout() throws Exception { Cluster cluster = new Cluster("mockClusterId", nodes.values(), Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(), Collections.<String>emptySet(), nodes.get(0)); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(time, cluster, + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster, AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1", AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "1")) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); @@ -374,7 +374,7 @@ public boolean conditionMet() { @Test public void testDescribeConfigs() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().controller()); @@ -390,7 +390,7 @@ public void testDescribeConfigs() throws Exception { @Test public void testCreatePartitions() throws Exception { - try (MockKafkaAdminClientEnv env = mockClientEnv()) { + try (AdminClientUnitTestEnv env = mockClientEnv()) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().controller()); @@ -443,7 +443,7 @@ public void testDeleteRecords() throws Exception { TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3); TopicPartition myTopicPartition4 = new TopicPartition("my_topic", 4); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); env.kafkaClient().setNode(env.cluster().nodes().get(0)); diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index b6a5888b5d2..c950163dc13 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -232,8 +232,32 @@ public DescribeTopicsResult describeTopics(Collection<String> topicNames, Descri } @Override - public DeleteTopicsResult deleteTopics(Collection<String> topics, DeleteTopicsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); + public DeleteTopicsResult deleteTopics(Collection<String> topicsToDelete, DeleteTopicsOptions options) { + Map<String, KafkaFuture<Void>> deleteTopicsResult = new HashMap<>(); + + if (timeoutNextRequests > 0) { + for (final String topicName : topicsToDelete) { + KafkaFutureImpl<Void> future = new KafkaFutureImpl<>(); + future.completeExceptionally(new TimeoutException()); + deleteTopicsResult.put(topicName, future); + } + + --timeoutNextRequests; + return new DeleteTopicsResult(deleteTopicsResult); + } + + for (final String topicName : topicsToDelete) { + KafkaFutureImpl<Void> future = new KafkaFutureImpl<>(); + + if (allTopics.remove(topicName) == null) { + future.completeExceptionally(new UnknownTopicOrPartitionException(String.format("Topic %s does not exist.", topicName))); + } else { + future.complete(null); + } + deleteTopicsResult.put(topicName, future); + } + + return new DeleteTopicsResult(deleteTopicsResult); } @Override @@ -243,7 +267,12 @@ public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPar @Override public DeleteRecordsResult deleteRecords(Map<TopicPartition, RecordsToDelete> recordsToDelete, DeleteRecordsOptions options) { - throw new UnsupportedOperationException("Not implemented yet"); + Map<TopicPartition, KafkaFuture<DeletedRecords>> deletedRecordsResult = new HashMap<>(); + if (recordsToDelete.isEmpty()) { + return new DeleteRecordsResult(deletedRecordsResult); + } else { + throw new UnsupportedOperationException("Not implemented yet"); + } } @Override diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java index 0a61d3e9042..c58d6741f58 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java @@ -17,11 +17,13 @@ package org.apache.kafka.connect.util; import org.apache.kafka.clients.NodeApiVersions; -import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv; +import org.apache.kafka.clients.admin.MockAdminClient; +import org.apache.kafka.clients.admin.AdminClientUnitTestEnv; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateTopicsResponse; @@ -47,7 +49,7 @@ public void returnNullWithApiVersionMismatch() { final NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); Cluster cluster = createCluster(1); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { + try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) { env.kafkaClient().setNode(cluster.controller()); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); @@ -62,14 +64,11 @@ public void returnNullWithApiVersionMismatch() { public void shouldNotCreateTopicWhenItAlreadyExists() { NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); Cluster cluster = createCluster(1); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { - env.kafkaClient().setNode(cluster.controller()); - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); - env.kafkaClient().prepareResponse(createTopicResponseWithAlreadyExists(newTopic)); - TopicAdmin admin = new TopicAdmin(null, env.adminClient()); - boolean created = admin.createTopic(newTopic); - assertFalse(created); + try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) { + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList()); + mockAdminClient.addTopic(false, "myTopic", Collections.singletonList(topicPartitionInfo), null); + TopicAdmin admin = new TopicAdmin(null, mockAdminClient); + assertFalse(admin.createTopic(newTopic)); } } @@ -77,14 +76,9 @@ public void shouldNotCreateTopicWhenItAlreadyExists() { public void shouldCreateTopicWhenItDoesNotExist() { NewTopic newTopic = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); Cluster cluster = createCluster(1); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { - env.kafkaClient().setNode(cluster.controller()); - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); - env.kafkaClient().prepareResponse(createTopicResponse(newTopic)); - TopicAdmin admin = new TopicAdmin(null, env.adminClient()); - boolean created = admin.createTopic(newTopic); - assertTrue(created); + try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) { + TopicAdmin admin = new TopicAdmin(null, mockAdminClient); + assertTrue(admin.createTopic(newTopic)); } } @@ -93,12 +87,8 @@ public void shouldCreateOneTopicWhenProvidedMultipleDefinitionsWithSameTopicName NewTopic newTopic1 = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); NewTopic newTopic2 = TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build(); Cluster cluster = createCluster(1); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { - env.kafkaClient().setNode(cluster.controller()); - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); - env.kafkaClient().prepareResponse(createTopicResponse(newTopic1)); - TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) { + TopicAdmin admin = new TopicAdmin(null, mockAdminClient); Set<String> newTopicNames = admin.createTopics(newTopic1, newTopic2); assertEquals(1, newTopicNames.size()); assertEquals(newTopic2.name(), newTopicNames.iterator().next()); @@ -108,11 +98,8 @@ public void shouldCreateOneTopicWhenProvidedMultipleDefinitionsWithSameTopicName @Test public void shouldReturnFalseWhenSuppliedNullTopicDescription() { Cluster cluster = createCluster(1); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { - env.kafkaClient().setNode(cluster.controller()); - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); - TopicAdmin admin = new TopicAdmin(null, env.adminClient()); + try (MockAdminClient mockAdminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) { + TopicAdmin admin = new TopicAdmin(null, mockAdminClient); boolean created = admin.createTopic(null); assertFalse(created); } @@ -120,7 +107,7 @@ public void shouldReturnFalseWhenSuppliedNullTopicDescription() { private Cluster createCluster(int numNodes) { HashMap<Integer, Node> nodes = new HashMap<>(); - for (int i = 0; i != numNodes; ++i) { + for (int i = 0; i < numNodes; ++i) { nodes.put(i, new Node(i, "localhost", 8121 + i)); } Cluster cluster = new Cluster("mockClusterId", nodes.values(), @@ -129,14 +116,6 @@ private Cluster createCluster(int numNodes) { return cluster; } - private CreateTopicsResponse createTopicResponse(NewTopic... topics) { - return createTopicResponse(new ApiError(Errors.NONE, ""), topics); - } - - private CreateTopicsResponse createTopicResponseWithAlreadyExists(NewTopic... topics) { - return createTopicResponse(new ApiError(Errors.TOPIC_ALREADY_EXISTS, "Topic already exists"), topics); - } - private CreateTopicsResponse createTopicResponseWithUnsupportedVersion(NewTopic... topics) { return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, "This version of the API is not supported"), topics); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 42504655117..6b760c16d62 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -186,11 +187,23 @@ public boolean conditionMet() { assertEquals(thread.state(), StreamThread.State.DEAD); } + private Cluster createCluster(int numNodes) { + HashMap<Integer, Node> nodes = new HashMap<>(); + for (int i = 0; i < numNodes; ++i) { + nodes.put(i, new Node(i, "localhost", 8121 + i)); + } + return new Cluster("mockClusterId", nodes.values(), + Collections.<PartitionInfo>emptySet(), Collections.<String>emptySet(), + Collections.<String>emptySet(), nodes.get(0)); + } + private StreamThread createStreamThread(final String clientId, final StreamsConfig config, final boolean eosEnabled) { if (eosEnabled) { clientSupplier.setApplicationIdForProducer(applicationId); } + clientSupplier.setClusterForAdminClient(createCluster(1)); + return StreamThread.create(internalTopologyBuilder, config, clientSupplier, diff --git a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java index c6b0f5f8252..dd32ad0786a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java @@ -17,8 +17,7 @@ package org.apache.kafka.streams.tools; import kafka.tools.StreamsResetter; -import org.apache.kafka.clients.NodeApiVersions; -import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv; +import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.MockConsumer; @@ -27,8 +26,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.DeleteTopicsResponse; +import org.apache.kafka.common.TopicPartitionInfo; import org.junit.Before; import org.junit.Test; @@ -40,6 +38,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -235,20 +234,19 @@ public void shouldSeekToEndOffset() { } @Test - public void shouldDeleteTopic() { + public void shouldDeleteTopic() throws InterruptedException, ExecutionException { Cluster cluster = createCluster(1); - try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(cluster)) { - env.kafkaClient().setNode(cluster.controller()); - env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); - env.kafkaClient().prepareMetadataUpdate(env.cluster(), Collections.<String>emptySet()); - env.kafkaClient().prepareResponse(new DeleteTopicsResponse(Collections.singletonMap(TOPIC, Errors.NONE))); - streamsResetter.doDelete(Collections.singletonList(topicPartition.topic()), env.adminClient()); + try (MockAdminClient adminClient = new MockAdminClient(cluster.nodes(), cluster.nodeById(0))) { + TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList()); + adminClient.addTopic(false, TOPIC, Collections.singletonList(topicPartitionInfo), null); + streamsResetter.doDelete(Collections.singletonList(TOPIC), adminClient); + assertEquals(Collections.emptySet(), adminClient.listTopics().names().get()); } } private Cluster createCluster(int numNodes) { HashMap<Integer, Node> nodes = new HashMap<>(); - for (int i = 0; i != numNodes; ++i) { + for (int i = 0; i < numNodes; ++i) { nodes.put(i, new Node(i, "localhost", 8121 + i)); } return new Cluster("mockClusterId", nodes.values(), diff --git a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java index ae83c60275c..1ec28fab0a1 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java +++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java @@ -17,7 +17,7 @@ package org.apache.kafka.test; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv; +import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -26,7 +26,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.KafkaClientSupplier; import java.util.LinkedList; @@ -59,8 +58,7 @@ public void setClusterForAdminClient(final Cluster cluster) { @Override public AdminClient getAdminClient(final Map<String, Object> config) { - MockKafkaAdminClientEnv clientEnv = new MockKafkaAdminClientEnv(Time.SYSTEM, cluster, config); - return clientEnv.adminClient(); + return new MockAdminClient(cluster.nodes(), cluster.nodeById(0)); } @Override ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Use MockAdminClient for any unit tests that depend on AdminClient > ----------------------------------------------------------------- > > Key: KAFKA-6363 > URL: https://issues.apache.org/jira/browse/KAFKA-6363 > Project: Kafka > Issue Type: Bug > Reporter: Guozhang Wang > Assignee: Filipe Agapito > Labels: newbie > > Today we have a few unit tests other than KafkaAdminClientTest that relies on > MockKafkaAdminClientEnv. > About this class and MockKafkaAdminClientEnv, my thoughts: > 1. MockKafkaAdminClientEnv is actually using a MockClient for the inner > KafkaClient; it should be only used for the unit test of KafkaAdminClient > itself. > 2. For any other unit tests on classes that depend on AdminClient, we should > be using the MockAdminClient that mocks the whole AdminClient. > So I suggest 1) in TopicAdminTest use MockAdminClient instead; 2) in > KafkaAdminClientTest use MockClient and added a new static constructor that > takes a KafkaClient; 3) remove the MockKafkaAdminClientEnv. -- This message was sent by Atlassian JIRA (v6.4.14#64029)