asfgit closed pull request #6845: [FLINK-10511][Cluster Management] Reuse the port selection and RPC se… URL: https://github.com/apache/flink/pull/6845
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java index f528bc4bcc4..cc12ff5ee5c 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.TransientBlobCache; import org.apache.flink.runtime.clusterframework.ApplicationStatus; -import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore; @@ -50,7 +49,7 @@ import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; -import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityContext; import org.apache.flink.runtime.security.SecurityUtils; @@ -83,10 +82,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import scala.concurrent.duration.FiniteDuration; - -import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR; - /** * Base class for the Flink cluster entry points. * @@ -252,7 +247,7 @@ protected void initializeServices(Configuration configuration) throws Exception final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS); final String portRange = getRPCPortRange(configuration); - commonRpcService = createRpcService(configuration, bindAddress, portRange); + commonRpcService = AkkaRpcServiceUtils.createRpcService(bindAddress, portRange, configuration); // update the configuration used to create the high availability services configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); @@ -293,15 +288,6 @@ protected String getRPCPortRange(Configuration configuration) { } } - protected RpcService createRpcService( - Configuration configuration, - String bindAddress, - String portRange) throws Exception { - ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, FORK_JOIN_EXECUTOR); - FiniteDuration duration = AkkaUtils.getTimeout(configuration); - return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit())); - } - protected HighAvailabilityServices createHaServices( Configuration configuration, Executor executor) throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index 3ee7641f717..28f04f7677d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution; import org.apache.flink.runtime.net.SSLUtils; @@ -30,8 +31,6 @@ import org.apache.flink.util.Preconditions; import akka.actor.ActorSystem; -import com.typesafe.config.Config; -import org.jboss.netty.channel.ChannelException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +59,25 @@ // RPC instantiation // ------------------------------------------------------------------------ + /** + * Utility method to create RPC service from configuration and hostname, port. + * + * @param hostname The hostname/address that describes the TaskManager's data location. + * @param portRangeDefinition The port range to start TaskManager on. + * @param configuration The configuration for the TaskManager. + * @return The rpc service which is used to start and connect to the TaskManager RpcEndpoint . + * @throws IOException Thrown, if the actor system can not bind to the address + * @throws Exception Thrown is some other error occurs while creating akka actor system + */ + public static RpcService createRpcService( + String hostname, + String portRangeDefinition, + Configuration configuration) throws Exception { + final ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, hostname, portRangeDefinition, LOG); + final Time timeout = AkkaUtils.getTimeoutAsTime(configuration); + return new AkkaRpcService(actorSystem, timeout); + } + /** * Utility method to create RPC service from configuration and hostname, port. * @@ -75,35 +93,7 @@ public static RpcService createRpcService( int port, Configuration configuration) throws Exception { LOG.info("Starting AkkaRpcService at {}.", NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port)); - - final ActorSystem actorSystem; - - try { - Config akkaConfig; - - if (hostname != null && !hostname.isEmpty()) { - // remote akka config - akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port); - } else { - // local akka config - akkaConfig = AkkaUtils.getAkkaConfig(configuration); - } - - LOG.debug("Using akka configuration \n {}.", akkaConfig); - - actorSystem = AkkaUtils.createActorSystem(akkaConfig); - } catch (Throwable t) { - if (t instanceof ChannelException) { - Throwable cause = t.getCause(); - if (cause != null && t.getCause() instanceof java.net.BindException) { - String address = NetUtils.hostAndPortToUrlString(hostname, port); - throw new IOException("Unable to bind AkkaRpcService actor system to address " + - address + " - " + cause.getMessage(), t); - } - } - throw new Exception("Could not create TaskManager actor system", t); - } - + final ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, hostname, port, LOG); final Time timeout = AkkaUtils.getTimeoutAsTime(configuration); return new AkkaRpcService(actorSystem, timeout); } @@ -144,7 +134,6 @@ public static String getRpcUrl( } /** - * * @param hostname The hostname or address where the target RPC service is listening. * @param port The port where the target RPC service is listening. * @param endpointName The name of the RPC endpoint. @@ -204,6 +193,6 @@ public static String createRandomName(String prefix) { // ------------------------------------------------------------------------ - /** This class is not meant to be instantiated */ + /** This class is not meant to be instantiated. */ private AkkaRpcServiceUtils() {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index 15fa4150b1f..2f54172bf6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -56,7 +56,6 @@ import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.ExecutorUtils; -import org.apache.flink.util.NetUtils; import akka.actor.ActorSystem; import org.slf4j.Logger; @@ -68,7 +67,6 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; -import java.util.Iterator; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -426,39 +424,14 @@ public static RpcService createRpcService( } final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT); - - return bindWithPort(configuration, taskManagerHostname, portRangeDefinition); - } - - private static RpcService bindWithPort( - Configuration configuration, - String taskManagerHostname, - String portRangeDefinition) throws Exception{ - - // parse port range definition and create port iterator - Iterator<Integer> portsIterator; try { - portsIterator = NetUtils.getPortRangeFromString(portRangeDefinition); + return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, configuration); } catch (Exception e) { - throw new IllegalArgumentException("Invalid port range definition: " + portRangeDefinition); - } - - while (portsIterator.hasNext()) { - try { - return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portsIterator.next(), configuration); - } - catch (Exception e) { - // we can continue to try if this contains a netty channel exception - Throwable cause = e.getCause(); - if (!(cause instanceof org.jboss.netty.channel.ChannelException || - cause instanceof java.net.BindException)) { - throw e; - } // else fall through the loop and try the next port + if (e instanceof BindException) { + throw new BindException("Could not start task manager on any port in port range " + + portRangeDefinition); } + throw e; } - - // if we come here, we have exhausted the port range - throw new BindException("Could not start task manager on any port in port range " - + portRangeDefinition); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services