hachikuji commented on a change in pull request #11667:
URL: https://github.com/apache/kafka/pull/11667#discussion_r803154384



##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala
##########
@@ -17,35 +17,45 @@
 package kafka.server
 
 import java.util.Properties
-
 import kafka.test.ClusterInstance
+import org.apache.kafka.clients.NodeApiVersions
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion
+import org.apache.kafka.common.message.{ApiMessageType, 
ApiVersionsResponseData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.record.RecordVersion
 import org.apache.kafka.common.requests.{ApiVersionsRequest, 
ApiVersionsResponse, RequestUtils}
 import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Tag
 
+import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
 @Tag("integration")
 abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) {
 
   def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: 
ListenerName): ApiVersionsResponse = {
-    IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, 
cluster.brokerSocketServers().asScala.head, listenerName)
+    val socket = if 
(cluster.controlPlaneListenerName().asScala.contains(listenerName) ||

Review comment:
       I think there might still be some confusion about the control plane 
listener. It is not expected to be the listener that the controller listens on 
for zk clusters. Instead, it is the listener that the controller uses to 
connect to other brokers. If we are testing `ApiVersions` on the control plane 
listener, then we can choose any broker. We do not need to find the active 
controller. So I think this logic should just be doing this:
   ```scala
       val socket = if 
(cluster.controllerListenerName().asScala.contains(listenerName)) {
         cluster.controllerSocketServers().asScala.head
       } else {
         cluster.brokerSocketServers().asScala.head
       }
   ```
   Then just to avoid confusion, maybe we can call it `CONTROL_PLANE` in 
`brokerPropertyOverrides` below. Would that work?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to