[GitHub] GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest

2019-01-29 Thread GitBox
GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and 
remove TaskManagerConfigurationTest
URL: https://github.com/apache/flink/pull/7525#discussion_r251869623
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
 ##
 @@ -16,125 +16,97 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.taskmanager;
+package org.apache.flink.runtime.taskexecutor;
 
 import net.jcip.annotations.NotThreadSafe;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.IOUtils;
 import org.junit.Rule;
 import org.junit.Test;
 
 import org.junit.rules.TemporaryFolder;
-import scala.Tuple2;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.URI;
-import java.util.Iterator;
 
 import static org.junit.Assert.*;
 
 /**
- * Validates that the TaskManager startup properly obeys the configuration
+ * Validates that the TaskManagerRunner startup properly obeys the 
configuration
  * values.
  *
  * NOTE: at least {@link #testDefaultFsParameterLoading()} should not be run 
in parallel to other
  * tests in the same JVM as it modifies a static (private) member of the 
{@link FileSystem} class
  * and verifies its content.
  */
 @NotThreadSafe
-public class TaskManagerConfigurationTest {
+public class TaskManagerRunnerConfigurationTest {
 
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
@Test
-   public void testUsePreconfiguredNetworkInterface() throws Exception {
+   public void testUsePreconfiguredRpcService() throws Exception {
final String TEST_HOST_NAME = "testhostname";
 
Configuration config = new Configuration();
config.setString(TaskManagerOptions.HOST, TEST_HOST_NAME);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, 7891);
 
-   HighAvailabilityServices highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-   config,
-   Executors.directExecutor(),
-   
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-   try {
-
-   Tuple2> address = 
TaskManager.selectNetworkInterfaceAndPortRange(config, 
highAvailabilityServices);
-
-   // validate the configured test host name
-   assertEquals(TEST_HOST_NAME, address._1());
-   } finally {
-   highAvailabilityServices.closeAndCleanupAllData();
-   }
-   }
-
-   @Test
-   public void testActorSystemPortConfig() throws Exception {
-   // config with pre-configured hostname to speed up tests (no 
interface selection)
-   Configuration config = new Configuration();
-   config.setString(TaskManagerOptions.HOST, "localhost");
-   config.setString(JobManagerOptions.ADDRESS, "localhost");
-   config.setInteger(JobManagerOptions.PORT, 7891);
-
HighAvailabilityServices highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Executors.directExecutor(),

HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
try {
// auto port
-   Iterator portsIter = 
TaskManager.selectNetworkInterfaceAndPortRange(config, 
highAvailabilityServices)._2();
-   assertTrue(portsIter.hasNext());
-   assertEquals(0, (int) portsIter.next());
+   RpcService rpcService = 
TaskManagerRunner.createRpcService(config, highAvailabilityServices);
+   assertTrue(rpcService.getPort() >= 0);
+   // pre-defined host name
+   assertEquals(TEST_HOST_NAME, rpcService.getAddress());
 
// pre-defined port
final int testPort = 22551;
config.setString(TaskManagerOptions.RPC_PORT, 
String.valueOf(testPort));
-
-   

[GitHub] GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest

2019-01-29 Thread GitBox
GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and 
remove TaskManagerConfigurationTest
URL: https://github.com/apache/flink/pull/7525#discussion_r251788642
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
 ##
 @@ -16,125 +16,97 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.taskmanager;
+package org.apache.flink.runtime.taskexecutor;
 
 import net.jcip.annotations.NotThreadSafe;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.IOUtils;
 import org.junit.Rule;
 import org.junit.Test;
 
 import org.junit.rules.TemporaryFolder;
-import scala.Tuple2;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.URI;
-import java.util.Iterator;
 
 import static org.junit.Assert.*;
 
 /**
- * Validates that the TaskManager startup properly obeys the configuration
+ * Validates that the TaskManagerRunner startup properly obeys the 
configuration
  * values.
  *
  * NOTE: at least {@link #testDefaultFsParameterLoading()} should not be run 
in parallel to other
  * tests in the same JVM as it modifies a static (private) member of the 
{@link FileSystem} class
  * and verifies its content.
  */
 @NotThreadSafe
-public class TaskManagerConfigurationTest {
+public class TaskManagerRunnerConfigurationTest {
 
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
@Test
-   public void testUsePreconfiguredNetworkInterface() throws Exception {
+   public void testUsePreconfiguredRpcService() throws Exception {
final String TEST_HOST_NAME = "testhostname";
 
Configuration config = new Configuration();
config.setString(TaskManagerOptions.HOST, TEST_HOST_NAME);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, 7891);
 
-   HighAvailabilityServices highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-   config,
-   Executors.directExecutor(),
-   
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-   try {
-
-   Tuple2> address = 
TaskManager.selectNetworkInterfaceAndPortRange(config, 
highAvailabilityServices);
-
-   // validate the configured test host name
-   assertEquals(TEST_HOST_NAME, address._1());
-   } finally {
-   highAvailabilityServices.closeAndCleanupAllData();
-   }
-   }
-
-   @Test
-   public void testActorSystemPortConfig() throws Exception {
-   // config with pre-configured hostname to speed up tests (no 
interface selection)
-   Configuration config = new Configuration();
-   config.setString(TaskManagerOptions.HOST, "localhost");
-   config.setString(JobManagerOptions.ADDRESS, "localhost");
-   config.setInteger(JobManagerOptions.PORT, 7891);
-
HighAvailabilityServices highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Executors.directExecutor(),

HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
try {
// auto port
-   Iterator portsIter = 
TaskManager.selectNetworkInterfaceAndPortRange(config, 
highAvailabilityServices)._2();
-   assertTrue(portsIter.hasNext());
-   assertEquals(0, (int) portsIter.next());
+   RpcService rpcService = 
TaskManagerRunner.createRpcService(config, highAvailabilityServices);
+   assertTrue(rpcService.getPort() >= 0);
+   // pre-defined host name
+   assertEquals(TEST_HOST_NAME, rpcService.getAddress());
 
// pre-defined port
final int testPort = 22551;
config.setString(TaskManagerOptions.RPC_PORT, 
String.valueOf(testPort));
-
-   

[GitHub] GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and remove TaskManagerConfigurationTest

2019-01-29 Thread GitBox
GJL commented on a change in pull request #7525: [FLINK-11363][test] Check and 
remove TaskManagerConfigurationTest
URL: https://github.com/apache/flink/pull/7525#discussion_r251788642
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunnerConfigurationTest.java
 ##
 @@ -16,125 +16,97 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.taskmanager;
+package org.apache.flink.runtime.taskexecutor;
 
 import net.jcip.annotations.NotThreadSafe;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.util.IOUtils;
 import org.junit.Rule;
 import org.junit.Test;
 
 import org.junit.rules.TemporaryFolder;
-import scala.Tuple2;
 
 import java.io.File;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.URI;
-import java.util.Iterator;
 
 import static org.junit.Assert.*;
 
 /**
- * Validates that the TaskManager startup properly obeys the configuration
+ * Validates that the TaskManagerRunner startup properly obeys the 
configuration
  * values.
  *
  * NOTE: at least {@link #testDefaultFsParameterLoading()} should not be run 
in parallel to other
  * tests in the same JVM as it modifies a static (private) member of the 
{@link FileSystem} class
  * and verifies its content.
  */
 @NotThreadSafe
-public class TaskManagerConfigurationTest {
+public class TaskManagerRunnerConfigurationTest {
 
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
 
@Test
-   public void testUsePreconfiguredNetworkInterface() throws Exception {
+   public void testUsePreconfiguredRpcService() throws Exception {
final String TEST_HOST_NAME = "testhostname";
 
Configuration config = new Configuration();
config.setString(TaskManagerOptions.HOST, TEST_HOST_NAME);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(JobManagerOptions.PORT, 7891);
 
-   HighAvailabilityServices highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
-   config,
-   Executors.directExecutor(),
-   
HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
-
-   try {
-
-   Tuple2> address = 
TaskManager.selectNetworkInterfaceAndPortRange(config, 
highAvailabilityServices);
-
-   // validate the configured test host name
-   assertEquals(TEST_HOST_NAME, address._1());
-   } finally {
-   highAvailabilityServices.closeAndCleanupAllData();
-   }
-   }
-
-   @Test
-   public void testActorSystemPortConfig() throws Exception {
-   // config with pre-configured hostname to speed up tests (no 
interface selection)
-   Configuration config = new Configuration();
-   config.setString(TaskManagerOptions.HOST, "localhost");
-   config.setString(JobManagerOptions.ADDRESS, "localhost");
-   config.setInteger(JobManagerOptions.PORT, 7891);
-
HighAvailabilityServices highAvailabilityServices = 
HighAvailabilityServicesUtils.createHighAvailabilityServices(
config,
Executors.directExecutor(),

HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
 
try {
// auto port
-   Iterator portsIter = 
TaskManager.selectNetworkInterfaceAndPortRange(config, 
highAvailabilityServices)._2();
-   assertTrue(portsIter.hasNext());
-   assertEquals(0, (int) portsIter.next());
+   RpcService rpcService = 
TaskManagerRunner.createRpcService(config, highAvailabilityServices);
+   assertTrue(rpcService.getPort() >= 0);
+   // pre-defined host name
+   assertEquals(TEST_HOST_NAME, rpcService.getAddress());
 
// pre-defined port
final int testPort = 22551;
config.setString(TaskManagerOptions.RPC_PORT, 
String.valueOf(testPort));
-
-