asfgit closed pull request #6845: [FLINK-10511][Cluster Management] Reuse the 
port selection and RPC se…
URL: https://github.com/apache/flink/pull/6845
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
index f528bc4bcc4..cc12ff5ee5c 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
@@ -33,7 +33,6 @@
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
@@ -50,7 +49,7 @@
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
@@ -83,10 +82,6 @@
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import scala.concurrent.duration.FiniteDuration;
-
-import static 
org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR;
-
 /**
  * Base class for the Flink cluster entry points.
  *
@@ -252,7 +247,7 @@ protected void initializeServices(Configuration 
configuration) throws Exception
                        final String bindAddress = 
configuration.getString(JobManagerOptions.ADDRESS);
                        final String portRange = getRPCPortRange(configuration);
 
-                       commonRpcService = createRpcService(configuration, 
bindAddress, portRange);
+                       commonRpcService = 
AkkaRpcServiceUtils.createRpcService(bindAddress, portRange, configuration);
 
                        // update the configuration used to create the high 
availability services
                        configuration.setString(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
@@ -293,15 +288,6 @@ protected String getRPCPortRange(Configuration 
configuration) {
                }
        }
 
-       protected RpcService createRpcService(
-                       Configuration configuration,
-                       String bindAddress,
-                       String portRange) throws Exception {
-               ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, 
FORK_JOIN_EXECUTOR);
-               FiniteDuration duration = AkkaUtils.getTimeout(configuration);
-               return new AkkaRpcService(actorSystem, 
Time.of(duration.length(), duration.unit()));
-       }
-
        protected HighAvailabilityServices createHaServices(
                Configuration configuration,
                Executor executor) throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index 3ee7641f717..28f04f7677d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -22,6 +22,7 @@
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution;
 import org.apache.flink.runtime.net.SSLUtils;
@@ -30,8 +31,6 @@
 import org.apache.flink.util.Preconditions;
 
 import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
-import org.jboss.netty.channel.ChannelException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -60,6 +59,25 @@
        //  RPC instantiation
        // 
------------------------------------------------------------------------
 
+       /**
+        * Utility method to create RPC service from configuration and 
hostname, port.
+        *
+        * @param hostname   The hostname/address that describes the 
TaskManager's data location.
+        * @param portRangeDefinition   The port range to start TaskManager on.
+        * @param configuration                 The configuration for the 
TaskManager.
+        * @return   The rpc service which is used to start and connect to the 
TaskManager RpcEndpoint .
+        * @throws IOException      Thrown, if the actor system can not bind to 
the address
+        * @throws Exception      Thrown is some other error occurs while 
creating akka actor system
+        */
+       public static RpcService createRpcService(
+               String hostname,
+               String portRangeDefinition,
+               Configuration configuration) throws Exception {
+               final ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, hostname, portRangeDefinition, 
LOG);
+               final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
+               return new AkkaRpcService(actorSystem, timeout);
+       }
+
        /**
         * Utility method to create RPC service from configuration and 
hostname, port.
         *
@@ -75,35 +93,7 @@ public static RpcService createRpcService(
                int port,
                Configuration configuration) throws Exception {
                LOG.info("Starting AkkaRpcService at {}.", 
NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port));
-
-               final ActorSystem actorSystem;
-
-               try {
-                       Config akkaConfig;
-
-                       if (hostname != null && !hostname.isEmpty()) {
-                               // remote akka config
-                               akkaConfig = 
AkkaUtils.getAkkaConfig(configuration, hostname, port);
-                       } else {
-                               // local akka config
-                               akkaConfig = 
AkkaUtils.getAkkaConfig(configuration);
-                       }
-
-                       LOG.debug("Using akka configuration \n {}.", 
akkaConfig);
-
-                       actorSystem = AkkaUtils.createActorSystem(akkaConfig);
-               } catch (Throwable t) {
-                       if (t instanceof ChannelException) {
-                               Throwable cause = t.getCause();
-                               if (cause != null && t.getCause() instanceof 
java.net.BindException) {
-                                       String address = 
NetUtils.hostAndPortToUrlString(hostname, port);
-                                       throw new IOException("Unable to bind 
AkkaRpcService actor system to address " +
-                                               address + " - " + 
cause.getMessage(), t);
-                               }
-                       }
-                       throw new Exception("Could not create TaskManager actor 
system", t);
-               }
-
+               final ActorSystem actorSystem = 
BootstrapTools.startActorSystem(configuration, hostname, port, LOG);
                final Time timeout = AkkaUtils.getTimeoutAsTime(configuration);
                return new AkkaRpcService(actorSystem, timeout);
        }
@@ -144,7 +134,6 @@ public static String getRpcUrl(
        }
 
        /**
-        * 
         * @param hostname The hostname or address where the target RPC service 
is listening.
         * @param port The port where the target RPC service is listening.
         * @param endpointName The name of the RPC endpoint.
@@ -204,6 +193,6 @@ public static String createRandomName(String prefix) {
 
        // 
------------------------------------------------------------------------
 
-       /** This class is not meant to be instantiated */
+       /** This class is not meant to be instantiated. */
        private AkkaRpcServiceUtils() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index 15fa4150b1f..2f54172bf6a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -56,7 +56,6 @@
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
-import org.apache.flink.util.NetUtils;
 
 import akka.actor.ActorSystem;
 import org.slf4j.Logger;
@@ -68,7 +67,6 @@
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -426,39 +424,14 @@ public static RpcService createRpcService(
                }
 
                final String portRangeDefinition = 
configuration.getString(TaskManagerOptions.RPC_PORT);
-
-               return bindWithPort(configuration, taskManagerHostname, 
portRangeDefinition);
-       }
-
-       private static RpcService bindWithPort(
-               Configuration configuration,
-               String taskManagerHostname,
-               String portRangeDefinition) throws Exception{
-
-               // parse port range definition and create port iterator
-               Iterator<Integer> portsIterator;
                try {
-                       portsIterator = 
NetUtils.getPortRangeFromString(portRangeDefinition);
+                       return 
AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, 
configuration);
                } catch (Exception e) {
-                       throw new IllegalArgumentException("Invalid port range 
definition: " + portRangeDefinition);
-               }
-
-               while (portsIterator.hasNext()) {
-                       try {
-                               return 
AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portsIterator.next(), 
configuration);
-                       }
-                       catch (Exception e) {
-                               // we can continue to try if this contains a 
netty channel exception
-                               Throwable cause = e.getCause();
-                               if (!(cause instanceof 
org.jboss.netty.channel.ChannelException ||
-                                       cause instanceof 
java.net.BindException)) {
-                                       throw e;
-                               } // else fall through the loop and try the 
next port
+                       if (e instanceof BindException) {
+                               throw new BindException("Could not start task 
manager on any port in port range "
+                                       + portRangeDefinition);
                        }
+                       throw e;
                }
-
-               // if we come here, we have exhausted the port range
-               throw new BindException("Could not start task manager on any 
port in port range "
-                       + portRangeDefinition);
        }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to