This is an automated email from the ASF dual-hosted git repository.

nielsbasjes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f4cdc0cb AVRO-3718: [Java] Fix flaky NettyServer test (#2110)
0f4cdc0cb is described below

commit 0f4cdc0cb986452c158f8d8a651ea92512ff8282
Author: Niels Basjes <[email protected]>
AuthorDate: Sun Feb 19 15:48:47 2023 +0100

    AVRO-3718: [Java] Fix flaky NettyServer test (#2110)
---
 .../org/apache/avro/ipc/netty/TestNettyServer.java | 51 ++++++++++++++++++----
 1 file changed, 43 insertions(+), 8 deletions(-)

diff --git 
a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServer.java
 
b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServer.java
index 31799312b..1735dfcf0 100644
--- 
a/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServer.java
+++ 
b/lang/java/ipc-netty/src/test/java/org/apache/avro/ipc/netty/TestNettyServer.java
@@ -39,10 +39,14 @@ import org.apache.avro.test.Message;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class TestNettyServer {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestNettyServer.class.getName());
+
   static final int CONNECT_TIMEOUT_MILLIS = 2000; // 2 sec
   protected static Server server;
   protected static Transceiver transceiver;
@@ -83,7 +87,7 @@ public class TestNettyServer {
 
   public static void initializeConnections(Consumer<SocketChannel> 
serverInitializer,
       Consumer<SocketChannel> transceiverInitializer) throws Exception {
-    System.out.println("starting server...");
+    LOG.info("starting server...");
     channelInitializer = transceiverInitializer;
     mailService = new MailImpl();
     Responder responder = new SpecificResponder(Mail.class, mailService);
@@ -91,7 +95,7 @@ public class TestNettyServer {
     server.start();
 
     int serverPort = server.getPort();
-    System.out.println("server port : " + serverPort);
+    LOG.info("server port : {}", serverPort);
 
     transceiver = new NettyTransceiver(new InetSocketAddress(serverPort), 
CONNECT_TIMEOUT_MILLIS,
         transceiverInitializer, null);
@@ -144,23 +148,54 @@ public class TestNettyServer {
 
   @Test
   void connectionsCount() throws Exception {
+    // It happens on a regular basis that the server still has a connection
+    // that is in the process of being terminated (previous test?).
+    // We wait for that to happen because otherwise this test will fail.
+    assertNumberOfConnectionsOnServer(1, 1000);
+
     Transceiver transceiver2 = new NettyTransceiver(new 
InetSocketAddress(server.getPort()), CONNECT_TIMEOUT_MILLIS,
         channelInitializer);
     Mail proxy2 = SpecificRequestor.getClient(Mail.class, transceiver2);
     proxy.fireandforget(createMessage());
     proxy2.fireandforget(createMessage());
-    assertEquals(2, ((NettyServer) server).getNumActiveConnections());
+    assertNumberOfConnectionsOnServer(2, 0);
     transceiver2.close();
 
     // Check the active connections with some retries as closing at the client
     // side might not take effect on the server side immediately
+    assertNumberOfConnectionsOnServer(1, 5000);
+  }
+
+  /**
+   * Assert for the number of server connections. This does repeated checks 
(with
+   * timeout) if it not matches at first because closing at the client side 
might
+   * not take effect on the server side immediately.
+   *
+   * @param wantedNumberOfConnections How many do we want to have
+   * @param maxWaitMs                 Within how much time (0= immediately)
+   */
+  private static void assertNumberOfConnectionsOnServer(int 
wantedNumberOfConnections, long maxWaitMs)
+      throws InterruptedException {
     int numActiveConnections = ((NettyServer) 
server).getNumActiveConnections();
-    for (int i = 0; i < 50 && numActiveConnections == 2; ++i) {
-      System.out.println("Server still has 2 active connections; retrying...");
-      Thread.sleep(100);
-      numActiveConnections = ((NettyServer) server).getNumActiveConnections();
+    if (numActiveConnections == wantedNumberOfConnections) {
+      return; // We're good.
+    }
+    long startMs = System.currentTimeMillis();
+    long waited = 0;
+    if (maxWaitMs > 0) {
+      boolean timeOut = false;
+      while (numActiveConnections != wantedNumberOfConnections && !timeOut) {
+        LOG.info("Server still has {} active connections (want {}, waiting for 
{}ms); retrying...",
+            numActiveConnections, wantedNumberOfConnections, waited);
+        Thread.sleep(100);
+        numActiveConnections = ((NettyServer) 
server).getNumActiveConnections();
+        waited = System.currentTimeMillis() - startMs;
+        timeOut = waited > maxWaitMs;
+      }
+      LOG.info("Server has {} active connections", numActiveConnections);
     }
-    assertEquals(1, numActiveConnections);
+    assertEquals(wantedNumberOfConnections, numActiveConnections,
+        "Not the expected number of connections after a wait of " + waited + " 
ms");
   }
 
   private Message createMessage() {

Reply via email to