chia7712 commented on code in PR #19776: URL: https://github.com/apache/kafka/pull/19776#discussion_r2116362068
########## core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala: ########## @@ -205,11 +205,12 @@ class ProducerIntegrationTest { .setProducerId(RecordBatch.NO_PRODUCER_ID) .setTransactionalId(null) .setTransactionTimeoutMs(10) - val request = new InitProducerIdRequest.Builder(data).build() + val request = new InitProducerIdRequest.Builder(data).build() Review Comment: ```scala val request = new InitProducerIdRequest.Builder(data).build() val port = broker.boundPort(listener) response = IntegrationTestUtils.connectAndReceive[InitProducerIdResponse](request, port) shouldRetry = response.data.errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code ``` ########## core/src/test/scala/unit/kafka/server/AllocateProducerIdsRequestTest.scala: ########## @@ -79,10 +80,13 @@ class AllocateProducerIdsRequestTest(cluster: ClusterInstance) { controllerSocketServer: SocketServer, request: AllocateProducerIdsRequest ): AllocateProducerIdsResponse = { + + val listenerName = cluster.controllerListenerName + val port = controllerSocketServer.boundPort(listenerName) + IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse]( Review Comment: ```scala IntegrationTestUtils.connectAndReceive[AllocateProducerIdsResponse]( request, controllerSocketServer.boundPort(cluster.controllerListenerName) ) ``` ########## core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala: ########## @@ -41,12 +42,18 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { } else { cluster.brokerSocketServers().asScala.head } - IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, socket, listenerName) + + val port = socket.boundPort(listenerName) + + IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, port) } def sendUnsupportedApiVersionRequest(request: ApiVersionsRequest): ApiVersionsResponse = { val overrideHeader = IntegrationTestUtils.nextRequestHeader(ApiKeys.API_VERSIONS, Short.MaxValue) - val socket = IntegrationTestUtils.connect(cluster.brokerSocketServers().asScala.head, cluster.clientListener()) + val socketServer = cluster.brokerSocketServers().asScala.head Review Comment: with that helper, we can use `val socket = IntegrationTestUtils.connect(cluster.boundPorts().asScala.head)` to streamline code, right? ########## core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala: ########## @@ -41,12 +42,18 @@ abstract class AbstractApiVersionsRequestTest(cluster: ClusterInstance) { } else { cluster.brokerSocketServers().asScala.head } - IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, socket, listenerName) + + val port = socket.boundPort(listenerName) Review Comment: ```scala IntegrationTestUtils.connectAndReceive[ApiVersionsResponse](request, socket.boundPort(listenerName)) ``` ########## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ########## @@ -492,12 +492,16 @@ class KRaftClusterTest { } private def sendDescribeClusterRequestToBoundPort(destination: SocketServer, - listenerName: ListenerName): DescribeClusterResponse = - connectAndReceive[DescribeClusterResponse]( - request = new DescribeClusterRequest.Builder(new DescribeClusterRequestData()).build(), - destination = destination, - listenerName = listenerName - ) + listenerName: ListenerName): DescribeClusterResponse = { + + val port = destination.boundPort(listenerName) Review Comment: ```scala connectAndReceive[DescribeClusterResponse](new DescribeClusterRequest.Builder(new DescribeClusterRequestData()).build(), destination.boundPort(listenerName)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org