hachikuji commented on a change in pull request #11667: URL: https://github.com/apache/kafka/pull/11667#discussion_r803154384
########## File path: core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala ########## @@ -17,35 +17,45 @@ package kafka.server import java.util.Properties - import kafka.test.ClusterInstance +import org.apache.kafka.clients.NodeApiVersions import org.apache.kafka.common.message.ApiMessageType.ListenerType import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion +import org.apache.kafka.common.message.{ApiMessageType, ApiVersionsResponseData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.record.RecordVersion import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse, RequestUtils} import org.apache.kafka.common.utils.Utils import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.Tag +import scala.compat.java8.OptionConverters._ import scala.jdk.CollectionConverters._ @Tag("integration") abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { def sendApiVersionsRequest(request: ApiVersionsRequest, listenerName: ListenerName): ApiVersionsResponse = { - IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, cluster.brokerSocketServers().asScala.head, listenerName) + val socket = if (cluster.controlPlaneListenerName().asScala.contains(listenerName) || Review comment: I think there might still be some confusion about the control plane listener. It is not expected to be the listener that the controller listens on for zk clusters. Instead, it is the listener that the controller uses to connect to other brokers. If we are test ApiVersions on the control plane listener, then we can choose any broker. We do not need to find the active controller. So I think this logic should just be doing this: ```scala val socket = if (cluster.controllerListenerName().asScala.contains(listenerName)) { cluster.controllerSocketServers().asScala.head } else { cluster.brokerSocketServers().asScala.head } ``` Then just to avoid confusion, maybe we can call it `CONTROL_PLANE` in `brokerPropertyOverrides` below. Would that work? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org