frankvicky commented on code in PR #20002:
URL: https://github.com/apache/kafka/pull/20002#discussion_r2160315482


##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala:
##########
@@ -182,9 +182,9 @@ class ProducerIntegrationTest {
 
   private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
     // Request enough PIDs from each broker to ensure each broker generates 
two blocks
-    val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker 
=> {
+    val ids = clusterInstance.brokers().values().stream().flatMap( broker => {
       IntStream.range(0, 1001).parallel().mapToObj( _ =>

Review Comment:
   ditto
   ```suggestion
         IntStream.range(0, 1001).parallel().mapToObj(_ =>
   ```



##########
core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala:
##########
@@ -217,7 +217,7 @@ class ClientQuotasRequestTest(cluster: ClusterInstance) {
           else
             InetAddress.getByName(entityName)
           var currentServerQuota = 0
-          currentServerQuota = 
cluster.brokerSocketServers().asScala.head.connectionQuotas.connectionRateForIp(entityIp)
+          currentServerQuota = 
cluster.brokers().values().asScala.head.socketServer.connectionQuotas.connectionRateForIp(entityIp)

Review Comment:
   hmm
   It seems that we could have an abstract method at `ClusterInstance`?
   In that way, we could avoid using `socketServer` directly.
   
   For example:
   ```java
   
           @Override
           public int getConnectionQuota(InetAddress address) {
               SocketServer socketServer = brokers().values().stream()
                   .map(KafkaBroker::socketServer)
                   .findFirst()
                   .orElseThrow();
               return socketServer.connectionQuotas().get(address);
           }
   ```



##########
test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java:
##########
@@ -189,9 +191,9 @@ public void testClusterTestWithDisksPerBroker() throws 
ExecutionException, Inter
 
     @ClusterTest(autoStart = AutoStart.NO)
     public void testNoAutoStart() {
-        Assertions.assertThrows(RuntimeException.class, 
clusterInstance::anyBrokerSocketServer);
+        Assertions.assertThrows(RuntimeException.class, () -> 
clusterInstance.brokers().values().stream().map(KafkaBroker::socketServer).findFirst());
         clusterInstance.start();
-        assertNotNull(clusterInstance.anyBrokerSocketServer());
+        
assertTrue(clusterInstance.brokers().values().stream().map(KafkaBroker::socketServer).findFirst().isPresent());

Review Comment:
   Same as the above comment, we could change `anyBrokerSocketServer` to return 
a boolean.
   ```java
           public boolean anyBrokerSocketServer() {
               return brokers().values().stream()
                   .map(KafkaBroker::socketServer)
                   .findAny()
                   .isPresent();
           }
   ```



##########
core/src/test/scala/integration/kafka/coordinator/transaction/ProducerIntegrationTest.scala:
##########
@@ -182,9 +182,9 @@ class ProducerIntegrationTest {
 
   private def verifyUniqueIds(clusterInstance: ClusterInstance): Unit = {
     // Request enough PIDs from each broker to ensure each broker generates 
two blocks
-    val ids = clusterInstance.brokerSocketServers().stream().flatMap( broker 
=> {
+    val ids = clusterInstance.brokers().values().stream().flatMap( broker => {

Review Comment:
   nit: redundant space
   ```suggestion
       val ids = clusterInstance.brokers().values().stream().flatMap(broker => {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to