This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit aa532b44199f9610f1b6e587acecc50a9e1a9b9c
Author: Akhilesh C <[email protected]>
AuthorDate: Thu Dec 15 14:16:41 2022 -0800

    KAFKA-14446: API forwarding support from zkBrokers to the Controller 
(#12961)
    
    This PR enables brokers which are upgrading from ZK mode to KRaft mode to 
forward certain metadata
    change requests to the controller instead of applying them directly through 
ZK. To faciliate this,
    we now support EnvelopeRequest on zkBrokers (instead of only on KRaft 
nodes.)
    
    In BrokerToControllerChannelManager, we can now reinitialize our 
NetworkClient. This is needed to
    handle the case when we transition from forwarding requests to a ZK-based 
broker over the
    inter-broker listener, to forwarding requests to a quorum node over the 
controller listener.
    
    In MetadataCache.scala, distinguish between KRaft and ZK controller nodes 
with a new type,
    CachedControllerId.
    
    In LeaderAndIsrRequest, StopReplicaRequest, and UpdateMetadataRequest, 
switch from sending both a
    zk and a KRaft controller ID to sending a single controller ID plus a 
boolean to express whether it
    is KRaft. The previous scheme was ambiguous as to whether the system was in 
KRaft or ZK mode when
    both IDs were -1 (although this case is unlikely to come up in practice). 
The new scheme avoids
    this ambiguity and is simpler to understand.
    
    Reviewers: dengziming <[email protected]>, David Arthur 
<[email protected]>, Colin P. McCabe <[email protected]>
---
 .../common/requests/AbstractControlRequest.java    |  10 +-
 .../kafka/common/requests/LeaderAndIsrRequest.java |  18 ++-
 .../kafka/common/requests/StopReplicaRequest.java  |  18 ++-
 .../common/requests/UpdateMetadataRequest.java     |  26 ++++-
 .../resources/common/message/EnvelopeRequest.json  |   2 +-
 .../common/message/LeaderAndIsrRequest.json        |   4 +-
 .../common/message/StopReplicaRequest.json         |   4 +-
 .../common/message/UpdateMetadataRequest.json      |   4 +-
 .../common/requests/ApiVersionsResponseTest.java   |   1 -
 core/src/main/scala/kafka/Kafka.scala              |   8 +-
 .../scala/kafka/common/InterBrokerSendThread.scala |   2 +-
 .../server/BrokerToControllerChannelManager.scala  | 121 ++++++++++++++-------
 core/src/main/scala/kafka/server/KafkaApis.scala   |  18 ++-
 core/src/main/scala/kafka/server/KafkaServer.scala |  22 +++-
 .../main/scala/kafka/server/MetadataCache.scala    |  18 ++-
 .../kafka/server/metadata/KRaftMetadataCache.scala |  11 +-
 .../kafka/server/metadata/ZkMetadataCache.scala    |  58 ++++++++--
 .../kafka/api/IntegrationTestHarness.scala         |   4 +
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |   2 +-
 .../kafka/server/QuorumTestHarness.scala           |   4 +
 .../BrokerToControllerRequestThreadTest.scala      |  52 ++++++---
 .../src/test/scala/kafka/utils/TestInfoUtils.scala |   8 ++
 .../kafka/integration/KafkaServerTestHarness.scala |   3 +-
 .../unit/kafka/server/ApiVersionManagerTest.scala  |   6 +-
 .../kafka/server/BrokerLifecycleManagerTest.scala  |   9 +-
 .../server/BrokerRegistrationRequestTest.scala     |  15 ++-
 .../kafka/server/CreateTopicsRequestTest.scala     |  10 +-
 .../kafka/server/DeleteTopicsRequestTest.scala     |   5 +-
 .../unit/kafka/server/ForwardingManagerTest.scala  |  24 ++--
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   2 +-
 .../MockBrokerToControllerChannelManager.scala     |   2 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   8 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   7 +-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |   3 +-
 .../jmh/metadata/MetadataRequestBenchmark.java     |   3 +-
 .../apache/kafka/jmh/server/CheckpointBench.java   |   4 +-
 .../kafka/jmh/server/PartitionCreationBench.java   |   6 +-
 .../kafka/server/common/MetadataVersion.java       |   4 +
 38 files changed, 383 insertions(+), 143 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java
index dc4a1e21e8d..789a4abeaba 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java
@@ -27,14 +27,20 @@ public abstract class AbstractControlRequest extends 
AbstractRequest {
         protected final int controllerId;
         protected final int controllerEpoch;
         protected final long brokerEpoch;
+        protected final boolean kraftController;
 
         protected Builder(ApiKeys api, short version, int controllerId, int 
controllerEpoch, long brokerEpoch) {
+            this(api, version, controllerId, controllerEpoch, brokerEpoch, 
false);
+        }
+
+        protected Builder(ApiKeys api, short version, int controllerId, int 
controllerEpoch,
+                          long brokerEpoch, boolean kraftController) {
             super(api, version);
             this.controllerId = controllerId;
             this.controllerEpoch = controllerEpoch;
             this.brokerEpoch = brokerEpoch;
+            this.kraftController = kraftController;
         }
-
     }
 
     protected AbstractControlRequest(ApiKeys api, short version) {
@@ -43,6 +49,8 @@ public abstract class AbstractControlRequest extends 
AbstractRequest {
 
     public abstract int controllerId();
 
+    public abstract boolean isKRaftController();
+
     public abstract int controllerEpoch();
 
     public abstract long brokerEpoch();
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
index d7382863176..257d8e78bfc 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
@@ -51,7 +51,14 @@ public class LeaderAndIsrRequest extends 
AbstractControlRequest {
         public Builder(short version, int controllerId, int controllerEpoch, 
long brokerEpoch,
                        List<LeaderAndIsrPartitionState> partitionStates, 
Map<String, Uuid> topicIds,
                        Collection<Node> liveLeaders) {
-            super(ApiKeys.LEADER_AND_ISR, version, controllerId, 
controllerEpoch, brokerEpoch);
+            this(version, controllerId, controllerEpoch, brokerEpoch, 
partitionStates, topicIds,
+                liveLeaders, false);
+        }
+
+        public Builder(short version, int controllerId, int controllerEpoch, 
long brokerEpoch,
+                       List<LeaderAndIsrPartitionState> partitionStates, 
Map<String, Uuid> topicIds,
+                       Collection<Node> liveLeaders, boolean kraftController) {
+            super(ApiKeys.LEADER_AND_ISR, version, controllerId, 
controllerEpoch, brokerEpoch, kraftController);
             this.partitionStates = partitionStates;
             this.topicIds = topicIds;
             this.liveLeaders = liveLeaders;
@@ -71,6 +78,10 @@ public class LeaderAndIsrRequest extends 
AbstractControlRequest {
                 .setBrokerEpoch(brokerEpoch)
                 .setLiveLeaders(leaders);
 
+            if (version >= 7) {
+                data.setIsKRaftController(kraftController);
+            }
+
             if (version >= 2) {
                 Map<String, LeaderAndIsrTopicState> topicStatesMap = 
groupByTopic(partitionStates, topicIds);
                 data.setTopicStates(new ArrayList<>(topicStatesMap.values()));
@@ -168,6 +179,11 @@ public class LeaderAndIsrRequest extends 
AbstractControlRequest {
         return data.controllerId();
     }
 
+    @Override
+    public boolean isKRaftController() {
+        return data.isKRaftController();
+    }
+
     @Override
     public int controllerEpoch() {
         return data.controllerEpoch();
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index 4326aaffd87..df746b56c84 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -45,7 +45,14 @@ public class StopReplicaRequest extends 
AbstractControlRequest {
 
         public Builder(short version, int controllerId, int controllerEpoch, 
long brokerEpoch,
                        boolean deletePartitions, List<StopReplicaTopicState> 
topicStates) {
-            super(ApiKeys.STOP_REPLICA, version, controllerId, 
controllerEpoch, brokerEpoch);
+            this(version, controllerId, controllerEpoch, brokerEpoch, 
deletePartitions,
+                topicStates, false);
+        }
+
+        public Builder(short version, int controllerId, int controllerEpoch, 
long brokerEpoch,
+                       boolean deletePartitions, List<StopReplicaTopicState> 
topicStates,
+                       boolean kraftController) {
+            super(ApiKeys.STOP_REPLICA, version, controllerId, 
controllerEpoch, brokerEpoch, kraftController);
             this.deletePartitions = deletePartitions;
             this.topicStates = topicStates;
         }
@@ -56,6 +63,10 @@ public class StopReplicaRequest extends 
AbstractControlRequest {
                 .setControllerEpoch(controllerEpoch)
                 .setBrokerEpoch(brokerEpoch);
 
+            if (version >= 4) {
+                data.setIsKRaftController(kraftController);
+            }
+
             if (version >= 3) {
                 data.setTopicStates(topicStates);
             } else if (version >= 1) {
@@ -196,6 +207,11 @@ public class StopReplicaRequest extends 
AbstractControlRequest {
         return data.controllerId();
     }
 
+    @Override
+    public boolean isKRaftController() {
+        return data.isKRaftController();
+    }
+
     @Override
     public int controllerEpoch() {
         return data.controllerEpoch();
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 845bdd92111..c0fd3000cc5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -51,7 +51,14 @@ public class UpdateMetadataRequest extends 
AbstractControlRequest {
         public Builder(short version, int controllerId, int controllerEpoch, 
long brokerEpoch,
                        List<UpdateMetadataPartitionState> partitionStates, 
List<UpdateMetadataBroker> liveBrokers,
                        Map<String, Uuid> topicIds) {
-            super(ApiKeys.UPDATE_METADATA, version, controllerId, 
controllerEpoch, brokerEpoch);
+            this(version, controllerId, controllerEpoch, brokerEpoch, 
partitionStates,
+                liveBrokers, topicIds, false);
+        }
+
+        public Builder(short version, int controllerId, int controllerEpoch, 
long brokerEpoch,
+                       List<UpdateMetadataPartitionState> partitionStates, 
List<UpdateMetadataBroker> liveBrokers,
+                       Map<String, Uuid> topicIds, boolean kraftController) {
+            super(ApiKeys.UPDATE_METADATA, version, controllerId, 
controllerEpoch, brokerEpoch, kraftController);
             this.partitionStates = partitionStates;
             this.liveBrokers = liveBrokers;
             this.topicIds = topicIds;
@@ -81,10 +88,14 @@ public class UpdateMetadataRequest extends 
AbstractControlRequest {
             }
 
             UpdateMetadataRequestData data = new UpdateMetadataRequestData()
-                    .setControllerId(controllerId)
-                    .setControllerEpoch(controllerEpoch)
-                    .setBrokerEpoch(brokerEpoch)
-                    .setLiveBrokers(liveBrokers);
+                .setControllerId(controllerId)
+                .setControllerEpoch(controllerEpoch)
+                .setBrokerEpoch(brokerEpoch)
+                .setLiveBrokers(liveBrokers);
+
+            if (version >= 8) {
+                data.setIsKRaftController(kraftController);
+            }
 
             if (version >= 5) {
                 Map<String, UpdateMetadataTopicState> topicStatesMap = 
groupByTopic(topicIds, partitionStates);
@@ -180,6 +191,11 @@ public class UpdateMetadataRequest extends 
AbstractControlRequest {
         return data.controllerId();
     }
 
+    @Override
+    public boolean isKRaftController() {
+        return data.isKRaftController();
+    }
+
     @Override
     public int controllerEpoch() {
         return data.controllerEpoch();
diff --git a/clients/src/main/resources/common/message/EnvelopeRequest.json 
b/clients/src/main/resources/common/message/EnvelopeRequest.json
index 1f6ff62de8d..a30a50ba684 100644
--- a/clients/src/main/resources/common/message/EnvelopeRequest.json
+++ b/clients/src/main/resources/common/message/EnvelopeRequest.json
@@ -16,7 +16,7 @@
 {
   "apiKey": 58,
   "type": "request",
-  "listeners": ["controller"],
+  "listeners": ["controller", "zkBroker"],
   "name": "EnvelopeRequest",
   // Request struct for forwarding.
   "validVersions": "0",
diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json 
b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
index 7042ec84d5b..e049d088c53 100644
--- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
+++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json
@@ -36,8 +36,8 @@
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The current controller ID." },
-    { "name": "KRaftControllerId", "type": "int32", "versions": "7+", 
"entityType": "brokerId", "default": "-1",
-      "about": "The KRaft controller id, used during migration. See KIP-866" },
+    { "name": "isKRaftController", "type": "bool", "versions": "7+", 
"default": "false",
+      "about": "If KRaft controller id is used during migration. See KIP-866" 
},
     { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
       "about": "The current controller epoch." },
     { "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": 
true, "default": "-1",
diff --git a/clients/src/main/resources/common/message/StopReplicaRequest.json 
b/clients/src/main/resources/common/message/StopReplicaRequest.json
index 67ed752a555..7c82c97aa71 100644
--- a/clients/src/main/resources/common/message/StopReplicaRequest.json
+++ b/clients/src/main/resources/common/message/StopReplicaRequest.json
@@ -31,8 +31,8 @@
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The controller id." },
-    { "name": "KRaftControllerId", "type": "int32", "versions": "4+", 
"entityType": "brokerId", "default": "-1",
-      "about": "The KRaft controller id, used during migration. See KIP-866" },
+    { "name": "isKRaftController", "type": "bool", "versions": "4+", 
"default": "false",
+      "about": "If KRaft controller id is used during migration. See KIP-866" 
},
     { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
       "about": "The controller epoch." },
     { "name": "BrokerEpoch", "type": "int64", "versions": "1+", "default": 
"-1", "ignorable": true,
diff --git 
a/clients/src/main/resources/common/message/UpdateMetadataRequest.json 
b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
index 287b0ed1a4b..e876caa2bac 100644
--- a/clients/src/main/resources/common/message/UpdateMetadataRequest.json
+++ b/clients/src/main/resources/common/message/UpdateMetadataRequest.json
@@ -36,8 +36,8 @@
   "fields": [
     { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": 
"brokerId",
       "about": "The controller id." },
-    { "name": "KRaftControllerId", "type": "int32", "versions": "8+", 
"entityType": "brokerId",
-      "about": "The KRaft controller id, used during migration." },
+    { "name": "isKRaftController", "type": "bool", "versions": "8+", 
"default": "false",
+      "about": "If KRaft controller id is used during migration. See KIP-866" 
},
     { "name": "ControllerEpoch", "type": "int32", "versions": "0+",
       "about": "The controller epoch." },
     { "name": "BrokerEpoch", "type": "int64", "versions": "5+", "ignorable": 
true, "default": "-1",
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
index 62571c6986a..2e498339256 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java
@@ -188,7 +188,6 @@ public class ApiVersionsResponseTest {
 
         // Ensure that APIs needed for the KRaft mode are not exposed through 
ApiVersions until we are ready for them
         HashSet<ApiKeys> exposedApis = apiKeysInResponse(response);
-        assertFalse(exposedApis.contains(ApiKeys.ENVELOPE));
         assertFalse(exposedApis.contains(ApiKeys.VOTE));
         assertFalse(exposedApis.contains(ApiKeys.BEGIN_QUORUM_EPOCH));
         assertFalse(exposedApis.contains(ApiKeys.END_QUORUM_EPOCH));
diff --git a/core/src/main/scala/kafka/Kafka.scala 
b/core/src/main/scala/kafka/Kafka.scala
index e1bd575f0c2..dad462dcad6 100755
--- a/core/src/main/scala/kafka/Kafka.scala
+++ b/core/src/main/scala/kafka/Kafka.scala
@@ -63,6 +63,12 @@ object Kafka extends Logging {
     props
   }
 
+  // For Zk mode, the API forwarding is currently enabled only under migration 
flag. We can
+  // directly do a static IBP check to see API forwarding is enabled here 
because IBP check is
+  // static in Zk mode.
+  private def enableApiForwarding(config: KafkaConfig) =
+    config.migrationEnabled && 
config.interBrokerProtocolVersion.isApiForwardingEnabled
+
   private def buildServer(props: Properties): Server = {
     val config = KafkaConfig.fromProps(props, false)
     if (config.requiresZookeeper) {
@@ -70,7 +76,7 @@ object Kafka extends Logging {
         config,
         Time.SYSTEM,
         threadNamePrefix = None,
-        enableForwarding = false
+        enableForwarding = enableApiForwarding(config)
       )
     } else {
       new KafkaRaftServer(
diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala 
b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
index c2724e24f1d..e1bea72ddd0 100644
--- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
+++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala
@@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._
  */
 abstract class InterBrokerSendThread(
   name: String,
-  networkClient: KafkaClient,
+  var networkClient: KafkaClient,
   requestTimeoutMs: Int,
   time: Time,
   isInterruptible: Boolean = true
diff --git 
a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala 
b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 3d1e5d3f63c..3446f83b647 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.LinkedBlockingDeque
 import java.util.concurrent.atomic.AtomicReference
 import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
 import kafka.raft.RaftManager
+import kafka.server.metadata.ZkMetadataCache
 import kafka.utils.Logging
 import org.apache.kafka.clients._
 import org.apache.kafka.common.{Node, Reconfigurable}
@@ -37,42 +38,55 @@ import scala.collection.Seq
 import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
-trait ControllerNodeProvider {
-  def get(): Option[Node]
-  def listenerName: ListenerName
-  def securityProtocol: SecurityProtocol
-  def saslMechanism: String
-}
+case class ControllerInformation(node: Option[Node],
+                                 listenerName: ListenerName,
+                                 securityProtocol: SecurityProtocol,
+                                 saslMechanism: String,
+                                 isZkController: Boolean)
 
-object MetadataCacheControllerNodeProvider {
-  def apply(
-    config: KafkaConfig,
-    metadataCache: kafka.server.MetadataCache
-  ): MetadataCacheControllerNodeProvider = {
-    val listenerName = config.controlPlaneListenerName
-      .getOrElse(config.interBrokerListenerName)
-
-    val securityProtocol = config.controlPlaneSecurityProtocol
-      .getOrElse(config.interBrokerSecurityProtocol)
-
-    new MetadataCacheControllerNodeProvider(
-      metadataCache,
-      listenerName,
-      securityProtocol,
-      config.saslMechanismInterBrokerProtocol
-    )
-  }
+trait ControllerNodeProvider {
+  def getControllerInfo(): ControllerInformation
 }
 
 class MetadataCacheControllerNodeProvider(
-  val metadataCache: kafka.server.MetadataCache,
-  val listenerName: ListenerName,
-  val securityProtocol: SecurityProtocol,
-  val saslMechanism: String
+  val metadataCache: ZkMetadataCache,
+  val config: KafkaConfig
 ) extends ControllerNodeProvider {
-  override def get(): Option[Node] = {
-    metadataCache.getControllerId
-      .flatMap(metadataCache.getAliveBrokerNode(_, listenerName))
+
+  private val zkControllerListenerName = 
config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+  private val zkControllerSecurityProtocol = 
config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+  private val zkControllerSaslMechanism = 
config.saslMechanismInterBrokerProtocol
+
+  private val kraftControllerListenerName = if 
(config.controllerListenerNames.nonEmpty)
+    new ListenerName(config.controllerListenerNames.head) else null
+  private val kraftControllerSecurityProtocol = 
Option(kraftControllerListenerName)
+    .map( listener => config.effectiveListenerSecurityProtocolMap.getOrElse(
+      listener, SecurityProtocol.forName(kraftControllerListenerName.value())))
+    .orNull
+  private val kraftControllerSaslMechanism = 
config.saslMechanismControllerProtocol
+
+  private val emptyZkControllerInfo =  ControllerInformation(
+    None,
+    zkControllerListenerName,
+    zkControllerSecurityProtocol,
+    zkControllerSaslMechanism,
+    isZkController = true)
+
+  override def getControllerInfo(): ControllerInformation = {
+    metadataCache.getControllerId.map {
+      case ZkCachedControllerId(id) => ControllerInformation(
+        metadataCache.getAliveBrokerNode(id, zkControllerListenerName),
+        zkControllerListenerName,
+        zkControllerSecurityProtocol,
+        zkControllerSaslMechanism,
+        isZkController = true)
+      case KRaftCachedControllerId(id) => ControllerInformation(
+        metadataCache.getAliveBrokerNode(id, kraftControllerListenerName),
+        kraftControllerListenerName,
+        kraftControllerSecurityProtocol,
+        kraftControllerSaslMechanism,
+        isZkController = false)
+    }.getOrElse(emptyZkControllerInfo)
   }
 }
 
@@ -108,9 +122,9 @@ class RaftControllerNodeProvider(
 ) extends ControllerNodeProvider with Logging {
   val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> 
node).toMap
 
-  override def get(): Option[Node] = {
-    raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode)
-  }
+  override def getControllerInfo(): ControllerInformation =
+    
ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode),
+      listenerName, securityProtocol, saslMechanism, isZkController = false)
 }
 
 object BrokerToControllerChannelManager {
@@ -176,13 +190,13 @@ class BrokerToControllerChannelManagerImpl(
   }
 
   private[server] def newRequestThread = {
-    val networkClient = {
+    def networkClient(controllerInfo: ControllerInformation) = {
       val channelBuilder = ChannelBuilders.clientChannelBuilder(
-        controllerNodeProvider.securityProtocol,
+        controllerInfo.securityProtocol,
         JaasContext.Type.SERVER,
         config,
-        controllerNodeProvider.listenerName,
-        controllerNodeProvider.saslMechanism,
+        controllerInfo.listenerName,
+        controllerInfo.saslMechanism,
         time,
         config.saslInterBrokerHandshakeRequestEnable,
         logContext
@@ -276,17 +290,38 @@ case class BrokerToControllerQueueItem(
 )
 
 class BrokerToControllerRequestThread(
-  networkClient: KafkaClient,
+  networkClientFactory: ControllerInformation => KafkaClient,
   metadataUpdater: ManualMetadataUpdater,
   controllerNodeProvider: ControllerNodeProvider,
   config: KafkaConfig,
   time: Time,
   threadName: String,
   retryTimeoutMs: Long
-) extends InterBrokerSendThread(threadName, networkClient, 
Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, 
retryTimeoutMs)).toInt, time, isInterruptible = false) {
+) extends InterBrokerSendThread(threadName, null, Math.min(Int.MaxValue, 
Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, time, 
isInterruptible = false) {
+
+  var isZkController = false
+  private def maybeResetNetworkClient(controllerInformation: 
ControllerInformation,
+                                      initialize: Boolean = false): Unit = {
+    if (initialize || isZkController != controllerInformation.isZkController) {
+      if (!initialize) {
+        debug("Controller changed to " + (if (isZkController) "zk" else 
"kraft") + " mode. " +
+          "Resetting network client")
+      }
+      // Close existing network client.
+      if (networkClient != null) {
+        networkClient.initiateClose()
+        networkClient.close()
+      }
+      isZkController = controllerInformation.isZkController
+      updateControllerAddress(controllerInformation.node.orNull)
+      controllerInformation.node.foreach(n => 
metadataUpdater.setNodes(Seq(n).asJava))
+      networkClient = networkClientFactory(controllerInformation)
+    }
+  }
 
   private val requestQueue = new 
LinkedBlockingDeque[BrokerToControllerQueueItem]()
   private val activeController = new AtomicReference[Node](null)
+  maybeResetNetworkClient(controllerNodeProvider.getControllerInfo(), 
initialize = true)
 
   // Used for testing
   @volatile
@@ -364,11 +399,13 @@ class BrokerToControllerRequestThread(
   }
 
   override def doWork(): Unit = {
+    val controllerInformation = controllerNodeProvider.getControllerInfo()
+    maybeResetNetworkClient(controllerInformation)
     if (activeControllerAddress().isDefined) {
       super.pollOnce(Long.MaxValue)
     } else {
-      debug("Controller isn't known, checking with controller provider")
-      controllerNodeProvider.get() match {
+      debug("Controller isn't cached, looking for local metadata changes")
+      controllerInformation.node match {
         case Some(controllerNode) =>
           info(s"Recorded new controller, from now on will use node 
$controllerNode")
           updateControllerAddress(controllerNode)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4839e00f8c2..ebe208d6e8f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -80,7 +80,6 @@ import java.util
 import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Collections, Optional}
-
 import scala.annotation.nowarn
 import scala.collection.{Map, Seq, Set, immutable, mutable}
 import scala.jdk.CollectionConverters._
@@ -1317,6 +1316,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     trace("Sending topic metadata %s and brokers %s for correlation id %d to 
client %s".format(completeTopicMetadata.mkString(","),
       brokers.mkString(","), request.header.correlationId, 
request.header.clientId))
+    val controllerId = {
+      metadataCache.getControllerId.flatMap {
+        case ZkCachedControllerId(id) => Some(id)
+        case KRaftCachedControllerId(_) => metadataCache.getRandomAliveBrokerId
+      }
+    }
 
     requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
        MetadataResponse.prepareResponse(
@@ -1324,7 +1329,7 @@ class KafkaApis(val requestChannel: RequestChannel,
          requestThrottleMs,
          brokers.toList.asJava,
          clusterId,
-         
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
+         controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
          completeTopicMetadata.asJava,
          clusterAuthorizedOperations
       ))
@@ -3332,13 +3337,18 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val brokers = 
metadataCache.getAliveBrokerNodes(request.context.listenerName)
-    val controllerId = 
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)
+    val controllerId = {
+      metadataCache.getControllerId.flatMap {
+        case ZkCachedControllerId(id) => Some(id)
+        case KRaftCachedControllerId(_) => metadataCache.getRandomAliveBrokerId
+      }
+    }
 
     requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
       val data = new DescribeClusterResponseData()
         .setThrottleTimeMs(requestThrottleMs)
         .setClusterId(clusterId)
-        .setControllerId(controllerId)
+        
.setControllerId(controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID))
         .setClusterAuthorizedOperations(clusterAuthorizedOperations)
 
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 79a621c6b54..2f880a118e1 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -147,6 +147,7 @@ class KafkaServer(
 
   var kafkaScheduler: KafkaScheduler = _
 
+  var kraftControllerNodes: Seq[Node] = Seq.empty
   @volatile var metadataCache: ZkMetadataCache = _
   var quotaManagers: QuotaFactory.QuotaManagers = _
 
@@ -272,8 +273,16 @@ class KafkaServer(
         _brokerState = BrokerState.RECOVERY
         logManager.startup(zkClient.getAllTopicsInCluster())
 
-        metadataCache = MetadataCache.zkMetadataCache(config.brokerId, 
config.interBrokerProtocolVersion, brokerFeatures)
-        val controllerNodeProvider = 
MetadataCacheControllerNodeProvider(config, metadataCache)
+        if (config.migrationEnabled) {
+          kraftControllerNodes = RaftConfig.voterConnectionsToNodes(
+            RaftConfig.parseVoterConnections(config.quorumVoters)).asScala
+        }
+        metadataCache = MetadataCache.zkMetadataCache(
+          config.brokerId,
+          config.interBrokerProtocolVersion,
+          brokerFeatures,
+          kraftControllerNodes)
+        val controllerNodeProvider = new 
MetadataCacheControllerNodeProvider(metadataCache, config)
 
         /* initialize feature change listener */
         _featureChangeListener = new 
FinalizedFeatureChangeListener(metadataCache, _zkClient)
@@ -614,7 +623,14 @@ class KafkaServer(
   private def controlledShutdown(): Unit = {
     val socketTimeoutMs = config.controllerSocketTimeoutMs
 
+    // TODO (KAFKA-14447): Handle controlled shutdown for zkBroker when we 
have KRaft controller.
     def doControlledShutdown(retries: Int): Boolean = {
+      if (config.requiresZookeeper &&
+        
metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])) {
+        info("ZkBroker currently has a KRaft controller. Controlled shutdown 
will be handled " +
+          "through broker life cycle manager")
+        return false
+      }
       val metadataUpdater = new ManualMetadataUpdater()
       val networkClient = {
         val channelBuilder = ChannelBuilders.clientChannelBuilder(
@@ -668,7 +684,7 @@ class KafkaServer(
 
           // 1. Find the controller and establish a connection to it.
           // If the controller id or the broker registration are missing, we 
sleep and retry (if there are remaining retries)
-          metadataCache.getControllerId match {
+          
metadataCache.getControllerId.filter(_.isInstanceOf[ZkCachedControllerId]).map(_.id)
 match {
             case Some(controllerId) =>
               metadataCache.getAliveBrokerNode(controllerId, 
config.interBrokerListenerName) match {
                 case Some(broker) =>
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala 
b/core/src/main/scala/kafka/server/MetadataCache.scala
index b20d4f6414c..e0501ef1ebe 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -32,6 +32,13 @@ case class FinalizedFeaturesAndEpoch(features: Map[String, 
Short], epoch: Long)
   }
 }
 
+sealed trait CachedControllerId {
+  val id: Int
+}
+
+case class ZkCachedControllerId(id: Int) extends CachedControllerId
+case class KRaftCachedControllerId(id: Int) extends CachedControllerId
+
 trait MetadataCache {
 
   /**
@@ -92,7 +99,7 @@ trait MetadataCache {
 
   def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node]
 
-  def getControllerId: Option[Int]
+  def getControllerId: Option[CachedControllerId]
 
   def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster
 
@@ -103,13 +110,18 @@ trait MetadataCache {
   def metadataVersion(): MetadataVersion
 
   def features(): FinalizedFeaturesAndEpoch
+
+  def getRandomAliveBrokerId: Option[Int]
 }
 
 object MetadataCache {
   def zkMetadataCache(brokerId: Int,
                       metadataVersion: MetadataVersion,
-                      brokerFeatures: BrokerFeatures = 
BrokerFeatures.createEmpty()): ZkMetadataCache = {
-    new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures)
+                      brokerFeatures: BrokerFeatures = 
BrokerFeatures.createEmpty(),
+                      kraftControllerNodes: collection.Seq[Node] = null)
+  : ZkMetadataCache = {
+    new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures,
+      Option(kraftControllerNodes).getOrElse(collection.Seq.empty[Node]))
   }
 
   def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = {
diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
index 7bd1c6343d9..7e6ad7bfd09 100644
--- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala
@@ -18,7 +18,7 @@
 package kafka.server.metadata
 
 import kafka.controller.StateChangeLogger
-import kafka.server.{FinalizedFeaturesAndEpoch, MetadataCache}
+import kafka.server.{CachedControllerId, FinalizedFeaturesAndEpoch, 
KRaftCachedControllerId, MetadataCache}
 import kafka.utils.Logging
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
@@ -287,14 +287,19 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
     result.toMap
   }
 
-  override def getControllerId: Option[Int] = 
getRandomAliveBroker(_currentImage)
-
   /**
    * Choose a random broker node to report as the controller. We do this 
because we want
    * the client to send requests destined for the controller to a random 
broker.
    * Clients do not have direct access to the controller in the KRaft world, 
as explained
    * in KIP-590.
    */
+  override def getControllerId: Option[CachedControllerId] =
+    getRandomAliveBroker(_currentImage).map(KRaftCachedControllerId)
+
+  override def getRandomAliveBrokerId: Option[Int] = {
+    getRandomAliveBroker(_currentImage)
+  }
+
   private def getRandomAliveBroker(image: MetadataImage): Option[Int] = {
     val aliveBrokers = getAliveBrokers(image).toList
     if (aliveBrokers.isEmpty) {
diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index d69785f90f6..d774cd41a5c 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.api._
 import kafka.controller.StateChangeLogger
-import kafka.server.{BrokerFeatures, FinalizedFeaturesAndEpoch, MetadataCache}
+import kafka.server.{BrokerFeatures, CachedControllerId, 
FinalizedFeaturesAndEpoch, KRaftCachedControllerId, MetadataCache, 
ZkCachedControllerId}
 import kafka.utils.CoreUtils._
 import kafka.utils.Logging
 import kafka.utils.Implicits._
@@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.{ApiVersionsResponse, 
MetadataResponse,
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.server.common.MetadataVersion
 
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
 import scala.concurrent.TimeoutException
 import scala.math.max
 
@@ -60,7 +60,11 @@ trait ZkFinalizedFeatureCache {
  *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
  */
-class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, 
brokerFeatures: BrokerFeatures)
+class ZkMetadataCache(
+  brokerId: Int,
+  metadataVersion: MetadataVersion,
+  brokerFeatures: BrokerFeatures,
+  kraftControllerNodes: Seq[Node] = Seq.empty)
   extends MetadataCache with ZkFinalizedFeatureCache with Logging {
 
   private val partitionMetadataLock = new ReentrantReadWriteLock()
@@ -68,8 +72,12 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
   //replace the value with a completely new one. this means reads (which are 
not under any lock) need to grab
   //the value of this var (into a val) ONCE and retain that read copy for the 
duration of their operation.
   //multiple reads of this value risk getting different snapshots.
-  @volatile private var metadataSnapshot: MetadataSnapshot = 
MetadataSnapshot(partitionStates = mutable.AnyRefMap.empty,
-    topicIds = Map.empty, controllerId = None, aliveBrokers = 
mutable.LongMap.empty, aliveNodes = mutable.LongMap.empty)
+  @volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot(
+    partitionStates = mutable.AnyRefMap.empty,
+    topicIds = Map.empty,
+    controllerId = None,
+    aliveBrokers = mutable.LongMap.empty,
+    aliveNodes = mutable.LongMap.empty)
 
   this.logIdent = s"[MetadataCache brokerId=$brokerId] "
   private val stateChangeLogger = new StateChangeLogger(brokerId, 
inControllerContext = false, None)
@@ -79,6 +87,8 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
   private val featureLock = new ReentrantLock()
   private val featureCond = featureLock.newCondition()
 
+  private val kraftControllerNodeMap = kraftControllerNodes.map(node => 
node.id() -> node).toMap
+
   // This method is the main hotspot when it comes to the performance of 
metadata requests,
   // we should be careful about adding additional logic here. Relatedly, 
`brokers` is
   // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
@@ -248,7 +258,12 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
   }
 
   override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): 
Option[Node] = {
-    
metadataSnapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))
+    val snapshot = metadataSnapshot
+    brokerId match {
+      case id if 
snapshot.controllerId.filter(_.isInstanceOf[KRaftCachedControllerId]).exists(_.id
 == id) =>
+        kraftControllerNodeMap.get(id)
+      case _ => 
snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName))
+    }
   }
 
   override def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node] 
= {
@@ -315,7 +330,14 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
     }.getOrElse(Map.empty[Int, Node])
   }
 
-  def getControllerId: Option[Int] = metadataSnapshot.controllerId
+  def getControllerId: Option[CachedControllerId] = {
+    metadataSnapshot.controllerId
+  }
+
+  def getRandomAliveBrokerId: Option[Int] = {
+    val aliveBrokers = metadataSnapshot.aliveBrokers.values.toList
+    
Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id)
+  }
 
   def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster = {
     val snapshot = metadataSnapshot
@@ -329,6 +351,13 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
       nodes.getOrElse(id.toLong, new Node(id, "", -1))
     }
 
+    def controllerId(snapshot: MetadataSnapshot): Option[Node] = {
+      snapshot.controllerId.flatMap {
+        case ZkCachedControllerId(id) => getAliveBrokerNode(id, listenerName)
+        case KRaftCachedControllerId(_) => 
getRandomAliveBrokerId.flatMap(getAliveBrokerNode(_, listenerName))
+      }
+    }
+
     val partitions = getAllPartitions(snapshot)
       .filter { case (_, state) => state.leader != 
LeaderAndIsr.LeaderDuringDelete }
       .map { case (tp, state) =>
@@ -342,7 +371,7 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
     new Cluster(clusterId, nodes.values.toBuffer.asJava,
       partitions.toBuffer.asJava,
       unauthorizedTopics, internalTopics,
-      snapshot.controllerId.map(id => node(id)).orNull)
+      controllerId(snapshot).orNull)
   }
 
   // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
@@ -351,9 +380,13 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
 
       val aliveBrokers = new 
mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size)
       val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, 
Node]](metadataSnapshot.aliveNodes.size)
-      val controllerIdOpt = updateMetadataRequest.controllerId match {
+      val controllerIdOpt: Option[CachedControllerId] = 
updateMetadataRequest.controllerId match {
         case id if id < 0 => None
-        case id => Some(id)
+        case id =>
+          if (updateMetadataRequest.isKRaftController)
+            Some(KRaftCachedControllerId(id))
+          else
+            Some(ZkCachedControllerId(id))
       }
 
       updateMetadataRequest.liveBrokers.forEach { broker =>
@@ -386,7 +419,8 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
 
       val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
       if (!updateMetadataRequest.partitionStates.iterator.hasNext) {
-        metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, 
topicIds.toMap, controllerIdOpt, aliveBrokers, aliveNodes)
+        metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, 
topicIds.toMap,
+          controllerIdOpt, aliveBrokers, aliveNodes)
       } else {
         //since kafka may do partial metadata updates, we start by copying the 
previous state
         val partitionStates = new mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size)
@@ -446,7 +480,7 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
 
   case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
                               topicIds: Map[String, Uuid],
-                              controllerId: Option[Int],
+                              controllerId: Option[CachedControllerId],
                               aliveBrokers: mutable.LongMap[Broker],
                               aliveNodes: 
mutable.LongMap[collection.Map[ListenerName, Node]]) {
     val topicNames: Map[Uuid, String] = topicIds.map { case (topicName, 
topicId) => (topicId, topicName) }
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 3b459875b60..f4ab8e041be 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -60,10 +60,14 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
   }
 
   override def generateConfigs: Seq[KafkaConfig] = {
+
     val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull, 
interBrokerSecurityProtocol = Some(securityProtocol),
       trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, 
logDirCount = logDirCount)
     configureListeners(cfgs)
     modifyConfigs(cfgs)
+    if (isZkMigrationTest()) {
+      cfgs.foreach(_.setProperty(KafkaConfig.MigrationEnabledProp, "true"))
+    }
     insertControllerListenersIfNeeded(cfgs)
     cfgs.map(KafkaConfig.fromProps)
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 80f6cd758b3..2a3e0a18117 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -215,7 +215,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       val brokerIds = brokers.map(_.config.brokerId).toSet
       assertTrue(brokerIds.contains(controller.id))
     } else {
-      
assertEquals(brokers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
+      
assertEquals(brokers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.map(_.id).
         getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id)
     }
 
diff --git 
a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala 
b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index f746cec53d7..51720db924a 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -160,6 +160,10 @@ abstract class QuorumTestHarness extends Logging {
     TestInfoUtils.isKRaft(testInfo)
   }
 
+  def isZkMigrationTest(): Boolean = {
+    TestInfoUtils.isZkMigrationTest(testInfo)
+  }
+
   def checkIsZKTest(): Unit = {
     if (isKRaftTest()) {
       throw new RuntimeException("This function can't be accessed when running 
the test " +
diff --git 
a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala 
b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
index bee1aefaca2..eea5c7517a0 100644
--- a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
@@ -25,9 +25,10 @@ import 
kafka.utils.TestUtils.TestControllerRequestCompletionHandler
 import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, 
Metadata, MockClient, NodeApiVersions}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.message.{EnvelopeResponseData, 
MetadataRequestData}
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, 
EnvelopeResponse, MetadataRequest, RequestTestUtils}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.common.utils.MockTime
 import org.junit.jupiter.api.Assertions._
@@ -37,6 +38,14 @@ import org.mockito.Mockito._
 
 class BrokerToControllerRequestThreadTest {
 
+  private def controllerInfo(node: Option[Node]): ControllerInformation = {
+    ControllerInformation(node, new ListenerName(""), 
SecurityProtocol.PLAINTEXT, "", isZkController = true)
+  }
+
+  private def emptyControllerInfo: ControllerInformation = {
+    controllerInfo(None)
+  }
+
   @Test
   def testRetryTimeoutWhileControllerNotAvailable(): Unit = {
     val time = new MockTime()
@@ -45,10 +54,10 @@ class BrokerToControllerRequestThreadTest {
     val mockClient = new MockClient(time, metadata)
     val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
 
-    when(controllerNodeProvider.get()).thenReturn(None)
+    
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
 
     val retryTimeoutMs = 30000
-    val testRequestThread = new BrokerToControllerRequestThread(mockClient, 
new ManualMetadataUpdater(), controllerNodeProvider,
+    val testRequestThread = new BrokerToControllerRequestThread(_ => 
mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
       config, time, "", retryTimeoutMs)
     testRequestThread.started = true
 
@@ -82,10 +91,10 @@ class BrokerToControllerRequestThreadTest {
     val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
     val activeController = new Node(controllerId, "host", 1234)
 
-    when(controllerNodeProvider.get()).thenReturn(Some(activeController))
+    
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
 
     val expectedResponse = RequestTestUtils.metadataUpdateWith(2, 
Collections.singletonMap("a", 2))
-    val testRequestThread = new BrokerToControllerRequestThread(mockClient, 
new ManualMetadataUpdater(), controllerNodeProvider,
+    val testRequestThread = new BrokerToControllerRequestThread(_ => 
mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
       config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
     mockClient.prepareResponse(expectedResponse)
@@ -124,10 +133,11 @@ class BrokerToControllerRequestThreadTest {
     val oldController = new Node(oldControllerId, "host1", 1234)
     val newController = new Node(newControllerId, "host2", 1234)
 
-    when(controllerNodeProvider.get()).thenReturn(Some(oldController), 
Some(newController))
+    when(controllerNodeProvider.getControllerInfo()).thenReturn(
+      emptyControllerInfo, controllerInfo(Some(oldController)), 
controllerInfo(Some(newController)))
 
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, 
Collections.singletonMap("a", 2))
-    val testRequestThread = new BrokerToControllerRequestThread(mockClient, 
new ManualMetadataUpdater(),
+    val testRequestThread = new BrokerToControllerRequestThread(_ => 
mockClient, new ManualMetadataUpdater(),
       controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
@@ -171,13 +181,14 @@ class BrokerToControllerRequestThreadTest {
     val oldController = new Node(oldControllerId, "host1", port)
     val newController = new Node(newControllerId, "host2", port)
 
-    when(controllerNodeProvider.get()).thenReturn(Some(oldController), 
Some(newController))
+    when(controllerNodeProvider.getControllerInfo()).thenReturn(
+      emptyControllerInfo, controllerInfo(Some(oldController)), 
controllerInfo(Some(newController)))
 
     val responseWithNotControllerError = 
RequestTestUtils.metadataUpdateWith("cluster1", 2,
       Collections.singletonMap("a", Errors.NOT_CONTROLLER),
       Collections.singletonMap("a", 2))
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, 
Collections.singletonMap("a", 2))
-    val testRequestThread = new BrokerToControllerRequestThread(mockClient, 
new ManualMetadataUpdater(), controllerNodeProvider,
+    val testRequestThread = new BrokerToControllerRequestThread(_ => 
mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
       config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
@@ -231,7 +242,11 @@ class BrokerToControllerRequestThreadTest {
     val oldController = new Node(oldControllerId, "host1", port)
     val newController = new Node(newControllerId, "host2", port)
 
-    when(controllerNodeProvider.get()).thenReturn(Some(oldController), 
Some(newController))
+    when(controllerNodeProvider.getControllerInfo()).thenReturn(
+      emptyControllerInfo,                  // call to create network client.
+      controllerInfo(Some(oldController)),
+      controllerInfo(Some(newController))
+    )
 
     // create an envelopeResponse with NOT_CONTROLLER error
     val envelopeResponseWithNotControllerError = new EnvelopeResponse(
@@ -240,7 +255,7 @@ class BrokerToControllerRequestThreadTest {
     // response for retry request after receiving NOT_CONTROLLER error
     val expectedResponse = RequestTestUtils.metadataUpdateWith(3, 
Collections.singletonMap("a", 2))
 
-    val testRequestThread = new BrokerToControllerRequestThread(mockClient, 
new ManualMetadataUpdater(), controllerNodeProvider,
+    val testRequestThread = new BrokerToControllerRequestThread(_ => 
mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
       config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
@@ -296,13 +311,13 @@ class BrokerToControllerRequestThreadTest {
     val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
     val controller = new Node(controllerId, "host1", 1234)
 
-    when(controllerNodeProvider.get()).thenReturn(Some(controller))
+    
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo,
 controllerInfo(Some(controller)))
 
     val retryTimeoutMs = 30000
     val responseWithNotControllerError = 
RequestTestUtils.metadataUpdateWith("cluster1", 2,
       Collections.singletonMap("a", Errors.NOT_CONTROLLER),
       Collections.singletonMap("a", 2))
-    val testRequestThread = new BrokerToControllerRequestThread(mockClient, 
new ManualMetadataUpdater(), controllerNodeProvider,
+    val testRequestThread = new BrokerToControllerRequestThread(_ => 
mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
       config, time, "", retryTimeoutMs)
     testRequestThread.started = true
 
@@ -344,7 +359,7 @@ class BrokerToControllerRequestThreadTest {
     val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
     val activeController = new Node(controllerId, "host", 1234)
 
-    when(controllerNodeProvider.get()).thenReturn(Some(activeController))
+    
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
 
     val callbackResponse = new AtomicReference[ClientResponse]()
     val completionHandler = new ControllerRequestCompletionHandler {
@@ -360,7 +375,7 @@ class BrokerToControllerRequestThreadTest {
 
     mockClient.prepareUnsupportedVersionResponse(request => request.apiKey == 
ApiKeys.METADATA)
 
-    val testRequestThread = new BrokerToControllerRequestThread(mockClient, 
new ManualMetadataUpdater(), controllerNodeProvider,
+    val testRequestThread = new BrokerToControllerRequestThread(_ => 
mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
       config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
@@ -381,7 +396,7 @@ class BrokerToControllerRequestThreadTest {
     val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
     val activeController = new Node(controllerId, "host", 1234)
 
-    when(controllerNodeProvider.get()).thenReturn(Some(activeController))
+    
when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController)))
 
     val callbackResponse = new AtomicReference[ClientResponse]()
     val completionHandler = new ControllerRequestCompletionHandler {
@@ -397,7 +412,7 @@ class BrokerToControllerRequestThreadTest {
 
     mockClient.createPendingAuthenticationError(activeController, 50)
 
-    val testRequestThread = new BrokerToControllerRequestThread(mockClient, 
new ManualMetadataUpdater(), controllerNodeProvider,
+    val testRequestThread = new BrokerToControllerRequestThread(_ => 
mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
       config, time, "", retryTimeoutMs = Long.MaxValue)
     testRequestThread.started = true
 
@@ -416,8 +431,9 @@ class BrokerToControllerRequestThreadTest {
     val mockClient = new MockClient(time, metadata)
 
     val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+    
when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
 
-    val testRequestThread = new BrokerToControllerRequestThread(mockClient, 
new ManualMetadataUpdater(), controllerNodeProvider,
+    val testRequestThread = new BrokerToControllerRequestThread(_ => 
mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
       config, time, "", retryTimeoutMs = Long.MaxValue)
 
     val completionHandler = new TestControllerRequestCompletionHandler(None)
diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala 
b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
index fa48024f313..ba93fa36b99 100644
--- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala
+++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala
@@ -43,5 +43,13 @@ object TestInfoUtils {
       false
     }
   }
+
+  def isZkMigrationTest(testInfo: TestInfo): Boolean = {
+    if (!isKRaft(testInfo)) {
+      false
+    } else {
+      testInfo.getDisplayName().contains("quorum=zkMigration")
+    }
+  }
   final val TestWithParameterizedQuorumName = "{displayName}.quorum={0}"
 }
diff --git 
a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
index 5dbac1bb4dc..1ef4d47995a 100755
--- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
@@ -357,7 +357,8 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
         config,
         time = brokerTime(config.brokerId),
         threadNamePrefix = None,
-        startup = false
+        startup = false,
+        enableZkApiForwarding = isZkMigrationTest()
       )
     }
   }
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
index 9936f8deaed..5d94319ed35 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
@@ -124,11 +124,11 @@ class ApiVersionManagerTest {
       features = brokerFeatures,
       metadataCache = metadataCache
     )
-    assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE))
-    assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
+    assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE))
+    assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
 
     val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs 
= 0)
-    assertNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id))
+    assertNotNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id))
   }
 
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index 1a0fac443c0..da6d9a8aa80 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -57,13 +57,14 @@ class BrokerLifecycleManagerTest {
   class SimpleControllerNodeProvider extends ControllerNodeProvider {
     val node = new AtomicReference[Node](null)
 
-    override def get(): Option[Node] = Option(node.get())
+    def listenerName: ListenerName = new ListenerName("PLAINTEXT")
 
-    override def listenerName: ListenerName = new ListenerName("PLAINTEXT")
+    def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT;
 
-    override def securityProtocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT;
+    def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM
 
-    override def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM
+    override def getControllerInfo(): ControllerInformation = 
ControllerInformation(Option(node.get()),
+      listenerName, securityProtocol, saslMechanism, isZkController = false)
   }
 
   class BrokerLifecycleManagerTestContext(properties: Properties) {
diff --git 
a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
index 2bb1314a789..cab586f23e4 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala
@@ -17,7 +17,7 @@
 
 package unit.kafka.server
 
-import kafka.server.{BrokerToControllerChannelManager, ControllerNodeProvider, 
ControllerRequestCompletionHandler}
+import kafka.server.{BrokerToControllerChannelManager, ControllerInformation, 
ControllerNodeProvider, ControllerRequestCompletionHandler}
 import kafka.test.ClusterInstance
 import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type}
 import kafka.test.junit.ClusterTestExtensions
@@ -48,17 +48,22 @@ class BrokerRegistrationRequestTest {
   def brokerToControllerChannelManager(clusterInstance: ClusterInstance): 
BrokerToControllerChannelManager = {
     BrokerToControllerChannelManager(
       new ControllerNodeProvider() {
-        override def get(): Option[Node] = Some(new Node(
+        def node: Option[Node] = Some(new Node(
           clusterInstance.anyControllerSocketServer().config.nodeId,
           "127.0.0.1",
           
clusterInstance.anyControllerSocketServer().boundPort(clusterInstance.controllerListenerName().get()),
         ))
 
-        override def listenerName: ListenerName = 
clusterInstance.controllerListenerName().get()
+        def listenerName: ListenerName = 
clusterInstance.controllerListenerName().get()
 
-        override def securityProtocol: SecurityProtocol = 
SecurityProtocol.PLAINTEXT
+        val securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
 
-        override def saslMechanism: String = ""
+        val saslMechanism: String = ""
+
+        def isZkController: Boolean = !clusterInstance.isKRaftTest
+
+        override def getControllerInfo(): ControllerInformation =
+          ControllerInformation(node, listenerName, securityProtocol, 
saslMechanism, isZkController)
       },
       Time.SYSTEM,
       new Metrics(),
diff --git 
a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
index a193db284c4..c6892566968 100644
--- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala
@@ -22,15 +22,16 @@ import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.CreateTopicsRequestData
 import 
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.CreateTopicsRequest
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
+
 import scala.jdk.CollectionConverters._
 
 class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest {
+
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
   @ValueSource(strings = Array("zk", "kraft"))
   def testValidCreateTopicsRequests(quorum: String): Unit = {
@@ -148,13 +149,14 @@ class CreateTopicsRequestTest extends 
AbstractCreateTopicsRequestTest {
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk", "zkMigration"))
   def testNotController(quorum: String): Unit = {
     // Note: we don't run this test when in KRaft mode, because KRaft doesn't 
have this
     // behavior of returning NOT_CONTROLLER. Instead, the request is forwarded.
     val req = topicsReq(Seq(topicReq("topic1")))
     val response = sendCreateTopicRequest(req, notControllerSocketServer)
-    assertEquals(1, response.errorCounts().get(Errors.NOT_CONTROLLER))
+    val error = if (isZkMigrationTest()) Errors.NONE else Errors.NOT_CONTROLLER
+    assertEquals(1, response.errorCounts().get(error))
   }
 
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
diff --git 
a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
index 629f203169d..ff14323b019 100644
--- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala
@@ -221,7 +221,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with 
Logging {
    * Instead, the request is forwarded.
    */
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
-  @ValueSource(strings = Array("zk"))
+  @ValueSource(strings = Array("zk", "zkMigration"))
   def testNotController(quorum: String): Unit = {
     val request = new DeleteTopicsRequest.Builder(
         new DeleteTopicsRequestData()
@@ -229,8 +229,9 @@ class DeleteTopicsRequestTest extends BaseRequestTest with 
Logging {
           .setTimeoutMs(1000)).build()
     val response = sendDeleteTopicsRequest(request, notControllerSocketServer)
 
+    val expectedError = if (isZkMigrationTest()) Errors.NONE else 
Errors.NOT_CONTROLLER
     val error = response.data.responses.find("not-controller").errorCode()
-    assertEquals(Errors.NOT_CONTROLLER.code,  error, "Expected controller 
error when routed incorrectly")
+    assertEquals(expectedError.code(),  error)
   }
 
   private def validateTopicIsDeleted(topic: String): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
index d0fc30fbdec..21c7d0d9807 100644
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
@@ -60,6 +60,14 @@ class ForwardingManagerTest {
     NodeApiVersions.create(List(envelopeApiVersion).asJava)
   }
 
+  private def controllerInfo = {
+    ControllerInformation(Some(new Node(0, "host", 1234)), new 
ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true)
+  }
+
+  private def emptyControllerInfo = {
+    ControllerInformation(None, new ListenerName(""), 
SecurityProtocol.PLAINTEXT, "", isZkController = true)
+  }
+
   @Test
   def testResponseCorrelationIdMismatch(): Unit = {
     val requestCorrelationId = 27
@@ -71,7 +79,7 @@ class ForwardingManagerTest {
     val responseBuffer = 
RequestTestUtils.serializeResponseWithHeader(responseBody, 
requestHeader.apiVersion,
       requestCorrelationId + 1)
 
-    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, 
"host", 1234)))
+    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
     val isEnvelopeRequest: RequestMatcher = request => 
request.isInstanceOf[EnvelopeRequest]
     client.prepareResponse(isEnvelopeRequest, new 
EnvelopeResponse(responseBuffer, Errors.NONE));
 
@@ -95,7 +103,7 @@ class ForwardingManagerTest {
     val responseBuffer = 
RequestTestUtils.serializeResponseWithHeader(responseBody,
       requestHeader.apiVersion, requestCorrelationId)
 
-    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, 
"host", 1234)))
+    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
     val isEnvelopeRequest: RequestMatcher = request => 
request.isInstanceOf[EnvelopeRequest]
     client.prepareResponse(isEnvelopeRequest, new 
EnvelopeResponse(responseBuffer, Errors.UNSUPPORTED_VERSION));
 
@@ -112,7 +120,7 @@ class ForwardingManagerTest {
     val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, 
requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
-    Mockito.when(controllerNodeProvider.get()).thenReturn(None)
+    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo)
 
     val response = new AtomicReference[AbstractResponse]()
     forwardingManager.forwardRequest(request, res => res.foreach(response.set))
@@ -136,7 +144,7 @@ class ForwardingManagerTest {
     val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, 
requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
-    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, 
"host", 1234)))
+    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
 
     val response = new AtomicReference[AbstractResponse]()
     forwardingManager.forwardRequest(request, res => res.foreach(response.set))
@@ -162,8 +170,7 @@ class ForwardingManagerTest {
     val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, 
requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
-    val controllerNode = new Node(0, "host", 1234)
-    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(controllerNode))
+    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
 
     client.prepareUnsupportedVersionResponse(req => req.apiKey == 
requestHeader.apiKey)
 
@@ -183,10 +190,9 @@ class ForwardingManagerTest {
     val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, 
requestCorrelationId)
     val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
 
-    val controllerNode = new Node(0, "host", 1234)
-    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(controllerNode))
+    
Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo)
 
-    client.createPendingAuthenticationError(controllerNode, 50)
+    client.createPendingAuthenticationError(controllerInfo.node.get, 50)
 
     val response = new AtomicReference[AbstractResponse]()
     forwardingManager.forwardRequest(request, res => res.foreach(response.set))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 5019a57b0ed..a8e5d5464a4 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3571,7 +3571,7 @@ class KafkaApisTest {
     val capturedResponse = verifyNoThrottling(request)
     val describeClusterResponse = 
capturedResponse.getValue.asInstanceOf[DescribeClusterResponse]
 
-    assertEquals(metadataCache.getControllerId.get, 
describeClusterResponse.data.controllerId)
+    assertEquals(metadataCache.getControllerId.get.id, 
describeClusterResponse.data.controllerId)
     assertEquals(clusterId, describeClusterResponse.data.clusterId)
     assertEquals(8096, 
describeClusterResponse.data.clusterAuthorizedOperations)
     assertEquals(metadataCache.getAliveBrokerNodes(plaintextListener).toSet,
diff --git 
a/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala
 
b/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala
index febd06f354d..1752e2c6644 100644
--- 
a/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala
+++ 
b/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala
@@ -73,7 +73,7 @@ class MockBrokerToControllerChannelManager(
         queueItem.callback.onTimeout()
         unsentIterator.remove()
       } else {
-        controllerNodeProvider.get() match {
+        controllerNodeProvider.getControllerInfo().node match {
           case Some(controller) if client.ready(controller, 
time.milliseconds()) =>
             val clientRequest = client.newClientRequest(
               controller.idString,
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 82c19949e3b..8679706777d 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -159,7 +159,8 @@ class RequestQuotaTest extends BaseRequestTest {
 
   @Test
   def testExemptRequestTime(): Unit = {
-    for (apiKey <- RequestQuotaTest.ClusterActions -- 
RequestQuotaTest.ClusterActionsWithThrottle) {
+    val actions = RequestQuotaTest.ClusterActions -- 
RequestQuotaTest.ClusterActionsWithThrottle -- RequestQuotaTest.Envelope
+    for (apiKey <- actions) {
       submitTest(apiKey, () => checkExemptRequestMetric(apiKey))
     }
 
@@ -170,7 +171,7 @@ class RequestQuotaTest extends BaseRequestTest {
   def testUnauthorizedThrottle(): Unit = {
     RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal
 
-    for (apiKey <- ApiKeys.zkBrokerApis.asScala) {
+    for (apiKey <- ApiKeys.zkBrokerApis.asScala.toSet -- 
RequestQuotaTest.Envelope) {
       submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey))
     }
 
@@ -765,7 +766,8 @@ object RequestQuotaTest {
   val ClusterActions = 
ApiKeys.zkBrokerApis.asScala.filter(_.clusterAction).toSet
   val ClusterActionsWithThrottle = Set(ApiKeys.ALLOCATE_PRODUCER_IDS, 
ApiKeys.UPDATE_FEATURES)
   val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE)
-  val ClientActions = ApiKeys.zkBrokerApis.asScala.toSet -- ClusterActions -- 
SaslActions
+  val Envelope = Set(ApiKeys.ENVELOPE)
+  val ClientActions = ApiKeys.zkBrokerApis.asScala.toSet -- ClusterActions -- 
SaslActions -- Envelope
 
   val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"Unauthorized")
   // Principal used for all client connections. This is modified by tests which
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 0162ebdd5e3..7110237ce4c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -179,7 +179,12 @@ object TestUtils extends Logging {
   }
 
   def createServer(config: KafkaConfig, time: Time, threadNamePrefix: 
Option[String], startup: Boolean): KafkaServer = {
-    val server = new KafkaServer(config, time, threadNamePrefix, 
enableForwarding = false)
+    createServer(config, time, threadNamePrefix, startup, 
enableZkApiForwarding = false)
+  }
+
+  def createServer(config: KafkaConfig, time: Time, threadNamePrefix: 
Option[String],
+                   startup: Boolean, enableZkApiForwarding: Boolean) = {
+    val server = new KafkaServer(config, time, threadNamePrefix, 
enableForwarding = enableZkApiForwarding)
     if (startup) server.startup()
     server
   }
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 3e57f29f8d4..6cbf2438c8c 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -218,7 +218,8 @@ public class ReplicaFetcherThreadBenchmark {
                 0, 0, 0, updatePartitionState, Collections.emptyList(), 
topicIds).build();
 
         // TODO: fix to support raft
-        ZkMetadataCache metadataCache = new ZkMetadataCache(0, 
config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty());
+        ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0,
+            config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), 
null);
         metadataCache.updateMetadata(0, updateMetadataRequest);
 
         replicaManager = new ReplicaManagerBuilder().
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 80376948b89..f65fdcdaa24 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -110,7 +110,8 @@ public class MetadataRequestBenchmark {
     private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
     private Metrics metrics = new Metrics();
     private int brokerId = 1;
-    private ZkMetadataCache metadataCache = 
MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latest(), 
BrokerFeatures.createEmpty());
+    private ZkMetadataCache metadataCache = 
MetadataCache.zkMetadataCache(brokerId,
+        MetadataVersion.latest(), BrokerFeatures.createEmpty(), null);
     private ClientQuotaManager clientQuotaManager = 
Mockito.mock(ClientQuotaManager.class);
     private ClientRequestQuotaManager clientRequestQuotaManager = 
Mockito.mock(ClientRequestQuotaManager.class);
     private ControllerMutationQuotaManager controllerMutationQuotaManager = 
Mockito.mock(ControllerMutationQuotaManager.class);
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index 9a60efbeabd..d83f748bcad 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -112,7 +112,9 @@ public class CheckpointBench {
         scheduler.startup();
         final BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
         final MetadataCache metadataCache =
-                
MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(), 
this.brokerProperties.interBrokerProtocolVersion(), 
BrokerFeatures.createEmpty());
+                MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
+                    this.brokerProperties.interBrokerProtocolVersion(),
+                    BrokerFeatures.createEmpty(), null);
         this.quotaManagers =
                 QuotaFactory.instantiate(this.brokerProperties,
                         this.metrics,
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
index efb69007888..3a8343772cb 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java
@@ -26,6 +26,7 @@ import kafka.server.BrokerFeatures;
 import kafka.server.BrokerTopicStats;
 import kafka.server.KafkaConfig;
 import kafka.server.LogDirFailureChannel;
+import kafka.server.MetadataCache;
 import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 import kafka.server.builders.LogManagerBuilder;
@@ -33,7 +34,6 @@ import kafka.server.builders.ReplicaManagerBuilder;
 import kafka.server.checkpoints.OffsetCheckpoints;
 import kafka.server.metadata.ConfigRepository;
 import kafka.server.metadata.MockConfigRepository;
-import kafka.server.metadata.ZkMetadataCache;
 import kafka.utils.KafkaScheduler;
 import kafka.utils.Scheduler;
 import kafka.utils.TestUtils;
@@ -160,7 +160,9 @@ public class PartitionCreationBench {
             setLogManager(logManager).
             setQuotaManagers(quotaManagers).
             setBrokerTopicStats(brokerTopicStats).
-            setMetadataCache(new 
ZkMetadataCache(this.brokerProperties.brokerId(), 
this.brokerProperties.interBrokerProtocolVersion(), 
BrokerFeatures.createEmpty())).
+            
setMetadataCache(MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(),
+                this.brokerProperties.interBrokerProtocolVersion(), 
BrokerFeatures.createEmpty(),
+                null)).
             setLogDirFailureChannel(failureChannel).
             setAlterPartitionManager(alterPartitionManager).
             build();
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 9fb7c0ff4ea..15f8fee9bde 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -243,6 +243,10 @@ public enum MetadataVersion {
         return this.isAtLeast(IBP_3_3_IV1);
     }
 
+    public boolean isApiForwardingEnabled() {
+        return this.isAtLeast(IBP_3_4_IV0);
+    }
+
     public boolean isKRaftSupported() {
         return this.featureLevel > 0;
     }

Reply via email to