This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5f2a68b1501 KAFKA-19119 Move ApiVersionManager/SimpleApiVersionManager 
to server (#19426)
5f2a68b1501 is described below

commit 5f2a68b150175def0f27c873469d509efd74addd
Author: Mickael Maison <[email protected]>
AuthorDate: Tue Apr 15 08:32:44 2025 +0200

    KAFKA-19119 Move ApiVersionManager/SimpleApiVersionManager to server 
(#19426)
    
    Reviewers: Ken Huang <[email protected]>, PoAn Yang
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 checkstyle/import-control-server.xml               |   1 +
 .../kafka/server/builders/KafkaApisBuilder.java    |   2 +-
 .../main/scala/kafka/network/SocketServer.scala    |   6 +-
 .../scala/kafka/server/ApiVersionManager.scala     | 176 ---------------------
 .../src/main/scala/kafka/server/BrokerServer.scala |  10 +-
 .../main/scala/kafka/server/ControllerApis.scala   |   2 +-
 .../main/scala/kafka/server/ControllerServer.scala |   2 +-
 .../scala/kafka/server/ForwardingManager.scala     |   7 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   2 +-
 .../scala/kafka/tools/TestRaftRequestHandler.scala |   5 +-
 .../main/scala/kafka/tools/TestRaftServer.scala    |   3 +-
 .../scala/unit/kafka/network/ProcessorTest.scala   |  29 ++--
 .../unit/kafka/network/SocketServerTest.scala      |   1 +
 .../unit/kafka/server/ControllerApisTest.scala     |   1 +
 ...st.scala => DefaultApiVersionManagerTest.scala} |  71 +++++----
 .../scala/unit/kafka/server/KafkaApisTest.scala    |   9 +-
 .../metadata/KRaftMetadataRequestBenchmark.java    |   2 +-
 .../org/apache/kafka/server/ApiVersionManager.java |  73 +++++++++
 .../kafka/server/DefaultApiVersionManager.java     | 107 +++++++++++++
 .../kafka/server/SimpleApiVersionManager.java      |  85 ++++++++++
 20 files changed, 345 insertions(+), 249 deletions(-)

diff --git a/checkstyle/import-control-server.xml 
b/checkstyle/import-control-server.xml
index a35719e8761..509d9fa27c4 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -85,6 +85,7 @@
     <allow pkg="javax.crypto" />
     <allow pkg="org.apache.kafka.server" />
     <allow pkg="org.apache.kafka.image" />
+    <allow pkg="org.apache.kafka.network.metrics" />
     <allow pkg="org.apache.kafka.storage.internals.log" />
     <allow pkg="org.apache.kafka.storage.internals.checkpoint" />
     <subpackage name="metrics">
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 8724fc3cb8e..9fe44c487a3 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -19,7 +19,6 @@ package kafka.server.builders;
 
 import kafka.coordinator.transaction.TransactionCoordinator;
 import kafka.network.RequestChannel;
-import kafka.server.ApiVersionManager;
 import kafka.server.AutoTopicCreationManager;
 import kafka.server.FetchManager;
 import kafka.server.ForwardingManager;
@@ -36,6 +35,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
 import org.apache.kafka.coordinator.share.ShareCoordinator;
 import org.apache.kafka.metadata.ConfigRepository;
 import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.ApiVersionManager;
 import org.apache.kafka.server.ClientMetricsManager;
 import org.apache.kafka.server.DelegationTokenManager;
 import org.apache.kafka.server.authorizer.Authorizer;
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 79cd0bc8ce2..4163b563f01 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -28,7 +28,7 @@ import java.util.concurrent.atomic._
 import kafka.network.Processor._
 import kafka.network.RequestChannel.{CloseConnectionResponse, 
EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
 import kafka.network.SocketServer._
-import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
+import kafka.server.{BrokerReconfigurable, KafkaConfig}
 import org.apache.kafka.network.EndPoint
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import kafka.utils._
@@ -46,7 +46,7 @@ import org.apache.kafka.common.utils.{KafkaThread, 
LogContext, Time, Utils}
 import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, 
Reconfigurable}
 import org.apache.kafka.network.{ConnectionQuotaEntity, 
ConnectionThrottledException, SocketServerConfigs, TooManyConnectionsException}
 import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.ServerSocketFactory
+import org.apache.kafka.server.{ApiVersionManager, ServerSocketFactory}
 import org.apache.kafka.server.config.QuotaConfig
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.network.ConnectionDisconnectListener
@@ -872,7 +872,7 @@ private[kafka] class Processor(
       credentialProvider.tokenCache,
       time,
       logContext,
-      version => apiVersionManager.apiVersionResponse(throttleTimeMs = 0, 
version < 4)
+      version => apiVersionManager.apiVersionResponse(0, version < 4)
     )
   )
 
diff --git a/core/src/main/scala/kafka/server/ApiVersionManager.scala 
b/core/src/main/scala/kafka/server/ApiVersionManager.scala
deleted file mode 100644
index 9cedbaf1c9d..00000000000
--- a/core/src/main/scala/kafka/server/ApiVersionManager.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import org.apache.kafka.common.feature.SupportedVersionRange
-import org.apache.kafka.common.message.ApiMessageType.ListenerType
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.requests.ApiVersionsResponse
-import org.apache.kafka.metadata.MetadataCache
-import org.apache.kafka.network.metrics.RequestChannelMetrics
-import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
-import org.apache.kafka.server.common.FinalizedFeatures
-
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-trait ApiVersionManager {
-  def enableUnstableLastVersion: Boolean
-  def listenerType: ListenerType
-  def enabledApis: collection.Set[ApiKeys]
-
-  def apiVersionResponse(throttleTimeMs: Int, alterFeatureLevel0: Boolean): 
ApiVersionsResponse
-
-  def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = {
-    apiKey != null && apiKey.inScope(listenerType) && 
apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion)
-  }
-  def newRequestMetrics: RequestChannelMetrics = new 
RequestChannelMetrics(enabledApis.asJava)
-
-  def features: FinalizedFeatures
-}
-
-object ApiVersionManager {
-  def apply(
-    listenerType: ListenerType,
-    config: KafkaConfig,
-    forwardingManager: ForwardingManager,
-    supportedFeatures: BrokerFeatures,
-    metadataCache: MetadataCache,
-    clientMetricsManager: Option[ClientMetricsManager]
-  ): ApiVersionManager = {
-    new DefaultApiVersionManager(
-      listenerType,
-      forwardingManager,
-      supportedFeatures,
-      metadataCache,
-      config.unstableApiVersionsEnabled,
-      clientMetricsManager
-    )
-  }
-}
-
-/**
- * A simple ApiVersionManager that does not support forwarding and does not 
have metadata cache, used in kraft controller.
- * its enabled apis are determined by the listener type, its finalized 
features are dynamically determined by the controller.
- *
- * @param listenerType the listener type
- * @param enabledApis the enabled apis, which are computed by the listener type
- * @param brokerFeatures the broker features
- * @param enableUnstableLastVersion whether to enable unstable last version, 
see [[KafkaConfig.unstableApiVersionsEnabled]]
- * @param featuresProvider a provider to the finalized features supported
- */
-class SimpleApiVersionManager(
-  val listenerType: ListenerType,
-  val enabledApis: collection.Set[ApiKeys],
-  brokerFeatures: 
org.apache.kafka.common.feature.Features[SupportedVersionRange],
-  val enableUnstableLastVersion: Boolean,
-  val featuresProvider: () => FinalizedFeatures
-) extends ApiVersionManager {
-
-  def this(
-    listenerType: ListenerType,
-    enableUnstableLastVersion: Boolean,
-    featuresProvider: () => FinalizedFeatures
-  ) = {
-    this(
-      listenerType,
-      ApiKeys.apisForListener(listenerType).asScala,
-      BrokerFeatures.defaultSupportedFeatures(enableUnstableLastVersion),
-      enableUnstableLastVersion,
-      featuresProvider
-    )
-  }
-
-  private val apiVersions = ApiVersionsResponse.collectApis(listenerType, 
enabledApis.asJava, enableUnstableLastVersion)
-
-  override def apiVersionResponse(
-    throttleTimeMs: Int,
-    alterFeatureLevel0: Boolean
-  ): ApiVersionsResponse = {
-    val currentFeatures = features
-    new ApiVersionsResponse.Builder().
-      setThrottleTimeMs(throttleTimeMs).
-      setApiVersions(apiVersions).
-      setSupportedFeatures(brokerFeatures).
-      setFinalizedFeatures(currentFeatures.finalizedFeatures()).
-      setFinalizedFeaturesEpoch(currentFeatures.finalizedFeaturesEpoch()).
-      setZkMigrationEnabled(false).
-      setAlterFeatureLevel0(alterFeatureLevel0).
-      build()
-  }
-
-  override def features: FinalizedFeatures = featuresProvider.apply()
-}
-
-/**
- * The default ApiVersionManager that supports forwarding and has metadata 
cache, used in broker and zk controller.
- * When forwarding is enabled, the enabled apis are determined by the broker 
listener type and the controller apis,
- * otherwise the enabled apis are determined by the broker listener type, 
which is the same with SimpleApiVersionManager.
- *
- * @param listenerType the listener type
- * @param forwardingManager the forwarding manager,
- * @param brokerFeatures the broker features
- * @param metadataCache the metadata cache, used to get the finalized features 
and the metadata version
- * @param enableUnstableLastVersion whether to enable unstable last version, 
see [[KafkaConfig.unstableApiVersionsEnabled]]
- * @param clientMetricsManager the client metrics manager, helps to determine 
whether client telemetry is enabled
- */
-class DefaultApiVersionManager(
-  val listenerType: ListenerType,
-  forwardingManager: ForwardingManager,
-  brokerFeatures: BrokerFeatures,
-  metadataCache: MetadataCache,
-  val enableUnstableLastVersion: Boolean,
-  val clientMetricsManager: Option[ClientMetricsManager] = None
-) extends ApiVersionManager {
-
-  val enabledApis: mutable.Set[ApiKeys] = 
ApiKeys.apisForListener(listenerType).asScala
-
-  override def apiVersionResponse(
-    throttleTimeMs: Int,
-    alterFeatureLevel0: Boolean
-  ): ApiVersionsResponse = {
-    val finalizedFeatures = metadataCache.features()
-    val controllerApiVersions = forwardingManager.controllerApiVersions
-    val clientTelemetryEnabled = clientMetricsManager match {
-      case Some(manager) => manager.isTelemetryReceiverConfigured
-      case None => false
-    }
-    val apiVersions = if (controllerApiVersions.isDefined) {
-      ApiVersionsResponse.controllerApiVersions(
-        controllerApiVersions.get,
-        listenerType,
-        enableUnstableLastVersion,
-        clientTelemetryEnabled)
-    } else {
-      ApiVersionsResponse.brokerApiVersions(
-        listenerType,
-        enableUnstableLastVersion,
-        clientTelemetryEnabled)
-    }
-    new ApiVersionsResponse.Builder().
-      setThrottleTimeMs(throttleTimeMs).
-      setApiVersions(apiVersions).
-      setSupportedFeatures(brokerFeatures.supportedFeatures).
-      setFinalizedFeatures(finalizedFeatures.finalizedFeatures()).
-      setFinalizedFeaturesEpoch(finalizedFeatures.finalizedFeaturesEpoch()).
-      setZkMigrationEnabled(false).
-      setAlterFeatureLevel0(alterFeatureLevel0).
-      build()
-  }
-
-  override def features: FinalizedFeatures = metadataCache.features()
-}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 398c085d0d5..63666ef3290 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -54,7 +54,7 @@ import 
org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState
 import org.apache.kafka.server.share.session.ShareSessionCache
 import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
 import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
-import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, 
ClientMetricsManager, DelayedActionQueue, DelegationTokenManager, ProcessRole}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, 
ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, 
DelegationTokenManager, ProcessRole}
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
@@ -252,13 +252,13 @@ class BrokerServer(
       forwardingManager = new 
ForwardingManagerImpl(clientToControllerChannelManager, metrics)
       clientMetricsManager = new 
ClientMetricsManager(clientMetricsReceiverPlugin, 
config.clientTelemetryMaxBytes, time, metrics)
 
-      val apiVersionManager = ApiVersionManager(
+      val apiVersionManager = new DefaultApiVersionManager(
         ListenerType.BROKER,
-        config,
-        forwardingManager,
+        () => forwardingManager.controllerApiVersions,
         brokerFeatures,
         metadataCache,
-        Some(clientMetricsManager)
+        config.unstableApiVersionsEnabled,
+        Optional.of(clientMetricsManager)
       )
 
       val connectionDisconnectListeners = 
Seq(clientMetricsManager.connectionDisconnectListener())
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index fbcb0e8572d..21246102c45 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -56,7 +56,7 @@ import 
org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
 import org.apache.kafka.metadata.{BrokerHeartbeatReply, 
BrokerRegistrationReply}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
+import org.apache.kafka.server.{ApiVersionManager, DelegationTokenManager, 
ProcessRole}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
 
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index c2826ec8bfc..83dcb99a5cc 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
 import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
+import org.apache.kafka.server.{DelegationTokenManager, ProcessRole, 
SimpleApiVersionManager}
 import org.apache.kafka.server.authorizer.Authorizer
 import 
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
 CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, 
NodeToControllerChannelManager}
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala 
b/core/src/main/scala/kafka/server/ForwardingManager.scala
index 45c95e38db8..c067000bf0c 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
EnvelopeRequest, EnvelopeResponse, RequestContext, RequestHeader}
 import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, 
NodeToControllerChannelManager}
 
+import java.util.Optional
 import java.util.concurrent.TimeUnit
 import scala.jdk.OptionConverters.RichOptional
 
@@ -85,7 +86,7 @@ trait ForwardingManager {
     responseCallback: Option[AbstractResponse] => Unit
   ): Unit
 
-  def controllerApiVersions: Option[NodeApiVersions]
+  def controllerApiVersions: Optional[NodeApiVersions]
 }
 
 object ForwardingManager {
@@ -187,8 +188,8 @@ class ForwardingManagerImpl(
   override def close(): Unit =
     forwardingManagerMetrics.close()
 
-  override def controllerApiVersions: Option[NodeApiVersions] =
-    channelManager.controllerApiVersions.toScala
+  override def controllerApiVersions: Optional[NodeApiVersions] =
+    channelManager.controllerApiVersions
 
   private def parseResponse(
     buffer: ByteBuffer,
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 1d482344a43..81cb249f086 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -60,7 +60,7 @@ import org.apache.kafka.common.{Node, TopicIdPartition, 
TopicPartition, Uuid}
 import org.apache.kafka.coordinator.group.{Group, GroupConfig, 
GroupConfigManager, GroupCoordinator}
 import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
-import org.apache.kafka.server.{ClientMetricsManager, DelegationTokenManager, 
ProcessRole}
+import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager, 
DelegationTokenManager, ProcessRole}
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.{GroupVersion, RequestLocal, 
TransactionVersion}
 import org.apache.kafka.server.config.DelegationTokenManagerConfigs
diff --git a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala 
b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
index 733e8228b7c..2e9d8e2bb8a 100644
--- a/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftRequestHandler.scala
@@ -19,13 +19,14 @@ package kafka.tools
 
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
-import kafka.server.{ApiRequestHandler, ApiVersionManager}
+import kafka.server.ApiRequestHandler
 import kafka.utils.Logging
 import org.apache.kafka.common.internals.FatalExitError
 import org.apache.kafka.common.message.{BeginQuorumEpochResponseData, 
EndQuorumEpochResponseData, FetchResponseData, FetchSnapshotResponseData, 
VoteResponseData}
 import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage}
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
BeginQuorumEpochResponse, EndQuorumEpochResponse, FetchResponse, 
FetchSnapshotResponse, VoteResponse}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.ApiVersionManager
 import org.apache.kafka.server.common.RequestLocal
 
 /**
@@ -65,7 +66,7 @@ class TestRaftRequestHandler(
   }
 
   private def handleApiVersions(request: RequestChannel.Request): Unit = {
-    requestChannel.sendResponse(request, 
apiVersionManager.apiVersionResponse(throttleTimeMs = 0, 
request.header.apiVersion() < 4), None)
+    requestChannel.sendResponse(request, 
apiVersionManager.apiVersionResponse(0, request.header.apiVersion() < 4), None)
   }
 
   private def handleVote(request: RequestChannel.Request): Unit = {
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala 
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 69d296fe467..c07538aadad 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.{CompletableFuture, 
CountDownLatch, LinkedBlockingDe
 import joptsimple.{OptionException, OptionSpec}
 import kafka.network.SocketServer
 import kafka.raft.{DefaultExternalKRaftMetrics, KafkaRaftManager, RaftManager}
-import kafka.server.{KafkaConfig, KafkaRequestHandlerPool, 
SimpleApiVersionManager}
+import kafka.server.{KafkaConfig, KafkaRequestHandlerPool}
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.errors.InvalidConfigurationException
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -38,6 +38,7 @@ import org.apache.kafka.common.{TopicPartition, Uuid, 
protocol}
 import org.apache.kafka.raft.errors.NotLeaderException
 import org.apache.kafka.raft.{Batch, BatchReader, Endpoints, LeaderAndEpoch, 
QuorumConfig, RaftClient}
 import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.SimpleApiVersionManager
 import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
 import org.apache.kafka.server.common.serialization.RecordSerde
 import org.apache.kafka.server.config.KRaftConfigs
diff --git a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala 
b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
index 66f3c5d5c77..575f004fe0f 100644
--- a/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
+++ b/core/src/test/scala/unit/kafka/network/ProcessorTest.scala
@@ -18,20 +18,21 @@
 package kafka.network
 
 import kafka.server.metadata.KRaftMetadataCache
-import kafka.server.{DefaultApiVersionManager, ForwardingManager, 
SimpleApiVersionManager}
+import org.apache.kafka.clients.NodeApiVersions
 import org.apache.kafka.common.errors.{InvalidRequestException, 
UnsupportedVersionException}
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.RequestHeaderData
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{RequestHeader, RequestTestUtils}
-import org.apache.kafka.server.BrokerFeatures
+import org.apache.kafka.server.{BrokerFeatures, DefaultApiVersionManager, 
SimpleApiVersionManager}
 import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, 
MetadataVersion}
 import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.function.Executable
 import org.mockito.Mockito.mock
 
-import java.util.Collections
+import java.util.function.Supplier
+import java.util.{Collections, Optional}
 
 class ProcessorTest {
 
@@ -44,7 +45,7 @@ class ProcessorTest {
     val e = assertThrows(classOf[InvalidRequestException],
       (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): 
Executable,
       "INIT_PRODUCER_ID with listener type CONTROLLER should throw 
InvalidRequestException exception")
-    assertTrue(e.toString.contains("disabled api"));
+    assertTrue(e.toString.contains("disabled api"))
   }
 
   @Test
@@ -55,26 +56,26 @@ class ProcessorTest {
       .setRequestApiKey(ApiKeys.LEADER_AND_ISR.id)
       .setRequestApiVersion(headerVersion)
       .setClientId("clientid")
-      .setCorrelationId(0);
+      .setCorrelationId(0)
     val requestHeader = RequestTestUtils.serializeRequestHeader(new 
RequestHeader(requestHeaderData, headerVersion))
-    val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER, 
mock(classOf[ForwardingManager]),
-      BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => 
KRaftVersion.LATEST_PRODUCTION), true)
+    val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER, 
mock(classOf[Supplier[Optional[NodeApiVersions]]]),
+      BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => 
KRaftVersion.LATEST_PRODUCTION), true, Optional.empty)
     val e = assertThrows(classOf[InvalidRequestException],
       (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): 
Executable,
       "LEADER_AND_ISR should throw InvalidRequestException exception")
-    assertTrue(e.toString.contains("Unsupported api"));
+    assertTrue(e.toString.contains("Unsupported api"))
   }
 
   @Test
   def testParseRequestHeaderWithUnsupportedApiVersion(): Unit = {
     val requestHeader = RequestTestUtils.serializeRequestHeader(
       new RequestHeader(ApiKeys.FETCH, 0, "clientid", 0))
-    val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER, 
mock(classOf[ForwardingManager]),
-      BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => 
KRaftVersion.LATEST_PRODUCTION), true)
+    val apiVersionManager = new DefaultApiVersionManager(ListenerType.BROKER, 
mock(classOf[Supplier[Optional[NodeApiVersions]]]),
+      BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => 
KRaftVersion.LATEST_PRODUCTION), true, Optional.empty)
     val e = assertThrows(classOf[UnsupportedVersionException],
       (() => Processor.parseRequestHeader(apiVersionManager, requestHeader)): 
Executable,
       "FETCH v0 should throw UnsupportedVersionException exception")
-    assertTrue(e.toString.contains("unsupported version"));
+    assertTrue(e.toString.contains("unsupported version"))
   }
 
   /**
@@ -86,12 +87,12 @@ class ProcessorTest {
     for (version <- 0 to 2) {
       val requestHeader = RequestTestUtils.serializeRequestHeader(
         new RequestHeader(ApiKeys.PRODUCE, version.toShort, "clientid", 0))
-      val apiVersionManager = new 
DefaultApiVersionManager(ListenerType.BROKER, mock(classOf[ForwardingManager]),
-        BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => 
KRaftVersion.LATEST_PRODUCTION), true)
+      val apiVersionManager = new 
DefaultApiVersionManager(ListenerType.BROKER, 
mock(classOf[Supplier[Optional[NodeApiVersions]]]),
+        BrokerFeatures.createDefault(true), new KRaftMetadataCache(0, () => 
KRaftVersion.LATEST_PRODUCTION), true, Optional.empty)
       val e = assertThrows(classOf[UnsupportedVersionException],
         (() => Processor.parseRequestHeader(apiVersionManager, 
requestHeader)): Executable,
         s"PRODUCE $version should throw UnsupportedVersionException exception")
-      assertTrue(e.toString.contains("unsupported version"));
+      assertTrue(e.toString.contains("unsupported version"))
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index f7fe1bba446..1d793e726b8 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -38,6 +38,7 @@ import org.apache.kafka.network.RequestConvertToJson
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.network.EndPoint
 import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.server.{ApiVersionManager, SimpleApiVersionManager}
 import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
 import org.apache.kafka.server.config.QuotaConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 7bfbd4eee68..19e221c42a5 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -55,6 +55,7 @@ import 
org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.network.metrics.RequestChannelMetrics
 import org.apache.kafka.raft.QuorumConfig
+import org.apache.kafka.server.SimpleApiVersionManager
 import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, 
AuthorizationResult, Authorizer}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, 
FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock, 
RequestLocal}
 import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
similarity index 70%
rename from core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
rename to 
core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
index 2d89c471249..fa1209c917e 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DefaultApiVersionManagerTest.scala
@@ -22,7 +22,7 @@ import 
org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.metadata.FeatureLevelRecord
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
-import org.apache.kafka.server.BrokerFeatures
+import org.apache.kafka.server.{BrokerFeatures, DefaultApiVersionManager}
 import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
@@ -30,13 +30,15 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.EnumSource
 import org.mockito.Mockito
 
+import java.util.Optional
+import java.util.function.Supplier
 import scala.jdk.CollectionConverters._
 
-class ApiVersionManagerTest {
+class DefaultApiVersionManagerTest {
   private val brokerFeatures = BrokerFeatures.createDefault(true)
   private val metadataCache = {
     val cache = new KRaftMetadataCache(1, () => KRaftVersion.LATEST_PRODUCTION)
-    val delta = new MetadataDelta(MetadataImage.EMPTY);
+    val delta = new MetadataDelta(MetadataImage.EMPTY)
     delta.replay(new FeatureLevelRecord()
       .setName(MetadataVersion.FEATURE_NAME)
       .setFeatureLevel(MetadataVersion.latestProduction().featureLevel())
@@ -48,15 +50,15 @@ class ApiVersionManagerTest {
   @ParameterizedTest
   @EnumSource(classOf[ListenerType])
   def testApiScope(apiScope: ListenerType): Unit = {
-    val forwardingManager = Mockito.mock(classOf[ForwardingManager])
+    val nodeApiVersionsSupplier = 
Mockito.mock(classOf[Supplier[Optional[NodeApiVersions]]])
     val versionManager = new DefaultApiVersionManager(
-      listenerType = apiScope,
-      forwardingManager = forwardingManager,
-      brokerFeatures = brokerFeatures,
-      metadataCache = metadataCache,
-      enableUnstableLastVersion = true
+      apiScope,
+      nodeApiVersionsSupplier,
+      brokerFeatures,
+      metadataCache,
+      true,
+      Optional.empty
     )
-    assertEquals(ApiKeys.apisForListener(apiScope).asScala, 
versionManager.enabledApis)
     assertTrue(ApiKeys.apisForListener(apiScope).asScala.forall { apiKey =>
       apiKey.allVersions.asScala.forall { version =>
         versionManager.isApiEnabled(apiKey, version)
@@ -67,13 +69,14 @@ class ApiVersionManagerTest {
   @ParameterizedTest
   @EnumSource(classOf[ListenerType])
   def testDisabledApis(apiScope: ListenerType): Unit = {
-    val forwardingManager = Mockito.mock(classOf[ForwardingManager])
+    val nodeApiVersionsSupplier = 
Mockito.mock(classOf[Supplier[Optional[NodeApiVersions]]])
     val versionManager = new DefaultApiVersionManager(
-      listenerType = apiScope,
-      forwardingManager = forwardingManager,
-      brokerFeatures = brokerFeatures,
-      metadataCache = metadataCache,
-      enableUnstableLastVersion = false
+      apiScope,
+      nodeApiVersionsSupplier,
+      brokerFeatures,
+      metadataCache,
+      false,
+      Optional.empty
     )
 
     ApiKeys.apisForListener(apiScope).forEach { apiKey =>
@@ -89,23 +92,24 @@ class ApiVersionManagerTest {
     val controllerMinVersion: Short = 3
     val controllerMaxVersion: Short = 5
 
-    val forwardingManager = Mockito.mock(classOf[ForwardingManager])
+    val nodeApiVersionsSupplier = 
Mockito.mock(classOf[Supplier[Optional[NodeApiVersions]]])
 
-    
Mockito.when(forwardingManager.controllerApiVersions).thenReturn(Some(NodeApiVersions.create(
+    
Mockito.when(nodeApiVersionsSupplier.get).thenReturn(Optional.of(NodeApiVersions.create(
       ApiKeys.CREATE_TOPICS.id,
       controllerMinVersion,
       controllerMaxVersion
     )))
 
     val versionManager = new DefaultApiVersionManager(
-      listenerType = ListenerType.BROKER,
-      forwardingManager = forwardingManager,
-      brokerFeatures = brokerFeatures,
-      metadataCache = metadataCache,
-      enableUnstableLastVersion = true
+      ListenerType.BROKER,
+      nodeApiVersionsSupplier,
+      brokerFeatures,
+      metadataCache,
+      true,
+      Optional.empty
     )
 
-    val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs 
= 0, false)
+    val apiVersionsResponse = versionManager.apiVersionResponse(0, false)
     val alterConfigVersion = 
apiVersionsResponse.data.apiKeys.find(ApiKeys.CREATE_TOPICS.id)
     assertNotNull(alterConfigVersion)
     assertEquals(controllerMinVersion, alterConfigVersion.minVersion)
@@ -114,20 +118,21 @@ class ApiVersionManagerTest {
 
   @Test
   def testEnvelopeDisabledForKRaftBroker(): Unit = {
-    val forwardingManager = Mockito.mock(classOf[ForwardingManager])
-    Mockito.when(forwardingManager.controllerApiVersions).thenReturn(None)
+    val nodeApiVersionsSupplier = 
Mockito.mock(classOf[Supplier[Optional[NodeApiVersions]]])
+    Mockito.when(nodeApiVersionsSupplier.get).thenReturn(Optional.empty())
 
     val versionManager = new DefaultApiVersionManager(
-      listenerType = ListenerType.BROKER,
-      forwardingManager = forwardingManager,
-      brokerFeatures = brokerFeatures,
-      metadataCache = metadataCache,
-      enableUnstableLastVersion = true
+      ListenerType.BROKER,
+      nodeApiVersionsSupplier,
+      brokerFeatures,
+      metadataCache,
+      true,
+      Optional.empty
     )
     assertFalse(versionManager.isApiEnabled(ApiKeys.ENVELOPE, 
ApiKeys.ENVELOPE.latestVersion))
-    assertFalse(versionManager.enabledApis.contains(ApiKeys.ENVELOPE))
+    
assertFalse(ApiKeys.apisForListener(versionManager.listenerType()).contains(ApiKeys.ENVELOPE))
 
-    val apiVersionsResponse = versionManager.apiVersionResponse(throttleTimeMs 
= 0, false)
+    val apiVersionsResponse = versionManager.apiVersionResponse(0, false)
     val envelopeVersion = 
apiVersionsResponse.data.apiKeys.find(ApiKeys.ENVELOPE.id)
     assertNull(envelopeVersion)
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 2283ca36582..0057556161d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -86,7 +86,7 @@ import org.apache.kafka.metadata.{ConfigRepository, 
MetadataCache, MockConfigRep
 import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.authorizer.AclEntry
-import org.apache.kafka.server.{BrokerFeatures, ClientMetricsManager}
+import org.apache.kafka.server.{ClientMetricsManager, SimpleApiVersionManager}
 import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, 
Authorizer}
 import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, 
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion}
 import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs}
@@ -179,13 +179,8 @@ class KafkaApisTest extends Logging {
     overrideProperties.foreach( p => properties.put(p._1, p._2))
     val config = new KafkaConfig(properties)
 
-    val listenerType = ListenerType.BROKER
-    val enabledApis = ApiKeys.apisForListener(listenerType).asScala
-
     val apiVersionManager = new SimpleApiVersionManager(
-      listenerType,
-      enabledApis,
-      BrokerFeatures.defaultSupportedFeatures(true),
+      ListenerType.BROKER,
       true,
       () => new FinalizedFeatures(MetadataVersion.latestTesting(), 
Collections.emptyMap[String, java.lang.Short], 0))
 
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
index b4779a5fd3b..a1020201a32 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java
@@ -30,7 +30,6 @@ import kafka.server.KafkaConfig;
 import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 import kafka.server.ReplicationQuotaManager;
-import kafka.server.SimpleApiVersionManager;
 import kafka.server.builders.KafkaApisBuilder;
 import kafka.server.metadata.KRaftMetadataCache;
 import kafka.server.share.SharePartitionManager;
@@ -60,6 +59,7 @@ import org.apache.kafka.network.RequestConvertToJson;
 import org.apache.kafka.network.metrics.RequestChannelMetrics;
 import org.apache.kafka.raft.QuorumConfig;
 import org.apache.kafka.server.ClientMetricsManager;
+import org.apache.kafka.server.SimpleApiVersionManager;
 import org.apache.kafka.server.common.FinalizedFeatures;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.MetadataVersion;
diff --git 
a/server/src/main/java/org/apache/kafka/server/ApiVersionManager.java 
b/server/src/main/java/org/apache/kafka/server/ApiVersionManager.java
new file mode 100644
index 00000000000..4cc60150844
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/ApiVersionManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.network.metrics.RequestChannelMetrics;
+import org.apache.kafka.server.common.FinalizedFeatures;
+
+/**
+ * ApiVersionManagers are used to define the APIs supported by servers
+ */
+public interface ApiVersionManager {
+
+    /**
+     * Whether to mark unstable API versions as enabled
+     * @return true if unstable API versions are enabled, otherwise false
+     */
+    boolean enableUnstableLastVersion();
+
+    /**
+     * The listener type
+     * @return Broker or Controller depending on the server's role
+     */
+    ApiMessageType.ListenerType listenerType();
+
+    /**
+     * The ApiVersionsResponse to send back to client when they send an 
ApiVersionsRequest
+     * @param throttleTimeMs The throttle time in milliseconds
+     * @param alterFeatureLevel0 Whether to filter feature v0 in the response
+     * @return the ApiVersionsResponse to send back to the client
+     */
+    ApiVersionsResponse apiVersionResponse(int throttleTimeMs, boolean 
alterFeatureLevel0);
+
+    /**
+     * The features supported by the server
+     * @return the FinalizedFeatures
+     */
+    FinalizedFeatures features();
+
+    /**
+     * Whether the specified API and version is supported
+     * @param apiKey the API key
+     * @param apiVersion the API version
+     * @return true if the API key and version is supported, otherwise false
+     */
+    default boolean isApiEnabled(ApiKeys apiKey, short apiVersion) {
+        return apiKey != null && apiKey.inScope(listenerType()) && 
apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion());
+    }
+
+    /**
+     * Create a new RequestChannelMetrics for the enabled APIs
+     * @return the RequestChannelMetrics
+     */
+    default RequestChannelMetrics newRequestMetrics() {
+        return new 
RequestChannelMetrics(ApiKeys.apisForListener(listenerType()));
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java 
b/server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java
new file mode 100644
index 00000000000..9eb835ccfbf
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.server.common.FinalizedFeatures;
+
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * The default ApiVersionManager that supports forwarding and has metadata 
cache, used in brokers.
+ * The enabled APIs are determined by the broker listener type and the 
controller APIs.
+ */
+public class DefaultApiVersionManager implements ApiVersionManager {
+
+    private final ApiMessageType.ListenerType listenerType;
+    private final Supplier<Optional<NodeApiVersions>> nodeApiVersionsSupplier;
+    private final BrokerFeatures brokerFeatures;
+    private final MetadataCache metadataCache;
+    private final boolean enableUnstableLastVersion;
+    private final Optional<ClientMetricsManager> clientMetricsManager;
+
+    /**
+     * DefaultApiVersionManager constructor
+     * @param listenerType the listener type
+     * @param nodeApiVersionsSupplier the supplier of NodeApiVersions
+     * @param brokerFeatures the broker features
+     * @param metadataCache the metadata cache, used to get the finalized 
features and the metadata version
+     * @param enableUnstableLastVersion whether to enable unstable last 
version, see
+     *   {@link 
org.apache.kafka.server.config.ServerConfigs#UNSTABLE_API_VERSIONS_ENABLE_CONFIG}
+     * @param clientMetricsManager the client metrics manager, helps to 
determine whether client telemetry is enabled
+     */
+    public DefaultApiVersionManager(
+            ApiMessageType.ListenerType listenerType,
+            Supplier<Optional<NodeApiVersions>> nodeApiVersionsSupplier,
+            BrokerFeatures brokerFeatures,
+            MetadataCache metadataCache,
+            boolean enableUnstableLastVersion,
+            Optional<ClientMetricsManager> clientMetricsManager) {
+        this.listenerType = listenerType;
+        this.nodeApiVersionsSupplier = nodeApiVersionsSupplier;
+        this.brokerFeatures = brokerFeatures;
+        this.metadataCache = metadataCache;
+        this.enableUnstableLastVersion = enableUnstableLastVersion;
+        this.clientMetricsManager = clientMetricsManager;
+    }
+
+    @Override
+    public boolean enableUnstableLastVersion() {
+        return enableUnstableLastVersion;
+    }
+
+    @Override
+    public ApiMessageType.ListenerType listenerType() {
+        return listenerType;
+    }
+
+    @Override
+    public ApiVersionsResponse apiVersionResponse(int throttleTimeMs, boolean 
alterFeatureLevel0) {
+        FinalizedFeatures finalizedFeatures = metadataCache.features();
+        Optional<NodeApiVersions> controllerApiVersions = 
nodeApiVersionsSupplier.get();
+        boolean clientTelemetryEnabled = 
clientMetricsManager.map(ClientMetricsManager::isTelemetryReceiverConfigured).orElse(false);
+        ApiVersionsResponseData.ApiVersionCollection apiVersions = 
controllerApiVersions
+                .map(nodeApiVersions -> 
ApiVersionsResponse.controllerApiVersions(
+                    nodeApiVersions,
+                    listenerType,
+                    enableUnstableLastVersion,
+                    clientTelemetryEnabled))
+                .orElseGet(() -> ApiVersionsResponse.brokerApiVersions(
+                    listenerType,
+                    enableUnstableLastVersion,
+                    clientTelemetryEnabled));
+
+        return new ApiVersionsResponse.Builder()
+            .setThrottleTimeMs(throttleTimeMs)
+            .setApiVersions(apiVersions)
+            .setSupportedFeatures(brokerFeatures.supportedFeatures())
+            .setFinalizedFeatures(finalizedFeatures.finalizedFeatures())
+            
.setFinalizedFeaturesEpoch(finalizedFeatures.finalizedFeaturesEpoch())
+            .setAlterFeatureLevel0(alterFeatureLevel0)
+            .build();
+    }
+
+    @Override
+    public FinalizedFeatures features() {
+        return metadataCache.features();
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/SimpleApiVersionManager.java 
b/server/src/main/java/org/apache/kafka/server/SimpleApiVersionManager.java
new file mode 100644
index 00000000000..67d1ef4abe6
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/SimpleApiVersionManager.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.feature.Features;
+import org.apache.kafka.common.feature.SupportedVersionRange;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.ApiVersionsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.server.common.FinalizedFeatures;
+
+import java.util.function.Supplier;
+
+/**
+ * A simple ApiVersionManager used in controllers. It does not support 
forwarding and does not have metadata cache.
+ * Its enabled APIs are determined by the listener type, its finalized 
features are dynamically determined by the controller.
+ */
+public class SimpleApiVersionManager implements ApiVersionManager {
+
+    private final ApiMessageType.ListenerType listenerType;
+    private final Features<SupportedVersionRange> brokerFeatures;
+    private final boolean enableUnstableLastVersion;
+    private final Supplier<FinalizedFeatures> featuresProvider;
+    private final ApiVersionsResponseData.ApiVersionCollection apiVersions;
+
+    /**
+     * SimpleApiVersionManager constructor
+     * @param listenerType the listener type
+     * @param enableUnstableLastVersion whether to enable unstable last 
version, see
+     *   {@link 
org.apache.kafka.server.config.ServerConfigs#UNSTABLE_API_VERSIONS_ENABLE_CONFIG}
+     * @param featuresProvider a provider to the finalized features supported
+     */
+    public SimpleApiVersionManager(ApiMessageType.ListenerType listenerType,
+                                   boolean enableUnstableLastVersion,
+                                   Supplier<FinalizedFeatures> 
featuresProvider) {
+        this.listenerType = listenerType;
+        this.brokerFeatures = 
BrokerFeatures.defaultSupportedFeatures(enableUnstableLastVersion);
+        this.enableUnstableLastVersion = enableUnstableLastVersion;
+        this.featuresProvider = featuresProvider;
+        this.apiVersions = ApiVersionsResponse.collectApis(listenerType, 
ApiKeys.apisForListener(listenerType), enableUnstableLastVersion);
+    }
+
+    @Override
+    public boolean enableUnstableLastVersion() {
+        return enableUnstableLastVersion;
+    }
+
+    @Override
+    public ApiMessageType.ListenerType listenerType() {
+        return listenerType;
+    }
+
+    @Override
+    public ApiVersionsResponse apiVersionResponse(int throttleTimeMs, boolean 
alterFeatureLevel0) {
+        FinalizedFeatures currentFeatures = features();
+        return new ApiVersionsResponse.Builder()
+                .setThrottleTimeMs(throttleTimeMs)
+                .setApiVersions(apiVersions)
+                .setSupportedFeatures(brokerFeatures)
+                .setFinalizedFeatures(currentFeatures.finalizedFeatures())
+                
.setFinalizedFeaturesEpoch(currentFeatures.finalizedFeaturesEpoch())
+                .setAlterFeatureLevel0(alterFeatureLevel0)
+                .build();
+    }
+
+    @Override
+    public FinalizedFeatures features() {
+        return featuresProvider.get();
+    }
+}

Reply via email to