frankvicky commented on code in PR #20002: URL: https://github.com/apache/kafka/pull/20002#discussion_r2160315482
########## core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala: ########## @@ -182,9 +182,9 @@ class ProducerIntegrationTest { private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = { // Request enough PIDs from each broker to ensure each broker generates two blocks - val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker => { + val ids = clusterInstance.brokers().values().stream().flatMap( broker => { IntStream.range(0, 1001).parallel().mapToObj( _ => Review Comment: ditto ```suggestion IntStream.range(0, 1001).parallel().mapToObj(_ => ``` ########## core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala: ########## @@ -217,7 +217,7 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) { else InetAddress.getByName(entityName) var currentServerQuota = 0 - currentServerQuota = cluster.brokerSocketServers().asScala.head.connectionQuotas.connectionRateForIp(entityIp) + currentServerQuota = cluster.brokers().values().asScala.head.socketServer.connectionQuotas.connectionRateForIp(entityIp) Review Comment: hmm It seems that we could have an abstract method at `ClusterInstance`? In that way, we could avoid using `socketServer` directly. For example: ```java @Override public int getConnectionQuota(InetAddress address) { SocketServer socketServer = brokers().values().stream() .map(KafkaBroker::socketServer) .findFirst() .orElseThrow(); return socketServer.connectionQuotas().get(address); } ``` ########## test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java: ########## @@ -189,9 +191,9 @@ public void testClusterTestWithDisksPerBroker() throws ExecutionException, Inter @ClusterTest(autoStart = AutoStart.NO) public void testNoAutoStart() { - Assertions.assertThrows(RuntimeException.class, clusterInstance::anyBrokerSocketServer); + Assertions.assertThrows(RuntimeException.class, () -> clusterInstance.brokers().values().stream().map(KafkaBroker::socketServer).findFirst()); clusterInstance.start(); - assertNotNull(clusterInstance.anyBrokerSocketServer()); + assertTrue(clusterInstance.brokers().values().stream().map(KafkaBroker::socketServer).findFirst().isPresent()); Review Comment: Same as the above comment, we could change `anyBrokerSocketServer` to return a boolean. ```java public boolean anyBrokerSocketServer() { return brokers().values().stream() .map(KafkaBroker::socketServer) .findAny() .isPresent(); } ``` ########## core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala: ########## @@ -182,9 +182,9 @@ class ProducerIntegrationTest { private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = { // Request enough PIDs from each broker to ensure each broker generates two blocks - val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker => { + val ids = clusterInstance.brokers().values().stream().flatMap( broker => { Review Comment: nit: redundant space ```suggestion val ids = clusterInstance.brokers().values().stream().flatMap(broker => { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org