This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9689a31  MINOR: Drop enable.metadata.quorum config (#9934)
9689a31 is described below

commit 9689a313f52a7b45f38faecddf018dc8ab02dc62
Author: Jason Gustafson <[email protected]>
AuthorDate: Thu Jan 21 15:16:15 2021 -0800

    MINOR: Drop enable.metadata.quorum config (#9934)
    
    The primary purpose of this patch is to remove the internal 
`enable.metadata.quorum` configuration. Instead, we rely on `process.roles` to 
determine if the self-managed quorum has been enabled. As a part of this, I've 
done the following:
    
    1. Replace the notion of "disabled" APIs with "controller-only" APIs. We 
previously marked some APIs which were intended only for the KIP-500 as 
"disabled" so that they would not be unintentionally exposed. For example, the 
Raft quorum APIs were disabled. Marking them as "controller-only" carries the 
same effect, but makes the intent that they should be only exposed by the 
KIP-500 controller clearer.
    2. Make `ForwardingManager` optional in `KafkaServer` and `KafkaApis`. 
Previously we used `null` if forwarding was enabled and relied on the metadata 
quorum check.
    3. Make `zookeeper.connect` an optional configuration if `process.roles` is 
defined.
    4. Update raft README to remove reference to `zookeeper.conntect`
    
    Reviewers: Colin Patrick McCabe <[email protected]>, Boyang Chen 
<[email protected]>
---
 .../org/apache/kafka/clients/NodeApiVersions.java  |  4 +--
 .../org/apache/kafka/common/protocol/ApiKeys.java  | 30 +++++++++++-----------
 .../org/apache/kafka/common/protocol/Protocol.java |  2 +-
 .../kafka/common/requests/ApiVersionsResponse.java |  4 +--
 .../apache/kafka/clients/NodeApiVersionsTest.java  |  8 +++---
 .../common/requests/ApiVersionsResponseTest.java   |  6 ++---
 core/src/main/scala/kafka/Kafka.scala              |  5 ++--
 .../main/scala/kafka/network/RequestChannel.scala  |  8 +++---
 .../main/scala/kafka/network/SocketServer.scala    | 14 +++++-----
 .../scala/kafka/server/ForwardingManager.scala     |  4 +--
 core/src/main/scala/kafka/server/KafkaApis.scala   | 28 +++++++++-----------
 core/src/main/scala/kafka/server/KafkaConfig.scala | 21 ++++++++-------
 core/src/main/scala/kafka/server/KafkaServer.scala | 19 +++++++-------
 .../main/scala/kafka/tools/TestRaftServer.scala    |  7 ++++-
 .../kafka/admin/BrokerApiVersionsCommandTest.scala |  2 +-
 .../test/scala/unit/kafka/api/ApiVersionTest.scala |  2 +-
 .../kafka/integration/KafkaServerTestHarness.scala | 11 ++++++--
 .../server/AbstractApiVersionsRequestTest.scala    |  2 +-
 .../CreateTopicsRequestWithForwardingTest.scala    |  6 +----
 .../unit/kafka/server/ForwardingManagerTest.scala  |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 13 +++++++---
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 17 ++++++++++++
 .../scala/unit/kafka/server/RequestQuotaTest.scala |  6 ++---
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  6 ++++-
 .../jmh/metadata/MetadataRequestBenchmark.java     |  6 ++---
 raft/README.md                                     |  9 +------
 26 files changed, 132 insertions(+), 110 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java 
b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index 7588dee..658d481 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -62,7 +62,7 @@ public class NodeApiVersions {
      */
     public static NodeApiVersions create(Collection<ApiVersion> overrides) {
         List<ApiVersion> apiVersions = new LinkedList<>(overrides);
-        for (ApiKeys apiKey : ApiKeys.enabledApis()) {
+        for (ApiKeys apiKey : ApiKeys.brokerApis()) {
             boolean exists = false;
             for (ApiVersion apiVersion : apiVersions) {
                 if (apiVersion.apiKey() == apiKey.id) {
@@ -170,7 +170,7 @@ public class NodeApiVersions {
 
         // Also handle the case where some apiKey types are not specified at 
all in the given ApiVersions,
         // which may happen when the remote is too old.
-        for (ApiKeys apiKey : ApiKeys.enabledApis()) {
+        for (ApiKeys apiKey : ApiKeys.brokerApis()) {
             if (!apiKeysText.containsKey(apiKey.id)) {
                 StringBuilder bld = new StringBuilder();
                 bld.append(apiKey.name).append("(").
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index ca1f4b8..67586d5 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -90,15 +90,15 @@ public enum ApiKeys {
     ALTER_CLIENT_QUOTAS(ApiMessageType.ALTER_CLIENT_QUOTAS, false, true),
     
DESCRIBE_USER_SCRAM_CREDENTIALS(ApiMessageType.DESCRIBE_USER_SCRAM_CREDENTIALS),
     ALTER_USER_SCRAM_CREDENTIALS(ApiMessageType.ALTER_USER_SCRAM_CREDENTIALS, 
false, true),
-    VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, false),
-    BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, 
RecordBatch.MAGIC_VALUE_V0, false, false),
-    END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, 
RecordBatch.MAGIC_VALUE_V0, false, false),
-    DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, 
RecordBatch.MAGIC_VALUE_V0, false, false),
+    VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, true),
+    BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, 
RecordBatch.MAGIC_VALUE_V0, false, true),
+    END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, 
RecordBatch.MAGIC_VALUE_V0, false, true),
+    DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, 
RecordBatch.MAGIC_VALUE_V0, false, true),
     ALTER_ISR(ApiMessageType.ALTER_ISR, true),
     UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true),
-    ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, 
false),
-    FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, 
RecordBatch.MAGIC_VALUE_V0, false, false),
-    DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER, false, 
RecordBatch.MAGIC_VALUE_V0, false, true);
+    ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, 
true),
+    FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, 
RecordBatch.MAGIC_VALUE_V0, false, true),
+    DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER);
 
     // The generator ensures every `ApiMessageType` has a unique id
     private static final Map<Integer, ApiKeys> ID_TO_TYPE = 
Arrays.stream(ApiKeys.values())
@@ -116,8 +116,8 @@ public enum ApiKeys {
     /** indicates the minimum required inter broker magic required to support 
the API */
     public final byte minRequiredInterBrokerMagic;
 
-    /** indicates whether the API is enabled and should be exposed in 
ApiVersions **/
-    public final boolean isEnabled;
+    /** indicates whether this is an API which is only exposed by the KIP-500 
controller **/
+    public final boolean isControllerOnlyApi;
 
     /** indicates whether the API is enabled for forwarding **/
     public final boolean forwardable;
@@ -139,7 +139,7 @@ public enum ApiKeys {
     }
 
     ApiKeys(ApiMessageType messageType, boolean clusterAction, byte 
minRequiredInterBrokerMagic, boolean forwardable) {
-        this(messageType, clusterAction, minRequiredInterBrokerMagic, 
forwardable, true);
+        this(messageType, clusterAction, minRequiredInterBrokerMagic, 
forwardable, false);
     }
 
     ApiKeys(
@@ -147,14 +147,14 @@ public enum ApiKeys {
         boolean clusterAction,
         byte minRequiredInterBrokerMagic,
         boolean forwardable,
-        boolean isEnabled
+        boolean isControllerOnlyApi
     ) {
         this.messageType = messageType;
         this.id = messageType.apiKey();
         this.name = messageType.name;
         this.clusterAction = clusterAction;
         this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic;
-        this.isEnabled = isEnabled;
+        this.isControllerOnlyApi = isControllerOnlyApi;
 
         this.requiresDelayedAllocation = forwardable || 
shouldRetainsBufferReference(messageType.requestSchemas());
         this.forwardable = forwardable;
@@ -210,7 +210,7 @@ public enum ApiKeys {
         b.append("<th>Name</th>\n");
         b.append("<th>Key</th>\n");
         b.append("</tr>");
-        for (ApiKeys key : ApiKeys.enabledApis()) {
+        for (ApiKeys key : ApiKeys.brokerApis()) {
             b.append("<tr>\n");
             b.append("<td>");
             b.append("<a href=\"#The_Messages_" + key.name + "\">" + key.name 
+ "</a>");
@@ -242,9 +242,9 @@ public enum ApiKeys {
         return hasBuffer.get();
     }
 
-    public static List<ApiKeys> enabledApis() {
+    public static List<ApiKeys> brokerApis() {
         return Arrays.stream(values())
-            .filter(api -> api.isEnabled)
+            .filter(api -> !api.isControllerOnlyApi)
             .collect(Collectors.toList());
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 0d573db..f31c613 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -133,7 +133,7 @@ public class Protocol {
             b.append("</pre>\n");
             schemaToFieldTableHtml(ResponseHeaderData.SCHEMAS[i], b);
         }
-        for (ApiKeys key : ApiKeys.enabledApis()) {
+        for (ApiKeys key : ApiKeys.brokerApis()) {
             // Key
             b.append("<h5>");
             b.append("<a name=\"The_Messages_" + key.name + "\">");
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 3a6ff2f..2bf9360 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -119,7 +119,7 @@ public class ApiVersionsResponse extends AbstractResponse {
 
     public static ApiVersionCollection defaultApiKeys(final byte minMagic) {
         ApiVersionCollection apiKeys = new ApiVersionCollection();
-        for (ApiKeys apiKey : ApiKeys.enabledApis()) {
+        for (ApiKeys apiKey : ApiKeys.brokerApis()) {
             if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
                 apiKeys.add(ApiVersionsResponse.toApiVersion(apiKey));
             }
@@ -137,7 +137,7 @@ public class ApiVersionsResponse extends AbstractResponse {
     public static ApiVersionCollection intersectControllerApiVersions(final 
byte minMagic,
                                                                       final 
Map<ApiKeys, ApiVersion> activeControllerApiVersions) {
         ApiVersionCollection apiKeys = new ApiVersionCollection();
-        for (ApiKeys apiKey : ApiKeys.enabledApis()) {
+        for (ApiKeys apiKey : ApiKeys.brokerApis()) {
             if (apiKey.minRequiredInterBrokerMagic <= minMagic) {
                 ApiVersion brokerApiVersion = toApiVersion(apiKey);
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index 01d8cf6..7c19d9f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -38,7 +38,7 @@ public class NodeApiVersionsTest {
         NodeApiVersions versions = new NodeApiVersions(new 
ApiVersionCollection());
         StringBuilder bld = new StringBuilder();
         String prefix = "(";
-        for (ApiKeys apiKey : ApiKeys.enabledApis()) {
+        for (ApiKeys apiKey : ApiKeys.brokerApis()) {
             bld.append(prefix).append(apiKey.name).
                     append("(").append(apiKey.id).append("): UNSUPPORTED");
             prefix = ", ";
@@ -143,10 +143,10 @@ public class NodeApiVersionsTest {
                 .setMaxVersion((short) 1));
         NodeApiVersions versions = new NodeApiVersions(versionList);
         for (ApiKeys apiKey: ApiKeys.values()) {
-            if (apiKey.isEnabled) {
-                assertEquals(apiKey.latestVersion(), 
versions.latestUsableVersion(apiKey));
-            } else {
+            if (apiKey.isControllerOnlyApi) {
                 assertNull(versions.apiVersion(apiKey));
+            } else {
+                assertEquals(apiKey.latestVersion(), 
versions.latestUsableVersion(apiKey));
             }
         }
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index f387a06..38a5869 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -40,7 +40,7 @@ public class ApiVersionsResponseTest {
 
     @Test
     public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() {
-        
assertEquals(apiKeysInResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE),
 new HashSet<>(ApiKeys.enabledApis()));
+        
assertEquals(apiKeysInResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE),
 new HashSet<>(ApiKeys.brokerApis()));
         
assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().supportedFeatures().isEmpty());
         
assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeatures().isEmpty());
         assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, 
ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeaturesEpoch());
@@ -49,9 +49,9 @@ public class ApiVersionsResponseTest {
     @Test
     public void shouldHaveCorrectDefaultApiVersionsResponse() {
         Collection<ApiVersion> apiVersions = 
ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys();
-        assertEquals(apiVersions.size(), ApiKeys.enabledApis().size(), "API 
versions for all API keys must be maintained.");
+        assertEquals(apiVersions.size(), ApiKeys.brokerApis().size(), "API 
versions for all API keys must be maintained.");
 
-        for (ApiKeys key : ApiKeys.enabledApis()) {
+        for (ApiKeys key : ApiKeys.brokerApis()) {
             ApiVersion version = 
ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.apiVersion(key.id);
             assertNotNull(version, "Could not find ApiVersion for API " + 
key.name);
             assertEquals(version.minVersion(), key.oldestVersion(), "Incorrect 
min version for Api " + key.name);
diff --git a/core/src/main/scala/kafka/Kafka.scala 
b/core/src/main/scala/kafka/Kafka.scala
index 1e1f345..4e278c9 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -65,11 +65,12 @@ object Kafka extends Logging {
 
   private def buildServer(props: Properties): Server = {
     val config = KafkaConfig.fromProps(props, false)
-    if (config.processRoles.isEmpty) {
+    if (config.requiresZookeeper) {
       new KafkaServer(
         config,
         Time.SYSTEM,
-        threadNamePrefix = None
+        threadNamePrefix = None,
+        enableForwarding = false
       )
     } else {
       new KafkaRaftServer(
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index 777c940..7d31125 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -59,11 +59,11 @@ object RequestChannel extends Logging {
     val sanitizedUser: String = Sanitizer.sanitize(principal.getName)
   }
 
-  class Metrics(allowDisabledApis: Boolean = false) {
+  class Metrics(allowControllerOnlyApis: Boolean = false) {
 
     private val metricsMap = mutable.Map[String, RequestMetrics]()
 
-    (ApiKeys.values.toSeq.filter(_.isEnabled || allowDisabledApis).map(_.name) 
++
+    (ApiKeys.values.toSeq.filter(!_.isControllerOnlyApi || 
allowControllerOnlyApis).map(_.name) ++
         Seq(RequestMetrics.consumerFetchMetricName, 
RequestMetrics.followFetchMetricName)).foreach { name =>
       metricsMap.put(name, new RequestMetrics(name))
     }
@@ -337,9 +337,9 @@ object RequestChannel extends Logging {
 class RequestChannel(val queueSize: Int,
                      val metricNamePrefix: String,
                      time: Time,
-                     allowDisabledApis: Boolean = false) extends 
KafkaMetricsGroup {
+                     allowControllerOnlyApis: Boolean = false) extends 
KafkaMetricsGroup {
   import RequestChannel._
-  val metrics = new RequestChannel.Metrics(allowDisabledApis)
+  val metrics = new RequestChannel.Metrics(allowControllerOnlyApis)
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
   val requestQueueSizeMetricName = 
metricNamePrefix.concat(RequestQueueSizeMetric)
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 9345452..da02c5e 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -77,7 +77,7 @@ class SocketServer(val config: KafkaConfig,
                    val metrics: Metrics,
                    val time: Time,
                    val credentialProvider: CredentialProvider,
-                   val allowDisabledApis: Boolean = false)
+                   val allowControllerOnlyApis: Boolean = false)
   extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
 
   private val maxQueuedRequests = config.queuedMaxRequests
@@ -93,12 +93,12 @@ class SocketServer(val config: KafkaConfig,
   // data-plane
   private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
   private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
Acceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, 
DataPlaneMetricPrefix, time, allowDisabledApis)
+  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, 
DataPlaneMetricPrefix, time, allowControllerOnlyApis)
   // control-plane
   private var controlPlaneProcessorOpt : Option[Processor] = None
   private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
   val controlPlaneRequestChannelOpt: Option[RequestChannel] = 
config.controlPlaneListenerName.map(_ =>
-    new RequestChannel(20, ControlPlaneMetricPrefix, time, allowDisabledApis))
+    new RequestChannel(20, ControlPlaneMetricPrefix, time, 
allowControllerOnlyApis))
 
   private var nextProcessorId = 0
   val connectionQuotas = new ConnectionQuotas(config, time, metrics)
@@ -429,7 +429,7 @@ class SocketServer(val config: KafkaConfig,
       memoryPool,
       logContext,
       isPrivilegedListener = isPrivilegedListener,
-      allowDisabledApis = allowDisabledApis
+      allowControllerOnlyApis = allowControllerOnlyApis
     )
   }
 
@@ -790,7 +790,7 @@ private[kafka] class Processor(val id: Int,
                                logContext: LogContext,
                                connectionQueueSize: Int = ConnectionQueueSize,
                                isPrivilegedListener: Boolean = false,
-                               allowDisabledApis: Boolean = false) extends 
AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+                               allowControllerOnlyApis: Boolean = false) 
extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
   private object ConnectionId {
     def fromString(s: String): Option[ConnectionId] = s.split("-") match {
@@ -981,10 +981,10 @@ private[kafka] class Processor(val id: Int,
 
   protected def parseRequestHeader(buffer: ByteBuffer): RequestHeader = {
     val header = RequestHeader.parse(buffer)
-    if (header.apiKey.isEnabled || allowDisabledApis) {
+    if (!header.apiKey.isControllerOnlyApi || allowControllerOnlyApis) {
       header
     } else {
-      throw new InvalidRequestException("Received request for disabled api key 
" + header.apiKey)
+      throw new InvalidRequestException("Received request for KIP-500 
controller-only api key " + header.apiKey)
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala 
b/core/src/main/scala/kafka/server/ForwardingManager.scala
index 8d285e8..20fda29 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -36,7 +36,7 @@ trait ForwardingManager {
     responseCallback: Option[AbstractResponse] => Unit
   ): Unit
 
-  def controllerApiVersions(): Option[NodeApiVersions]
+  def controllerApiVersions: Option[NodeApiVersions]
 
   def start(): Unit = {}
 
@@ -140,7 +140,7 @@ class ForwardingManagerImpl(
     channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler)
   }
 
-  override def controllerApiVersions(): Option[NodeApiVersions] =
+  override def controllerApiVersions: Option[NodeApiVersions] =
     channelManager.controllerApiVersions()
 
   private def parseResponse(
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 09c1ba4..9004cef 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -101,7 +101,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                 val groupCoordinator: GroupCoordinator,
                 val txnCoordinator: TransactionCoordinator,
                 val controller: KafkaController,
-                val forwardingManager: ForwardingManager,
+                val forwardingManager: Option[ForwardingManager],
                 val zkClient: KafkaZkClient,
                 val brokerId: Int,
                 val config: KafkaConfig,
@@ -131,7 +131,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def isForwardingEnabled(request: RequestChannel.Request): Boolean = {
-    config.metadataQuorumEnabled && request.context.principalSerde.isPresent
+    forwardingManager.isDefined && request.context.principalSerde.isPresent
   }
 
   private def maybeForwardToController(
@@ -149,12 +149,12 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    if (!request.isForwarded && !controller.isActive && 
isForwardingEnabled(request)) {
-      forwardingManager.forwardRequest(request, responseCallback)
-    } else {
-      // When the KIP-500 mode is off or the principal serde is undefined, 
forwarding is not supported,
-      // therefore requests are handled directly.
-      handler(request)
+    forwardingManager match {
+      case Some(mgr) if !request.isForwarded && !controller.isActive =>
+        mgr.forwardRequest(request, responseCallback)
+
+      case _ =>
+        handler(request)
     }
   }
 
@@ -1742,11 +1742,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       else {
         val supportedFeatures = brokerFeatures.supportedFeatures
         val finalizedFeaturesOpt = finalizedFeatureCache.get
-        val controllerApiVersions = if (isForwardingEnabled(request)) {
-          forwardingManager.controllerApiVersions()
-        } else {
-          None
-        }
+        val controllerApiVersions = 
forwardingManager.flatMap(_.controllerApiVersions)
 
         val apiVersionsResponse =
           finalizedFeaturesOpt match {
@@ -3251,9 +3247,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     // If forwarding is not yet enabled or this request has been received on 
an invalid endpoint,
     // then we treat the request as unparsable and close the connection.
-    if (!config.metadataQuorumEnabled) {
+    if (!isForwardingEnabled(request)) {
       info(s"Closing connection ${request.context.connectionId} because it 
sent an `Envelope` " +
-        s"request, which is not accepted without enabling the internal config 
${KafkaConfig.EnableMetadataQuorumProp}")
+        "request even though forwarding has not been enabled")
       requestHelper.closeConnection(request, Collections.emptyMap())
       return
     } else if (!request.context.fromPrivilegedListener) {
@@ -3279,7 +3275,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val forwardedRequestHeader = 
parseForwardedRequestHeader(forwardedRequestBuffer)
 
     val forwardedApi = forwardedRequestHeader.apiKey
-    if (!forwardedApi.forwardable || !forwardedApi.isEnabled) {
+    if (!forwardedApi.forwardable) {
       throw new InvalidRequestException(s"API $forwardedApi is not enabled or 
is not eligible for forwarding")
     }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index fac691e..e933459 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -363,7 +363,6 @@ object KafkaConfig {
   val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMsProp = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMaxMsProp = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
-  val EnableMetadataQuorumProp = "enable.metadata.quorum"
   val ProcessRolesProp = "process.roles"
 
   /************* Authorizer Configuration ***********/
@@ -648,6 +647,9 @@ object KafkaConfig {
   val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
   val ConnectionSetupTimeoutMsDoc = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC
   val ConnectionSetupTimeoutMaxMsDoc = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC
+  val ProcessRolesDoc = "The roles that this process plays: 'broker', 
'controller', or 'broker,controller' if it is both. " +
+    "This configuration is only for clusters upgraded for KIP-500, which 
replaces the dependence on Zookeeper with " +
+    "a self-managed Raft quorum. Leave this config undefined or empty for 
Zookeeper clusters."
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameDoc = s"The fully qualified name of a class that 
implements s${classOf[Authorizer].getName}" +
   " interface, which is used by the broker for authorization. This config also 
supports authorizers that implement the deprecated" +
@@ -997,9 +999,6 @@ object KafkaConfig {
   val PasswordEncoderKeyLengthDoc =  "The key length used for encoding 
dynamically configured passwords."
   val PasswordEncoderIterationsDoc =  "The iteration count used for encoding 
dynamically configured passwords."
 
-  /** ********* Experimental metadata quorum configuration ***********/
-  val ProcessRolesDoc = "This configuration determines what roles this process 
should play: broker, controller, or both"
-
   private val configDef = {
     import ConfigDef.Importance._
     import ConfigDef.Range._
@@ -1009,7 +1008,7 @@ object KafkaConfig {
     new ConfigDef()
 
       /** ********* Zookeeper Configuration ***********/
-      .define(ZkConnectProp, STRING, HIGH, ZkConnectDoc)
+      .define(ZkConnectProp, STRING, null, HIGH, ZkConnectDoc)
       .define(ZkSessionTimeoutMsProp, INT, Defaults.ZkSessionTimeoutMs, HIGH, 
ZkSessionTimeoutMsDoc)
       .define(ZkConnectionTimeoutMsProp, INT, null, HIGH, 
ZkConnectionTimeoutMsDoc)
       .define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, 
ZkSyncTimeMsDoc)
@@ -1044,9 +1043,6 @@ object KafkaConfig {
       .define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, 
RequestTimeoutMsDoc)
       .define(ConnectionSetupTimeoutMsProp, LONG, 
Defaults.ConnectionSetupTimeoutMs, MEDIUM, ConnectionSetupTimeoutMsDoc)
       .define(ConnectionSetupTimeoutMaxMsProp, LONG, 
Defaults.ConnectionSetupTimeoutMaxMs, MEDIUM, ConnectionSetupTimeoutMaxMsDoc)
-
-      // Experimental flag to turn on APIs required for the internal metadata 
quorum (KIP-500)
-      .defineInternal(EnableMetadataQuorumProp, BOOLEAN, false, LOW)
       .defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), 
ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc)
 
       /************* Authorizer Configuration ***********/
@@ -1480,6 +1476,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   var brokerId: Int = getInt(KafkaConfig.BrokerIdProp)
   val processRoles = parseProcessRoles()
 
+  def requiresZookeeper: Boolean = processRoles.isEmpty
+
   private def parseProcessRoles(): Set[ProcessRole] = {
     val roles = getList(KafkaConfig.ProcessRolesProp).asScala.map {
       case "broker" => BrokerRole
@@ -1619,9 +1617,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   /** ********* Feature configuration ***********/
   def isFeatureVersioningSupported = interBrokerProtocolVersion >= 
KAFKA_2_7_IV0
 
-  /** ********* Experimental metadata quorum configuration ***********/
-  def metadataQuorumEnabled = getBoolean(KafkaConfig.EnableMetadataQuorumProp)
-
   /** ********* Group coordinator configuration ***********/
   val groupMinSessionTimeoutMs = 
getInt(KafkaConfig.GroupMinSessionTimeoutMsProp)
   val groupMaxSessionTimeoutMs = 
getInt(KafkaConfig.GroupMaxSessionTimeoutMsProp)
@@ -1910,5 +1905,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
         
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs 
should always be less than" +
         s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to 
prevent failed" +
         s" authentication responses from timing out")
+
+    if (requiresZookeeper && zkConnect == null) {
+      throw new ConfigException(s"Missing required configuration 
'${KafkaConfig.ZkConnectProp}' which has no default value.")
+    }
   }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index b1748ef..fabd019 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -87,6 +87,7 @@ class KafkaServer(
   val config: KafkaConfig,
   time: Time = Time.SYSTEM,
   threadNamePrefix: Option[String] = None,
+  enableForwarding: Boolean = false
 ) extends Server with Logging with KafkaMetricsGroup {
 
   private val startupComplete = new AtomicBoolean(false)
@@ -129,7 +130,7 @@ class KafkaServer(
 
   var kafkaController: KafkaController = null
 
-  var forwardingManager: ForwardingManager = null
+  var forwardingManager: Option[ForwardingManager] = None
 
   var alterIsrManager: AlterIsrManager = null
 
@@ -254,10 +255,10 @@ class KafkaServer(
         // Delay starting processors until the end of the initialization 
sequence to ensure
         // that credentials have been loaded before processing authentications.
         //
-        // Note that we allow the use of disabled APIs when experimental 
support for
-        // the internal metadata quorum has been enabled
+        // Note that we allow the use of KIP-500 controller APIs when 
forwarding is enabled
+        // so that the Envelope request is exposed. This is only used in 
testing currently.
         socketServer = new SocketServer(config, metrics, time, 
credentialProvider,
-          allowDisabledApis = config.metadataQuorumEnabled)
+          allowControllerOnlyApis = enableForwarding)
         socketServer.startup(startProcessingRequests = false)
 
         /* start replica manager */
@@ -293,15 +294,15 @@ class KafkaServer(
         kafkaController = new KafkaController(config, zkClient, time, metrics, 
brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, 
threadNamePrefix)
         kafkaController.startup()
 
-        if (config.metadataQuorumEnabled) {
-          forwardingManager = ForwardingManager(
+        if (enableForwarding) {
+          this.forwardingManager = Some(ForwardingManager(
             config,
             metadataCache,
             time,
             metrics,
             threadNamePrefix
-          )
-          forwardingManager.start()
+          ))
+          forwardingManager.foreach(_.start())
         }
 
         adminManager = new ZkAdminManager(config, metrics, metadataCache, 
zkClient)
@@ -685,7 +686,7 @@ class KafkaServer(
           CoreUtils.swallow(alterIsrManager.shutdown(), this)
 
         if (forwardingManager != null)
-          CoreUtils.swallow(forwardingManager.shutdown(), this)
+          CoreUtils.swallow(forwardingManager.foreach(_.shutdown()), this)
 
         if (logManager != null)
           CoreUtils.swallow(logManager.shutdown(), this)
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala 
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 9ac3f0f..68b2d06 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -66,7 +66,7 @@ class TestRaftServer(
     tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
     credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, 
tokenCache)
 
-    socketServer = new SocketServer(config, metrics, time, credentialProvider, 
allowDisabledApis = true)
+    socketServer = new SocketServer(config, metrics, time, credentialProvider, 
allowControllerOnlyApis = true)
     socketServer.startup(startProcessingRequests = false)
 
     raftManager = new KafkaRaftManager[Array[Byte]](
@@ -413,6 +413,11 @@ object TestRaftServer extends Logging {
 
       val configFile = opts.options.valueOf(opts.configOpt)
       val serverProps = Utils.loadProps(configFile)
+
+      // KafkaConfig requires either `process.roles` or `zookeeper.connect`. 
Neither are
+      // actually used by the test server, so we fill in `process.roles` with 
an arbitrary value.
+      serverProps.put(KafkaConfig.ProcessRolesProp, "controller")
+
       val config = KafkaConfig.fromProps(serverProps, doLog = false)
       val throughput = opts.options.valueOf(opts.throughputOpt)
       val recordSize = opts.options.valueOf(opts.recordSizeOpt)
diff --git 
a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
 
b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
index 5dc6ba4..6224591 100644
--- 
a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -55,7 +55,7 @@ class BrokerApiVersionsCommandTest extends 
KafkaServerTestHarness {
     assertTrue(lineIter.hasNext)
     assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next())
     val nodeApiVersions = NodeApiVersions.create
-    val enabledApis = ApiKeys.enabledApis.asScala
+    val enabledApis = ApiKeys.brokerApis.asScala
     for (apiKey <- enabledApis) {
       val apiVersion = nodeApiVersions.apiVersion(apiKey)
       assertNotNull(apiVersion)
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala 
b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index f3c16fc..1e86876 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -232,7 +232,7 @@ class ApiVersionTest {
       Features.emptySupportedFeatures,
       None
     )
-    assertEquals(new util.HashSet[ApiKeys](ApiKeys.enabledApis), 
apiKeysInResponse(response))
+    assertEquals(new util.HashSet[ApiKeys](ApiKeys.brokerApis), 
apiKeysInResponse(response))
     assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, 
response.throttleTimeMs)
     assertTrue(response.data.supportedFeatures.isEmpty)
     assertTrue(response.data.finalizedFeatures.isEmpty)
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 8eae07e..63ba976 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -85,6 +85,7 @@ abstract class KafkaServerTestHarness extends 
ZooKeeperTestHarness {
   protected def serverSaslProperties: Option[Properties] = None
   protected def clientSaslProperties: Option[Properties] = None
   protected def brokerTime(brokerId: Int): Time = Time.SYSTEM
+  protected def enableForwarding: Boolean = false
 
   @BeforeEach
   override def setUp(): Unit = {
@@ -98,8 +99,14 @@ abstract class KafkaServerTestHarness extends 
ZooKeeperTestHarness {
 
     // Add each broker to `servers` buffer as soon as it is created to ensure 
that brokers
     // are shutdown cleanly in tearDown even if a subsequent broker fails to 
start
-    for (config <- configs)
-      servers += TestUtils.createServer(config, time = 
brokerTime(config.brokerId))
+    for (config <- configs) {
+      servers += TestUtils.createServer(
+        config,
+        time = brokerTime(config.brokerId),
+        threadNamePrefix = None,
+        enableForwarding
+      )
+    }
     brokerList = TestUtils.bootstrapServers(servers, listenerName)
     alive = new Array[Boolean](servers.length)
     Arrays.fill(alive, true)
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
index fd599c6..d71764e 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
@@ -47,7 +47,7 @@ abstract class AbstractApiVersionsRequestTest extends 
BaseRequestTest {
   }
 
   def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, 
listenerName: ListenerName = interBrokerListenerName): Unit = {
-    val expectedApis = ApiKeys.enabledApis()
+    val expectedApis = ApiKeys.brokerApis()
     if (listenerName == controlPlaneListenerName) {
       expectedApis.add(ApiKeys.ENVELOPE)
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
 
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
index b35286d..a538ceb 100644
--- 
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala
@@ -17,8 +17,6 @@
 
 package kafka.server
 
-import java.util.Properties
-
 import org.apache.kafka.common.protocol.Errors
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Test
@@ -27,9 +25,7 @@ import scala.jdk.CollectionConverters._
 
 class CreateTopicsRequestWithForwardingTest extends 
AbstractCreateTopicsRequestTest {
 
-  override def brokerPropertyOverrides(properties: Properties): Unit = {
-    properties.put(KafkaConfig.EnableMetadataQuorumProp, true.toString)
-  }
+  override def enableForwarding: Boolean = true
 
   @Test
   def testForwardToController(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
index d94d065..200635f 100644
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
@@ -191,7 +191,7 @@ class ForwardingManagerTest {
       startTimeNanos = time.nanoseconds(),
       memoryPool = MemoryPool.NONE,
       buffer = requestBuffer,
-      metrics = new RequestChannel.Metrics(allowDisabledApis = true),
+      metrics = new RequestChannel.Metrics(allowControllerOnlyApis = true),
       envelope = None
     )
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 22a97ce..909ecc6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -124,14 +124,19 @@ class KafkaApisTest {
     val properties = TestUtils.createBrokerConfig(brokerId, "zk")
     properties.put(KafkaConfig.InterBrokerProtocolVersionProp, 
interBrokerProtocolVersion.toString)
     properties.put(KafkaConfig.LogMessageFormatVersionProp, 
interBrokerProtocolVersion.toString)
-    properties.put(KafkaConfig.EnableMetadataQuorumProp, 
enableForwarding.toString)
+
+    val forwardingManagerOpt = if (enableForwarding)
+      Some(this.forwardingManager)
+    else
+      None
+
     new KafkaApis(requestChannel,
       replicaManager,
       adminManager,
       groupCoordinator,
       txnCoordinator,
       controller,
-      forwardingManager,
+      forwardingManagerOpt,
       zkClient,
       brokerId,
       new KafkaConfig(properties),
@@ -601,7 +606,7 @@ class KafkaApisTest {
     val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS, 
ApiKeys.API_VERSIONS.latestVersion, clientId, 0)
 
     val permittedVersion: Short = 0
-    EasyMock.expect(forwardingManager.controllerApiVersions()).andReturn(
+    EasyMock.expect(forwardingManager.controllerApiVersions).andReturn(
       Some(NodeApiVersions.create(ApiKeys.ALTER_CONFIGS.id, permittedVersion, 
permittedVersion)))
 
     val capturedResponse = expectNoThrottling()
@@ -637,7 +642,7 @@ class KafkaApisTest {
 
     val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS, 
ApiKeys.API_VERSIONS.latestVersion, clientId, 0)
 
-    EasyMock.expect(forwardingManager.controllerApiVersions()).andReturn(None)
+    EasyMock.expect(forwardingManager.controllerApiVersions).andReturn(None)
 
     val capturedResponse = expectNoThrottling()
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 863dfd8..6cbf04a 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -995,4 +995,21 @@ class KafkaConfigTest {
     val raftConfig = new RaftConfig(KafkaConfig.fromProps(props))
     assertEquals(expectedVoters, raftConfig.quorumVoterConnections())
   }
+
+  @Test
+  def testZookeeperConnectRequiredIfEmptyProcessRoles(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "")
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+    assertFalse(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def testZookeeperConnectNotRequiredIfNonEmptyProcessRoles(): Unit = {
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "broker")
+    props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092")
+    assertTrue(isValidKafkaConfig(props))
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ba074b8..698ecef 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -171,7 +171,7 @@ class RequestQuotaTest extends BaseRequestTest {
   def testUnauthorizedThrottle(): Unit = {
     RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
 
-    for (apiKey <- ApiKeys.enabledApis.asScala) {
+    for (apiKey <- ApiKeys.brokerApis.asScala) {
       submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
     }
 
@@ -739,9 +739,9 @@ class RequestQuotaTest extends BaseRequestTest {
 }
 
 object RequestQuotaTest {
-  val ClusterActions = 
ApiKeys.enabledApis.asScala.filter(_.clusterAction).toSet
+  val ClusterActions = ApiKeys.brokerApis.asScala.filter(_.clusterAction).toSet
   val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE)
-  val ClientActions = ApiKeys.enabledApis.asScala.toSet -- ClusterActions -- 
SaslActions
+  val ClientActions = ApiKeys.brokerApis.asScala.toSet -- ClusterActions -- 
SaslActions
 
   val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"Unauthorized")
   // Principal used for all client connections. This is modified by tests which
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ee4c9f7..f786240 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -156,7 +156,11 @@ object TestUtils extends Logging {
   }
 
   def createServer(config: KafkaConfig, time: Time, threadNamePrefix: 
Option[String]): KafkaServer = {
-    val server = new KafkaServer(config, time, threadNamePrefix = 
threadNamePrefix)
+    createServer(config, time, threadNamePrefix, enableForwarding = false)
+  }
+
+  def createServer(config: KafkaConfig, time: Time, threadNamePrefix: 
Option[String], enableForwarding: Boolean): KafkaServer = {
+    val server = new KafkaServer(config, time, threadNamePrefix, 
enableForwarding)
     server.startup()
     server
   }
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 32cb417..2089739 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -22,7 +22,6 @@ import kafka.coordinator.group.GroupCoordinator;
 import kafka.coordinator.transaction.TransactionCoordinator;
 import kafka.network.RequestChannel;
 import kafka.network.RequestConvertToJson;
-import kafka.server.ZkAdminManager;
 import kafka.server.BrokerFeatures;
 import kafka.server.BrokerTopicStats;
 import kafka.server.ClientQuotaManager;
@@ -30,7 +29,6 @@ import kafka.server.ClientRequestQuotaManager;
 import kafka.server.ControllerMutationQuotaManager;
 import kafka.server.FetchManager;
 import kafka.server.FinalizedFeatureCache;
-import kafka.server.ForwardingManager;
 import kafka.server.KafkaApis;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaConfig$;
@@ -38,6 +36,7 @@ import kafka.server.MetadataCache;
 import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 import kafka.server.ReplicationQuotaManager;
+import kafka.server.ZkAdminManager;
 import kafka.zk.KafkaZkClient;
 import org.apache.kafka.common.memory.MemoryPool;
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
@@ -99,7 +98,6 @@ public class MetadataRequestBenchmark {
     private ZkAdminManager adminManager = Mockito.mock(ZkAdminManager.class);
     private TransactionCoordinator transactionCoordinator = 
Mockito.mock(TransactionCoordinator.class);
     private KafkaController kafkaController = 
Mockito.mock(KafkaController.class);
-    private ForwardingManager forwardingManager = 
Mockito.mock(ForwardingManager.class);
     private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
     private Metrics metrics = new Metrics();
     private int brokerId = 1;
@@ -176,7 +174,7 @@ public class MetadataRequestBenchmark {
             groupCoordinator,
             transactionCoordinator,
             kafkaController,
-            forwardingManager,
+            Option.empty(),
             kafkaZkClient,
             brokerId,
             new KafkaConfig(kafkaProps),
diff --git a/raft/README.md b/raft/README.md
index 383470b..6b41879 100644
--- a/raft/README.md
+++ b/raft/README.md
@@ -12,8 +12,7 @@ Below we describe the details to set this up.
     bin/test-raft-server-start.sh config/raft.properties
 
 ### Run Multi Node Quorum ###
-Create 3 separate raft quorum properties as the following
-(note that the `zookeeper.connect` config is required, but unused):
+Create 3 separate raft quorum properties as the following:
 
 `cat << EOF >> config/raft-quorum-1.properties`
     
@@ -21,8 +20,6 @@ Create 3 separate raft quorum properties as the following
     listeners=PLAINTEXT://localhost:9092
     controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094
     log.dirs=/tmp/raft-logs-1
-    
-    zookeeper.connect=localhost:2181
     EOF
 
 `cat << EOF >> config/raft-quorum-2.properties`
@@ -31,8 +28,6 @@ Create 3 separate raft quorum properties as the following
     listeners=PLAINTEXT://localhost:9093
     controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094
     log.dirs=/tmp/raft-logs-2
-    
-    zookeeper.connect=localhost:2181
     EOF
     
 `cat << EOF >> config/raft-quorum-3.properties`
@@ -41,8 +36,6 @@ Create 3 separate raft quorum properties as the following
     listeners=PLAINTEXT://localhost:9094
     controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094
     log.dirs=/tmp/raft-logs-3
-    
-    zookeeper.connect=localhost:2181
     EOF
  
 Open up 3 separate terminals, and run individual commands:

Reply via email to