dengziming commented on code in PR #14175: URL: https://github.com/apache/kafka/pull/14175#discussion_r1288503732
########## core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala: ########## @@ -56,21 +66,30 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness { assertTrue(lineIter.hasNext) assertEquals(s"${bootstrapServers()} (id: 0 rack: null) -> (", lineIter.next()) val nodeApiVersions = NodeApiVersions.create Review Comment: We should change how we construct `NodeApiVersions` here, such as `val nodeApiVersions = new NodeApiVersions(clientApis.map(ApiVersionsResponse.toApiVersion).asJava, Collections.emptyList(), false)` ########## core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala: ########## @@ -17,37 +17,47 @@ package kafka.admin -import java.io.{ByteArrayOutputStream, PrintStream} -import java.nio.charset.StandardCharsets -import scala.collection.Seq import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig -import kafka.utils.TestUtils +import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.clients.NodeApiVersions +import org.apache.kafka.common.message.ApiMessageType import org.apache.kafka.common.protocol.ApiKeys import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotNull, assertTrue} -import org.junit.jupiter.api.Test import org.junit.jupiter.api.Timeout +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import java.io.{ByteArrayOutputStream, PrintStream} +import java.nio.charset.StandardCharsets +import scala.collection.Seq import scala.jdk.CollectionConverters._ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness { def generateConfigs: Seq[KafkaConfig] = - TestUtils.createBrokerConfigs(1, zkConnect).map(props => { - // Configure control plane listener to make sure we have separate listeners from client, - // in order to avoid returning Envelope API version. - props.setProperty(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLLER") - props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT") - props.setProperty("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0") - props.setProperty(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:0,CONTROLLER://localhost:0") - props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true") - props - }).map(KafkaConfig.fromProps) + if (isKRaftTest()) { + TestUtils.createBrokerConfigs(1, null).map(props => { + props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true") Review Comment: Please add a comment about this change, I guess this is related to KIP-848, and we can remove this after KIP-848 is all set. ########## core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala: ########## @@ -56,21 +66,30 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness { assertTrue(lineIter.hasNext) assertEquals(s"${bootstrapServers()} (id: 0 rack: null) -> (", lineIter.next()) val nodeApiVersions = NodeApiVersions.create - val enabledApis = ApiKeys.zkBrokerApis.asScala - for (apiKey <- enabledApis) { - val apiVersion = nodeApiVersions.apiVersion(apiKey) - assertNotNull(apiVersion) + val listenerType = if (isKRaftTest()) { + ApiMessageType.ListenerType.BROKER + } else { + ApiMessageType.ListenerType.ZK_BROKER + } + val clientApis = ApiKeys.clientApis().asScala + for (apiKey <- clientApis) { + assertTrue(lineIter.hasNext) + val actual = lineIter.next() + if (apiKey.inScope(listenerType)) { + val apiVersion = nodeApiVersions.apiVersion(apiKey) + assertNotNull(apiVersion) - val versionRangeStr = - if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString - else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}" - val usableVersion = nodeApiVersions.latestUsableVersion(apiKey) + val versionRangeStr = + if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString + else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}" + val usableVersion = nodeApiVersions.latestUsableVersion(apiKey) - val terminator = if (apiKey == enabledApis.last) "" else "," + val terminator = if (apiKey == clientApis.last) "" else "," - val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator" - assertTrue(lineIter.hasNext) - assertEquals(line, lineIter.next()) + val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator" + assertTrue(lineIter.hasNext) + assertEquals(line, actual) + } Review Comment: We should add a else branch to do similar assertions, such as ``` val line = s"\t${apiKey.name}(${apiKey.id}): UNSUPPORTED," assertTrue(lineIter.hasNext) assertEquals(line, actual) ``` ########## clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java: ########## @@ -68,7 +68,7 @@ public static NodeApiVersions create() { */ public static NodeApiVersions create(Collection<ApiVersion> overrides) { List<ApiVersion> apiVersions = new LinkedList<>(overrides); - for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) { + for (ApiKeys apiKey : ApiKeys.clientApis()) { Review Comment: This method is used in too many places so we should be careful about this change, we'd better keep it unchanged in this PR, and do this change in another patch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org