This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f2cb1d24728 [FLINK-32848][tests][JUnit5 migration] Migrate 
flink-runtime/rpc tests to JUnit5 (#23301)
f2cb1d24728 is described below

commit f2cb1d247283344e9194e63931a2948e09f73c93
Author: Zhanghao Chen <m...@outlook.com>
AuthorDate: Thu Sep 21 08:45:59 2023 +0800

    [FLINK-32848][tests][JUnit5 migration] Migrate flink-runtime/rpc tests to 
JUnit5 (#23301)
    
    * [FLINK-32848][tests][JUnit5 migration] Migrate flink-runtime/rpc tests to 
JUnit5
---
 .../apache/flink/runtime/rpc/AsyncCallsTest.java   |  44 ++++----
 .../flink/runtime/rpc/FencedRpcEndpointTest.java   |  49 ++++-----
 .../flink/runtime/rpc/RpcConnectionTest.java       |   7 +-
 .../apache/flink/runtime/rpc/RpcEndpointTest.java  | 112 +++++++++------------
 .../apache/flink/runtime/rpc/RpcSSLAuthITCase.java |  29 +++---
 5 files changed, 109 insertions(+), 132 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 0ffad0f098e..4be039e1340 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -20,11 +20,10 @@ package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.UUID;
@@ -35,11 +34,9 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
 
-public class AsyncCallsTest extends TestLogger {
+class AsyncCallsTest {
 
     // ------------------------------------------------------------------------
     //  shared test members
@@ -49,14 +46,13 @@ public class AsyncCallsTest extends TestLogger {
 
     private static RpcService rpcService;
 
-    @BeforeClass
-    public static void setup() throws Exception {
+    @BeforeAll
+    static void setup() throws Exception {
         rpcService = RpcSystem.load().localServiceBuilder(new 
Configuration()).createAndStart();
     }
 
-    @AfterClass
-    public static void shutdown()
-            throws InterruptedException, ExecutionException, TimeoutException {
+    @AfterAll
+    static void shutdown() throws InterruptedException, ExecutionException, 
TimeoutException {
         rpcService.closeAsync().get();
     }
 
@@ -65,12 +61,12 @@ public class AsyncCallsTest extends TestLogger {
     // ------------------------------------------------------------------------
 
     @Test
-    public void testScheduleWithNoDelay() throws Exception {
+    void testScheduleWithNoDelay() throws Exception {
         runScheduleWithNoDelayTest(TestEndpoint::new);
     }
 
     @Test
-    public void testFencedScheduleWithNoDelay() throws Exception {
+    void testFencedScheduleWithNoDelay() throws Exception {
         runScheduleWithNoDelayTest(FencedTestEndpoint::new);
     }
 
@@ -117,22 +113,24 @@ public class AsyncCallsTest extends TestLogger {
                             Duration.ofSeconds(30L));
 
             String str = result.get(30, TimeUnit.SECONDS);
-            assertEquals("test", str);
+            assertThat(str).isEqualTo("test");
 
             // validate that no concurrent access happened
-            assertFalse("Rpc Endpoint had concurrent access", 
concurrentAccess.get());
+            assertThat(concurrentAccess)
+                    .withFailMessage("Rpc Endpoint had concurrent access")
+                    .isFalse();
         } finally {
             RpcUtils.terminateRpcEndpoint(rpcEndpoint);
         }
     }
 
     @Test
-    public void testScheduleWithDelay() throws Exception {
+    void testScheduleWithDelay() throws Exception {
         runScheduleWithDelayTest(TestEndpoint::new);
     }
 
     @Test
-    public void testFencedScheduleWithDelay() throws Exception {
+    void testFencedScheduleWithDelay() throws Exception {
         runScheduleWithDelayTest(FencedTestEndpoint::new);
     }
 
@@ -178,9 +176,13 @@ public class AsyncCallsTest extends TestLogger {
             final long stop = System.nanoTime();
 
             // validate that no concurrent access happened
-            assertFalse("Rpc Endpoint had concurrent access", 
concurrentAccess.get());
+            assertThat(concurrentAccess)
+                    .withFailMessage("Rpc Endpoint had concurrent access")
+                    .isFalse();
 
-            assertTrue("call was not properly delayed", ((stop - start) / 
1_000_000) >= delay);
+            assertThat(delay)
+                    .withFailMessage("call was not properly delayed")
+                    .isLessThanOrEqualTo((stop - start) / 1_000_000);
         } finally {
             RpcUtils.terminateRpcEndpoint(rpcEndpoint);
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
index c7326340e8d..7c1788beb71 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/FencedRpcEndpointTest.java
@@ -24,12 +24,10 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
 import org.apache.flink.runtime.rpc.exceptions.RpcRuntimeException;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.TestLoggerExtension;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -37,25 +35,23 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Fail.fail;
 
 /** Tests for the FencedRpcEndpoint. */
-@ExtendWith(TestLoggerExtension.class)
-public class FencedRpcEndpointTest {
+class FencedRpcEndpointTest {
 
     private static final Time timeout = Time.seconds(10L);
     private static RpcService rpcService;
 
     @BeforeAll
-    public static void setup() {
+    static void setup() {
         rpcService = new TestingRpcService();
     }
 
     @AfterAll
-    public static void teardown()
-            throws ExecutionException, InterruptedException, TimeoutException {
+    static void teardown() throws ExecutionException, InterruptedException, 
TimeoutException {
         if (rpcService != null) {
             RpcUtils.terminateRpcService(rpcService);
         }
@@ -63,7 +59,7 @@ public class FencedRpcEndpointTest {
 
     /** Tests that messages with the wrong fencing token are filtered out. */
     @Test
-    public void testFencing() throws Exception {
+    void testFencing() throws Exception {
         final UUID fencingToken = UUID.randomUUID();
         final UUID wrongFencingToken = UUID.randomUUID();
         final String value = "barfoo";
@@ -88,11 +84,11 @@ public class FencedRpcEndpointTest {
                                     FencedTestingGateway.class)
                             .get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
 
-            assertEquals(
-                    value,
-                    properFencedGateway
-                            .foobar(timeout)
-                            .get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS));
+            assertThat(
+                            properFencedGateway
+                                    .foobar(timeout)
+                                    .get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS))
+                    .isEqualTo(value);
 
             try {
                 wronglyFencedGateway
@@ -100,8 +96,8 @@ public class FencedRpcEndpointTest {
                         .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                 fail("This should fail since we have the wrong fencing 
token.");
             } catch (ExecutionException e) {
-                assertTrue(
-                        ExceptionUtils.stripExecutionException(e) instanceof 
FencingTokenException);
+                assertThat(ExceptionUtils.stripExecutionException(e))
+                        .isInstanceOf(FencingTokenException.class);
             }
 
         } finally {
@@ -115,7 +111,7 @@ public class FencedRpcEndpointTest {
      * the fencing token from such a gateway.
      */
     @Test
-    public void testUnfencedRemoteGateway() throws Exception {
+    void testUnfencedRemoteGateway() throws Exception {
         final UUID initialFencingToken = UUID.randomUUID();
         final String value = "foobar";
 
@@ -136,16 +132,15 @@ public class FencedRpcEndpointTest {
                         .get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                 fail("This should have failed because we have an unfenced 
gateway.");
             } catch (ExecutionException e) {
-                assertTrue(
-                        ExceptionUtils.stripExecutionException(e) instanceof 
RpcRuntimeException);
+                assertThat(ExceptionUtils.stripExecutionException(e))
+                        .isInstanceOf(RpcRuntimeException.class);
             }
 
-            try {
-                unfencedGateway.getFencingToken();
-                fail("We should not be able to call getFencingToken on an 
unfenced gateway.");
-            } catch (UnsupportedOperationException ignored) {
-                // we should not be able to call getFencingToken on an 
unfenced gateway
-            }
+            // we should not be able to call getFencingToken on an unfenced 
gateway
+            assertThatThrownBy(unfencedGateway::getFencingToken)
+                    .withFailMessage(
+                            "We should not be able to call getFencingToken on 
an unfenced gateway.")
+                    .isInstanceOf(UnsupportedOperationException.class);
         } finally {
             RpcUtils.terminateRpcEndpoint(fencedTestingEndpoint);
             fencedTestingEndpoint.validateResourceClosed();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
index 99ad7c208de..c9863579819 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -22,9 +22,8 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
@@ -35,10 +34,10 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
  * This test validates that the RPC service gives a good message when it 
cannot connect to an
  * RpcEndpoint.
  */
-public class RpcConnectionTest extends TestLogger {
+class RpcConnectionTest {
 
     @Test
-    public void testConnectFailure() throws Exception {
+    void testConnectFailure() throws Exception {
         // we start the RPC service with a very long timeout to ensure that 
the test
         // can only pass if the connection problem is not recognized merely 
via a timeout
         Configuration configuration = new Configuration();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
index 57071ab3d67..b1a97e175d3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcEndpointTest.java
@@ -20,18 +20,17 @@ package org.apache.flink.runtime.rpc;
 
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
-import org.apache.flink.util.TestLoggerExtension;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.time.Duration;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -39,26 +38,21 @@ import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for the RpcEndpoint, its self gateways and MainThreadExecutor 
scheduling command. */
-@ExtendWith(TestLoggerExtension.class)
-public class RpcEndpointTest {
+class RpcEndpointTest {
 
     private static RpcService rpcService = null;
 
     @BeforeAll
-    public static void setup() throws Exception {
+    static void setup() throws Exception {
         rpcService = RpcSystem.load().localServiceBuilder(new 
Configuration()).createAndStart();
     }
 
     @AfterAll
-    public static void teardown() throws Exception {
+    static void teardown() throws Exception {
         rpcService.closeAsync().get();
     }
 
@@ -67,7 +61,7 @@ public class RpcEndpointTest {
      * self gateway.
      */
     @Test
-    public void testSelfGateway() throws Exception {
+    void testSelfGateway() throws Exception {
         int expectedValue = 1337;
         BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, 
expectedValue);
 
@@ -78,7 +72,7 @@ public class RpcEndpointTest {
 
             CompletableFuture<Integer> foobar = baseGateway.foobar();
 
-            assertEquals(Integer.valueOf(expectedValue), foobar.get());
+            assertThat(foobar.get()).isEqualTo(expectedValue);
         } finally {
             RpcUtils.terminateRpcEndpoint(baseEndpoint);
 
@@ -91,27 +85,19 @@ public class RpcEndpointTest {
      * by the RpcEndpoint.
      */
     @Test
-    public void testWrongSelfGateway() {
-        assertThrows(
-                RuntimeException.class,
-                () -> {
-                    int expectedValue = 1337;
-                    BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, 
expectedValue);
-
-                    try {
-                        baseEndpoint.start();
-
-                        DifferentGateway differentGateway =
-                                
baseEndpoint.getSelfGateway(DifferentGateway.class);
-
-                        fail(
-                                "Expected to fail with a RuntimeException 
since we requested the wrong gateway type.");
-                    } finally {
-                        RpcUtils.terminateRpcEndpoint(baseEndpoint);
-
-                        baseEndpoint.validateResourceClosed();
-                    }
-                });
+    void testWrongSelfGateway() throws ExecutionException, 
InterruptedException {
+        int expectedValue = 1337;
+        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, 
expectedValue);
+        try {
+            baseEndpoint.start();
+            assertThatThrownBy(() -> 
baseEndpoint.getSelfGateway(DifferentGateway.class))
+                    .withFailMessage(
+                            "Expected to fail with a RuntimeException since we 
requested the wrong gateway type.")
+                    .isInstanceOf(RuntimeException.class);
+        } finally {
+            RpcUtils.terminateRpcEndpoint(baseEndpoint);
+            baseEndpoint.validateResourceClosed();
+        }
     }
 
     /**
@@ -119,7 +105,7 @@ public class RpcEndpointTest {
      * gateways.
      */
     @Test
-    public void testEndpointInheritance() throws Exception {
+    void testEndpointInheritance() throws Exception {
         int foobar = 1;
         int barfoo = 2;
         String foo = "foobar";
@@ -133,11 +119,11 @@ public class RpcEndpointTest {
             ExtendedGateway extendedGateway = 
endpoint.getSelfGateway(ExtendedGateway.class);
             DifferentGateway differentGateway = 
endpoint.getSelfGateway(DifferentGateway.class);
 
-            assertEquals(Integer.valueOf(foobar), baseGateway.foobar().get());
-            assertEquals(Integer.valueOf(foobar), 
extendedGateway.foobar().get());
+            assertThat(baseGateway.foobar().get()).isEqualTo(foobar);
+            assertThat(extendedGateway.foobar().get()).isEqualTo(foobar);
 
-            assertEquals(Integer.valueOf(barfoo), 
extendedGateway.barfoo().get());
-            assertEquals(foo, differentGateway.foo().get());
+            assertThat(extendedGateway.barfoo().get()).isEqualTo(barfoo);
+            assertThat(differentGateway.foo().get()).isEqualTo(foo);
         } finally {
             RpcUtils.terminateRpcEndpoint(endpoint);
             endpoint.validateResourceClosed();
@@ -146,8 +132,7 @@ public class RpcEndpointTest {
 
     /** Tests that the RPC is running after it has been started. */
     @Test
-    public void testRunningState()
-            throws InterruptedException, ExecutionException, TimeoutException {
+    void testRunningState() throws InterruptedException, ExecutionException, 
TimeoutException {
         RunningStateTestingEndpoint endpoint =
                 new RunningStateTestingEndpoint(
                         rpcService, CompletableFuture.completedFuture(null));
@@ -156,7 +141,7 @@ public class RpcEndpointTest {
 
         try {
             endpoint.start();
-            assertTrue(gateway.queryIsRunningFlag().get());
+            assertThat(gateway.queryIsRunningFlag().get()).isTrue();
         } finally {
             RpcUtils.terminateRpcEndpoint(endpoint);
             endpoint.validateResourceClosed();
@@ -165,8 +150,7 @@ public class RpcEndpointTest {
 
     /** Tests that the RPC is not running if it is being stopped. */
     @Test
-    public void testNotRunningState()
-            throws InterruptedException, ExecutionException, TimeoutException {
+    void testNotRunningState() throws InterruptedException, 
ExecutionException, TimeoutException {
         CompletableFuture<Void> stopFuture = new CompletableFuture<>();
         RunningStateTestingEndpoint endpoint =
                 new RunningStateTestingEndpoint(rpcService, stopFuture);
@@ -176,7 +160,7 @@ public class RpcEndpointTest {
         endpoint.start();
         CompletableFuture<Void> terminationFuture = 
endpoint.closeAndWaitUntilOnStopCalled();
 
-        assertFalse(gateway.queryIsRunningFlag().get());
+        assertThat(gateway.queryIsRunningFlag().get()).isFalse();
 
         stopFuture.complete(null);
         terminationFuture.get();
@@ -276,7 +260,7 @@ public class RpcEndpointTest {
 
     /** Tests executing the runnable in the main thread of the underlying RPC 
endpoint. */
     @Test
-    public void testExecute() throws InterruptedException, ExecutionException, 
TimeoutException {
+    void testExecute() throws InterruptedException, ExecutionException, 
TimeoutException {
         final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
         final CompletableFuture<Void> asyncExecutionFuture = new 
CompletableFuture<>();
         try {
@@ -295,7 +279,7 @@ public class RpcEndpointTest {
     }
 
     @Test
-    public void testScheduleRunnableWithDelayInMilliseconds() throws Exception 
{
+    void testScheduleRunnableWithDelayInMilliseconds() throws Exception {
         testScheduleWithDelay(
                 (mainThreadExecutor, expectedDelay) ->
                         mainThreadExecutor.schedule(
@@ -303,7 +287,7 @@ public class RpcEndpointTest {
     }
 
     @Test
-    public void testScheduleRunnableWithDelayInSeconds() throws Exception {
+    void testScheduleRunnableWithDelayInSeconds() throws Exception {
         testScheduleWithDelay(
                 (mainThreadExecutor, expectedDelay) ->
                         mainThreadExecutor.schedule(
@@ -311,7 +295,7 @@ public class RpcEndpointTest {
     }
 
     @Test
-    public void testScheduleRunnableAfterClose() throws Exception {
+    void testScheduleRunnableAfterClose() throws Exception {
         testScheduleAfterClose(
                 (mainThreadExecutor, expectedDelay) ->
                         mainThreadExecutor.schedule(
@@ -319,7 +303,7 @@ public class RpcEndpointTest {
     }
 
     @Test
-    public void testCancelScheduledRunnable() throws Exception {
+    void testCancelScheduledRunnable() throws Exception {
         testCancelScheduledTask(
                 (mainThreadExecutor, future) -> {
                     final Duration delayDuration = Duration.ofMillis(2);
@@ -333,7 +317,7 @@ public class RpcEndpointTest {
     }
 
     @Test
-    public void testScheduleCallableWithDelayInMilliseconds() throws Exception 
{
+    void testScheduleCallableWithDelayInMilliseconds() throws Exception {
         testScheduleWithDelay(
                 (mainThreadExecutor, expectedDelay) ->
                         mainThreadExecutor.schedule(
@@ -341,7 +325,7 @@ public class RpcEndpointTest {
     }
 
     @Test
-    public void testScheduleCallableWithDelayInSeconds() throws Exception {
+    void testScheduleCallableWithDelayInSeconds() throws Exception {
         testScheduleWithDelay(
                 (mainThreadExecutor, expectedDelay) ->
                         mainThreadExecutor.schedule(
@@ -349,7 +333,7 @@ public class RpcEndpointTest {
     }
 
     @Test
-    public void testScheduleCallableAfterClose() throws Exception {
+    void testScheduleCallableAfterClose() throws Exception {
         testScheduleAfterClose(
                 (mainThreadExecutor, expectedDelay) ->
                         mainThreadExecutor.schedule(
@@ -357,7 +341,7 @@ public class RpcEndpointTest {
     }
 
     @Test
-    public void testCancelScheduledCallable() {
+    void testCancelScheduledCallable() {
         testCancelScheduledTask(
                 (mainThreadExecutor, future) -> {
                     final Duration delayDuration = Duration.ofMillis(2);
@@ -405,8 +389,8 @@ public class RpcEndpointTest {
 
         final Duration expectedDelay = Duration.ofSeconds(0);
         ScheduledFuture<?> future = scheduler.apply(mainThreadExecutor, 
expectedDelay);
-        assertFalse(taskCompletedFuture.isDone());
-        assertFalse(future.isDone());
+        assertThat(taskCompletedFuture).isNotDone();
+        assertThat((Future<?>) future).isNotDone();
     }
 
     private static void testCancelScheduledTask(
@@ -427,8 +411,8 @@ public class RpcEndpointTest {
         scheduledFuture.cancel(true);
         manuallyTriggeredScheduledExecutorService.triggerAllNonPeriodicTasks();
 
-        assertTrue(scheduledFuture.isCancelled());
-        assertFalse(actionFuture.isDone());
+        assertThat((Future<?>) scheduledFuture).isCancelled();
+        assertThat(actionFuture).isNotDone();
         mainThreadExecutor.close();
     }
 
@@ -439,7 +423,7 @@ public class RpcEndpointTest {
      * called directly from RpcEndpoint, MainThreadExecutor do not support 
this method.
      */
     @Test
-    public void testCallAsync() throws InterruptedException, 
ExecutionException, TimeoutException {
+    void testCallAsync() throws InterruptedException, ExecutionException, 
TimeoutException {
         final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
         final Integer expectedInteger = 12345;
         try {
@@ -451,7 +435,7 @@ public class RpcEndpointTest {
                                 return expectedInteger;
                             },
                             Duration.ofSeconds(10L));
-            assertEquals(expectedInteger, integerFuture.get());
+            assertThat(integerFuture.get()).isEqualTo(expectedInteger);
         } finally {
             RpcUtils.terminateRpcEndpoint(endpoint);
             endpoint.validateResourceClosed();
@@ -463,8 +447,7 @@ public class RpcEndpointTest {
      * expected.
      */
     @Test
-    public void testCallAsyncTimeout()
-            throws InterruptedException, ExecutionException, TimeoutException {
+    void testCallAsyncTimeout() throws InterruptedException, 
ExecutionException, TimeoutException {
         final RpcEndpoint endpoint = new BaseEndpoint(rpcService);
         final Duration timeout = Duration.ofMillis(100);
         CountDownLatch latch = new CountDownLatch(1);
@@ -481,8 +464,7 @@ public class RpcEndpointTest {
                             .handle((ignore, throwable) -> throwable);
             final Throwable throwable = throwableFuture.get();
 
-            assertNotNull(throwable);
-            assertTrue(throwable instanceof TimeoutException);
+            
assertThat(throwable).isNotNull().isInstanceOf(TimeoutException.class);
         } finally {
             latch.countDown();
             RpcUtils.terminateRpcEndpoint(endpoint);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
index f5854c1b83d..153781ec033 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcSSLAuthITCase.java
@@ -22,10 +22,9 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.util.Arrays;
@@ -33,14 +32,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * This test validates that the RPC service gives a good message when it 
cannot connect to an
  * RpcEndpoint.
  */
-public class RpcSSLAuthITCase extends TestLogger {
+class RpcSSLAuthITCase {
 
     private static final String KEY_STORE_FILE =
             RpcSSLAuthITCase.class.getResource("/local127.keystore").getFile();
@@ -50,7 +48,7 @@ public class RpcSSLAuthITCase extends TestLogger {
             
RpcSSLAuthITCase.class.getResource("/untrusted.keystore").getFile();
 
     @Test
-    public void testConnectFailure() throws Exception {
+    void testConnectFailure() throws Exception {
         final Configuration baseConfig = new Configuration();
         baseConfig.setString(AkkaOptions.TCP_TIMEOUT, "1 s");
         // we start the RPC service with a very long timeout to ensure that 
the test
@@ -105,15 +103,16 @@ public class RpcSSLAuthITCase extends TestLogger {
 
             CompletableFuture<TestGateway> future =
                     rpcService2.connect(endpoint.getAddress(), 
TestGateway.class);
-            TestGateway gateway = future.get(10000000, TimeUnit.SECONDS);
-
-            CompletableFuture<String> fooFuture = gateway.foo();
-            fooFuture.get();
-
-            fail("should never complete normally");
-        } catch (ExecutionException e) {
-            // that is what we want
-            assertTrue(e.getCause() instanceof RpcConnectionException);
+            assertThatThrownBy(
+                            () -> {
+                                TestGateway gateway = future.get(10000000, 
TimeUnit.SECONDS);
+
+                                CompletableFuture<String> fooFuture = 
gateway.foo();
+                                fooFuture.get();
+                            })
+                    .withFailMessage("should never complete normally")
+                    .isInstanceOf(ExecutionException.class)
+                    .hasCauseInstanceOf(RpcConnectionException.class);
         } finally {
             final CompletableFuture<Void> rpcTerminationFuture1 =
                     rpcService1 != null

Reply via email to