dajac commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1098634689


##########
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##########
@@ -45,55 +45,70 @@ object ApiVersionManager {
       listenerType,
       forwardingManager,
       supportedFeatures,
-      metadataCache
+      metadataCache,
+      config.unstableApiVersionsEnabled
     )
   }
 }
 
 class SimpleApiVersionManager(
   val listenerType: ListenerType,
   val enabledApis: collection.Set[ApiKeys],
-  brokerFeatures: Features[SupportedVersionRange]
+  brokerFeatures: Features[SupportedVersionRange],
+  enableUnstableLastVersion: Boolean
 ) extends ApiVersionManager {
 
-  def this(listenerType: ListenerType) = {
-    this(listenerType, ApiKeys.apisForListener(listenerType).asScala, 
BrokerFeatures.defaultSupportedFeatures())
+  def this(
+    listenerType: ListenerType,
+    enableUnstableLastVersion: Boolean
+  ) = {
+    this(
+      listenerType,
+      ApiKeys.apisForListener(listenerType).asScala,
+      BrokerFeatures.defaultSupportedFeatures(),
+      enableUnstableLastVersion
+    )
   }
 
-  private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava)
+  private val apiVersions = 
ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion)
 
   override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse 
= {
     ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, 
apiVersions, brokerFeatures)
   }
+
+  override def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = {
+    apiKey != null && apiKey.inScope(listenerType) && 
apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion)
+  }
 }
 
 class DefaultApiVersionManager(
   val listenerType: ListenerType,
   forwardingManager: Option[ForwardingManager],
   features: BrokerFeatures,
-  metadataCache: MetadataCache
+  metadataCache: MetadataCache,
+  enableUnstableLastVersion: Boolean
 ) extends ApiVersionManager {
 
+  val enabledApis = ApiKeys.apisForListener(listenerType).asScala
+
   override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = {
     val supportedFeatures = features.supportedFeatures
     val finalizedFeatures = metadataCache.features()
     val controllerApiVersions = 
forwardingManager.flatMap(_.controllerApiVersions)
 
     ApiVersionsResponse.createApiVersionsResponse(
-        throttleTimeMs,
-        metadataCache.metadataVersion().highestSupportedRecordVersion,
-        supportedFeatures,
-        finalizedFeatures.features.map(kv => (kv._1, 
kv._2.asInstanceOf[java.lang.Short])).asJava,
-        finalizedFeatures.epoch,
-        controllerApiVersions.orNull,
-        listenerType)
-  }
-
-  override def enabledApis: collection.Set[ApiKeys] = {
-    ApiKeys.apisForListener(listenerType).asScala
+      throttleTimeMs,
+      metadataCache.metadataVersion().highestSupportedRecordVersion,
+      supportedFeatures,
+      finalizedFeatures.features.map(kv => (kv._1, 
kv._2.asInstanceOf[java.lang.Short])).asJava,
+      finalizedFeatures.epoch,
+      controllerApiVersions.orNull,
+      listenerType,
+      enableUnstableLastVersion
+    )
   }
 
-  override def isApiEnabled(apiKey: ApiKeys): Boolean = {
-    apiKey.inScope(listenerType)
+  override def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = {
+    apiKey != null && apiKey.inScope(listenerType) && 
apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion)

Review Comment:
   We could but we have to add `enableUnstableLastVersion` to the trait as 
well. I guess that it is not an issue. Let me do it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to