This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit aa532b44199f9610f1b6e587acecc50a9e1a9b9c Author: Akhilesh C <[email protected]> AuthorDate: Thu Dec 15 14:16:41 2022 -0800 KAFKA-14446: API forwarding support from zkBrokers to the Controller (#12961) This PR enables brokers which are upgrading from ZK mode to KRaft mode to forward certain metadata change requests to the controller instead of applying them directly through ZK. To faciliate this, we now support EnvelopeRequest on zkBrokers (instead of only on KRaft nodes.) In BrokerToControllerChannelManager, we can now reinitialize our NetworkClient. This is needed to handle the case when we transition from forwarding requests to a ZK-based broker over the inter-broker listener, to forwarding requests to a quorum node over the controller listener. In MetadataCache.scala, distinguish between KRaft and ZK controller nodes with a new type, CachedControllerId. In LeaderAndIsrRequest, StopReplicaRequest, and UpdateMetadataRequest, switch from sending both a zk and a KRaft controller ID to sending a single controller ID plus a boolean to express whether it is KRaft. The previous scheme was ambiguous as to whether the system was in KRaft or ZK mode when both IDs were -1 (although this case is unlikely to come up in practice). The new scheme avoids this ambiguity and is simpler to understand. Reviewers: dengziming <[email protected]>, David Arthur <[email protected]>, Colin P. McCabe <[email protected]> --- .../common/requests/AbstractControlRequest.java | 10 +- .../kafka/common/requests/LeaderAndIsrRequest.java | 18 ++- .../kafka/common/requests/StopReplicaRequest.java | 18 ++- .../common/requests/UpdateMetadataRequest.java | 26 ++++- .../resources/common/message/EnvelopeRequest.json | 2 +- .../common/message/LeaderAndIsrRequest.json | 4 +- .../common/message/StopReplicaRequest.json | 4 +- .../common/message/UpdateMetadataRequest.json | 4 +- .../common/requests/ApiVersionsResponseTest.java | 1 - core/src/main/scala/kafka/Kafka.scala | 8 +- .../scala/kafka/common/InterBrokerSendThread.scala | 2 +- .../server/BrokerToControllerChannelManager.scala | 121 ++++++++++++++------- core/src/main/scala/kafka/server/KafkaApis.scala | 18 ++- core/src/main/scala/kafka/server/KafkaServer.scala | 22 +++- .../main/scala/kafka/server/MetadataCache.scala | 18 ++- .../kafka/server/metadata/KRaftMetadataCache.scala | 11 +- .../kafka/server/metadata/ZkMetadataCache.scala | 58 ++++++++-- .../kafka/api/IntegrationTestHarness.scala | 4 + .../kafka/api/PlaintextAdminIntegrationTest.scala | 2 +- .../kafka/server/QuorumTestHarness.scala | 4 + .../BrokerToControllerRequestThreadTest.scala | 52 ++++++--- .../src/test/scala/kafka/utils/TestInfoUtils.scala | 8 ++ .../kafka/integration/KafkaServerTestHarness.scala | 3 +- .../unit/kafka/server/ApiVersionManagerTest.scala | 6 +- .../kafka/server/BrokerLifecycleManagerTest.scala | 9 +- .../server/BrokerRegistrationRequestTest.scala | 15 ++- .../kafka/server/CreateTopicsRequestTest.scala | 10 +- .../kafka/server/DeleteTopicsRequestTest.scala | 5 +- .../unit/kafka/server/ForwardingManagerTest.scala | 24 ++-- .../scala/unit/kafka/server/KafkaApisTest.scala | 2 +- .../MockBrokerToControllerChannelManager.scala | 2 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 8 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 7 +- .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java | 3 +- .../jmh/metadata/MetadataRequestBenchmark.java | 3 +- .../apache/kafka/jmh/server/CheckpointBench.java | 4 +- .../kafka/jmh/server/PartitionCreationBench.java | 6 +- .../kafka/server/common/MetadataVersion.java | 4 + 38 files changed, 383 insertions(+), 143 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java index dc4a1e21e8d..789a4abeaba 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractControlRequest.java @@ -27,14 +27,20 @@ public abstract class AbstractControlRequest extends AbstractRequest { protected final int controllerId; protected final int controllerEpoch; protected final long brokerEpoch; + protected final boolean kraftController; protected Builder(ApiKeys api, short version, int controllerId, int controllerEpoch, long brokerEpoch) { + this(api, version, controllerId, controllerEpoch, brokerEpoch, false); + } + + protected Builder(ApiKeys api, short version, int controllerId, int controllerEpoch, + long brokerEpoch, boolean kraftController) { super(api, version); this.controllerId = controllerId; this.controllerEpoch = controllerEpoch; this.brokerEpoch = brokerEpoch; + this.kraftController = kraftController; } - } protected AbstractControlRequest(ApiKeys api, short version) { @@ -43,6 +49,8 @@ public abstract class AbstractControlRequest extends AbstractRequest { public abstract int controllerId(); + public abstract boolean isKRaftController(); + public abstract int controllerEpoch(); public abstract long brokerEpoch(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java index d7382863176..257d8e78bfc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java @@ -51,7 +51,14 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds, Collection<Node> liveLeaders) { - super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch); + this(version, controllerId, controllerEpoch, brokerEpoch, partitionStates, topicIds, + liveLeaders, false); + } + + public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, + List<LeaderAndIsrPartitionState> partitionStates, Map<String, Uuid> topicIds, + Collection<Node> liveLeaders, boolean kraftController) { + super(ApiKeys.LEADER_AND_ISR, version, controllerId, controllerEpoch, brokerEpoch, kraftController); this.partitionStates = partitionStates; this.topicIds = topicIds; this.liveLeaders = liveLeaders; @@ -71,6 +78,10 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { .setBrokerEpoch(brokerEpoch) .setLiveLeaders(leaders); + if (version >= 7) { + data.setIsKRaftController(kraftController); + } + if (version >= 2) { Map<String, LeaderAndIsrTopicState> topicStatesMap = groupByTopic(partitionStates, topicIds); data.setTopicStates(new ArrayList<>(topicStatesMap.values())); @@ -168,6 +179,11 @@ public class LeaderAndIsrRequest extends AbstractControlRequest { return data.controllerId(); } + @Override + public boolean isKRaftController() { + return data.isKRaftController(); + } + @Override public int controllerEpoch() { return data.controllerEpoch(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java index 4326aaffd87..df746b56c84 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java @@ -45,7 +45,14 @@ public class StopReplicaRequest extends AbstractControlRequest { public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, boolean deletePartitions, List<StopReplicaTopicState> topicStates) { - super(ApiKeys.STOP_REPLICA, version, controllerId, controllerEpoch, brokerEpoch); + this(version, controllerId, controllerEpoch, brokerEpoch, deletePartitions, + topicStates, false); + } + + public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, + boolean deletePartitions, List<StopReplicaTopicState> topicStates, + boolean kraftController) { + super(ApiKeys.STOP_REPLICA, version, controllerId, controllerEpoch, brokerEpoch, kraftController); this.deletePartitions = deletePartitions; this.topicStates = topicStates; } @@ -56,6 +63,10 @@ public class StopReplicaRequest extends AbstractControlRequest { .setControllerEpoch(controllerEpoch) .setBrokerEpoch(brokerEpoch); + if (version >= 4) { + data.setIsKRaftController(kraftController); + } + if (version >= 3) { data.setTopicStates(topicStates); } else if (version >= 1) { @@ -196,6 +207,11 @@ public class StopReplicaRequest extends AbstractControlRequest { return data.controllerId(); } + @Override + public boolean isKRaftController() { + return data.isKRaftController(); + } + @Override public int controllerEpoch() { return data.controllerEpoch(); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 845bdd92111..c0fd3000cc5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -51,7 +51,14 @@ public class UpdateMetadataRequest extends AbstractControlRequest { public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, List<UpdateMetadataPartitionState> partitionStates, List<UpdateMetadataBroker> liveBrokers, Map<String, Uuid> topicIds) { - super(ApiKeys.UPDATE_METADATA, version, controllerId, controllerEpoch, brokerEpoch); + this(version, controllerId, controllerEpoch, brokerEpoch, partitionStates, + liveBrokers, topicIds, false); + } + + public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, + List<UpdateMetadataPartitionState> partitionStates, List<UpdateMetadataBroker> liveBrokers, + Map<String, Uuid> topicIds, boolean kraftController) { + super(ApiKeys.UPDATE_METADATA, version, controllerId, controllerEpoch, brokerEpoch, kraftController); this.partitionStates = partitionStates; this.liveBrokers = liveBrokers; this.topicIds = topicIds; @@ -81,10 +88,14 @@ public class UpdateMetadataRequest extends AbstractControlRequest { } UpdateMetadataRequestData data = new UpdateMetadataRequestData() - .setControllerId(controllerId) - .setControllerEpoch(controllerEpoch) - .setBrokerEpoch(brokerEpoch) - .setLiveBrokers(liveBrokers); + .setControllerId(controllerId) + .setControllerEpoch(controllerEpoch) + .setBrokerEpoch(brokerEpoch) + .setLiveBrokers(liveBrokers); + + if (version >= 8) { + data.setIsKRaftController(kraftController); + } if (version >= 5) { Map<String, UpdateMetadataTopicState> topicStatesMap = groupByTopic(topicIds, partitionStates); @@ -180,6 +191,11 @@ public class UpdateMetadataRequest extends AbstractControlRequest { return data.controllerId(); } + @Override + public boolean isKRaftController() { + return data.isKRaftController(); + } + @Override public int controllerEpoch() { return data.controllerEpoch(); diff --git a/clients/src/main/resources/common/message/EnvelopeRequest.json b/clients/src/main/resources/common/message/EnvelopeRequest.json index 1f6ff62de8d..a30a50ba684 100644 --- a/clients/src/main/resources/common/message/EnvelopeRequest.json +++ b/clients/src/main/resources/common/message/EnvelopeRequest.json @@ -16,7 +16,7 @@ { "apiKey": 58, "type": "request", - "listeners": ["controller"], + "listeners": ["controller", "zkBroker"], "name": "EnvelopeRequest", // Request struct for forwarding. "validVersions": "0", diff --git a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json index 7042ec84d5b..e049d088c53 100644 --- a/clients/src/main/resources/common/message/LeaderAndIsrRequest.json +++ b/clients/src/main/resources/common/message/LeaderAndIsrRequest.json @@ -36,8 +36,8 @@ "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The current controller ID." }, - { "name": "KRaftControllerId", "type": "int32", "versions": "7+", "entityType": "brokerId", "default": "-1", - "about": "The KRaft controller id, used during migration. See KIP-866" }, + { "name": "isKRaftController", "type": "bool", "versions": "7+", "default": "false", + "about": "If KRaft controller id is used during migration. See KIP-866" }, { "name": "ControllerEpoch", "type": "int32", "versions": "0+", "about": "The current controller epoch." }, { "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": true, "default": "-1", diff --git a/clients/src/main/resources/common/message/StopReplicaRequest.json b/clients/src/main/resources/common/message/StopReplicaRequest.json index 67ed752a555..7c82c97aa71 100644 --- a/clients/src/main/resources/common/message/StopReplicaRequest.json +++ b/clients/src/main/resources/common/message/StopReplicaRequest.json @@ -31,8 +31,8 @@ "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The controller id." }, - { "name": "KRaftControllerId", "type": "int32", "versions": "4+", "entityType": "brokerId", "default": "-1", - "about": "The KRaft controller id, used during migration. See KIP-866" }, + { "name": "isKRaftController", "type": "bool", "versions": "4+", "default": "false", + "about": "If KRaft controller id is used during migration. See KIP-866" }, { "name": "ControllerEpoch", "type": "int32", "versions": "0+", "about": "The controller epoch." }, { "name": "BrokerEpoch", "type": "int64", "versions": "1+", "default": "-1", "ignorable": true, diff --git a/clients/src/main/resources/common/message/UpdateMetadataRequest.json b/clients/src/main/resources/common/message/UpdateMetadataRequest.json index 287b0ed1a4b..e876caa2bac 100644 --- a/clients/src/main/resources/common/message/UpdateMetadataRequest.json +++ b/clients/src/main/resources/common/message/UpdateMetadataRequest.json @@ -36,8 +36,8 @@ "fields": [ { "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId", "about": "The controller id." }, - { "name": "KRaftControllerId", "type": "int32", "versions": "8+", "entityType": "brokerId", - "about": "The KRaft controller id, used during migration." }, + { "name": "isKRaftController", "type": "bool", "versions": "8+", "default": "false", + "about": "If KRaft controller id is used during migration. See KIP-866" }, { "name": "ControllerEpoch", "type": "int32", "versions": "0+", "about": "The controller epoch." }, { "name": "BrokerEpoch", "type": "int64", "versions": "5+", "ignorable": true, "default": "-1", diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index 62571c6986a..2e498339256 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -188,7 +188,6 @@ public class ApiVersionsResponseTest { // Ensure that APIs needed for the KRaft mode are not exposed through ApiVersions until we are ready for them HashSet<ApiKeys> exposedApis = apiKeysInResponse(response); - assertFalse(exposedApis.contains(ApiKeys.ENVELOPE)); assertFalse(exposedApis.contains(ApiKeys.VOTE)); assertFalse(exposedApis.contains(ApiKeys.BEGIN_QUORUM_EPOCH)); assertFalse(exposedApis.contains(ApiKeys.END_QUORUM_EPOCH)); diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index e1bd575f0c2..dad462dcad6 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -63,6 +63,12 @@ object Kafka extends Logging { props } + // For Zk mode, the API forwarding is currently enabled only under migration flag. We can + // directly do a static IBP check to see API forwarding is enabled here because IBP check is + // static in Zk mode. + private def enableApiForwarding(config: KafkaConfig) = + config.migrationEnabled && config.interBrokerProtocolVersion.isApiForwardingEnabled + private def buildServer(props: Properties): Server = { val config = KafkaConfig.fromProps(props, false) if (config.requiresZookeeper) { @@ -70,7 +76,7 @@ object Kafka extends Logging { config, Time.SYSTEM, threadNamePrefix = None, - enableForwarding = false + enableForwarding = enableApiForwarding(config) ) } else { new KafkaRaftServer( diff --git a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala index c2724e24f1d..e1bea72ddd0 100644 --- a/core/src/main/scala/kafka/common/InterBrokerSendThread.scala +++ b/core/src/main/scala/kafka/common/InterBrokerSendThread.scala @@ -33,7 +33,7 @@ import scala.jdk.CollectionConverters._ */ abstract class InterBrokerSendThread( name: String, - networkClient: KafkaClient, + var networkClient: KafkaClient, requestTimeoutMs: Int, time: Time, isInterruptible: Boolean = true diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala index 3d1e5d3f63c..3446f83b647 100644 --- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala @@ -21,6 +21,7 @@ import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.atomic.AtomicReference import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} import kafka.raft.RaftManager +import kafka.server.metadata.ZkMetadataCache import kafka.utils.Logging import org.apache.kafka.clients._ import org.apache.kafka.common.{Node, Reconfigurable} @@ -37,42 +38,55 @@ import scala.collection.Seq import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ -trait ControllerNodeProvider { - def get(): Option[Node] - def listenerName: ListenerName - def securityProtocol: SecurityProtocol - def saslMechanism: String -} +case class ControllerInformation(node: Option[Node], + listenerName: ListenerName, + securityProtocol: SecurityProtocol, + saslMechanism: String, + isZkController: Boolean) -object MetadataCacheControllerNodeProvider { - def apply( - config: KafkaConfig, - metadataCache: kafka.server.MetadataCache - ): MetadataCacheControllerNodeProvider = { - val listenerName = config.controlPlaneListenerName - .getOrElse(config.interBrokerListenerName) - - val securityProtocol = config.controlPlaneSecurityProtocol - .getOrElse(config.interBrokerSecurityProtocol) - - new MetadataCacheControllerNodeProvider( - metadataCache, - listenerName, - securityProtocol, - config.saslMechanismInterBrokerProtocol - ) - } +trait ControllerNodeProvider { + def getControllerInfo(): ControllerInformation } class MetadataCacheControllerNodeProvider( - val metadataCache: kafka.server.MetadataCache, - val listenerName: ListenerName, - val securityProtocol: SecurityProtocol, - val saslMechanism: String + val metadataCache: ZkMetadataCache, + val config: KafkaConfig ) extends ControllerNodeProvider { - override def get(): Option[Node] = { - metadataCache.getControllerId - .flatMap(metadataCache.getAliveBrokerNode(_, listenerName)) + + private val zkControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) + private val zkControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) + private val zkControllerSaslMechanism = config.saslMechanismInterBrokerProtocol + + private val kraftControllerListenerName = if (config.controllerListenerNames.nonEmpty) + new ListenerName(config.controllerListenerNames.head) else null + private val kraftControllerSecurityProtocol = Option(kraftControllerListenerName) + .map( listener => config.effectiveListenerSecurityProtocolMap.getOrElse( + listener, SecurityProtocol.forName(kraftControllerListenerName.value()))) + .orNull + private val kraftControllerSaslMechanism = config.saslMechanismControllerProtocol + + private val emptyZkControllerInfo = ControllerInformation( + None, + zkControllerListenerName, + zkControllerSecurityProtocol, + zkControllerSaslMechanism, + isZkController = true) + + override def getControllerInfo(): ControllerInformation = { + metadataCache.getControllerId.map { + case ZkCachedControllerId(id) => ControllerInformation( + metadataCache.getAliveBrokerNode(id, zkControllerListenerName), + zkControllerListenerName, + zkControllerSecurityProtocol, + zkControllerSaslMechanism, + isZkController = true) + case KRaftCachedControllerId(id) => ControllerInformation( + metadataCache.getAliveBrokerNode(id, kraftControllerListenerName), + kraftControllerListenerName, + kraftControllerSecurityProtocol, + kraftControllerSaslMechanism, + isZkController = false) + }.getOrElse(emptyZkControllerInfo) } } @@ -108,9 +122,9 @@ class RaftControllerNodeProvider( ) extends ControllerNodeProvider with Logging { val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap - override def get(): Option[Node] = { - raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode) - } + override def getControllerInfo(): ControllerInformation = + ControllerInformation(raftManager.leaderAndEpoch.leaderId.asScala.map(idToNode), + listenerName, securityProtocol, saslMechanism, isZkController = false) } object BrokerToControllerChannelManager { @@ -176,13 +190,13 @@ class BrokerToControllerChannelManagerImpl( } private[server] def newRequestThread = { - val networkClient = { + def networkClient(controllerInfo: ControllerInformation) = { val channelBuilder = ChannelBuilders.clientChannelBuilder( - controllerNodeProvider.securityProtocol, + controllerInfo.securityProtocol, JaasContext.Type.SERVER, config, - controllerNodeProvider.listenerName, - controllerNodeProvider.saslMechanism, + controllerInfo.listenerName, + controllerInfo.saslMechanism, time, config.saslInterBrokerHandshakeRequestEnable, logContext @@ -276,17 +290,38 @@ case class BrokerToControllerQueueItem( ) class BrokerToControllerRequestThread( - networkClient: KafkaClient, + networkClientFactory: ControllerInformation => KafkaClient, metadataUpdater: ManualMetadataUpdater, controllerNodeProvider: ControllerNodeProvider, config: KafkaConfig, time: Time, threadName: String, retryTimeoutMs: Long -) extends InterBrokerSendThread(threadName, networkClient, Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, time, isInterruptible = false) { +) extends InterBrokerSendThread(threadName, null, Math.min(Int.MaxValue, Math.min(config.controllerSocketTimeoutMs, retryTimeoutMs)).toInt, time, isInterruptible = false) { + + var isZkController = false + private def maybeResetNetworkClient(controllerInformation: ControllerInformation, + initialize: Boolean = false): Unit = { + if (initialize || isZkController != controllerInformation.isZkController) { + if (!initialize) { + debug("Controller changed to " + (if (isZkController) "zk" else "kraft") + " mode. " + + "Resetting network client") + } + // Close existing network client. + if (networkClient != null) { + networkClient.initiateClose() + networkClient.close() + } + isZkController = controllerInformation.isZkController + updateControllerAddress(controllerInformation.node.orNull) + controllerInformation.node.foreach(n => metadataUpdater.setNodes(Seq(n).asJava)) + networkClient = networkClientFactory(controllerInformation) + } + } private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem]() private val activeController = new AtomicReference[Node](null) + maybeResetNetworkClient(controllerNodeProvider.getControllerInfo(), initialize = true) // Used for testing @volatile @@ -364,11 +399,13 @@ class BrokerToControllerRequestThread( } override def doWork(): Unit = { + val controllerInformation = controllerNodeProvider.getControllerInfo() + maybeResetNetworkClient(controllerInformation) if (activeControllerAddress().isDefined) { super.pollOnce(Long.MaxValue) } else { - debug("Controller isn't known, checking with controller provider") - controllerNodeProvider.get() match { + debug("Controller isn't cached, looking for local metadata changes") + controllerInformation.node match { case Some(controllerNode) => info(s"Recorded new controller, from now on will use node $controllerNode") updateControllerAddress(controllerNode) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 4839e00f8c2..ebe208d6e8f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -80,7 +80,6 @@ import java.util import java.util.concurrent.{CompletableFuture, ConcurrentHashMap} import java.util.concurrent.atomic.AtomicInteger import java.util.{Collections, Optional} - import scala.annotation.nowarn import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.jdk.CollectionConverters._ @@ -1317,6 +1316,12 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Sending topic metadata %s and brokers %s for correlation id %d to client %s".format(completeTopicMetadata.mkString(","), brokers.mkString(","), request.header.correlationId, request.header.clientId)) + val controllerId = { + metadataCache.getControllerId.flatMap { + case ZkCachedControllerId(id) => Some(id) + case KRaftCachedControllerId(_) => metadataCache.getRandomAliveBrokerId + } + } requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => MetadataResponse.prepareResponse( @@ -1324,7 +1329,7 @@ class KafkaApis(val requestChannel: RequestChannel, requestThrottleMs, brokers.toList.asJava, clusterId, - metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID), + controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID), completeTopicMetadata.asJava, clusterAuthorizedOperations )) @@ -3332,13 +3337,18 @@ class KafkaApis(val requestChannel: RequestChannel, } val brokers = metadataCache.getAliveBrokerNodes(request.context.listenerName) - val controllerId = metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID) + val controllerId = { + metadataCache.getControllerId.flatMap { + case ZkCachedControllerId(id) => Some(id) + case KRaftCachedControllerId(_) => metadataCache.getRandomAliveBrokerId + } + } requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { val data = new DescribeClusterResponseData() .setThrottleTimeMs(requestThrottleMs) .setClusterId(clusterId) - .setControllerId(controllerId) + .setControllerId(controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)) .setClusterAuthorizedOperations(clusterAuthorizedOperations) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 79a621c6b54..2f880a118e1 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -147,6 +147,7 @@ class KafkaServer( var kafkaScheduler: KafkaScheduler = _ + var kraftControllerNodes: Seq[Node] = Seq.empty @volatile var metadataCache: ZkMetadataCache = _ var quotaManagers: QuotaFactory.QuotaManagers = _ @@ -272,8 +273,16 @@ class KafkaServer( _brokerState = BrokerState.RECOVERY logManager.startup(zkClient.getAllTopicsInCluster()) - metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion, brokerFeatures) - val controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache) + if (config.migrationEnabled) { + kraftControllerNodes = RaftConfig.voterConnectionsToNodes( + RaftConfig.parseVoterConnections(config.quorumVoters)).asScala + } + metadataCache = MetadataCache.zkMetadataCache( + config.brokerId, + config.interBrokerProtocolVersion, + brokerFeatures, + kraftControllerNodes) + val controllerNodeProvider = new MetadataCacheControllerNodeProvider(metadataCache, config) /* initialize feature change listener */ _featureChangeListener = new FinalizedFeatureChangeListener(metadataCache, _zkClient) @@ -614,7 +623,14 @@ class KafkaServer( private def controlledShutdown(): Unit = { val socketTimeoutMs = config.controllerSocketTimeoutMs + // TODO (KAFKA-14447): Handle controlled shutdown for zkBroker when we have KRaft controller. def doControlledShutdown(retries: Int): Boolean = { + if (config.requiresZookeeper && + metadataCache.getControllerId.exists(_.isInstanceOf[KRaftCachedControllerId])) { + info("ZkBroker currently has a KRaft controller. Controlled shutdown will be handled " + + "through broker life cycle manager") + return false + } val metadataUpdater = new ManualMetadataUpdater() val networkClient = { val channelBuilder = ChannelBuilders.clientChannelBuilder( @@ -668,7 +684,7 @@ class KafkaServer( // 1. Find the controller and establish a connection to it. // If the controller id or the broker registration are missing, we sleep and retry (if there are remaining retries) - metadataCache.getControllerId match { + metadataCache.getControllerId.filter(_.isInstanceOf[ZkCachedControllerId]).map(_.id) match { case Some(controllerId) => metadataCache.getAliveBrokerNode(controllerId, config.interBrokerListenerName) match { case Some(broker) => diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index b20d4f6414c..e0501ef1ebe 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -32,6 +32,13 @@ case class FinalizedFeaturesAndEpoch(features: Map[String, Short], epoch: Long) } } +sealed trait CachedControllerId { + val id: Int +} + +case class ZkCachedControllerId(id: Int) extends CachedControllerId +case class KRaftCachedControllerId(id: Int) extends CachedControllerId + trait MetadataCache { /** @@ -92,7 +99,7 @@ trait MetadataCache { def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] - def getControllerId: Option[Int] + def getControllerId: Option[CachedControllerId] def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster @@ -103,13 +110,18 @@ trait MetadataCache { def metadataVersion(): MetadataVersion def features(): FinalizedFeaturesAndEpoch + + def getRandomAliveBrokerId: Option[Int] } object MetadataCache { def zkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, - brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty()): ZkMetadataCache = { - new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures) + brokerFeatures: BrokerFeatures = BrokerFeatures.createEmpty(), + kraftControllerNodes: collection.Seq[Node] = null) + : ZkMetadataCache = { + new ZkMetadataCache(brokerId, metadataVersion, brokerFeatures, + Option(kraftControllerNodes).getOrElse(collection.Seq.empty[Node])) } def kRaftMetadataCache(brokerId: Int): KRaftMetadataCache = { diff --git a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala index 7bd1c6343d9..7e6ad7bfd09 100644 --- a/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala @@ -18,7 +18,7 @@ package kafka.server.metadata import kafka.controller.StateChangeLogger -import kafka.server.{FinalizedFeaturesAndEpoch, MetadataCache} +import kafka.server.{CachedControllerId, FinalizedFeaturesAndEpoch, KRaftCachedControllerId, MetadataCache} import kafka.utils.Logging import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} @@ -287,14 +287,19 @@ class KRaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging w result.toMap } - override def getControllerId: Option[Int] = getRandomAliveBroker(_currentImage) - /** * Choose a random broker node to report as the controller. We do this because we want * the client to send requests destined for the controller to a random broker. * Clients do not have direct access to the controller in the KRaft world, as explained * in KIP-590. */ + override def getControllerId: Option[CachedControllerId] = + getRandomAliveBroker(_currentImage).map(KRaftCachedControllerId) + + override def getRandomAliveBrokerId: Option[Int] = { + getRandomAliveBroker(_currentImage) + } + private def getRandomAliveBroker(image: MetadataImage): Option[Int] = { val aliveBrokers = getAliveBrokers(image).toList if (aliveBrokers.isEmpty) { diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index d69785f90f6..d774cd41a5c 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._ import kafka.cluster.{Broker, EndPoint} import kafka.api._ import kafka.controller.StateChangeLogger -import kafka.server.{BrokerFeatures, FinalizedFeaturesAndEpoch, MetadataCache} +import kafka.server.{BrokerFeatures, CachedControllerId, FinalizedFeaturesAndEpoch, KRaftCachedControllerId, MetadataCache, ZkCachedControllerId} import kafka.utils.CoreUtils._ import kafka.utils.Logging import kafka.utils.Implicits._ @@ -42,7 +42,7 @@ import org.apache.kafka.common.requests.{ApiVersionsResponse, MetadataResponse, import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.server.common.MetadataVersion -import java.util.concurrent.TimeUnit +import java.util.concurrent.{ThreadLocalRandom, TimeUnit} import scala.concurrent.TimeoutException import scala.math.max @@ -60,7 +60,11 @@ trait ZkFinalizedFeatureCache { * A cache for the state (e.g., current leader) of each partition. This cache is updated through * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. */ -class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFeatures: BrokerFeatures) +class ZkMetadataCache( + brokerId: Int, + metadataVersion: MetadataVersion, + brokerFeatures: BrokerFeatures, + kraftControllerNodes: Seq[Node] = Seq.empty) extends MetadataCache with ZkFinalizedFeatureCache with Logging { private val partitionMetadataLock = new ReentrantReadWriteLock() @@ -68,8 +72,12 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea //replace the value with a completely new one. this means reads (which are not under any lock) need to grab //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation. //multiple reads of this value risk getting different snapshots. - @volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot(partitionStates = mutable.AnyRefMap.empty, - topicIds = Map.empty, controllerId = None, aliveBrokers = mutable.LongMap.empty, aliveNodes = mutable.LongMap.empty) + @volatile private var metadataSnapshot: MetadataSnapshot = MetadataSnapshot( + partitionStates = mutable.AnyRefMap.empty, + topicIds = Map.empty, + controllerId = None, + aliveBrokers = mutable.LongMap.empty, + aliveNodes = mutable.LongMap.empty) this.logIdent = s"[MetadataCache brokerId=$brokerId] " private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None) @@ -79,6 +87,8 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea private val featureLock = new ReentrantLock() private val featureCond = featureLock.newCondition() + private val kraftControllerNodeMap = kraftControllerNodes.map(node => node.id() -> node).toMap + // This method is the main hotspot when it comes to the performance of metadata requests, // we should be careful about adding additional logic here. Relatedly, `brokers` is // `List[Integer]` instead of `List[Int]` to avoid a collection copy. @@ -248,7 +258,12 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea } override def getAliveBrokerNode(brokerId: Int, listenerName: ListenerName): Option[Node] = { - metadataSnapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName)) + val snapshot = metadataSnapshot + brokerId match { + case id if snapshot.controllerId.filter(_.isInstanceOf[KRaftCachedControllerId]).exists(_.id == id) => + kraftControllerNodeMap.get(id) + case _ => snapshot.aliveBrokers.get(brokerId).flatMap(_.getNode(listenerName)) + } } override def getAliveBrokerNodes(listenerName: ListenerName): Iterable[Node] = { @@ -315,7 +330,14 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea }.getOrElse(Map.empty[Int, Node]) } - def getControllerId: Option[Int] = metadataSnapshot.controllerId + def getControllerId: Option[CachedControllerId] = { + metadataSnapshot.controllerId + } + + def getRandomAliveBrokerId: Option[Int] = { + val aliveBrokers = metadataSnapshot.aliveBrokers.values.toList + Some(aliveBrokers(ThreadLocalRandom.current().nextInt(aliveBrokers.size)).id) + } def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = { val snapshot = metadataSnapshot @@ -329,6 +351,13 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea nodes.getOrElse(id.toLong, new Node(id, "", -1)) } + def controllerId(snapshot: MetadataSnapshot): Option[Node] = { + snapshot.controllerId.flatMap { + case ZkCachedControllerId(id) => getAliveBrokerNode(id, listenerName) + case KRaftCachedControllerId(_) => getRandomAliveBrokerId.flatMap(getAliveBrokerNode(_, listenerName)) + } + } + val partitions = getAllPartitions(snapshot) .filter { case (_, state) => state.leader != LeaderAndIsr.LeaderDuringDelete } .map { case (tp, state) => @@ -342,7 +371,7 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea new Cluster(clusterId, nodes.values.toBuffer.asJava, partitions.toBuffer.asJava, unauthorizedTopics, internalTopics, - snapshot.controllerId.map(id => node(id)).orNull) + controllerId(snapshot).orNull) } // This method returns the deleted TopicPartitions received from UpdateMetadataRequest @@ -351,9 +380,13 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea val aliveBrokers = new mutable.LongMap[Broker](metadataSnapshot.aliveBrokers.size) val aliveNodes = new mutable.LongMap[collection.Map[ListenerName, Node]](metadataSnapshot.aliveNodes.size) - val controllerIdOpt = updateMetadataRequest.controllerId match { + val controllerIdOpt: Option[CachedControllerId] = updateMetadataRequest.controllerId match { case id if id < 0 => None - case id => Some(id) + case id => + if (updateMetadataRequest.isKRaftController) + Some(KRaftCachedControllerId(id)) + else + Some(ZkCachedControllerId(id)) } updateMetadataRequest.liveBrokers.forEach { broker => @@ -386,7 +419,8 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea val deletedPartitions = new mutable.ArrayBuffer[TopicPartition] if (!updateMetadataRequest.partitionStates.iterator.hasNext) { - metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, topicIds.toMap, controllerIdOpt, aliveBrokers, aliveNodes) + metadataSnapshot = MetadataSnapshot(metadataSnapshot.partitionStates, topicIds.toMap, + controllerIdOpt, aliveBrokers, aliveNodes) } else { //since kafka may do partial metadata updates, we start by copying the previous state val partitionStates = new mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]](metadataSnapshot.partitionStates.size) @@ -446,7 +480,7 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], topicIds: Map[String, Uuid], - controllerId: Option[Int], + controllerId: Option[CachedControllerId], aliveBrokers: mutable.LongMap[Broker], aliveNodes: mutable.LongMap[collection.Map[ListenerName, Node]]) { val topicNames: Map[Uuid, String] = topicIds.map { case (topicName, topicId) => (topicId, topicName) } diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 3b459875b60..f4ab8e041be 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -60,10 +60,14 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness { } override def generateConfigs: Seq[KafkaConfig] = { + val cfgs = TestUtils.createBrokerConfigs(brokerCount, zkConnectOrNull, interBrokerSecurityProtocol = Some(securityProtocol), trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties, logDirCount = logDirCount) configureListeners(cfgs) modifyConfigs(cfgs) + if (isZkMigrationTest()) { + cfgs.foreach(_.setProperty(KafkaConfig.MigrationEnabledProp, "true")) + } insertControllerListenersIfNeeded(cfgs) cfgs.map(KafkaConfig.fromProps) } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 80f6cd758b3..2a3e0a18117 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -215,7 +215,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val brokerIds = brokers.map(_.config.brokerId).toSet assertTrue(brokerIds.contains(controller.id)) } else { - assertEquals(brokers.head.dataPlaneRequestProcessor.metadataCache.getControllerId. + assertEquals(brokers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.map(_.id). getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id) } diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala index f746cec53d7..51720db924a 100755 --- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala +++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala @@ -160,6 +160,10 @@ abstract class QuorumTestHarness extends Logging { TestInfoUtils.isKRaft(testInfo) } + def isZkMigrationTest(): Boolean = { + TestInfoUtils.isZkMigrationTest(testInfo) + } + def checkIsZKTest(): Unit = { if (isKRaftTest()) { throw new RuntimeException("This function can't be accessed when running the test " + diff --git a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala index bee1aefaca2..eea5c7517a0 100644 --- a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala +++ b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala @@ -25,9 +25,10 @@ import kafka.utils.TestUtils.TestControllerRequestCompletionHandler import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, Metadata, MockClient, NodeApiVersions} import org.apache.kafka.common.Node import org.apache.kafka.common.message.{EnvelopeResponseData, MetadataRequestData} +import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, EnvelopeResponse, MetadataRequest, RequestTestUtils} -import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder import org.apache.kafka.common.utils.MockTime import org.junit.jupiter.api.Assertions._ @@ -37,6 +38,14 @@ import org.mockito.Mockito._ class BrokerToControllerRequestThreadTest { + private def controllerInfo(node: Option[Node]): ControllerInformation = { + ControllerInformation(node, new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true) + } + + private def emptyControllerInfo: ControllerInformation = { + controllerInfo(None) + } + @Test def testRetryTimeoutWhileControllerNotAvailable(): Unit = { val time = new MockTime() @@ -45,10 +54,10 @@ class BrokerToControllerRequestThreadTest { val mockClient = new MockClient(time, metadata) val controllerNodeProvider = mock(classOf[ControllerNodeProvider]) - when(controllerNodeProvider.get()).thenReturn(None) + when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo) val retryTimeoutMs = 30000 - val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, + val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs) testRequestThread.started = true @@ -82,10 +91,10 @@ class BrokerToControllerRequestThreadTest { val controllerNodeProvider = mock(classOf[ControllerNodeProvider]) val activeController = new Node(controllerId, "host", 1234) - when(controllerNodeProvider.get()).thenReturn(Some(activeController)) + when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController))) val expectedResponse = RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("a", 2)) - val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, + val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) testRequestThread.started = true mockClient.prepareResponse(expectedResponse) @@ -124,10 +133,11 @@ class BrokerToControllerRequestThreadTest { val oldController = new Node(oldControllerId, "host1", 1234) val newController = new Node(newControllerId, "host2", 1234) - when(controllerNodeProvider.get()).thenReturn(Some(oldController), Some(newController)) + when(controllerNodeProvider.getControllerInfo()).thenReturn( + emptyControllerInfo, controllerInfo(Some(oldController)), controllerInfo(Some(newController))) val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2)) - val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), + val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) testRequestThread.started = true @@ -171,13 +181,14 @@ class BrokerToControllerRequestThreadTest { val oldController = new Node(oldControllerId, "host1", port) val newController = new Node(newControllerId, "host2", port) - when(controllerNodeProvider.get()).thenReturn(Some(oldController), Some(newController)) + when(controllerNodeProvider.getControllerInfo()).thenReturn( + emptyControllerInfo, controllerInfo(Some(oldController)), controllerInfo(Some(newController))) val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2, Collections.singletonMap("a", Errors.NOT_CONTROLLER), Collections.singletonMap("a", 2)) val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2)) - val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, + val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) testRequestThread.started = true @@ -231,7 +242,11 @@ class BrokerToControllerRequestThreadTest { val oldController = new Node(oldControllerId, "host1", port) val newController = new Node(newControllerId, "host2", port) - when(controllerNodeProvider.get()).thenReturn(Some(oldController), Some(newController)) + when(controllerNodeProvider.getControllerInfo()).thenReturn( + emptyControllerInfo, // call to create network client. + controllerInfo(Some(oldController)), + controllerInfo(Some(newController)) + ) // create an envelopeResponse with NOT_CONTROLLER error val envelopeResponseWithNotControllerError = new EnvelopeResponse( @@ -240,7 +255,7 @@ class BrokerToControllerRequestThreadTest { // response for retry request after receiving NOT_CONTROLLER error val expectedResponse = RequestTestUtils.metadataUpdateWith(3, Collections.singletonMap("a", 2)) - val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, + val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) testRequestThread.started = true @@ -296,13 +311,13 @@ class BrokerToControllerRequestThreadTest { val controllerNodeProvider = mock(classOf[ControllerNodeProvider]) val controller = new Node(controllerId, "host1", 1234) - when(controllerNodeProvider.get()).thenReturn(Some(controller)) + when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo, controllerInfo(Some(controller))) val retryTimeoutMs = 30000 val responseWithNotControllerError = RequestTestUtils.metadataUpdateWith("cluster1", 2, Collections.singletonMap("a", Errors.NOT_CONTROLLER), Collections.singletonMap("a", 2)) - val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, + val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs) testRequestThread.started = true @@ -344,7 +359,7 @@ class BrokerToControllerRequestThreadTest { val controllerNodeProvider = mock(classOf[ControllerNodeProvider]) val activeController = new Node(controllerId, "host", 1234) - when(controllerNodeProvider.get()).thenReturn(Some(activeController)) + when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController))) val callbackResponse = new AtomicReference[ClientResponse]() val completionHandler = new ControllerRequestCompletionHandler { @@ -360,7 +375,7 @@ class BrokerToControllerRequestThreadTest { mockClient.prepareUnsupportedVersionResponse(request => request.apiKey == ApiKeys.METADATA) - val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, + val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) testRequestThread.started = true @@ -381,7 +396,7 @@ class BrokerToControllerRequestThreadTest { val controllerNodeProvider = mock(classOf[ControllerNodeProvider]) val activeController = new Node(controllerId, "host", 1234) - when(controllerNodeProvider.get()).thenReturn(Some(activeController)) + when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo(Some(activeController))) val callbackResponse = new AtomicReference[ClientResponse]() val completionHandler = new ControllerRequestCompletionHandler { @@ -397,7 +412,7 @@ class BrokerToControllerRequestThreadTest { mockClient.createPendingAuthenticationError(activeController, 50) - val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, + val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) testRequestThread.started = true @@ -416,8 +431,9 @@ class BrokerToControllerRequestThreadTest { val mockClient = new MockClient(time, metadata) val controllerNodeProvider = mock(classOf[ControllerNodeProvider]) + when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo) - val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider, + val testRequestThread = new BrokerToControllerRequestThread(_ => mockClient, new ManualMetadataUpdater(), controllerNodeProvider, config, time, "", retryTimeoutMs = Long.MaxValue) val completionHandler = new TestControllerRequestCompletionHandler(None) diff --git a/core/src/test/scala/kafka/utils/TestInfoUtils.scala b/core/src/test/scala/kafka/utils/TestInfoUtils.scala index fa48024f313..ba93fa36b99 100644 --- a/core/src/test/scala/kafka/utils/TestInfoUtils.scala +++ b/core/src/test/scala/kafka/utils/TestInfoUtils.scala @@ -43,5 +43,13 @@ object TestInfoUtils { false } } + + def isZkMigrationTest(testInfo: TestInfo): Boolean = { + if (!isKRaft(testInfo)) { + false + } else { + testInfo.getDisplayName().contains("quorum=zkMigration") + } + } final val TestWithParameterizedQuorumName = "{displayName}.quorum={0}" } diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 5dbac1bb4dc..1ef4d47995a 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -357,7 +357,8 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { config, time = brokerTime(config.brokerId), threadNamePrefix = None, - startup = false + startup = false, + enableZkApiForwarding = isZkMigrationTest() ) } } diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala index 9936f8deaed..5d94319ed35 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala @@ -124,11 +124,11 @@ class ApiVersionManagerTest { features = brokerFeatures, metadataCache = metadataCache ) - assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE)) - assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE)) + assertTrue(versionManager.isApiEnabled(ApiKeys.ENVELOPE)) + assertTrue(versionManager.enabledApis.contains(ApiKeys.ENVELOPE)) val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs = 0) - assertNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)) + assertNotNull(apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)) } } diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala index 1a0fac443c0..da6d9a8aa80 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala @@ -57,13 +57,14 @@ class BrokerLifecycleManagerTest { class SimpleControllerNodeProvider extends ControllerNodeProvider { val node = new AtomicReference[Node](null) - override def get(): Option[Node] = Option(node.get()) + def listenerName: ListenerName = new ListenerName("PLAINTEXT") - override def listenerName: ListenerName = new ListenerName("PLAINTEXT") + def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT; - override def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT; + def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM - override def saslMechanism: String = SaslConfigs.DEFAULT_SASL_MECHANISM + override def getControllerInfo(): ControllerInformation = ControllerInformation(Option(node.get()), + listenerName, securityProtocol, saslMechanism, isZkController = false) } class BrokerLifecycleManagerTestContext(properties: Properties) { diff --git a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala index 2bb1314a789..cab586f23e4 100644 --- a/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BrokerRegistrationRequestTest.scala @@ -17,7 +17,7 @@ package unit.kafka.server -import kafka.server.{BrokerToControllerChannelManager, ControllerNodeProvider, ControllerRequestCompletionHandler} +import kafka.server.{BrokerToControllerChannelManager, ControllerInformation, ControllerNodeProvider, ControllerRequestCompletionHandler} import kafka.test.ClusterInstance import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type} import kafka.test.junit.ClusterTestExtensions @@ -48,17 +48,22 @@ class BrokerRegistrationRequestTest { def brokerToControllerChannelManager(clusterInstance: ClusterInstance): BrokerToControllerChannelManager = { BrokerToControllerChannelManager( new ControllerNodeProvider() { - override def get(): Option[Node] = Some(new Node( + def node: Option[Node] = Some(new Node( clusterInstance.anyControllerSocketServer().config.nodeId, "127.0.0.1", clusterInstance.anyControllerSocketServer().boundPort(clusterInstance.controllerListenerName().get()), )) - override def listenerName: ListenerName = clusterInstance.controllerListenerName().get() + def listenerName: ListenerName = clusterInstance.controllerListenerName().get() - override def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT + val securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT - override def saslMechanism: String = "" + val saslMechanism: String = "" + + def isZkController: Boolean = !clusterInstance.isKRaftTest + + override def getControllerInfo(): ControllerInformation = + ControllerInformation(node, listenerName, securityProtocol, saslMechanism, isZkController) }, Time.SYSTEM, new Metrics(), diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala index a193db284c4..c6892566968 100644 --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala @@ -22,15 +22,16 @@ import org.apache.kafka.common.Uuid import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.CreateTopicsRequestData import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection -import org.apache.kafka.common.protocol.ApiKeys -import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.CreateTopicsRequest import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource + import scala.jdk.CollectionConverters._ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) @ValueSource(strings = Array("zk", "kraft")) def testValidCreateTopicsRequests(quorum: String): Unit = { @@ -148,13 +149,14 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk")) + @ValueSource(strings = Array("zk", "zkMigration")) def testNotController(quorum: String): Unit = { // Note: we don't run this test when in KRaft mode, because KRaft doesn't have this // behavior of returning NOT_CONTROLLER. Instead, the request is forwarded. val req = topicsReq(Seq(topicReq("topic1"))) val response = sendCreateTopicRequest(req, notControllerSocketServer) - assertEquals(1, response.errorCounts().get(Errors.NOT_CONTROLLER)) + val error = if (isZkMigrationTest()) Errors.NONE else Errors.NOT_CONTROLLER + assertEquals(1, response.errorCounts().get(error)) } @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index 629f203169d..ff14323b019 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -221,7 +221,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging { * Instead, the request is forwarded. */ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk")) + @ValueSource(strings = Array("zk", "zkMigration")) def testNotController(quorum: String): Unit = { val request = new DeleteTopicsRequest.Builder( new DeleteTopicsRequestData() @@ -229,8 +229,9 @@ class DeleteTopicsRequestTest extends BaseRequestTest with Logging { .setTimeoutMs(1000)).build() val response = sendDeleteTopicsRequest(request, notControllerSocketServer) + val expectedError = if (isZkMigrationTest()) Errors.NONE else Errors.NOT_CONTROLLER val error = response.data.responses.find("not-controller").errorCode() - assertEquals(Errors.NOT_CONTROLLER.code, error, "Expected controller error when routed incorrectly") + assertEquals(expectedError.code(), error) } private def validateTopicIsDeleted(topic: String): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala index d0fc30fbdec..21c7d0d9807 100644 --- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala @@ -60,6 +60,14 @@ class ForwardingManagerTest { NodeApiVersions.create(List(envelopeApiVersion).asJava) } + private def controllerInfo = { + ControllerInformation(Some(new Node(0, "host", 1234)), new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true) + } + + private def emptyControllerInfo = { + ControllerInformation(None, new ListenerName(""), SecurityProtocol.PLAINTEXT, "", isZkController = true) + } + @Test def testResponseCorrelationIdMismatch(): Unit = { val requestCorrelationId = 27 @@ -71,7 +79,7 @@ class ForwardingManagerTest { val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody, requestHeader.apiVersion, requestCorrelationId + 1) - Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234))) + Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo) val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest] client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.NONE)); @@ -95,7 +103,7 @@ class ForwardingManagerTest { val responseBuffer = RequestTestUtils.serializeResponseWithHeader(responseBody, requestHeader.apiVersion, requestCorrelationId) - Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234))) + Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo) val isEnvelopeRequest: RequestMatcher = request => request.isInstanceOf[EnvelopeRequest] client.prepareResponse(isEnvelopeRequest, new EnvelopeResponse(responseBuffer, Errors.UNSUPPORTED_VERSION)); @@ -112,7 +120,7 @@ class ForwardingManagerTest { val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId) val request = buildRequest(requestHeader, requestBuffer, clientPrincipal) - Mockito.when(controllerNodeProvider.get()).thenReturn(None) + Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(emptyControllerInfo) val response = new AtomicReference[AbstractResponse]() forwardingManager.forwardRequest(request, res => res.foreach(response.set)) @@ -136,7 +144,7 @@ class ForwardingManagerTest { val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId) val request = buildRequest(requestHeader, requestBuffer, clientPrincipal) - Mockito.when(controllerNodeProvider.get()).thenReturn(Some(new Node(0, "host", 1234))) + Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo) val response = new AtomicReference[AbstractResponse]() forwardingManager.forwardRequest(request, res => res.foreach(response.set)) @@ -162,8 +170,7 @@ class ForwardingManagerTest { val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId) val request = buildRequest(requestHeader, requestBuffer, clientPrincipal) - val controllerNode = new Node(0, "host", 1234) - Mockito.when(controllerNodeProvider.get()).thenReturn(Some(controllerNode)) + Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo) client.prepareUnsupportedVersionResponse(req => req.apiKey == requestHeader.apiKey) @@ -183,10 +190,9 @@ class ForwardingManagerTest { val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId) val request = buildRequest(requestHeader, requestBuffer, clientPrincipal) - val controllerNode = new Node(0, "host", 1234) - Mockito.when(controllerNodeProvider.get()).thenReturn(Some(controllerNode)) + Mockito.when(controllerNodeProvider.getControllerInfo()).thenReturn(controllerInfo) - client.createPendingAuthenticationError(controllerNode, 50) + client.createPendingAuthenticationError(controllerInfo.node.get, 50) val response = new AtomicReference[AbstractResponse]() forwardingManager.forwardRequest(request, res => res.foreach(response.set)) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 5019a57b0ed..a8e5d5464a4 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -3571,7 +3571,7 @@ class KafkaApisTest { val capturedResponse = verifyNoThrottling(request) val describeClusterResponse = capturedResponse.getValue.asInstanceOf[DescribeClusterResponse] - assertEquals(metadataCache.getControllerId.get, describeClusterResponse.data.controllerId) + assertEquals(metadataCache.getControllerId.get.id, describeClusterResponse.data.controllerId) assertEquals(clusterId, describeClusterResponse.data.clusterId) assertEquals(8096, describeClusterResponse.data.clusterAuthorizedOperations) assertEquals(metadataCache.getAliveBrokerNodes(plaintextListener).toSet, diff --git a/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala b/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala index febd06f354d..1752e2c6644 100644 --- a/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala +++ b/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala @@ -73,7 +73,7 @@ class MockBrokerToControllerChannelManager( queueItem.callback.onTimeout() unsentIterator.remove() } else { - controllerNodeProvider.get() match { + controllerNodeProvider.getControllerInfo().node match { case Some(controller) if client.ready(controller, time.milliseconds()) => val clientRequest = client.newClientRequest( controller.idString, diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 82c19949e3b..8679706777d 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -159,7 +159,8 @@ class RequestQuotaTest extends BaseRequestTest { @Test def testExemptRequestTime(): Unit = { - for (apiKey <- RequestQuotaTest.ClusterActions -- RequestQuotaTest.ClusterActionsWithThrottle) { + val actions = RequestQuotaTest.ClusterActions -- RequestQuotaTest.ClusterActionsWithThrottle -- RequestQuotaTest.Envelope + for (apiKey <- actions) { submitTest(apiKey, () => checkExemptRequestMetric(apiKey)) } @@ -170,7 +171,7 @@ class RequestQuotaTest extends BaseRequestTest { def testUnauthorizedThrottle(): Unit = { RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal - for (apiKey <- ApiKeys.zkBrokerApis.asScala) { + for (apiKey <- ApiKeys.zkBrokerApis.asScala.toSet -- RequestQuotaTest.Envelope) { submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey)) } @@ -765,7 +766,8 @@ object RequestQuotaTest { val ClusterActions = ApiKeys.zkBrokerApis.asScala.filter(_.clusterAction).toSet val ClusterActionsWithThrottle = Set(ApiKeys.ALLOCATE_PRODUCER_IDS, ApiKeys.UPDATE_FEATURES) val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE) - val ClientActions = ApiKeys.zkBrokerApis.asScala.toSet -- ClusterActions -- SaslActions + val Envelope = Set(ApiKeys.ENVELOPE) + val ClientActions = ApiKeys.zkBrokerApis.asScala.toSet -- ClusterActions -- SaslActions -- Envelope val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized") // Principal used for all client connections. This is modified by tests which diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 0162ebdd5e3..7110237ce4c 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -179,7 +179,12 @@ object TestUtils extends Logging { } def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], startup: Boolean): KafkaServer = { - val server = new KafkaServer(config, time, threadNamePrefix, enableForwarding = false) + createServer(config, time, threadNamePrefix, startup, enableZkApiForwarding = false) + } + + def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], + startup: Boolean, enableZkApiForwarding: Boolean) = { + val server = new KafkaServer(config, time, threadNamePrefix, enableForwarding = enableZkApiForwarding) if (startup) server.startup() server } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java index 3e57f29f8d4..6cbf2438c8c 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java @@ -218,7 +218,8 @@ public class ReplicaFetcherThreadBenchmark { 0, 0, 0, updatePartitionState, Collections.emptyList(), topicIds).build(); // TODO: fix to support raft - ZkMetadataCache metadataCache = new ZkMetadataCache(0, config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty()); + ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(0, + config.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), null); metadataCache.updateMetadata(0, updateMetadataRequest); replicaManager = new ReplicaManagerBuilder(). diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index 80376948b89..f65fdcdaa24 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -110,7 +110,8 @@ public class MetadataRequestBenchmark { private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class); private Metrics metrics = new Metrics(); private int brokerId = 1; - private ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId, MetadataVersion.latest(), BrokerFeatures.createEmpty()); + private ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId, + MetadataVersion.latest(), BrokerFeatures.createEmpty(), null); private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class); private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class); private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index 9a60efbeabd..d83f748bcad 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -112,7 +112,9 @@ public class CheckpointBench { scheduler.startup(); final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(); final MetadataCache metadataCache = - MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(), this.brokerProperties.interBrokerProtocolVersion(), BrokerFeatures.createEmpty()); + MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(), + this.brokerProperties.interBrokerProtocolVersion(), + BrokerFeatures.createEmpty(), null); this.quotaManagers = QuotaFactory.instantiate(this.brokerProperties, this.metrics, diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java index efb69007888..3a8343772cb 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/PartitionCreationBench.java @@ -26,6 +26,7 @@ import kafka.server.BrokerFeatures; import kafka.server.BrokerTopicStats; import kafka.server.KafkaConfig; import kafka.server.LogDirFailureChannel; +import kafka.server.MetadataCache; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; import kafka.server.builders.LogManagerBuilder; @@ -33,7 +34,6 @@ import kafka.server.builders.ReplicaManagerBuilder; import kafka.server.checkpoints.OffsetCheckpoints; import kafka.server.metadata.ConfigRepository; import kafka.server.metadata.MockConfigRepository; -import kafka.server.metadata.ZkMetadataCache; import kafka.utils.KafkaScheduler; import kafka.utils.Scheduler; import kafka.utils.TestUtils; @@ -160,7 +160,9 @@ public class PartitionCreationBench { setLogManager(logManager). setQuotaManagers(quotaManagers). setBrokerTopicStats(brokerTopicStats). - setMetadataCache(new ZkMetadataCache(this.brokerProperties.brokerId(), this.brokerProperties.interBrokerProtocolVersion(), BrokerFeatures.createEmpty())). + setMetadataCache(MetadataCache.zkMetadataCache(this.brokerProperties.brokerId(), + this.brokerProperties.interBrokerProtocolVersion(), BrokerFeatures.createEmpty(), + null)). setLogDirFailureChannel(failureChannel). setAlterPartitionManager(alterPartitionManager). build(); diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 9fb7c0ff4ea..15f8fee9bde 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -243,6 +243,10 @@ public enum MetadataVersion { return this.isAtLeast(IBP_3_3_IV1); } + public boolean isApiForwardingEnabled() { + return this.isAtLeast(IBP_3_4_IV0); + } + public boolean isKRaftSupported() { return this.featureLevel > 0; }
