[ 
https://issues.apache.org/jira/browse/KAFKA-6363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16316912#comment-16316912
 ] 

ASF GitHub Bot commented on KAFKA-6363:
---------------------------------------

guozhangwang closed pull request #4371: KAFKA-6363: Use MockAdminClient for any 
unit tests that depend on Adm…
URL: https://github.com/apache/kafka/pull/4371
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
similarity index 91%
rename from 
clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
rename to 
clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
index cca35ac22c9..10281fb6ffa 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockKafkaAdminClientEnv.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientUnitTestEnv.java
@@ -35,21 +35,21 @@
  * <p>
  * When finished, be sure to {@link #close() close} the environment object.
  */
-public class MockKafkaAdminClientEnv implements AutoCloseable {
+public class AdminClientUnitTestEnv implements AutoCloseable {
     private final Time time;
     private final Cluster cluster;
     private final MockClient mockClient;
     private final KafkaAdminClient adminClient;
 
-    public MockKafkaAdminClientEnv(Cluster cluster, String...vals) {
+    public AdminClientUnitTestEnv(Cluster cluster, String...vals) {
         this(Time.SYSTEM, cluster, vals);
     }
 
-    public MockKafkaAdminClientEnv(Time time, Cluster cluster, String...vals) {
+    public AdminClientUnitTestEnv(Time time, Cluster cluster, String...vals) {
         this(time, cluster, newStrMap(vals));
     }
 
-    public MockKafkaAdminClientEnv(Time time, Cluster cluster, Map<String, 
Object> config) {
+    public AdminClientUnitTestEnv(Time time, Cluster cluster, Map<String, 
Object> config) {
         this.time = time;
         this.cluster = cluster;
         AdminClientConfig adminClientConfig = new AdminClientConfig(config);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index c0fe73c36ed..84588a9f3be 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -38,10 +38,10 @@
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
+import org.apache.kafka.common.requests.CreatePartitionsResponse;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
@@ -75,8 +75,8 @@
 import java.util.concurrent.Future;
 
 import static java.util.Arrays.asList;
-import static org.apache.kafka.common.requests.ResourceType.TOPIC;
 import static org.apache.kafka.common.requests.ResourceType.BROKER;
+import static org.apache.kafka.common.requests.ResourceType.TOPIC;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -155,7 +155,7 @@ public void testGenerateClientId() {
                 
KafkaAdminClient.generateClientId(newConfMap(AdminClientConfig.CLIENT_ID_CONFIG,
 "myCustomId")));
     }
 
-    private static MockKafkaAdminClientEnv mockClientEnv(String... configVals) 
{
+    private static AdminClientUnitTestEnv mockClientEnv(String... configVals) {
         HashMap<Integer, Node> nodes = new HashMap<>();
         nodes.put(0, new Node(0, "localhost", 8121));
         nodes.put(1, new Node(1, "localhost", 8122));
@@ -163,12 +163,12 @@ private static MockKafkaAdminClientEnv 
mockClientEnv(String... configVals) {
         Cluster cluster = new Cluster("mockClusterId", nodes.values(),
                 Collections.<PartitionInfo>emptySet(), 
Collections.<String>emptySet(),
                 Collections.<String>emptySet(), nodes.get(0));
-        return new MockKafkaAdminClientEnv(cluster, configVals);
+        return new AdminClientUnitTestEnv(cluster, configVals);
     }
 
     @Test
     public void testCloseAdminClient() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
         }
     }
 
@@ -190,7 +190,7 @@ private static void assertFutureError(Future<?> future, 
Class<? extends Throwabl
      */
     @Test
     public void testTimeoutWithoutMetadata() throws Exception {
-        try (MockKafkaAdminClientEnv env = 
mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) {
+        try (AdminClientUnitTestEnv env = 
mockClientEnv(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().setNode(new Node(0, "localhost", 8121));
             env.kafkaClient().prepareResponse(new 
CreateTopicsResponse(Collections.singletonMap("myTopic", new 
ApiError(Errors.NONE, ""))));
@@ -203,7 +203,7 @@ public void testTimeoutWithoutMetadata() throws Exception {
 
     @Test
     public void testCreateTopics() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
@@ -226,7 +226,7 @@ public void testCreateTopics() throws Exception {
 
     @Test
     public void testDescribeAcls() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
@@ -250,7 +250,7 @@ public void testDescribeAcls() throws Exception {
 
     @Test
     public void testCreateAcls() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
@@ -279,7 +279,7 @@ public void testCreateAcls() throws Exception {
 
     @Test
     public void testDeleteAcls() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
@@ -330,7 +330,7 @@ public void testHandleTimeout() throws Exception {
         Cluster cluster = new Cluster("mockClusterId", nodes.values(),
             Collections.<PartitionInfo>emptySet(), 
Collections.<String>emptySet(),
             Collections.<String>emptySet(), nodes.get(0));
-        try (MockKafkaAdminClientEnv env = new MockKafkaAdminClientEnv(time, 
cluster,
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster,
             AdminClientConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1",
                 AdminClientConfig.RECONNECT_BACKOFF_MS_CONFIG, "1")) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
@@ -374,7 +374,7 @@ public boolean conditionMet() {
 
     @Test
     public void testDescribeConfigs() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
@@ -390,7 +390,7 @@ public void testDescribeConfigs() throws Exception {
 
     @Test
     public void testCreatePartitions() throws Exception {
-        try (MockKafkaAdminClientEnv env = mockClientEnv()) {
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().controller());
@@ -443,7 +443,7 @@ public void testDeleteRecords() throws Exception {
         TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3);
         TopicPartition myTopicPartition4 = new TopicPartition("my_topic", 4);
 
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
             env.kafkaClient().setNode(env.cluster().nodes().get(0));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index b6a5888b5d2..c950163dc13 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -232,8 +232,32 @@ public DescribeTopicsResult 
describeTopics(Collection<String> topicNames, Descri
     }
 
     @Override
-    public DeleteTopicsResult deleteTopics(Collection<String> topics, 
DeleteTopicsOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
+    public DeleteTopicsResult deleteTopics(Collection<String> topicsToDelete, 
DeleteTopicsOptions options) {
+        Map<String, KafkaFuture<Void>> deleteTopicsResult = new HashMap<>();
+
+        if (timeoutNextRequests > 0) {
+            for (final String topicName : topicsToDelete) {
+                KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+                future.completeExceptionally(new TimeoutException());
+                deleteTopicsResult.put(topicName, future);
+            }
+
+            --timeoutNextRequests;
+            return new DeleteTopicsResult(deleteTopicsResult);
+        }
+
+        for (final String topicName : topicsToDelete) {
+            KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+
+            if (allTopics.remove(topicName) == null) {
+                future.completeExceptionally(new 
UnknownTopicOrPartitionException(String.format("Topic %s does not exist.", 
topicName)));
+            } else {
+                future.complete(null);
+            }
+            deleteTopicsResult.put(topicName, future);
+        }
+
+        return new DeleteTopicsResult(deleteTopicsResult);
     }
 
     @Override
@@ -243,7 +267,12 @@ public CreatePartitionsResult createPartitions(Map<String, 
NewPartitions> newPar
 
     @Override
     public DeleteRecordsResult deleteRecords(Map<TopicPartition, 
RecordsToDelete> recordsToDelete, DeleteRecordsOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
+        Map<TopicPartition, KafkaFuture<DeletedRecords>> deletedRecordsResult 
= new HashMap<>();
+        if (recordsToDelete.isEmpty()) {
+            return new DeleteRecordsResult(deletedRecordsResult);
+        } else {
+            throw new UnsupportedOperationException("Not implemented yet");
+        }
     }
 
     @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
index 0a61d3e9042..c58d6741f58 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicAdminTest.java
@@ -17,11 +17,13 @@
 package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.AdminClientUnitTestEnv;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
@@ -47,7 +49,7 @@
     public void returnNullWithApiVersionMismatch() {
         final NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
             env.kafkaClient().setNode(cluster.controller());
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
@@ -62,14 +64,11 @@ public void returnNullWithApiVersionMismatch() {
     public void shouldNotCreateTopicWhenItAlreadyExists() {
         NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.controller());
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
-            
env.kafkaClient().prepareResponse(createTopicResponseWithAlreadyExists(newTopic));
-            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
-            boolean created = admin.createTopic(newTopic);
-            assertFalse(created);
+        try (MockAdminClient mockAdminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            mockAdminClient.addTopic(false, "myTopic", 
Collections.singletonList(topicPartitionInfo), null);
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            assertFalse(admin.createTopic(newTopic));
         }
     }
 
@@ -77,14 +76,9 @@ public void shouldNotCreateTopicWhenItAlreadyExists() {
     public void shouldCreateTopicWhenItDoesNotExist() {
         NewTopic newTopic = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.controller());
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
-            env.kafkaClient().prepareResponse(createTopicResponse(newTopic));
-            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
-            boolean created = admin.createTopic(newTopic);
-            assertTrue(created);
+        try (MockAdminClient mockAdminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
+            assertTrue(admin.createTopic(newTopic));
         }
     }
 
@@ -93,12 +87,8 @@ public void 
shouldCreateOneTopicWhenProvidedMultipleDefinitionsWithSameTopicName
         NewTopic newTopic1 = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         NewTopic newTopic2 = 
TopicAdmin.defineTopic("myTopic").partitions(1).compacted().build();
         Cluster cluster = createCluster(1);
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.controller());
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
-            env.kafkaClient().prepareResponse(createTopicResponse(newTopic1));
-            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+        try (MockAdminClient mockAdminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
             Set<String> newTopicNames = admin.createTopics(newTopic1, 
newTopic2);
             assertEquals(1, newTopicNames.size());
             assertEquals(newTopic2.name(), newTopicNames.iterator().next());
@@ -108,11 +98,8 @@ public void 
shouldCreateOneTopicWhenProvidedMultipleDefinitionsWithSameTopicName
     @Test
     public void shouldReturnFalseWhenSuppliedNullTopicDescription() {
         Cluster cluster = createCluster(1);
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.controller());
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
-            TopicAdmin admin = new TopicAdmin(null, env.adminClient());
+        try (MockAdminClient mockAdminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicAdmin admin = new TopicAdmin(null, mockAdminClient);
             boolean created = admin.createTopic(null);
             assertFalse(created);
         }
@@ -120,7 +107,7 @@ public void 
shouldReturnFalseWhenSuppliedNullTopicDescription() {
 
     private Cluster createCluster(int numNodes) {
         HashMap<Integer, Node> nodes = new HashMap<>();
-        for (int i = 0; i != numNodes; ++i) {
+        for (int i = 0; i < numNodes; ++i) {
             nodes.put(i, new Node(i, "localhost", 8121 + i));
         }
         Cluster cluster = new Cluster("mockClusterId", nodes.values(),
@@ -129,14 +116,6 @@ private Cluster createCluster(int numNodes) {
         return cluster;
     }
 
-    private CreateTopicsResponse createTopicResponse(NewTopic... topics) {
-        return createTopicResponse(new ApiError(Errors.NONE, ""), topics);
-    }
-
-    private CreateTopicsResponse 
createTopicResponseWithAlreadyExists(NewTopic... topics) {
-        return createTopicResponse(new ApiError(Errors.TOPIC_ALREADY_EXISTS, 
"Topic already exists"), topics);
-    }
-
     private CreateTopicsResponse 
createTopicResponseWithUnsupportedVersion(NewTopic... topics) {
         return createTopicResponse(new ApiError(Errors.UNSUPPORTED_VERSION, 
"This version of the API is not supported"), topics);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 42504655117..6b760c16d62 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -23,6 +23,7 @@
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
@@ -186,11 +187,23 @@ public boolean conditionMet() {
         assertEquals(thread.state(), StreamThread.State.DEAD);
     }
 
+    private Cluster createCluster(int numNodes) {
+        HashMap<Integer, Node> nodes = new HashMap<>();
+        for (int i = 0; i < numNodes; ++i) {
+            nodes.put(i, new Node(i, "localhost", 8121 + i));
+        }
+        return new Cluster("mockClusterId", nodes.values(),
+            Collections.<PartitionInfo>emptySet(), 
Collections.<String>emptySet(),
+            Collections.<String>emptySet(), nodes.get(0));
+    }
+
     private StreamThread createStreamThread(final String clientId, final 
StreamsConfig config, final boolean eosEnabled) {
         if (eosEnabled) {
             clientSupplier.setApplicationIdForProducer(applicationId);
         }
 
+        clientSupplier.setClusterForAdminClient(createCluster(1));
+
         return StreamThread.create(internalTopologyBuilder,
                                    config,
                                    clientSupplier,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
index c6b0f5f8252..dd32ad0786a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tools/StreamsResetterTest.java
@@ -17,8 +17,7 @@
 package org.apache.kafka.streams.tools;
 
 import kafka.tools.StreamsResetter;
-import org.apache.kafka.clients.NodeApiVersions;
-import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv;
+import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.MockConsumer;
@@ -27,8 +26,7 @@
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.requests.DeleteTopicsResponse;
+import org.apache.kafka.common.TopicPartitionInfo;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -40,6 +38,7 @@
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -235,20 +234,19 @@ public void shouldSeekToEndOffset() {
     }
 
     @Test
-    public void shouldDeleteTopic() {
+    public void shouldDeleteTopic() throws InterruptedException, 
ExecutionException {
         Cluster cluster = createCluster(1);
-        try (MockKafkaAdminClientEnv env = new 
MockKafkaAdminClientEnv(cluster)) {
-            env.kafkaClient().setNode(cluster.controller());
-            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
-            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
-            env.kafkaClient().prepareResponse(new 
DeleteTopicsResponse(Collections.singletonMap(TOPIC, Errors.NONE)));
-            
streamsResetter.doDelete(Collections.singletonList(topicPartition.topic()), 
env.adminClient());
+        try (MockAdminClient adminClient = new 
MockAdminClient(cluster.nodes(), cluster.nodeById(0))) {
+            TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, 
cluster.nodeById(0), cluster.nodes(), Collections.<Node>emptyList());
+            adminClient.addTopic(false, TOPIC, 
Collections.singletonList(topicPartitionInfo), null);
+            streamsResetter.doDelete(Collections.singletonList(TOPIC), 
adminClient);
+            assertEquals(Collections.emptySet(), 
adminClient.listTopics().names().get());
         }
     }
 
     private Cluster createCluster(int numNodes) {
         HashMap<Integer, Node> nodes = new HashMap<>();
-        for (int i = 0; i != numNodes; ++i) {
+        for (int i = 0; i < numNodes; ++i) {
             nodes.put(i, new Node(i, "localhost", 8121 + i));
         }
         return new Cluster("mockClusterId", nodes.values(),
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java 
b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
index ae83c60275c..1ec28fab0a1 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.clients.admin.AdminClient;
-import org.apache.kafka.clients.admin.MockKafkaAdminClientEnv;
+import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -26,7 +26,6 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 
 import java.util.LinkedList;
@@ -59,8 +58,7 @@ public void setClusterForAdminClient(final Cluster cluster) {
 
     @Override
     public AdminClient getAdminClient(final Map<String, Object> config) {
-        MockKafkaAdminClientEnv clientEnv = new 
MockKafkaAdminClientEnv(Time.SYSTEM, cluster, config);
-        return clientEnv.adminClient();
+        return new MockAdminClient(cluster.nodes(), cluster.nodeById(0));
     }
 
     @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Use MockAdminClient for any unit tests that depend on AdminClient
> -----------------------------------------------------------------
>
>                 Key: KAFKA-6363
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6363
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Guozhang Wang
>            Assignee: Filipe Agapito
>              Labels: newbie
>
> Today we have a few unit tests other than KafkaAdminClientTest that relies on 
> MockKafkaAdminClientEnv.
> About this class and MockKafkaAdminClientEnv, my thoughts:
> 1. MockKafkaAdminClientEnv is actually using a MockClient for the inner 
> KafkaClient; it should be only used for the unit test of KafkaAdminClient 
> itself.
> 2. For any other unit tests on classes that depend on AdminClient, we should 
> be using the MockAdminClient that mocks the whole AdminClient.
> So I suggest 1) in TopicAdminTest use MockAdminClient instead; 2) in 
> KafkaAdminClientTest use MockClient and added a new static constructor that 
> takes a KafkaClient; 3) remove the MockKafkaAdminClientEnv.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to