dengziming commented on code in PR #14175:
URL: https://github.com/apache/kafka/pull/14175#discussion_r1288503732


##########
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala:
##########
@@ -56,21 +66,30 @@ class BrokerApiVersionsCommandTest extends 
KafkaServerTestHarness {
     assertTrue(lineIter.hasNext)
     assertEquals(s"${bootstrapServers()} (id: 0 rack: null) -> (", 
lineIter.next())
     val nodeApiVersions = NodeApiVersions.create

Review Comment:
   We should change how we construct `NodeApiVersions` here,  such as `val 
nodeApiVersions = new 
NodeApiVersions(clientApis.map(ApiVersionsResponse.toApiVersion).asJava, 
Collections.emptyList(), false)`



##########
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala:
##########
@@ -17,37 +17,47 @@
 
 package kafka.admin
 
-import java.io.{ByteArrayOutputStream, PrintStream}
-import java.nio.charset.StandardCharsets
-import scala.collection.Seq
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.NodeApiVersions
+import org.apache.kafka.common.message.ApiMessageType
 import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertNotNull, assertTrue}
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
 
   def generateConfigs: Seq[KafkaConfig] =
-    TestUtils.createBrokerConfigs(1, zkConnect).map(props => {
-      // Configure control plane listener to make sure we have separate 
listeners from client,
-      // in order to avoid returning Envelope API version.
-      props.setProperty(KafkaConfig.ControlPlaneListenerNameProp, "CONTROLLER")
-      props.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
-      props.setProperty("listeners", 
"PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
-      props.setProperty(KafkaConfig.AdvertisedListenersProp, 
"PLAINTEXT://localhost:0,CONTROLLER://localhost:0")
-      props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")
-      props
-    }).map(KafkaConfig.fromProps)
+    if (isKRaftTest()) {
+      TestUtils.createBrokerConfigs(1, null).map(props => {
+        props.setProperty(KafkaConfig.UnstableApiVersionsEnableProp, "true")

Review Comment:
   Please add a comment about this change, I guess this is related to KIP-848, 
and we can remove this after KIP-848 is all set.



##########
core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala:
##########
@@ -56,21 +66,30 @@ class BrokerApiVersionsCommandTest extends 
KafkaServerTestHarness {
     assertTrue(lineIter.hasNext)
     assertEquals(s"${bootstrapServers()} (id: 0 rack: null) -> (", 
lineIter.next())
     val nodeApiVersions = NodeApiVersions.create
-    val enabledApis = ApiKeys.zkBrokerApis.asScala
-    for (apiKey <- enabledApis) {
-      val apiVersion = nodeApiVersions.apiVersion(apiKey)
-      assertNotNull(apiVersion)
+    val listenerType = if (isKRaftTest()) {
+      ApiMessageType.ListenerType.BROKER
+    } else {
+      ApiMessageType.ListenerType.ZK_BROKER
+    }
+    val clientApis = ApiKeys.clientApis().asScala
+    for (apiKey <- clientApis) {
+      assertTrue(lineIter.hasNext)
+      val actual = lineIter.next()
+      if (apiKey.inScope(listenerType)) {
+        val apiVersion = nodeApiVersions.apiVersion(apiKey)
+        assertNotNull(apiVersion)
 
-      val versionRangeStr =
-        if (apiVersion.minVersion == apiVersion.maxVersion) 
apiVersion.minVersion.toString
-        else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
-      val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
+        val versionRangeStr =
+          if (apiVersion.minVersion == apiVersion.maxVersion) 
apiVersion.minVersion.toString
+          else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
+        val usableVersion = nodeApiVersions.latestUsableVersion(apiKey)
 
-      val terminator = if (apiKey == enabledApis.last) "" else ","
+        val terminator = if (apiKey == clientApis.last) "" else ","
 
-      val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: 
$usableVersion]$terminator"
-      assertTrue(lineIter.hasNext)
-      assertEquals(line, lineIter.next())
+        val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: 
$usableVersion]$terminator"
+        assertTrue(lineIter.hasNext)
+        assertEquals(line, actual)
+      }

Review Comment:
   We should add a else branch to do similar assertions, such as 
   ```
           val line = s"\t${apiKey.name}(${apiKey.id}): UNSUPPORTED,"
           assertTrue(lineIter.hasNext)
           assertEquals(line, actual)
   ```



##########
clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java:
##########
@@ -68,7 +68,7 @@ public static NodeApiVersions create() {
      */
     public static NodeApiVersions create(Collection<ApiVersion> overrides) {
         List<ApiVersion> apiVersions = new LinkedList<>(overrides);
-        for (ApiKeys apiKey : ApiKeys.zkBrokerApis()) {
+        for (ApiKeys apiKey : ApiKeys.clientApis()) {

Review Comment:
   This method is used in too many places so we should be careful about this 
change, we'd better keep it unchanged in this PR, and do this change in another 
patch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to