This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c1e39b6aca7fc0af5ae6370f6dcb644c91881dd1 Author: Lijie Wang <wangdachui9...@gmail.com> AuthorDate: Wed Jun 22 20:05:19 2022 +0800 [hotfix][runtime][tests] Migrate TaskManagerLocationTest, TaskExecutorToResourceManagerConnectionTest and TaskManagerRunnerConfigurationTest to JUnit5 --- ...askExecutorToResourceManagerConnectionTest.java | 39 ++++---- .../TaskManagerRunnerConfigurationTest.java | 103 +++++++++------------ .../taskmanager/TaskManagerLocationTest.java | 78 ++++++++-------- 3 files changed, 97 insertions(+), 123 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java index a14752fde1e..6e4616f7a09 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnectionTest.java @@ -32,24 +32,21 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration; import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rpc.TestingRpcService; -import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.Executors; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link TaskExecutorToResourceManagerConnection}. */ -public class TaskExecutorToResourceManagerConnectionTest extends TestLogger { +class TaskExecutorToResourceManagerConnectionTest { private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutorToResourceManagerConnectionTest.class); @@ -83,7 +80,7 @@ public class TaskExecutorToResourceManagerConnectionTest extends TestLogger { private CompletableFuture<Void> registrationRejectionFuture; @Test - public void testResourceManagerRegistration() throws Exception { + void testResourceManagerRegistration() throws Exception { final TaskExecutorToResourceManagerConnection resourceManagerRegistration = createTaskExecutorToResourceManagerConnection(); @@ -97,13 +94,13 @@ public class TaskExecutorToResourceManagerConnectionTest extends TestLogger { final TaskExecutorMemoryConfiguration actualMemoryConfiguration = taskExecutorRegistration.getMemoryConfiguration(); - assertThat(actualAddress, is(equalTo(TASK_MANAGER_ADDRESS))); - assertThat(actualResourceId, is(equalTo(TASK_MANAGER_RESOURCE_ID))); - assertThat(actualDataPort, is(equalTo(TASK_MANAGER_DATA_PORT))); - assertThat( - actualHardwareDescription, - is(equalTo(TASK_MANAGER_HARDWARE_DESCRIPTION))); - assertThat(actualMemoryConfiguration, is(TASK_MANAGER_MEMORY_CONFIGURATION)); + assertThat(actualAddress).isEqualTo(TASK_MANAGER_ADDRESS); + assertThat(actualResourceId).isEqualTo(TASK_MANAGER_RESOURCE_ID); + assertThat(actualDataPort).isEqualTo(TASK_MANAGER_DATA_PORT); + assertThat(actualHardwareDescription) + .isEqualTo(TASK_MANAGER_HARDWARE_DESCRIPTION); + assertThat(actualMemoryConfiguration) + .isEqualTo(TASK_MANAGER_MEMORY_CONFIGURATION); return CompletableFuture.completedFuture(successfulRegistration()); }); @@ -113,7 +110,7 @@ public class TaskExecutorToResourceManagerConnectionTest extends TestLogger { } @Test - public void testResourceManagerRegistrationIsRejected() { + void testResourceManagerRegistrationIsRejected() { final TaskExecutorToResourceManagerConnection resourceManagerRegistration = createTaskExecutorToResourceManagerConnection(); @@ -157,8 +154,8 @@ public class TaskExecutorToResourceManagerConnectionTest extends TestLogger { new ClusterInformation("blobServerHost", 55555)); } - @Before - public void setUp() { + @BeforeEach + void setUp() { rpcService = new TestingRpcService(); testingResourceManagerGateway = new TestingResourceManagerGateway(); @@ -168,8 +165,8 @@ public class TaskExecutorToResourceManagerConnectionTest extends TestLogger { registrationRejectionFuture = new CompletableFuture<>(); } - @After - public void tearDown() throws Exception { + @AfterEach + void tearDown() throws Exception { rpcService.stopService().get(TEST_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java index 77a8ab234ca..0f021a08c0d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java @@ -34,14 +34,11 @@ import org.apache.flink.runtime.rpc.AddressResolution; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.RpcSystem; import org.apache.flink.util.IOUtils; -import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.Executors; -import org.hamcrest.Description; -import org.hamcrest.TypeSafeMatcher; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.opentest4j.TestAbortedException; import sun.net.util.IPAddressUtil; import javax.annotation.Nullable; @@ -52,19 +49,14 @@ import java.io.IOException; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Duration; +import java.util.UUID; import java.util.concurrent.TimeUnit; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.isEmptyOrNullString; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import static org.junit.Assume.assumeNoException; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Validates that the TaskManagerRunner startup properly obeys the configuration values. @@ -74,17 +66,16 @@ import static org.junit.Assume.assumeNoException; * and verifies its content. */ @NotThreadSafe -public class TaskManagerRunnerConfigurationTest extends TestLogger { +class TaskManagerRunnerConfigurationTest { private static final RpcSystem RPC_SYSTEM = RpcSystem.load(); private static final int TEST_TIMEOUT_SECONDS = 10; - @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @TempDir private Path temporaryFolder; @Test - public void testTaskManagerRpcServiceShouldBindToConfiguredTaskManagerHostname() - throws Exception { + void testTaskManagerRpcServiceShouldBindToConfiguredTaskManagerHostname() throws Exception { final String taskmanagerHost = "testhostname"; final Configuration config = createFlinkConfigWithPredefinedTaskManagerHostname(taskmanagerHost); @@ -97,8 +88,8 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger { TaskManagerRunner.createRpcService( config, highAvailabilityServices, RPC_SYSTEM); - assertThat(taskManagerRpcService.getPort(), is(greaterThanOrEqualTo(0))); - assertThat(taskManagerRpcService.getAddress(), is(equalTo(taskmanagerHost))); + assertThat(taskManagerRpcService.getPort()).isGreaterThanOrEqualTo(0); + assertThat(taskManagerRpcService.getAddress()).isEqualTo(taskmanagerHost); } finally { maybeCloseRpcService(taskManagerRpcService); highAvailabilityServices.closeAndCleanupAllData(); @@ -106,7 +97,7 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger { } @Test - public void testTaskManagerRpcServiceShouldBindToHostnameAddress() throws Exception { + void testTaskManagerRpcServiceShouldBindToHostnameAddress() throws Exception { final Configuration config = createFlinkConfigWithHostBindPolicy(HostBindPolicy.NAME); final HighAvailabilityServices highAvailabilityServices = createHighAvailabilityServices(config); @@ -116,7 +107,7 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger { taskManagerRpcService = TaskManagerRunner.createRpcService( config, highAvailabilityServices, RPC_SYSTEM); - assertThat(taskManagerRpcService.getAddress(), not(isEmptyOrNullString())); + assertThat(taskManagerRpcService.getAddress()).isNotNull().isNotEmpty(); } finally { maybeCloseRpcService(taskManagerRpcService); highAvailabilityServices.closeAndCleanupAllData(); @@ -124,9 +115,8 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger { } @Test - public void - testTaskManagerRpcServiceShouldBindToIpAddressDeterminedByConnectingToResourceManager() - throws Exception { + void testTaskManagerRpcServiceShouldBindToIpAddressDeterminedByConnectingToResourceManager() + throws Exception { final ServerSocket testJobManagerSocket = openServerSocket(); final Configuration config = createFlinkConfigWithJobManagerPort(testJobManagerSocket.getLocalPort()); @@ -138,7 +128,11 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger { taskManagerRpcService = TaskManagerRunner.createRpcService( config, highAvailabilityServices, RPC_SYSTEM); - assertThat(taskManagerRpcService.getAddress(), is(ipAddress())); + assertThat(taskManagerRpcService.getAddress()) + .matches( + value -> + (IPAddressUtil.isIPv4LiteralAddress(value) + || IPAddressUtil.isIPv6LiteralAddress(value))); } finally { maybeCloseRpcService(taskManagerRpcService); highAvailabilityServices.closeAndCleanupAllData(); @@ -147,8 +141,7 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger { } @Test - public void testCreatingTaskManagerRpcServiceShouldFailIfRpcPortRangeIsInvalid() - throws Exception { + void testCreatingTaskManagerRpcServiceShouldFailIfRpcPortRangeIsInvalid() throws Exception { final Configuration config = new Configuration( createFlinkConfigWithPredefinedTaskManagerHostname("example.org")); @@ -158,19 +151,23 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger { createHighAvailabilityServices(config); try { - TaskManagerRunner.createRpcService(config, highAvailabilityServices, RPC_SYSTEM); - fail("Should fail because -1 is not a valid port range"); - } catch (final IllegalArgumentException e) { - assertThat(e.getMessage(), containsString("Invalid port range definition: -1")); + assertThatThrownBy( + () -> + TaskManagerRunner.createRpcService( + config, highAvailabilityServices, RPC_SYSTEM)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid port range definition: -1"); } finally { highAvailabilityServices.closeAndCleanupAllData(); } } @Test - public void testDefaultFsParameterLoading() throws Exception { + void testDefaultFsParameterLoading() throws Exception { try { - final File tmpDir = temporaryFolder.newFolder(); + final File tmpDir = + Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString()) + .toFile(); final File confFile = new File(tmpDir, GlobalConfiguration.FLINK_CONF_FILENAME); final URI defaultFS = new URI("otherFS", null, "localhost", 1234, null, null, null); @@ -183,7 +180,7 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger { Configuration configuration = TaskManagerRunner.loadConfiguration(args); FileSystem.initialize(configuration); - assertEquals(defaultFS, FileSystem.getDefaultFsUri()); + assertThat(defaultFS).isEqualTo(FileSystem.getDefaultFsUri()); } finally { // reset FS settings FileSystem.initialize(new Configuration()); @@ -191,8 +188,9 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger { } @Test - public void testLoadDynamicalProperties() throws IOException, FlinkParseException { - final File tmpDir = temporaryFolder.newFolder(); + void testLoadDynamicalProperties() throws IOException, FlinkParseException { + final File tmpDir = + Files.createTempDirectory(temporaryFolder, UUID.randomUUID().toString()).toFile(); final File confFile = new File(tmpDir, GlobalConfiguration.FLINK_CONF_FILENAME); final PrintWriter pw1 = new PrintWriter(confFile); final long managedMemory = 1024 * 1024 * 256; @@ -210,11 +208,10 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger { "-D" + JobManagerOptions.PORT.key() + "=" + jmPort }; Configuration configuration = TaskManagerRunner.loadConfiguration(args); - assertEquals( - MemorySize.parse(managedMemory + "b"), - configuration.get(TaskManagerOptions.MANAGED_MEMORY_SIZE)); - assertEquals(jmHost, configuration.get(JobManagerOptions.ADDRESS)); - assertEquals(jmPort, configuration.getInteger(JobManagerOptions.PORT)); + assertThat(MemorySize.parse(managedMemory + "b")) + .isEqualTo(configuration.get(TaskManagerOptions.MANAGED_MEMORY_SIZE)); + assertThat(jmHost).isEqualTo(configuration.get(JobManagerOptions.ADDRESS)); + assertThat(jmPort).isEqualTo(configuration.getInteger(JobManagerOptions.PORT)); } private static Configuration createFlinkConfigWithPredefinedTaskManagerHostname( @@ -255,8 +252,7 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger { try { return new ServerSocket(0); } catch (IOException e) { - assumeNoException("Skip test because could not open a server socket", e); - throw new RuntimeException("satisfy compiler"); + throw new TestAbortedException("Skip test because could not open a server socket"); } } @@ -266,19 +262,4 @@ public class TaskManagerRunnerConfigurationTest extends TestLogger { rpcService.stopService().get(TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); } } - - private static TypeSafeMatcher<String> ipAddress() { - return new TypeSafeMatcher<String>() { - @Override - protected boolean matchesSafely(String value) { - return IPAddressUtil.isIPv4LiteralAddress(value) - || IPAddressUtil.isIPv6LiteralAddress(value); - } - - @Override - public void describeTo(Description description) { - description.appendText("Is an ip address."); - } - }; - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java index 4d7350b2e73..3c9fa646e1e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerLocationTest.java @@ -21,16 +21,12 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.util.InstantiationUtil; -import org.junit.Assert; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.net.InetAddress; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -38,10 +34,10 @@ import static org.mockito.Mockito.when; * Tests for the TaskManagerLocation, which identifies the location and connection information of a * TaskManager. */ -public class TaskManagerLocationTest { +class TaskManagerLocationTest { @Test - public void testEqualsHashAndCompareTo() { + void testEqualsHashAndCompareTo() { try { ResourceID resourceID1 = new ResourceID("a"); ResourceID resourceID2 = new ResourceID("b"); @@ -72,22 +68,22 @@ public class TaskManagerLocationTest { TaskManagerLocation three = new TaskManagerLocation(resourceID3, address3, 10871); TaskManagerLocation four = new TaskManagerLocation(resourceID1, address1, 19871); - assertTrue(one.equals(four)); - assertTrue(!one.equals(two)); - assertTrue(!one.equals(three)); - assertTrue(!two.equals(three)); - assertTrue(!three.equals(four)); + assertThat(one).isEqualTo(four); + assertThat(one).isNotEqualTo(two); + assertThat(one).isNotEqualTo(three); + assertThat(two).isNotEqualTo(three); + assertThat(three).isNotEqualTo(four); - assertTrue(one.compareTo(four) == 0); - assertTrue(four.compareTo(one) == 0); - assertTrue(one.compareTo(two) != 0); - assertTrue(one.compareTo(three) != 0); - assertTrue(two.compareTo(three) != 0); - assertTrue(three.compareTo(four) != 0); + assertThat(one.compareTo(four)).isEqualTo(0); + assertThat(four.compareTo(one)).isEqualTo(0); + assertThat(one.compareTo(two)).isNotEqualTo(0); + assertThat(one.compareTo(three)).isNotEqualTo(0); + assertThat(two.compareTo(three)).isNotEqualTo(0); + assertThat(three.compareTo(four)).isNotEqualTo(0); { int val = one.compareTo(two); - assertTrue(two.compareTo(one) == -val); + assertThat(two.compareTo(one)).isEqualTo(-val); } } catch (Exception e) { e.printStackTrace(); @@ -96,7 +92,7 @@ public class TaskManagerLocationTest { } @Test - public void testSerialization() { + void testSerialization() { try { // without resolved hostname { @@ -105,7 +101,7 @@ public class TaskManagerLocationTest { ResourceID.generate(), InetAddress.getByName("1.2.3.4"), 8888); TaskManagerLocation serCopy = InstantiationUtil.clone(original); - assertEquals(original, serCopy); + assertThat(original).isEqualTo(serCopy); } // with resolved hostname @@ -116,7 +112,7 @@ public class TaskManagerLocationTest { original.getFQDNHostname(); TaskManagerLocation serCopy = InstantiationUtil.clone(original); - assertEquals(original, serCopy); + assertThat(original).isEqualTo(serCopy); } } catch (Exception e) { e.printStackTrace(); @@ -125,17 +121,17 @@ public class TaskManagerLocationTest { } @Test - public void testGetFQDNHostname() { + void testGetFQDNHostname() { try { TaskManagerLocation info1 = new TaskManagerLocation( ResourceID.generate(), InetAddress.getByName("127.0.0.1"), 19871); - assertNotNull(info1.getFQDNHostname()); + assertThat(info1.getFQDNHostname()).isNotNull(); TaskManagerLocation info2 = new TaskManagerLocation( ResourceID.generate(), InetAddress.getByName("1.2.3.4"), 8888); - assertNotNull(info2.getFQDNHostname()); + assertThat(info2.getFQDNHostname()).isNotNull(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -143,7 +139,7 @@ public class TaskManagerLocationTest { } @Test - public void testGetHostname0() { + void testGetHostname0() { try { InetAddress address = mock(InetAddress.class); when(address.getCanonicalHostName()).thenReturn("worker2.cluster.mycompany.com"); @@ -152,7 +148,7 @@ public class TaskManagerLocationTest { final TaskManagerLocation info = new TaskManagerLocation(ResourceID.generate(), address, 19871); - Assert.assertEquals("worker2", info.getHostname()); + assertThat("worker2").isEqualTo(info.getHostname()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -160,7 +156,7 @@ public class TaskManagerLocationTest { } @Test - public void testGetHostname1() { + void testGetHostname1() { try { InetAddress address = mock(InetAddress.class); when(address.getCanonicalHostName()).thenReturn("worker10"); @@ -169,7 +165,7 @@ public class TaskManagerLocationTest { TaskManagerLocation info = new TaskManagerLocation(ResourceID.generate(), address, 19871); - Assert.assertEquals("worker10", info.getHostname()); + assertThat("worker10").isEqualTo(info.getHostname()); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -177,7 +173,7 @@ public class TaskManagerLocationTest { } @Test - public void testGetHostname2() { + void testGetHostname2() { try { final String addressString = "192.168.254.254"; @@ -192,11 +188,11 @@ public class TaskManagerLocationTest { TaskManagerLocation info = new TaskManagerLocation(ResourceID.generate(), address, 54152); - assertNotNull(info.getFQDNHostname()); - assertTrue(info.getFQDNHostname().equals(addressString)); + assertThat(info.getFQDNHostname()).isNotNull(); + assertThat(info.getFQDNHostname()).isEqualTo(addressString); - assertNotNull(info.getHostname()); - assertTrue(info.getHostname().equals(addressString)); + assertThat(info.getHostname()).isNotNull(); + assertThat(info.getHostname()).isEqualTo(addressString); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -204,7 +200,7 @@ public class TaskManagerLocationTest { } @Test - public void testNotRetrieveHostName() { + void testNotRetrieveHostName() { InetAddress address = mock(InetAddress.class); when(address.getCanonicalHostName()).thenReturn("worker10"); when(address.getHostName()).thenReturn("worker10"); @@ -217,9 +213,9 @@ public class TaskManagerLocationTest { 19871, new TaskManagerLocation.IpOnlyHostNameSupplier(address)); - assertNotEquals("worker10", info.getHostname()); - assertNotEquals("worker10", info.getFQDNHostname()); - assertEquals("127.0.0.1", info.getHostname()); - assertEquals("127.0.0.1", info.getFQDNHostname()); + assertThat("worker10").isNotEqualTo(info.getHostname()); + assertThat("worker10").isNotEqualTo(info.getFQDNHostname()); + assertThat("127.0.0.1").isEqualTo(info.getHostname()); + assertThat("127.0.0.1").isEqualTo(info.getFQDNHostname()); } }