This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 650d463  [FLINK-23362][tests] Remove timeouts
650d463 is described below

commit 650d463f5133b91551a5cbe96963e3fc678aebf0
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Tue Aug 17 19:58:34 2021 +0200

    [FLINK-23362][tests] Remove timeouts
---
 .../flink/queryablestate/network/ClientTest.java   | 77 ++++++++--------------
 1 file changed, 27 insertions(+), 50 deletions(-)

diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index e89a14f..80134fc 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -82,13 +82,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -100,8 +96,6 @@ public class ClientTest extends TestLogger {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ClientTest.class);
 
-    private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(20L, 
TimeUnit.SECONDS);
-
     // Thread pool for client bootstrap (shared between tests)
     private NioEventLoopGroup nioGroup;
 
@@ -114,14 +108,13 @@ public class ClientTest extends TestLogger {
     public void tearDown() throws Exception {
         if (nioGroup != null) {
             // note: no "quiet period" to not trigger Netty#4357
-            nioGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS);
+            nioGroup.shutdownGracefully();
         }
     }
 
     /** Tests simple queries, of which half succeed and half fail. */
     @Test
     public void testSimpleRequests() throws Exception {
-        Deadline deadline = TEST_TIMEOUT.fromNow();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
         MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
@@ -173,7 +166,7 @@ public class ClientTest extends TestLogger {
             Exception testException = new RuntimeException("Expected test 
Exception");
 
             for (long i = 0L; i < numQueries; i++) {
-                ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                ByteBuf buf = received.take();
                 assertNotNull("Receive timed out", buf);
 
                 Channel ch = channel.get();
@@ -205,14 +198,11 @@ public class ClientTest extends TestLogger {
             for (long i = 0L; i < numQueries; i++) {
 
                 if (i % 2L == 0L) {
-                    KvStateResponse serializedResult =
-                            futures.get((int) i)
-                                    .get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                    KvStateResponse serializedResult = futures.get((int) 
i).get();
                     assertArrayEquals(expected, serializedResult.getContent());
                 } else {
                     try {
-                        futures.get((int) i)
-                                .get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                        futures.get((int) i).get();
                         fail("Did not throw expected Exception");
                     } catch (ExecutionException e) {
 
@@ -228,9 +218,8 @@ public class ClientTest extends TestLogger {
             long expectedRequests = numQueries / 2L;
 
             // Counts can take some time to propagate
-            while (deadline.hasTimeLeft()
-                    && (stats.getNumSuccessful() != expectedRequests
-                            || stats.getNumFailed() != expectedRequests)) {
+            while (stats.getNumSuccessful() != expectedRequests
+                    || stats.getNumFailed() != expectedRequests) {
                 Thread.sleep(100L);
             }
 
@@ -246,7 +235,7 @@ public class ClientTest extends TestLogger {
                     // this is why we now simply wait a bit so that everything 
is
                     // shut down and then we check
 
-                    client.shutdown().get(10L, TimeUnit.SECONDS);
+                    client.shutdown().get();
                 } catch (Exception e) {
                     exc = e;
                     LOG.error("An exception occurred while shutting down 
netty.", e);
@@ -267,7 +256,6 @@ public class ClientTest extends TestLogger {
     /** Tests that a request to an unavailable host is failed with 
ConnectException. */
     @Test
     public void testRequestUnavailableHost() throws Exception {
-        Deadline deadline = TEST_TIMEOUT.fromNow();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
         MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer =
@@ -290,7 +278,7 @@ public class ClientTest extends TestLogger {
             CompletableFuture<KvStateResponse> future = 
client.sendRequest(serverAddress, request);
 
             try {
-                future.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                future.get();
                 fail("Did not throw expected ConnectException");
             } catch (ExecutionException e) {
                 if (!(e.getCause() instanceof ConnectException)) {
@@ -301,7 +289,7 @@ public class ClientTest extends TestLogger {
         } finally {
             if (client != null) {
                 try {
-                    client.shutdown().get(10L, TimeUnit.SECONDS);
+                    client.shutdown().get();
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -315,7 +303,6 @@ public class ClientTest extends TestLogger {
     /** Multiple threads concurrently fire queries. */
     @Test
     public void testConcurrentQueries() throws Exception {
-        Deadline deadline = TEST_TIMEOUT.fromNow();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
         final MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
@@ -389,11 +376,9 @@ public class ClientTest extends TestLogger {
 
             // Verify results
             for (Future<List<CompletableFuture<KvStateResponse>>> future : 
futures) {
-                List<CompletableFuture<KvStateResponse>> results =
-                        future.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                List<CompletableFuture<KvStateResponse>> results = 
future.get();
                 for (CompletableFuture<KvStateResponse> result : results) {
-                    KvStateResponse actual =
-                            result.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                    KvStateResponse actual = result.get();
                     assertArrayEquals(serializedResult, actual.getContent());
                 }
             }
@@ -401,7 +386,7 @@ public class ClientTest extends TestLogger {
             int totalQueries = numQueryTasks * numQueriesPerTask;
 
             // Counts can take some time to propagate
-            while (deadline.hasTimeLeft() && stats.getNumSuccessful() != 
totalQueries) {
+            while (stats.getNumSuccessful() != totalQueries) {
                 Thread.sleep(100L);
             }
 
@@ -418,7 +403,7 @@ public class ClientTest extends TestLogger {
 
             if (client != null) {
                 try {
-                    client.shutdown().get(10L, TimeUnit.SECONDS);
+                    client.shutdown().get();
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -435,7 +420,6 @@ public class ClientTest extends TestLogger {
      */
     @Test
     public void testFailureClosesChannel() throws Exception {
-        Deadline deadline = TEST_TIMEOUT.fromNow();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
         final MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
@@ -478,11 +462,11 @@ public class ClientTest extends TestLogger {
             futures.add(client.sendRequest(serverAddress, request));
             futures.add(client.sendRequest(serverAddress, request));
 
-            ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+            ByteBuf buf = received.take();
             assertNotNull("Receive timed out", buf);
             buf.release();
 
-            buf = received.poll(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+            buf = received.take();
             assertNotNull("Receive timed out", buf);
             buf.release();
 
@@ -498,7 +482,7 @@ public class ClientTest extends TestLogger {
                             new RuntimeException("Expected test server 
failure")));
 
             try {
-                futures.remove(0).get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                futures.remove(0).get();
                 fail("Did not throw expected server failure");
             } catch (ExecutionException e) {
 
@@ -509,7 +493,7 @@ public class ClientTest extends TestLogger {
             }
 
             try {
-                futures.remove(0).get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                futures.remove(0).get();
                 fail("Did not throw expected server failure");
             } catch (ExecutionException e) {
 
@@ -522,8 +506,7 @@ public class ClientTest extends TestLogger {
             assertEquals(0L, stats.getNumConnections());
 
             // Counts can take some time to propagate
-            while (deadline.hasTimeLeft()
-                    && (stats.getNumSuccessful() != 0L || stats.getNumFailed() 
!= 2L)) {
+            while (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 
2L) {
                 Thread.sleep(100L);
             }
 
@@ -533,7 +516,7 @@ public class ClientTest extends TestLogger {
         } finally {
             if (client != null) {
                 try {
-                    client.shutdown().get(10L, TimeUnit.SECONDS);
+                    client.shutdown().get();
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -554,7 +537,6 @@ public class ClientTest extends TestLogger {
      */
     @Test
     public void testServerClosesChannel() throws Exception {
-        Deadline deadline = TEST_TIMEOUT.fromNow();
         AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
 
         final MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
@@ -594,17 +576,17 @@ public class ClientTest extends TestLogger {
                     new KvStateInternalRequest(new KvStateID(), new byte[0]);
             Future<KvStateResponse> future = client.sendRequest(serverAddress, 
request);
 
-            while (!received.get() && deadline.hasTimeLeft()) {
+            while (!received.get()) {
                 Thread.sleep(50L);
             }
             assertTrue("Receive timed out", received.get());
 
             assertEquals(1, stats.getNumConnections());
 
-            channel.get().close().await(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+            channel.get().close().await();
 
             try {
-                future.get(deadline.timeLeft().toMillis(), 
TimeUnit.MILLISECONDS);
+                future.get();
                 fail("Did not throw expected server failure");
             } catch (ExecutionException e) {
                 if (!(e.getCause() instanceof ClosedChannelException)) {
@@ -616,8 +598,7 @@ public class ClientTest extends TestLogger {
             assertEquals(0L, stats.getNumConnections());
 
             // Counts can take some time to propagate
-            while (deadline.hasTimeLeft()
-                    && (stats.getNumSuccessful() != 0L || stats.getNumFailed() 
!= 1L)) {
+            while (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 
1L) {
                 Thread.sleep(100L);
             }
 
@@ -627,7 +608,7 @@ public class ClientTest extends TestLogger {
         } finally {
             if (client != null) {
                 try {
-                    client.shutdown().get(10L, TimeUnit.SECONDS);
+                    client.shutdown().get();
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -679,8 +660,6 @@ public class ClientTest extends TestLogger {
                         Collections.emptyList(),
                         new CloseableRegistry());
 
-        final FiniteDuration timeout = new FiniteDuration(10, 
TimeUnit.SECONDS);
-
         AtomicKvStateRequestStats clientStats = new 
AtomicKvStateRequestStats();
 
         final MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
@@ -787,9 +766,7 @@ public class ClientTest extends TestLogger {
                                 int targetServer = random.get(j) % numServers;
 
                                 Future<KvStateResponse> future = 
futures.get(j);
-                                byte[] buf =
-                                        future.get(timeout.toMillis(), 
TimeUnit.MILLISECONDS)
-                                                .getContent();
+                                byte[] buf = future.get().getContent();
                                 int value =
                                         KvStateSerializer.deserializeValue(
                                                 buf, IntSerializer.INSTANCE);
@@ -811,7 +788,7 @@ public class ClientTest extends TestLogger {
             }
 
             try {
-                client.shutdown().get(10L, TimeUnit.SECONDS);
+                client.shutdown().get();
             } catch (Exception e) {
                 e.printStackTrace();
             }
@@ -855,7 +832,7 @@ public class ClientTest extends TestLogger {
         } finally {
             if (client != null) {
                 try {
-                    client.shutdown().get(10L, TimeUnit.SECONDS);
+                    client.shutdown().get();
                 } catch (Exception e) {
                     e.printStackTrace();
                 }

Reply via email to