showuon commented on code in PR #12159:
URL: https://github.com/apache/kafka/pull/12159#discussion_r880005058


##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1138,20 +1130,18 @@ private KafkaChannel 
createConnectionWithPendingReceives(int pendingReceives) th
     }
 
     /**
-     * Sends the specified number of requests and waits for the requests to be 
sent. The channel
-     * is muted during polling to ensure that incoming data is not received.
+     * Sends the specified number of requests and waits for the requests to be 
sent.
+     * The channel is muted during polling to ensure that incoming data is not 
received.
      */
-    private KafkaChannel sendNoReceive(KafkaChannel channel, int numRequests) 
throws Exception {
-        channel.mute();
+    private void sendNoReceive(KafkaChannel channel, int numRequests) throws 
Exception {
+        selector.mute(channel.id());
         for (int i = 0; i < numRequests; i++) {
             selector.send(createSend(channel.id(), String.valueOf(i)));
             do {
                 selector.poll(10);
             } while (selector.completedSends().isEmpty());
         }
-        channel.maybeUnmute();
-
-        return channel;
+        selector.unmute(channel.id());

Review Comment:
   nice clean up.



##########
clients/src/test/java/org/apache/kafka/common/network/NetworkTestUtils.java:
##########
@@ -87,6 +88,16 @@ public static void checkClientConnection(Selector selector, 
String node, int min
         }
     }
 
+    public static void waitForChannelConnected(Selector selector, String node) 
throws IOException {
+        int secondsLeft = 30;
+        while (selector.channel(node) != null
+                && !selector.channel(node).isConnected() && secondsLeft-- > 0) 
{
+            selector.poll(1000L);
+        }
+        assertNotNull(selector.channel(node));
+        assertTrue(selector.channel(node).isConnected());

Review Comment:
   I'll add error message in the assertion, to indicate what we expected, ex:
   `assertTrue(selector.channel(node).isConnected(), "channel " + node + " is 
not connected after 30 secs");`
   
   Same to below `waitForChannelReady` method. (I know it's not your change, 
but let's improve it together)



##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -603,23 +603,18 @@ private void verifyChannelExpiry(KafkaChannel channel) 
throws Exception {
     public void testCloseOldestConnectionWithMultiplePendingReceives() throws 
Exception {
         int expectedReceives = 5;
         KafkaChannel channel = 
createConnectionWithPendingReceives(expectedReceives);
-        String id = channel.id();
-        int completedReceives = 0;
+        int completedReceives = selector.completedReceives().size();
+
         while (selector.disconnected().isEmpty()) {
-            time.sleep(6000); // The max idle time is 5000ms
-            selector.poll(completedReceives == expectedReceives ? 0 : 1000);
+            time.sleep(CONNECTION_MAX_IDLE_MS + 1_000);
+            selector.poll(completedReceives == expectedReceives ? 0 : 1_000);
             completedReceives += selector.completedReceives().size();
-            if (!selector.completedReceives().isEmpty()) {
-                assertEquals(1, selector.completedReceives().size());
-                assertNotNull(selector.channel(id), "Channel should not have 
been expired");
-                assertTrue(selector.closingChannel(id) != null || 
selector.channel(id) != null, "Channel not found");
-                assertFalse(selector.disconnected().containsKey(id), 
"Disconnect notified too early");
-            }

Review Comment:
   Why did we remove these assetions?



##########
clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java:
##########
@@ -1030,10 +1025,7 @@ private void blockingConnect(String node) throws 
IOException {
 
     protected void blockingConnect(String node, InetSocketAddress serverAddr) 
throws IOException {
         selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
-        while (!selector.connected().contains(node))
-            selector.poll(10000L);
-        while (!selector.isChannelReady(node))
-            selector.poll(10000L);
+        NetworkTestUtils.waitForChannelReady(selector, node);

Review Comment:
   I think we should replace the 2 while loop with:
   `waitForChannelConnected` + `waitForChannelReady`.
   This way, we can make sure the connection is done or not when exception 
thrown during `waitForChannelReady`, right?
   
   Same comments to below similar replacement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to