[GitHub] GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest
GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest URL: https://github.com/apache/flink/pull/7525#discussion_r251869623 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java ## @@ -16,125 +16,97 @@ * limitations under the License. */ -package org.apache.flink.runtime.taskmanager; +package org.apache.flink.runtime.taskexecutor; import net.jcip.annotations.NotThreadSafe; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.IOUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import scala.Tuple2; import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; import java.net.ServerSocket; import java.net.URI; -import java.util.Iterator; import static org.junit.Assert.*; /** - * Validates that the TaskManager startup properly obeys the configuration + * Validates that the TaskManagerRunner startup properly obeys the configuration * values. * * NOTE: at least {@link #testDefaultFsParameterLoading()} should not be run in parallel to other * tests in the same JVM as it modifies a static (private) member of the {@link FileSystem} class * and verifies its content. */ @NotThreadSafe -public class TaskManagerConfigurationTest { +public class TaskManagerRunnerConfigurationTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Test - public void testUsePreconfiguredNetworkInterface() throws Exception { + public void testUsePreconfiguredRpcService() throws Exception { final String TEST_HOST_NAME = "testhostname"; Configuration config = new Configuration(); config.setString(TaskManagerOptions.HOST, TEST_HOST_NAME); config.setString(JobManagerOptions.ADDRESS, "localhost"); config.setInteger(JobManagerOptions.PORT, 7891); - HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, - Executors.directExecutor(), - HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); - - try { - - Tuple2> address = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices); - - // validate the configured test host name - assertEquals(TEST_HOST_NAME, address._1()); - } finally { - highAvailabilityServices.closeAndCleanupAllData(); - } - } - - @Test - public void testActorSystemPortConfig() throws Exception { - // config with pre-configured hostname to speed up tests (no interface selection) - Configuration config = new Configuration(); - config.setString(TaskManagerOptions.HOST, "localhost"); - config.setString(JobManagerOptions.ADDRESS, "localhost"); - config.setInteger(JobManagerOptions.PORT, 7891); - HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( config, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); try { // auto port - Iterator portsIter = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._2(); - assertTrue(portsIter.hasNext()); - assertEquals(0, (int) portsIter.next()); + RpcService rpcService = TaskManagerRunner.createRpcService(config, highAvailabilityServices); + assertTrue(rpcService.getPort() >= 0); + // pre-defined host name + assertEquals(TEST_HOST_NAME, rpcService.getAddress()); // pre-defined port final int testPort = 22551; config.setString(TaskManagerOptions.RPC_PORT, String.valueOf(testPort)); - -
[GitHub] GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest
GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest URL: https://github.com/apache/flink/pull/7525#discussion_r251788642 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java ## @@ -16,125 +16,97 @@ * limitations under the License. */ -package org.apache.flink.runtime.taskmanager; +package org.apache.flink.runtime.taskexecutor; import net.jcip.annotations.NotThreadSafe; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.IOUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import scala.Tuple2; import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; import java.net.ServerSocket; import java.net.URI; -import java.util.Iterator; import static org.junit.Assert.*; /** - * Validates that the TaskManager startup properly obeys the configuration + * Validates that the TaskManagerRunner startup properly obeys the configuration * values. * * NOTE: at least {@link #testDefaultFsParameterLoading()} should not be run in parallel to other * tests in the same JVM as it modifies a static (private) member of the {@link FileSystem} class * and verifies its content. */ @NotThreadSafe -public class TaskManagerConfigurationTest { +public class TaskManagerRunnerConfigurationTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Test - public void testUsePreconfiguredNetworkInterface() throws Exception { + public void testUsePreconfiguredRpcService() throws Exception { final String TEST_HOST_NAME = "testhostname"; Configuration config = new Configuration(); config.setString(TaskManagerOptions.HOST, TEST_HOST_NAME); config.setString(JobManagerOptions.ADDRESS, "localhost"); config.setInteger(JobManagerOptions.PORT, 7891); - HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, - Executors.directExecutor(), - HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); - - try { - - Tuple2> address = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices); - - // validate the configured test host name - assertEquals(TEST_HOST_NAME, address._1()); - } finally { - highAvailabilityServices.closeAndCleanupAllData(); - } - } - - @Test - public void testActorSystemPortConfig() throws Exception { - // config with pre-configured hostname to speed up tests (no interface selection) - Configuration config = new Configuration(); - config.setString(TaskManagerOptions.HOST, "localhost"); - config.setString(JobManagerOptions.ADDRESS, "localhost"); - config.setInteger(JobManagerOptions.PORT, 7891); - HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( config, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); try { // auto port - Iterator portsIter = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._2(); - assertTrue(portsIter.hasNext()); - assertEquals(0, (int) portsIter.next()); + RpcService rpcService = TaskManagerRunner.createRpcService(config, highAvailabilityServices); + assertTrue(rpcService.getPort() >= 0); + // pre-defined host name + assertEquals(TEST_HOST_NAME, rpcService.getAddress()); // pre-defined port final int testPort = 22551; config.setString(TaskManagerOptions.RPC_PORT, String.valueOf(testPort)); - -
[GitHub] GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest
GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest URL: https://github.com/apache/flink/pull/7525#discussion_r251788642 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java ## @@ -16,125 +16,97 @@ * limitations under the License. */ -package org.apache.flink.runtime.taskmanager; +package org.apache.flink.runtime.taskexecutor; import net.jcip.annotations.NotThreadSafe; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.IOUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import scala.Tuple2; import java.io.File; import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; import java.net.ServerSocket; import java.net.URI; -import java.util.Iterator; import static org.junit.Assert.*; /** - * Validates that the TaskManager startup properly obeys the configuration + * Validates that the TaskManagerRunner startup properly obeys the configuration * values. * * NOTE: at least {@link #testDefaultFsParameterLoading()} should not be run in parallel to other * tests in the same JVM as it modifies a static (private) member of the {@link FileSystem} class * and verifies its content. */ @NotThreadSafe -public class TaskManagerConfigurationTest { +public class TaskManagerRunnerConfigurationTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Test - public void testUsePreconfiguredNetworkInterface() throws Exception { + public void testUsePreconfiguredRpcService() throws Exception { final String TEST_HOST_NAME = "testhostname"; Configuration config = new Configuration(); config.setString(TaskManagerOptions.HOST, TEST_HOST_NAME); config.setString(JobManagerOptions.ADDRESS, "localhost"); config.setInteger(JobManagerOptions.PORT, 7891); - HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( - config, - Executors.directExecutor(), - HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); - - try { - - Tuple2> address = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices); - - // validate the configured test host name - assertEquals(TEST_HOST_NAME, address._1()); - } finally { - highAvailabilityServices.closeAndCleanupAllData(); - } - } - - @Test - public void testActorSystemPortConfig() throws Exception { - // config with pre-configured hostname to speed up tests (no interface selection) - Configuration config = new Configuration(); - config.setString(TaskManagerOptions.HOST, "localhost"); - config.setString(JobManagerOptions.ADDRESS, "localhost"); - config.setInteger(JobManagerOptions.PORT, 7891); - HighAvailabilityServices highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices( config, Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); try { // auto port - Iterator portsIter = TaskManager.selectNetworkInterfaceAndPortRange(config, highAvailabilityServices)._2(); - assertTrue(portsIter.hasNext()); - assertEquals(0, (int) portsIter.next()); + RpcService rpcService = TaskManagerRunner.createRpcService(config, highAvailabilityServices); + assertTrue(rpcService.getPort() >= 0); + // pre-defined host name + assertEquals(TEST_HOST_NAME, rpcService.getAddress()); // pre-defined port final int testPort = 22551; config.setString(TaskManagerOptions.RPC_PORT, String.valueOf(testPort)); - -