dajac commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1098634689
########## core/src/main/scala/kafka/server/ApiVersionManager.scala: ########## @@ -45,55 +45,70 @@ object ApiVersionManager { listenerType, forwardingManager, supportedFeatures, - metadataCache + metadataCache, + config.unstableApiVersionsEnabled ) } } class SimpleApiVersionManager( val listenerType: ListenerType, val enabledApis: collection.Set[ApiKeys], - brokerFeatures: Features[SupportedVersionRange] + brokerFeatures: Features[SupportedVersionRange], + enableUnstableLastVersion: Boolean ) extends ApiVersionManager { - def this(listenerType: ListenerType) = { - this(listenerType, ApiKeys.apisForListener(listenerType).asScala, BrokerFeatures.defaultSupportedFeatures()) + def this( + listenerType: ListenerType, + enableUnstableLastVersion: Boolean + ) = { + this( + listenerType, + ApiKeys.apisForListener(listenerType).asScala, + BrokerFeatures.defaultSupportedFeatures(), + enableUnstableLastVersion + ) } - private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava) + private val apiVersions = ApiVersionsResponse.collectApis(enabledApis.asJava, enableUnstableLastVersion) override def apiVersionResponse(requestThrottleMs: Int): ApiVersionsResponse = { ApiVersionsResponse.createApiVersionsResponse(requestThrottleMs, apiVersions, brokerFeatures) } + + override def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = { + apiKey != null && apiKey.inScope(listenerType) && apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion) + } } class DefaultApiVersionManager( val listenerType: ListenerType, forwardingManager: Option[ForwardingManager], features: BrokerFeatures, - metadataCache: MetadataCache + metadataCache: MetadataCache, + enableUnstableLastVersion: Boolean ) extends ApiVersionManager { + val enabledApis = ApiKeys.apisForListener(listenerType).asScala + override def apiVersionResponse(throttleTimeMs: Int): ApiVersionsResponse = { val supportedFeatures = features.supportedFeatures val finalizedFeatures = metadataCache.features() val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions) ApiVersionsResponse.createApiVersionsResponse( - throttleTimeMs, - metadataCache.metadataVersion().highestSupportedRecordVersion, - supportedFeatures, - finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava, - finalizedFeatures.epoch, - controllerApiVersions.orNull, - listenerType) - } - - override def enabledApis: collection.Set[ApiKeys] = { - ApiKeys.apisForListener(listenerType).asScala + throttleTimeMs, + metadataCache.metadataVersion().highestSupportedRecordVersion, + supportedFeatures, + finalizedFeatures.features.map(kv => (kv._1, kv._2.asInstanceOf[java.lang.Short])).asJava, + finalizedFeatures.epoch, + controllerApiVersions.orNull, + listenerType, + enableUnstableLastVersion + ) } - override def isApiEnabled(apiKey: ApiKeys): Boolean = { - apiKey.inScope(listenerType) + override def isApiEnabled(apiKey: ApiKeys, apiVersion: Short): Boolean = { + apiKey != null && apiKey.inScope(listenerType) && apiKey.isVersionEnabled(apiVersion, enableUnstableLastVersion) Review Comment: We could but we have to add `enableUnstableLastVersion` to the trait as well. I guess that it is not an issue. Let me do it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org