Repository: flink Updated Branches: refs/heads/master 1190f3b1d -> 74b535d56
[FLINK-3074] Add config option to start YARN AM on port range This closes #1416 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74b535d5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74b535d5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74b535d5 Branch: refs/heads/master Commit: 74b535d56a81fdf7460b9ce632e14e3c3d119355 Parents: 1190f3b Author: Robert Metzger <rmetz...@apache.org> Authored: Thu Nov 26 17:38:45 2015 +0100 Committer: Robert Metzger <rmetz...@apache.org> Committed: Wed Dec 9 17:41:05 2015 +0100 ---------------------------------------------------------------------- docs/setup/config.md | 13 +++- docs/setup/yarn_setup.md | 25 +++++++ .../flink/configuration/ConfigConstants.java | 22 ++++++ .../java/org/apache/flink/util/NetUtils.java | 73 ++++++++++++++++--- .../org/apache/flink/util/NetUtilsTest.java | 33 +++++++-- .../apache/flink/runtime/blob/BlobServer.java | 26 +++---- flink-yarn/src/main/resources/log4j.properties | 24 +++++++ .../flink/yarn/ApplicationMasterBase.scala | 74 ++++++++++++++++++-- 8 files changed, 251 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index bc70e1d..a8aba49 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -224,7 +224,7 @@ Note: State backend must be accessible from the JobManager, use file:// only for - `blob.server.port`: Port definition for the blob server (serving user jar's) on the Taskmanagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. -It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running +It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine. - `execution-retries.delay`: Delay between execution retries. Default value "5 s". Note that values @@ -428,6 +428,17 @@ For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationM - `yarn.taskmanager.env.` Similar to the configuration prefix about, this prefix allows setting custom environment variables for the TaskManager processes. + +- `yarn.application-master.port` (Default: 0, which lets the OS choose an ephemeral port) +With this configuration option, users can specify a port, a range of ports or a list of ports for the +Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to +let the operating system choose an appropriate port. In particular when multiple AMs are running on the +same physical host, fixed port assignments prevent the AM from starting. + +For example when running Flink on YARN on an environment with a restrictive firewall, this +option allows specifying a range of allowed ports. + + ## High Availability Mode - `recovery.mode`: (Default 'standalone') Defines the recovery mode used for the cluster execution. Currently, http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/docs/setup/yarn_setup.md ---------------------------------------------------------------------- diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md index b95b8a5..a7309e4 100644 --- a/docs/setup/yarn_setup.md +++ b/docs/setup/yarn_setup.md @@ -257,6 +257,31 @@ It allows to access log files for running YARN applications and shows diagnostic Users using Hadoop distributions from companies like Hortonworks, Cloudera or MapR might have to build Flink against their specific versions of Hadoop (HDFS) and YARN. Please read the [build instructions](building.html) for more details. +## Running Flink on YARN behind Firewalls + +Some YARN clusters use firewalls for controlling the network traffic between the cluster and the rest of the network. +In those setups, Flink jobs can only be submitted to a YARN session from within the cluster's network (behind the firewall). +If this is not feasible for production use, Flink allows to configure a port range for all relevant services. With these +ranges configured, users can also submit jobs to Flink crossing the firewall. + +Currently, two services are needed to submit a job: + + * The JobManager (ApplicatonMaster in YARN) + * The BlobServer running within the JobManager. + +When submitting a job to Flink, the BlobServer will distribute the jars with the user code to all worker nodes (TaskManagers). +The JobManager receives the job itself and triggers the execution. + +The two configuration parameters for specifying the ports are the following: + + * `yarn.application-master.port` + * `blob.server.port` + +These two configuration options accept single ports (for example: "50010"), ranges ("50000-50025"), or a combination of +both ("50010,50011,50020-50025,50050-50075"). + +(Hadoop is using a similar mechanism, there the configuration parameter is called `yarn.app.mapreduce.am.job.client.port-range`.) + ## Background / Internals This section briefly describes how Flink and YARN interact. http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 11a9478..c8dccb8 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -86,6 +86,8 @@ public final class ConfigConstants { * The port can either be a port, such as "9123", * a range of ports: "50100-50200" * or a list of ranges and or points: "50100-50200,50300-50400,51234" + * + * Setting the port to 0 will let the OS choose an available port. */ public static final String BLOB_SERVER_PORT = "blob.server.port"; @@ -264,6 +266,20 @@ public final class ConfigConstants { public static final String YARN_TASK_MANAGER_ENV_PREFIX = "yarn.taskmanager.env."; + + /** + * The config parameter defining the Akka actor system port for the ApplicationMaster and + * JobManager + * + * The port can either be a port, such as "9123", + * a range of ports: "50100-50200" + * or a list of ranges and or points: "50100-50200,50300-50400,51234" + * + * Setting the port to 0 will let the OS choose an available port. + */ + public static final String YARN_APPLICATION_MASTER_PORT = "yarn.application-master.port"; + + // ------------------------ Hadoop Configuration ------------------------ /** @@ -628,6 +644,12 @@ public final class ConfigConstants { * Relative amount of memory to subtract from the requested memory. */ public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f; + + /** + * Default port for the application master is 0, which means + * the operating system assigns an ephemeral port + */ + public static final String DEFAULT_YARN_APPLICATION_MASTER_PORT = "0"; // ------------------------ File System Behavior ------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/flink-core/src/main/java/org/apache/flink/util/NetUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java index 0ba8820..cdb54ed 100644 --- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java @@ -18,7 +18,10 @@ package org.apache.flink.util; +import com.google.common.collect.Iterators; import com.google.common.net.InetAddresses; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.Inet4Address; @@ -29,10 +32,13 @@ import java.net.MalformedURLException; import java.net.ServerSocket; import java.net.URL; import java.net.UnknownHostException; -import java.util.HashSet; -import java.util.Set; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; public class NetUtils { + + private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class); /** * Turn a fully qualified domain name (fqdn) into a hostname. If the fqdn has multiple subparts @@ -170,30 +176,75 @@ public class NetUtils { } /** - * Returns a set of available ports defined by the range definition. + * Returns an iterator over available ports defined by the range definition. * * @param rangeDefinition String describing a single port, a range of ports or multiple ranges. * @return Set of ports from the range definition * @throws NumberFormatException If an invalid string is passed. */ - public static Set<Integer> getPortRangeFromString(String rangeDefinition) throws NumberFormatException { - Set<Integer> finalSet = new HashSet<>(); + + public static Iterator<Integer> getPortRangeFromString(String rangeDefinition) throws NumberFormatException { final String[] ranges = rangeDefinition.trim().split(","); + List<Iterator<Integer>> iterators = new ArrayList<>(ranges.length); for(String rawRange: ranges) { + Iterator<Integer> rangeIterator = null; String range = rawRange.trim(); int dashIdx = range.indexOf('-'); if (dashIdx == -1) { // only one port in range: - finalSet.add(Integer.valueOf(range)); + rangeIterator = Iterators.singletonIterator(Integer.valueOf(range)); } else { // evaluate range - int start = Integer.valueOf(range.substring(0, dashIdx)); - int end = Integer.valueOf(range.substring(dashIdx+1, range.length())); - for(int i = start; i <= end; i++) { - finalSet.add(i); + final int start = Integer.valueOf(range.substring(0, dashIdx)); + final int end = Integer.valueOf(range.substring(dashIdx+1, range.length())); + rangeIterator = new Iterator<Integer>() { + int i = start; + @Override + public boolean hasNext() { + return i <= end; + } + + @Override + public Integer next() { + return i++; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not supported"); + } + }; + } + iterators.add(rangeIterator); + } + return Iterators.concat(iterators.iterator()); + } + + /** + * Tries to allocate a socket from the given sets of ports. + * + * @param portsIterator A set of ports to choose from. + * @param factory A factory for creating the SocketServer + * @return null if no port was available or an allocated socket. + */ + public static ServerSocket createSocketFromPorts(Iterator<Integer> portsIterator, SocketFactory factory) throws IOException { + while (portsIterator.hasNext()) { + int port = portsIterator.next(); + LOG.debug("Trying to open socket on port {}", port); + try { + return factory.createSocket(port); + } catch (IOException | IllegalArgumentException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Unable to allocate socket on port", e); + } else { + LOG.info("Unable to allocate on port {}, due to error: {}", port, e.getMessage()); } } } - return finalSet; + return null; + } + + public interface SocketFactory { + ServerSocket createSocket(int port) throws IOException; } } http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java index 13a59fa..e367e8b 100644 --- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java @@ -18,11 +18,17 @@ package org.apache.flink.util; +import com.google.common.collect.DiscreteDomain; +import com.google.common.collect.Iterators; import org.junit.Assert; import org.junit.Test; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Set; import static org.hamcrest.core.IsCollectionContaining.hasItems; @@ -99,11 +105,18 @@ public class NetUtilsTest { } } + + @Test public void testFreePortRangeUtility() { // inspired by Hadoop's example for "yarn.app.mapreduce.am.job.client.port-range" String rangeDefinition = "50000-50050, 50100-50200,51234 "; // this also contains some whitespaces - Set<Integer> ports = NetUtils.getPortRangeFromString(rangeDefinition); + Iterator<Integer> portsIter = NetUtils.getPortRangeFromString(rangeDefinition); + Set<Integer> ports = new HashSet<>(); + while(portsIter.hasNext()) { + Assert.assertTrue("Duplicate element", ports.add(portsIter.next())); + } + Assert.assertEquals(51+101+1, ports.size()); // check first range Assert.assertThat(ports, hasItems(50000, 50001, 50002, 50050)); @@ -114,14 +127,20 @@ public class NetUtilsTest { // test single port "range": - ports = NetUtils.getPortRangeFromString(" 51234"); - Assert.assertEquals(1, ports.size()); - Assert.assertEquals(51234, (int)ports.iterator().next()); + portsIter = NetUtils.getPortRangeFromString(" 51234"); + Assert.assertTrue(portsIter.hasNext()); + Assert.assertEquals(51234, (int)portsIter.next()); + Assert.assertFalse(portsIter.hasNext()); // test port list - ports = NetUtils.getPortRangeFromString("5,1,2,3,4"); - Assert.assertEquals(5, ports.size()); - Assert.assertThat(ports, hasItems(1,2,3,4,5)); + portsIter = NetUtils.getPortRangeFromString("5,1,2,3,4"); + Assert.assertTrue(portsIter.hasNext()); + Assert.assertEquals(5, (int)portsIter.next()); + Assert.assertEquals(1, (int)portsIter.next()); + Assert.assertEquals(2, (int)portsIter.next()); + Assert.assertEquals(3, (int)portsIter.next()); + Assert.assertEquals(4, (int)portsIter.next()); + Assert.assertFalse(portsIter.hasNext()); Throwable error = null; http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index f4b6c00..910bd23 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -68,7 +68,7 @@ public class BlobServer extends Thread implements BlobService { private final BlobStore blobStore; /** Set of currently running threads */ - private final Set<BlobServerConnection> activeConnections = new HashSet<BlobServerConnection>(); + private final Set<BlobServerConnection> activeConnections = new HashSet<>(); /** The maximum number of concurrent connections */ private final int maxConnections; @@ -142,23 +142,17 @@ public class BlobServer extends Thread implements BlobService { // ----------------------- start the server ------------------- String serverPortRange = config.getString(ConfigConstants.BLOB_SERVER_PORT, ConfigConstants.DEFAULT_BLOB_SERVER_PORT); - Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange).iterator(); - ServerSocket socketAttempt = null; - while(ports.hasNext()) { - int port = ports.next(); - LOG.debug("Trying to open socket on port {}", port); - try { - socketAttempt = new ServerSocket(port, backlog); - break; // we were able to use the port. - } catch (IOException | IllegalArgumentException e) { - if(LOG.isDebugEnabled()) { - LOG.debug("Unable to allocate socket on port", e); - } else { - LOG.info("Unable to allocate on port {}, due to error: {}", port, e.getMessage()); - } + Iterator<Integer> ports = NetUtils.getPortRangeFromString(serverPortRange); + + final int finalBacklog = backlog; + ServerSocket socketAttempt = NetUtils.createSocketFromPorts(ports, new NetUtils.SocketFactory() { + @Override + public ServerSocket createSocket(int port) throws IOException { + return new ServerSocket(port, finalBacklog); } - } + }); + if(socketAttempt == null) { throw new IOException("Unable to allocate socket for blob server in specified port range: "+serverPortRange); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/flink-yarn/src/main/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/resources/log4j.properties b/flink-yarn/src/main/resources/log4j.properties new file mode 100644 index 0000000..749796f --- /dev/null +++ b/flink-yarn/src/main/resources/log4j.properties @@ -0,0 +1,24 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + + +# Convenience file for local debugging of the JobManager/TaskManager. +log4j.rootLogger=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala index dcddad8..dcb75e1 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala @@ -19,22 +19,27 @@ package org.apache.flink.yarn import java.io.{FileWriter, BufferedWriter, PrintWriter} +import java.net.{BindException, ServerSocket} import java.security.PrivilegedAction -import akka.actor.ActorSystem +import akka.actor.{ActorRef, ActorSystem} import org.apache.flink.client.CliFrontend import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManagerMode, JobManager} import org.apache.flink.runtime.util.EnvironmentInformation import org.apache.flink.runtime.webmonitor.WebMonitor +import org.apache.flink.util.NetUtils import org.apache.flink.yarn.YarnMessages.StartYarnSession import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.jboss.netty.channel.ChannelException import org.slf4j.LoggerFactory +import scala.annotation.tailrec import scala.io.Source +import scala.util.{Success, Failure, Try} /** Base class for all application masters. This base class provides functionality to start a * [[JobManager]] implementation in a Yarn container. @@ -111,17 +116,78 @@ abstract class ApplicationMasterBase { config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) } - val (actorSystem, jmActor, archiveActor, webMonitor) = + // we try to start the JobManager actor system using the port definition + // from the config. + // first, we check if the port is available by opening a socket + // if the actor system fails to start on the port, we try further + val amPortRange: String = config.getString(ConfigConstants.YARN_APPLICATION_MASTER_PORT, + ConfigConstants.DEFAULT_YARN_APPLICATION_MASTER_PORT) + val portsIterator = NetUtils.getPortRangeFromString(amPortRange) + + // method to start the actor system. + def startActorSystem( + portsIterator: java.util.Iterator[Integer]) + : (ActorSystem, ActorRef, ActorRef, Option[WebMonitor]) = { + val availableSocket = NetUtils.createSocketFromPorts( + portsIterator, + new NetUtils.SocketFactory { + override def createSocket(port: Int): ServerSocket = new ServerSocket(port) + }) + + // get port as integer and close socket + val tryPort = if (availableSocket == null) { + throw new BindException(s"Unable to allocate port for ApplicationMaster in " + + s"specified port range: $amPortRange ") + } else { + val port = availableSocket.getLocalPort + availableSocket.close() + port // return for if + } + JobManager.startActorSystemAndJobManagerActors( config, JobManagerMode.CLUSTER, ownHostname, - 0, + tryPort, getJobManagerClass, getArchivistClass ) + } + + @tailrec + def retry[T](fn: => T, stopCond: => Boolean): Try[T] = { + Try { + fn + } match { + case Failure(x: BindException) => + if (stopCond) { + Failure(new RuntimeException("Unable to do further retries starting the actor " + + "system")) + } else { + retry(fn, stopCond) + } + case Failure(x: Exception) => x.getCause match { + case c: ChannelException => + if (stopCond) { + Failure(new RuntimeException("Unable to do further retries starting the actor " + + "system")) + } else { + retry(fn, stopCond) + } + case _ => Failure(x) + } + case f => f + } + } + + // try starting the actor system + val result = retry(startActorSystem(portsIterator), {portsIterator.hasNext}) + + val (actorSystem, jmActor, archiveActor, webMonitor) = result match { + case Success(r) => r + case Failure(failure) => throw new RuntimeException("Unable to start actor system", failure) + } - actorSystemOption = Option(actorSystem) webMonitorOption = webMonitor val address = AkkaUtils.getAddress(actorSystem)