This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 650d463 [FLINK-23362][tests] Remove timeouts 650d463 is described below commit 650d463f5133b91551a5cbe96963e3fc678aebf0 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Tue Aug 17 19:58:34 2021 +0200 [FLINK-23362][tests] Remove timeouts --- .../flink/queryablestate/network/ClientTest.java | 77 ++++++++-------------- 1 file changed, 27 insertions(+), 50 deletions(-) diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java index e89a14f..80134fc 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java @@ -82,13 +82,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; - import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -100,8 +96,6 @@ public class ClientTest extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(ClientTest.class); - private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(20L, TimeUnit.SECONDS); - // Thread pool for client bootstrap (shared between tests) private NioEventLoopGroup nioGroup; @@ -114,14 +108,13 @@ public class ClientTest extends TestLogger { public void tearDown() throws Exception { if (nioGroup != null) { // note: no "quiet period" to not trigger Netty#4357 - nioGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS); + nioGroup.shutdownGracefully(); } } /** Tests simple queries, of which half succeed and half fail. */ @Test public void testSimpleRequests() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = @@ -173,7 +166,7 @@ public class ClientTest extends TestLogger { Exception testException = new RuntimeException("Expected test Exception"); for (long i = 0L; i < numQueries; i++) { - ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + ByteBuf buf = received.take(); assertNotNull("Receive timed out", buf); Channel ch = channel.get(); @@ -205,14 +198,11 @@ public class ClientTest extends TestLogger { for (long i = 0L; i < numQueries; i++) { if (i % 2L == 0L) { - KvStateResponse serializedResult = - futures.get((int) i) - .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + KvStateResponse serializedResult = futures.get((int) i).get(); assertArrayEquals(expected, serializedResult.getContent()); } else { try { - futures.get((int) i) - .get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + futures.get((int) i).get(); fail("Did not throw expected Exception"); } catch (ExecutionException e) { @@ -228,9 +218,8 @@ public class ClientTest extends TestLogger { long expectedRequests = numQueries / 2L; // Counts can take some time to propagate - while (deadline.hasTimeLeft() - && (stats.getNumSuccessful() != expectedRequests - || stats.getNumFailed() != expectedRequests)) { + while (stats.getNumSuccessful() != expectedRequests + || stats.getNumFailed() != expectedRequests) { Thread.sleep(100L); } @@ -246,7 +235,7 @@ public class ClientTest extends TestLogger { // this is why we now simply wait a bit so that everything is // shut down and then we check - client.shutdown().get(10L, TimeUnit.SECONDS); + client.shutdown().get(); } catch (Exception e) { exc = e; LOG.error("An exception occurred while shutting down netty.", e); @@ -267,7 +256,6 @@ public class ClientTest extends TestLogger { /** Tests that a request to an unavailable host is failed with ConnectException. */ @Test public void testRequestUnavailableHost() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = @@ -290,7 +278,7 @@ public class ClientTest extends TestLogger { CompletableFuture<KvStateResponse> future = client.sendRequest(serverAddress, request); try { - future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + future.get(); fail("Did not throw expected ConnectException"); } catch (ExecutionException e) { if (!(e.getCause() instanceof ConnectException)) { @@ -301,7 +289,7 @@ public class ClientTest extends TestLogger { } finally { if (client != null) { try { - client.shutdown().get(10L, TimeUnit.SECONDS); + client.shutdown().get(); } catch (Exception e) { e.printStackTrace(); } @@ -315,7 +303,6 @@ public class ClientTest extends TestLogger { /** Multiple threads concurrently fire queries. */ @Test public void testConcurrentQueries() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = @@ -389,11 +376,9 @@ public class ClientTest extends TestLogger { // Verify results for (Future<List<CompletableFuture<KvStateResponse>>> future : futures) { - List<CompletableFuture<KvStateResponse>> results = - future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + List<CompletableFuture<KvStateResponse>> results = future.get(); for (CompletableFuture<KvStateResponse> result : results) { - KvStateResponse actual = - result.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + KvStateResponse actual = result.get(); assertArrayEquals(serializedResult, actual.getContent()); } } @@ -401,7 +386,7 @@ public class ClientTest extends TestLogger { int totalQueries = numQueryTasks * numQueriesPerTask; // Counts can take some time to propagate - while (deadline.hasTimeLeft() && stats.getNumSuccessful() != totalQueries) { + while (stats.getNumSuccessful() != totalQueries) { Thread.sleep(100L); } @@ -418,7 +403,7 @@ public class ClientTest extends TestLogger { if (client != null) { try { - client.shutdown().get(10L, TimeUnit.SECONDS); + client.shutdown().get(); } catch (Exception e) { e.printStackTrace(); } @@ -435,7 +420,6 @@ public class ClientTest extends TestLogger { */ @Test public void testFailureClosesChannel() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = @@ -478,11 +462,11 @@ public class ClientTest extends TestLogger { futures.add(client.sendRequest(serverAddress, request)); futures.add(client.sendRequest(serverAddress, request)); - ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + ByteBuf buf = received.take(); assertNotNull("Receive timed out", buf); buf.release(); - buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + buf = received.take(); assertNotNull("Receive timed out", buf); buf.release(); @@ -498,7 +482,7 @@ public class ClientTest extends TestLogger { new RuntimeException("Expected test server failure"))); try { - futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + futures.remove(0).get(); fail("Did not throw expected server failure"); } catch (ExecutionException e) { @@ -509,7 +493,7 @@ public class ClientTest extends TestLogger { } try { - futures.remove(0).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + futures.remove(0).get(); fail("Did not throw expected server failure"); } catch (ExecutionException e) { @@ -522,8 +506,7 @@ public class ClientTest extends TestLogger { assertEquals(0L, stats.getNumConnections()); // Counts can take some time to propagate - while (deadline.hasTimeLeft() - && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 2L)) { + while (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 2L) { Thread.sleep(100L); } @@ -533,7 +516,7 @@ public class ClientTest extends TestLogger { } finally { if (client != null) { try { - client.shutdown().get(10L, TimeUnit.SECONDS); + client.shutdown().get(); } catch (Exception e) { e.printStackTrace(); } @@ -554,7 +537,6 @@ public class ClientTest extends TestLogger { */ @Test public void testServerClosesChannel() throws Exception { - Deadline deadline = TEST_TIMEOUT.fromNow(); AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats(); final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = @@ -594,17 +576,17 @@ public class ClientTest extends TestLogger { new KvStateInternalRequest(new KvStateID(), new byte[0]); Future<KvStateResponse> future = client.sendRequest(serverAddress, request); - while (!received.get() && deadline.hasTimeLeft()) { + while (!received.get()) { Thread.sleep(50L); } assertTrue("Receive timed out", received.get()); assertEquals(1, stats.getNumConnections()); - channel.get().close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + channel.get().close().await(); try { - future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + future.get(); fail("Did not throw expected server failure"); } catch (ExecutionException e) { if (!(e.getCause() instanceof ClosedChannelException)) { @@ -616,8 +598,7 @@ public class ClientTest extends TestLogger { assertEquals(0L, stats.getNumConnections()); // Counts can take some time to propagate - while (deadline.hasTimeLeft() - && (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 1L)) { + while (stats.getNumSuccessful() != 0L || stats.getNumFailed() != 1L) { Thread.sleep(100L); } @@ -627,7 +608,7 @@ public class ClientTest extends TestLogger { } finally { if (client != null) { try { - client.shutdown().get(10L, TimeUnit.SECONDS); + client.shutdown().get(); } catch (Exception e) { e.printStackTrace(); } @@ -679,8 +660,6 @@ public class ClientTest extends TestLogger { Collections.emptyList(), new CloseableRegistry()); - final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS); - AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats(); final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer = @@ -787,9 +766,7 @@ public class ClientTest extends TestLogger { int targetServer = random.get(j) % numServers; Future<KvStateResponse> future = futures.get(j); - byte[] buf = - future.get(timeout.toMillis(), TimeUnit.MILLISECONDS) - .getContent(); + byte[] buf = future.get().getContent(); int value = KvStateSerializer.deserializeValue( buf, IntSerializer.INSTANCE); @@ -811,7 +788,7 @@ public class ClientTest extends TestLogger { } try { - client.shutdown().get(10L, TimeUnit.SECONDS); + client.shutdown().get(); } catch (Exception e) { e.printStackTrace(); } @@ -855,7 +832,7 @@ public class ClientTest extends TestLogger { } finally { if (client != null) { try { - client.shutdown().get(10L, TimeUnit.SECONDS); + client.shutdown().get(); } catch (Exception e) { e.printStackTrace(); }