hachikuji commented on a change in pull request #10220:
URL: https://github.com/apache/kafka/pull/10220#discussion_r588522951



##########
File path: core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
##########
@@ -199,12 +209,25 @@ private void processClusterTest(ClusterTest annot, 
ClusterTestDefaults defaults,
         }
 
         switch (type) {
-            case ZK:
-            case BOTH:
+            case ZK: {
                 ClusterConfig config = builder.build();
                 config.serverProperties().putAll(properties);
                 testInvocations.accept(new ZkClusterInvocationContext(config));
                 break;
+            } case RAFT: {
+                ClusterConfig config = builder.build();
+                config.serverProperties().putAll(properties);
+                testInvocations.accept(new 
RaftClusterInvocationContext(config));
+                break;
+            } case BOTH: {
+                ClusterConfig zkConfig = builder.build();

Review comment:
       nit: not a big deal, but I suspect there's a way to remove some of the 
duplication here in this switch. For example, perhaps we could add a method to 
`Type` to build the invocation context:
   ```java
     List<ClusterInvocationContext> invocationContexts(ClusterConfig config);
   ```

##########
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.network.SocketServer;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.metadata.BrokerState;
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test 
invocation. Each instance of this
+ * class is provided with a configuration for the cluster.
+ *
+ * This context also provides parameter resolvers for:
+ *
+ * <ul>
+ *     <li>ClusterConfig (the same instance passed to the constructor)</li>
+ *     <li>ClusterInstance (includes methods to expose underlying 
SocketServer-s)</li>
+ *     <li>IntegrationTestHelper (helper methods)</li>
+ * </ul>
+ */
+public class RaftClusterInvocationContext implements 
TestTemplateInvocationContext {
+
+    private final ClusterConfig clusterConfig;
+    private final AtomicReference<KafkaClusterTestKit> clusterReference;
+
+    public RaftClusterInvocationContext(ClusterConfig clusterConfig) {
+        this.clusterConfig = clusterConfig;
+        this.clusterReference = new AtomicReference<>();
+    }
+
+    @Override
+    public String getDisplayName(int invocationIndex) {
+        String clusterDesc = clusterConfig.nameTags().entrySet().stream()
+            .map(Object::toString)
+            .collect(Collectors.joining(", "));
+        return String.format("[Quorum %d] %s", invocationIndex, clusterDesc);
+    }
+
+    @Override
+    public List<Extension> getAdditionalExtensions() {
+        return Arrays.asList(
+            (BeforeTestExecutionCallback) context -> {
+                KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(
+                    new TestKitNodes.Builder().
+                        setNumKip500BrokerNodes(clusterConfig.numBrokers()).
+                        
setNumControllerNodes(clusterConfig.numControllers()).build());
+
+                // Copy properties into the TestKit builder
+                clusterConfig.serverProperties().forEach((key, value) -> 
builder.setConfigProp(key.toString(), value.toString()));
+                // TODO how to pass down security protocol and listener name?
+                KafkaClusterTestKit cluster = builder.build();
+                clusterReference.set(cluster);
+                cluster.format();
+                cluster.startup();
+                kafka.utils.TestUtils.waitUntilTrue(
+                    () -> cluster.kip500Brokers().get(0).currentState() == 
BrokerState.RUNNING,
+                    () -> "Broker never made it to RUNNING state.",
+                    org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
+                    100L);
+            },
+            (AfterTestExecutionCallback) context -> 
clusterReference.get().close(),
+            new ClusterInstanceParameterResolver(new 
RaftClusterInstance(clusterReference, clusterConfig)),
+            new GenericParameterResolver<>(clusterConfig, ClusterConfig.class)
+        );
+    }
+
+    public static class RaftClusterInstance implements ClusterInstance {
+
+        private final AtomicReference<KafkaClusterTestKit> clusterReference;
+        private final ClusterConfig clusterConfig;
+        final AtomicBoolean started = new AtomicBoolean(false);
+        final AtomicBoolean stopped = new AtomicBoolean(false);
+
+        RaftClusterInstance(AtomicReference<KafkaClusterTestKit> 
clusterReference, ClusterConfig clusterConfig) {
+            this.clusterReference = clusterReference;
+            this.clusterConfig = clusterConfig;
+        }
+
+        @Override
+        public String bootstrapServers() {
+            return 
clusterReference.get().clientProperties().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+        }
+
+        @Override
+        public Collection<SocketServer> brokerSocketServers() {
+            return clusterReference.get().kip500Brokers().values().stream()
+                .map(BrokerServer::socketServer)
+                .collect(Collectors.toList());
+        }
+
+        @Override
+        public ListenerName clientListener() {
+            return ListenerName.normalised("EXTERNAL");
+        }
+
+        @Override
+        public Collection<SocketServer> controllerSocketServers() {
+            return clusterReference.get().controllers().values().stream()
+                .map(ControllerServer::socketServer)
+                .collect(Collectors.toList());
+        }
+
+        @Override
+        public SocketServer anyBrokerSocketServer() {
+            return clusterReference.get().kip500Brokers().values().stream()
+                .map(BrokerServer::socketServer)
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No broker 
SocketServers found"));
+        }
+
+        @Override
+        public SocketServer anyControllerSocketServer() {
+            return clusterReference.get().controllers().values().stream()
+                .map(ControllerServer::socketServer)
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No controller 
SocketServers found"));
+        }
+
+        @Override
+        public ClusterType clusterType() {
+            return ClusterType.RAFT;
+        }
+
+        @Override
+        public ClusterConfig config() {
+            return clusterConfig;
+        }
+
+        @Override
+        public Object getUnderlying() {

Review comment:
       Is there any benefit letting this return the more specific 
`KafkaClusterTestKit` type?

##########
File path: core/src/test/java/kafka/testkit/Kip500BrokerNode.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Kip500BrokerNode implements TestKitNode {
+    public static class Builder {
+        private int id = -1;
+        private Uuid incarnationId = null;
+        private String metadataDirectory = null;
+        private List<String> logDataDirectories = null;
+
+        public Builder setId(int id) {
+            this.id = id;
+            return this;
+        }
+
+        public Builder setLogDirectories(List<String> logDataDirectories) {
+            this.logDataDirectories = logDataDirectories;
+            return this;
+        }
+
+        public Kip500BrokerNode build() {
+            if (id == -1) {
+                throw new RuntimeException("You must set the node id");
+            }
+            if (incarnationId == null) {
+                incarnationId = Uuid.randomUuid();
+            }
+            if (metadataDirectory == null) {
+                metadataDirectory = String.format("kip500broker_%d_meta", id);

Review comment:
       nit: do we need the `kip500` prefix? We don't use it in 
`ControllerNode`. Similarly, maybe we can call this `BrokerNode` instead of 
`Kip500BrokerNode`?

##########
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.network.SocketServer;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.metadata.BrokerState;
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test 
invocation. Each instance of this
+ * class is provided with a configuration for the cluster.
+ *
+ * This context also provides parameter resolvers for:
+ *
+ * <ul>
+ *     <li>ClusterConfig (the same instance passed to the constructor)</li>
+ *     <li>ClusterInstance (includes methods to expose underlying 
SocketServer-s)</li>
+ *     <li>IntegrationTestHelper (helper methods)</li>
+ * </ul>
+ */
+public class RaftClusterInvocationContext implements 
TestTemplateInvocationContext {
+
+    private final ClusterConfig clusterConfig;
+    private final AtomicReference<KafkaClusterTestKit> clusterReference;
+
+    public RaftClusterInvocationContext(ClusterConfig clusterConfig) {
+        this.clusterConfig = clusterConfig;
+        this.clusterReference = new AtomicReference<>();
+    }
+
+    @Override
+    public String getDisplayName(int invocationIndex) {
+        String clusterDesc = clusterConfig.nameTags().entrySet().stream()
+            .map(Object::toString)
+            .collect(Collectors.joining(", "));
+        return String.format("[Quorum %d] %s", invocationIndex, clusterDesc);
+    }
+
+    @Override
+    public List<Extension> getAdditionalExtensions() {
+        return Arrays.asList(
+            (BeforeTestExecutionCallback) context -> {
+                KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(
+                    new TestKitNodes.Builder().
+                        setNumKip500BrokerNodes(clusterConfig.numBrokers()).
+                        
setNumControllerNodes(clusterConfig.numControllers()).build());
+
+                // Copy properties into the TestKit builder
+                clusterConfig.serverProperties().forEach((key, value) -> 
builder.setConfigProp(key.toString(), value.toString()));
+                // TODO how to pass down security protocol and listener name?
+                KafkaClusterTestKit cluster = builder.build();
+                clusterReference.set(cluster);
+                cluster.format();
+                cluster.startup();
+                kafka.utils.TestUtils.waitUntilTrue(
+                    () -> cluster.kip500Brokers().get(0).currentState() == 
BrokerState.RUNNING,
+                    () -> "Broker never made it to RUNNING state.",
+                    org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
+                    100L);
+            },
+            (AfterTestExecutionCallback) context -> 
clusterReference.get().close(),
+            new ClusterInstanceParameterResolver(new 
RaftClusterInstance(clusterReference, clusterConfig)),
+            new GenericParameterResolver<>(clusterConfig, ClusterConfig.class)
+        );
+    }
+
+    public static class RaftClusterInstance implements ClusterInstance {
+
+        private final AtomicReference<KafkaClusterTestKit> clusterReference;
+        private final ClusterConfig clusterConfig;
+        final AtomicBoolean started = new AtomicBoolean(false);
+        final AtomicBoolean stopped = new AtomicBoolean(false);
+
+        RaftClusterInstance(AtomicReference<KafkaClusterTestKit> 
clusterReference, ClusterConfig clusterConfig) {
+            this.clusterReference = clusterReference;
+            this.clusterConfig = clusterConfig;
+        }
+
+        @Override
+        public String bootstrapServers() {
+            return 
clusterReference.get().clientProperties().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+        }
+
+        @Override
+        public Collection<SocketServer> brokerSocketServers() {
+            return clusterReference.get().kip500Brokers().values().stream()
+                .map(BrokerServer::socketServer)
+                .collect(Collectors.toList());
+        }
+
+        @Override
+        public ListenerName clientListener() {
+            return ListenerName.normalised("EXTERNAL");
+        }
+
+        @Override
+        public Collection<SocketServer> controllerSocketServers() {
+            return clusterReference.get().controllers().values().stream()
+                .map(ControllerServer::socketServer)
+                .collect(Collectors.toList());
+        }
+
+        @Override
+        public SocketServer anyBrokerSocketServer() {
+            return clusterReference.get().kip500Brokers().values().stream()
+                .map(BrokerServer::socketServer)
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No broker 
SocketServers found"));
+        }
+
+        @Override
+        public SocketServer anyControllerSocketServer() {
+            return clusterReference.get().controllers().values().stream()
+                .map(ControllerServer::socketServer)
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No controller 
SocketServers found"));
+        }
+
+        @Override
+        public ClusterType clusterType() {
+            return ClusterType.RAFT;
+        }
+
+        @Override
+        public ClusterConfig config() {
+            return clusterConfig;
+        }
+
+        @Override
+        public Object getUnderlying() {
+            return clusterReference.get();
+        }
+
+        @Override
+        public Admin createAdminClient(Properties configOverrides) {
+            return Admin.create(clusterReference.get().clientProperties());
+        }
+
+        @Override
+        public void start() {
+            if (started.compareAndSet(false, true)) {
+                try {
+                    clusterReference.get().startup();
+                } catch (Exception e) {
+                    throw new RuntimeException("Failed to start up Raft 
server", e);
+                }
+            }
+        }
+
+        @Override
+        public void stop() {
+            if (stopped.compareAndSet(false, true)) {
+                try {
+                    clusterReference.get().close();
+                } catch (Exception e) {
+                    throw new RuntimeException("Failed to stop up Raft 
server", e);

Review comment:
       nit: drop "up"

##########
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.network.SocketServer;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.metadata.BrokerState;
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test 
invocation. Each instance of this
+ * class is provided with a configuration for the cluster.
+ *
+ * This context also provides parameter resolvers for:
+ *
+ * <ul>
+ *     <li>ClusterConfig (the same instance passed to the constructor)</li>
+ *     <li>ClusterInstance (includes methods to expose underlying 
SocketServer-s)</li>
+ *     <li>IntegrationTestHelper (helper methods)</li>
+ * </ul>
+ */
+public class RaftClusterInvocationContext implements 
TestTemplateInvocationContext {
+
+    private final ClusterConfig clusterConfig;
+    private final AtomicReference<KafkaClusterTestKit> clusterReference;
+
+    public RaftClusterInvocationContext(ClusterConfig clusterConfig) {
+        this.clusterConfig = clusterConfig;
+        this.clusterReference = new AtomicReference<>();
+    }
+
+    @Override
+    public String getDisplayName(int invocationIndex) {
+        String clusterDesc = clusterConfig.nameTags().entrySet().stream()
+            .map(Object::toString)
+            .collect(Collectors.joining(", "));
+        return String.format("[Quorum %d] %s", invocationIndex, clusterDesc);
+    }
+
+    @Override
+    public List<Extension> getAdditionalExtensions() {
+        return Arrays.asList(
+            (BeforeTestExecutionCallback) context -> {
+                KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(
+                    new TestKitNodes.Builder().
+                        setNumKip500BrokerNodes(clusterConfig.numBrokers()).
+                        
setNumControllerNodes(clusterConfig.numControllers()).build());
+
+                // Copy properties into the TestKit builder
+                clusterConfig.serverProperties().forEach((key, value) -> 
builder.setConfigProp(key.toString(), value.toString()));
+                // TODO how to pass down security protocol and listener name?

Review comment:
       Maybe we can turn this into a JIRA?

##########
File path: core/src/test/java/kafka/testkit/Kip500BrokerNode.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Kip500BrokerNode implements TestKitNode {
+    public static class Builder {
+        private int id = -1;
+        private Uuid incarnationId = null;
+        private String metadataDirectory = null;

Review comment:
       It looks like the builder doesn't provide a way to set this. On a side 
note, I wonder if we should mimic the configuration behavior more closely. If 
`metadata.log.dir` is not set explicitly, then we use the first listed data 
directory.

##########
File path: core/src/test/java/kafka/testkit/TestKitNodes.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.server.MetaProperties;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+public class TestKitNodes {
+    public static class Builder {
+        private Uuid clusterId = null;
+        private final NavigableMap<Integer, ControllerNode> controllerNodes = 
new TreeMap<>();
+        private final NavigableMap<Integer, Kip500BrokerNode> 
kip500BrokerNodes = new TreeMap<>();
+
+        public Builder setClusterId(Uuid clusterId) {
+            this.clusterId = clusterId;
+            return this;
+        }
+
+        public Builder addNodes(TestKitNode[] nodes) {
+            for (TestKitNode node : nodes) {
+                addNode(node);
+            }
+            return this;
+        }
+
+        public Builder addNode(TestKitNode node) {
+            if (node instanceof ControllerNode) {
+                ControllerNode controllerNode = (ControllerNode) node;
+                controllerNodes.put(node.id(), controllerNode);
+            } else if (node instanceof Kip500BrokerNode) {
+                Kip500BrokerNode brokerNode = (Kip500BrokerNode) node;
+                kip500BrokerNodes.put(node.id(), brokerNode);
+            } else {
+                throw new RuntimeException("Can't handle TestKitNode subclass 
" +
+                        node.getClass().getSimpleName());
+            }
+            return this;
+        }
+
+        public Builder setNumControllerNodes(int numControllerNodes) {
+            if (numControllerNodes < 0) {
+                throw new RuntimeException("Invalid negative value for 
numControllerNodes");
+            }
+            while (controllerNodes.size() > numControllerNodes) {

Review comment:
       nit: this loop is a little unconventional. Maybe we could use 
`pollFirstEntry` instead of the iterator? Similarly in 
`setNumKip500BrokerNodes`. 

##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{Admin, NewTopic}
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
+import org.apache.kafka.metadata.BrokerState
+import org.junit.jupiter.api.{Test, Timeout}
+import org.junit.jupiter.api.Assertions._
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.TimeUnit
+import scala.jdk.CollectionConverters._
+
+@Timeout(120000)
+class RaftClusterTest {
+
+  @Test
+  def testCreateClusterAndClose(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(1).
+        setNumControllerNodes(1).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testCreateClusterAndWaitForBrokerInRunningState(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(3).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      TestUtils.waitUntilTrue(() => 
cluster.kip500Brokers().get(0).currentState() == BrokerState.RUNNING,
+        "Broker never made it to RUNNING state.")
+      TestUtils.waitUntilTrue(() => 
cluster.raftManagers().get(0).kafkaRaftClient.leaderAndEpoch().leaderId.isPresent,
+      "RaftManager was not initialized.")
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        assertEquals(cluster.nodes().clusterId().toString,
+          admin.describeCluster().clusterId().get())
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testCreateClusterAndCreateAndListTopic(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(3).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      TestUtils.waitUntilTrue(() => 
cluster.kip500Brokers().get(0).currentState() == BrokerState.RUNNING,
+        "Broker never made it to RUNNING state.")
+      TestUtils.waitUntilTrue(() => 
cluster.raftManagers().get(0).kafkaRaftClient.leaderAndEpoch().leaderId.isPresent,
+        "RaftManager was not initialized.")
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        // Create a test topic
+        val newTopic = Collections.singletonList(new NewTopic("test-topic", 1, 
3.toShort))
+        val createTopicResult = admin.createTopics(newTopic)
+        createTopicResult.all().get(60, TimeUnit.SECONDS)
+
+        // List created topic
+        TestUtils.waitUntilTrue(() => {
+          val listTopicsResult = admin.listTopics()
+          val result = listTopicsResult.names().get(5, 
TimeUnit.SECONDS).size() == newTopic.size()
+          if (result) {
+            newTopic forEach(topic => {
+              assertTrue(listTopicsResult.names().get().contains(topic.name()))
+            })
+          }
+          result
+        }, "Topics created were not listed.")
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testCreateClusterAndCreateAndManyTopics(): Unit = {

Review comment:
       Not sure there's much benefit having the single topic case covered 
separately. If it is, we can probably factor out a generalized helper. We can 
probably cover `testCreateClusterAndCreateAndManyTopicsWithManyPartitions` with 
the helper as well.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = 
LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the 
controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements 
AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new 
CompletableFuture<>();

Review comment:
       nit: I think we could use a more convenient type, such as `Map<Integer, 
InetAddressSpec>`. Ultimately this just needs to make it down to 
`KafkaNetworkChannel.updateEndpoint` so the conversion to the config value is 
unnecessary.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = 
LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the 
controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements 
AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new 
CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), 
e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = 
nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", 
controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of 
controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of 
controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + 
nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new 
ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = 
nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = 
Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", 
false));
+                Time time = Time.SYSTEM;

Review comment:
       nit: this seems unnecessary. We're already using the constant below 
anyway.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = 
LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the 
controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements 
AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new 
CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), 
e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = 
nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", 
controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of 
controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of 
controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + 
nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new 
ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = 
nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = 
Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", 
false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {

Review comment:
       It might be nice to factor out a helper to build the controller and 
broker nodes. It would make it a little easier to process this method visually.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = 
LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the 
controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements 
AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new 
CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), 
e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = 
nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", 
controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of 
controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of 
controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + 
nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new 
ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = 
nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = 
Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", 
false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), 
"controller");
+                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "CONTROLLER://localhost:0");
+                    
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+                    // Note: we can't accurately set controller.quorum.voters 
yet, since we don't
+                    // yet know what ports each controller will pick.  Set it 
to a dummy string \
+                    // for now as a placeholder.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
dummyQuorumVotersString);

Review comment:
       nit: Instead of calling it `dummy` which makes it sound hacky, maybe we 
could call it `uninitializedQuorumVotersString` or something like that. We have 
tried to make configuring with the `0.0.0.0:0` endpoint an explicitly supported 
feature.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = 
LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the 
controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements 
AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new 
CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), 
e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = 
nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", 
controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of 
controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of 
controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + 
nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new 
ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = 
nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = 
Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", 
false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), 
"controller");
+                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "CONTROLLER://localhost:0");
+                    
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+                    // Note: we can't accurately set controller.quorum.voters 
yet, since we don't
+                    // yet know what ports each controller will pick.  Set it 
to a dummy string \
+                    // for now as a placeholder.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
dummyQuorumVotersString);
+                    setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), Collections.emptyList());
+                    KafkaConfig config = new KafkaConfig(props, false, 
Option.empty());
+
+                    String threadNamePrefix = String.format("controller%d_", 
node.id());
+                    MetaProperties metaProperties = 
MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new 
TopicPartition("@metadata", 0);
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new 
KafkaRaftManager<>(
+                        metaProperties, config, new MetadataRecordSerde(), 
metadataPartition,
+                        Time.SYSTEM, new Metrics(), 
Option.apply(threadNamePrefix), connectFutureManager.future);
+                    MetaLogManager metaLogShim = new 
MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
+                    ControllerServer controller = new ControllerServer(
+                        nodes.controllerProperties(node.id()),
+                        config,
+                        metaLogShim,
+                        raftManager,
+                        time,
+                        new Metrics(),
+                        Option.apply(threadNamePrefix),
+                        connectFutureManager.future
+                    );
+                    controllers.put(node.id(), controller);
+                    
controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
+                        if (e != null) {
+                            connectFutureManager.fail(e);
+                        } else {
+                            connectFutureManager.registerPort(node.id(), port);
+                        }
+                    });
+                    raftManagers.put(node.id(), raftManager);
+                }
+                for (Kip500BrokerNode node : nodes.brokerNodes().values()) {

Review comment:
       By the way, I sort of feel it would make our lives easier if we used 
`KafkaRaftServer` directly instead of building the controller, broker, and raft 
managers ourselves. For one thing, that would make it trivial to support mixed 
mode. We don't have to do that here, but I'm kind of curious if there is a 
reason that we don't.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = 
LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the 
controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements 
AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new 
CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), 
e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = 
nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", 
controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of 
controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of 
controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + 
nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new 
ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = 
nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = 
Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", 
false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), 
"controller");
+                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "CONTROLLER://localhost:0");
+                    
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+                    // Note: we can't accurately set controller.quorum.voters 
yet, since we don't
+                    // yet know what ports each controller will pick.  Set it 
to a dummy string \
+                    // for now as a placeholder.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
dummyQuorumVotersString);
+                    setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), Collections.emptyList());
+                    KafkaConfig config = new KafkaConfig(props, false, 
Option.empty());
+
+                    String threadNamePrefix = String.format("controller%d_", 
node.id());
+                    MetaProperties metaProperties = 
MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new 
TopicPartition("@metadata", 0);
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new 
KafkaRaftManager<>(
+                        metaProperties, config, new MetadataRecordSerde(), 
metadataPartition,
+                        Time.SYSTEM, new Metrics(), 
Option.apply(threadNamePrefix), connectFutureManager.future);
+                    MetaLogManager metaLogShim = new 
MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
+                    ControllerServer controller = new ControllerServer(
+                        nodes.controllerProperties(node.id()),
+                        config,
+                        metaLogShim,
+                        raftManager,
+                        time,
+                        new Metrics(),
+                        Option.apply(threadNamePrefix),
+                        connectFutureManager.future
+                    );
+                    controllers.put(node.id(), controller);
+                    
controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
+                        if (e != null) {
+                            connectFutureManager.fail(e);
+                        } else {
+                            connectFutureManager.registerPort(node.id(), port);
+                        }
+                    });
+                    raftManagers.put(node.id(), raftManager);
+                }
+                for (Kip500BrokerNode node : nodes.brokerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), 
"broker");
+                    props.put(KafkaConfig$.MODULE$.BrokerIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    props.put(KafkaConfig$.MODULE$.LogDirsProp(),
+                        String.join(",", node.logDataDirectories()));
+                    
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "EXTERNAL://localhost:0");
+                    
props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
+                        nodes.interBrokerListenerName().value());
+                    
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+
+                    setupNodeDirectories(baseDirectory, 
node.metadataDirectory(),
+                        node.logDataDirectories());
+
+                    // Just like above, we set a placeholder voter list here 
until we
+                    // find out what ports the controllers picked.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
dummyQuorumVotersString);
+                    KafkaConfig config = new KafkaConfig(props, false, 
Option.empty());
+
+                    String threadNamePrefix = String.format("broker%d_", 
node.id());
+                    MetaProperties metaProperties = 
MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new 
TopicPartition("@metadata", 0);
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new 
KafkaRaftManager<>(
+                            metaProperties, config, new MetadataRecordSerde(), 
metadataPartition,
+                            Time.SYSTEM, new Metrics(), 
Option.apply(threadNamePrefix), connectFutureManager.future);
+                    MetaLogManager metaLogShim = new 
MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
+                    BrokerServer broker = new BrokerServer(
+                        config,
+                        nodes.brokerProperties(node.id()),
+                        metaLogShim,
+                        time,
+                        new Metrics(),
+                        Option.apply(threadNamePrefix),
+                        
JavaConverters.asScalaBuffer(Collections.<String>emptyList()).toSeq(),
+                        connectFutureManager.future,
+                        Server.SUPPORTED_FEATURES()
+                    );
+                    kip500Brokers.put(node.id(), broker);
+                    raftManagers.put(node.id(), raftManager);
+                }
+            } catch (Exception e) {
+                if (executorService != null) {
+                    executorService.shutdownNow();
+                    executorService.awaitTermination(1, TimeUnit.DAYS);

Review comment:
       Would we really wait one day for tests to shutdown? Maybe one minute is 
enough?

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = 
LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the 
controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements 
AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new 
CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), 
e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = 
nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", 
controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of 
controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of 
controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + 
nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new 
ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = 
nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = 
Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", 
false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), 
"controller");
+                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "CONTROLLER://localhost:0");
+                    
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+                    // Note: we can't accurately set controller.quorum.voters 
yet, since we don't
+                    // yet know what ports each controller will pick.  Set it 
to a dummy string \
+                    // for now as a placeholder.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
dummyQuorumVotersString);
+                    setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), Collections.emptyList());
+                    KafkaConfig config = new KafkaConfig(props, false, 
Option.empty());
+
+                    String threadNamePrefix = String.format("controller%d_", 
node.id());
+                    MetaProperties metaProperties = 
MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new 
TopicPartition("@metadata", 0);
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new 
KafkaRaftManager<>(
+                        metaProperties, config, new MetadataRecordSerde(), 
metadataPartition,
+                        Time.SYSTEM, new Metrics(), 
Option.apply(threadNamePrefix), connectFutureManager.future);
+                    MetaLogManager metaLogShim = new 
MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
+                    ControllerServer controller = new ControllerServer(
+                        nodes.controllerProperties(node.id()),
+                        config,
+                        metaLogShim,
+                        raftManager,
+                        time,
+                        new Metrics(),
+                        Option.apply(threadNamePrefix),
+                        connectFutureManager.future
+                    );
+                    controllers.put(node.id(), controller);
+                    
controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
+                        if (e != null) {
+                            connectFutureManager.fail(e);
+                        } else {
+                            connectFutureManager.registerPort(node.id(), port);
+                        }
+                    });
+                    raftManagers.put(node.id(), raftManager);
+                }
+                for (Kip500BrokerNode node : nodes.brokerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), 
"broker");
+                    props.put(KafkaConfig$.MODULE$.BrokerIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    props.put(KafkaConfig$.MODULE$.LogDirsProp(),
+                        String.join(",", node.logDataDirectories()));
+                    
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "EXTERNAL://localhost:0");
+                    
props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
+                        nodes.interBrokerListenerName().value());
+                    
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+
+                    setupNodeDirectories(baseDirectory, 
node.metadataDirectory(),
+                        node.logDataDirectories());
+
+                    // Just like above, we set a placeholder voter list here 
until we
+                    // find out what ports the controllers picked.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
dummyQuorumVotersString);
+                    KafkaConfig config = new KafkaConfig(props, false, 
Option.empty());
+
+                    String threadNamePrefix = String.format("broker%d_", 
node.id());
+                    MetaProperties metaProperties = 
MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new 
TopicPartition("@metadata", 0);
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new 
KafkaRaftManager<>(
+                            metaProperties, config, new MetadataRecordSerde(), 
metadataPartition,
+                            Time.SYSTEM, new Metrics(), 
Option.apply(threadNamePrefix), connectFutureManager.future);
+                    MetaLogManager metaLogShim = new 
MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
+                    BrokerServer broker = new BrokerServer(
+                        config,
+                        nodes.brokerProperties(node.id()),
+                        metaLogShim,
+                        time,
+                        new Metrics(),
+                        Option.apply(threadNamePrefix),
+                        
JavaConverters.asScalaBuffer(Collections.<String>emptyList()).toSeq(),
+                        connectFutureManager.future,
+                        Server.SUPPORTED_FEATURES()
+                    );
+                    kip500Brokers.put(node.id(), broker);
+                    raftManagers.put(node.id(), raftManager);
+                }
+            } catch (Exception e) {
+                if (executorService != null) {
+                    executorService.shutdownNow();
+                    executorService.awaitTermination(1, TimeUnit.DAYS);
+                }
+                for (ControllerServer controller : controllers.values()) {
+                    controller.shutdown();
+                }
+                for (BrokerServer brokerServer : kip500Brokers.values()) {
+                    brokerServer.shutdown();
+                }
+                for (KafkaRaftManager raftManager : raftManagers.values()) {
+                    raftManager.shutdown();
+                }
+                connectFutureManager.close();
+                if (baseDirectory != null) {
+                    Utils.delete(baseDirectory);
+                }
+                throw e;
+            }
+            return new KafkaClusterTestKit(executorService, nodes, controllers,
+                kip500Brokers, raftManagers, connectFutureManager, 
baseDirectory);
+        }
+
+        static private void setupNodeDirectories(File baseDirectory,
+                                                 String metadataDirectory,
+                                                 Collection<String> 
logDataDirectories) throws Exception {
+            Files.createDirectories(new File(baseDirectory, "local").toPath());
+            Files.createDirectories(Paths.get(metadataDirectory));
+            for (String logDataDirectory : logDataDirectories) {
+                Files.createDirectories(Paths.get(logDataDirectory));
+            }
+        }
+    }
+
+    private final ExecutorService executorService;
+    private final TestKitNodes nodes;
+    private final Map<Integer, ControllerServer> controllers;
+    private final Map<Integer, BrokerServer> kip500Brokers;
+    private final Map<Integer, KafkaRaftManager> raftManagers;
+    private final ControllerQuorumVotersFutureManager 
controllerQuorumVotersFutureManager;
+    private final File baseDirectory;
+
+    private KafkaClusterTestKit(ExecutorService executorService,
+                                TestKitNodes nodes,
+                                Map<Integer, ControllerServer> controllers,
+                                Map<Integer, BrokerServer> kip500Brokers,
+                                Map<Integer, KafkaRaftManager> raftManagers,
+                                ControllerQuorumVotersFutureManager 
controllerQuorumVotersFutureManager,
+                                File baseDirectory) {
+        this.executorService = executorService;
+        this.nodes = nodes;
+        this.controllers = controllers;
+        this.kip500Brokers = kip500Brokers;
+        this.raftManagers = raftManagers;
+        this.controllerQuorumVotersFutureManager = 
controllerQuorumVotersFutureManager;
+        this.baseDirectory = baseDirectory;
+    }
+
+    public void format() throws Exception {
+        List<Future<?>> futures = new ArrayList<>();
+        try {
+            for (Entry<Integer, ControllerServer> entry : 
controllers.entrySet()) {
+                int nodeId = entry.getKey();

Review comment:
       We really ought to be able to factor out a helper here. The code looks 
identical for both the controller and broker cases.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = 
LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the 
controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements 
AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new 
CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), 
e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = 
nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", 
controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of 
controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of 
controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + 
nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new 
ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = 
nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = 
Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", 
false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), 
"controller");
+                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "CONTROLLER://localhost:0");
+                    
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+                    // Note: we can't accurately set controller.quorum.voters 
yet, since we don't
+                    // yet know what ports each controller will pick.  Set it 
to a dummy string \
+                    // for now as a placeholder.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, 
dummyQuorumVotersString);
+                    setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), Collections.emptyList());
+                    KafkaConfig config = new KafkaConfig(props, false, 
Option.empty());
+
+                    String threadNamePrefix = String.format("controller%d_", 
node.id());
+                    MetaProperties metaProperties = 
MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new 
TopicPartition("@metadata", 0);

Review comment:
       I don't suppose we could reference the constant defined in 
`KafkaRaftServer`?

##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{Admin, NewTopic}
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
+import org.apache.kafka.metadata.BrokerState
+import org.junit.jupiter.api.{Test, Timeout}
+import org.junit.jupiter.api.Assertions._
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.TimeUnit
+import scala.jdk.CollectionConverters._
+
+@Timeout(120000)
+class RaftClusterTest {
+
+  @Test
+  def testCreateClusterAndClose(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(1).
+        setNumControllerNodes(1).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testCreateClusterAndWaitForBrokerInRunningState(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(3).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      TestUtils.waitUntilTrue(() => 
cluster.kip500Brokers().get(0).currentState() == BrokerState.RUNNING,
+        "Broker never made it to RUNNING state.")
+      TestUtils.waitUntilTrue(() => 
cluster.raftManagers().get(0).kafkaRaftClient.leaderAndEpoch().leaderId.isPresent,
+      "RaftManager was not initialized.")

Review comment:
       nit: alignment looks off

##########
File path: core/src/test/resources/log4j.properties
##########
@@ -18,8 +18,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=ERROR
-log4j.logger.org.apache.kafka=ERROR
+log4j.logger.kafka=INFO

Review comment:
       Guessing this was just added for debugging.

##########
File path: core/src/test/java/kafka/testkit/TestKitNodes.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.server.MetaProperties;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+public class TestKitNodes {
+    public static class Builder {
+        private Uuid clusterId = null;
+        private final NavigableMap<Integer, ControllerNode> controllerNodes = 
new TreeMap<>();

Review comment:
       It looks like we don't support mixed mode testing. That seems worth a 
follow-up JIRA. It is definitely an interesting case from the perspective of 
the raft implementation since it involves two listeners.

##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{Admin, NewTopic}
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
+import org.apache.kafka.metadata.BrokerState
+import org.junit.jupiter.api.{Test, Timeout}
+import org.junit.jupiter.api.Assertions._
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.TimeUnit
+import scala.jdk.CollectionConverters._
+
+@Timeout(120000)
+class RaftClusterTest {
+
+  @Test
+  def testCreateClusterAndClose(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(1).
+        setNumControllerNodes(1).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testCreateClusterAndWaitForBrokerInRunningState(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(3).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      TestUtils.waitUntilTrue(() => 
cluster.kip500Brokers().get(0).currentState() == BrokerState.RUNNING,
+        "Broker never made it to RUNNING state.")
+      TestUtils.waitUntilTrue(() => 
cluster.raftManagers().get(0).kafkaRaftClient.leaderAndEpoch().leaderId.isPresent,
+      "RaftManager was not initialized.")
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        assertEquals(cluster.nodes().clusterId().toString,
+          admin.describeCluster().clusterId().get())
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testCreateClusterAndCreateAndListTopic(): Unit = {

Review comment:
       Maybe we can let this cover topic deletion as well

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = 
LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the 
controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements 
AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new 
CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), 
e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = 
nodes.controllerNodes().keySet().stream().

Review comment:
       Just want to point out that this assumes all controllers are voters. It 
would be worth a follow-up to support controllers as observers as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to