Repository: flink
Updated Branches:
  refs/heads/master 1190f3b1d -> 74b535d56


[FLINK-3074] Add config option to start YARN AM on port range

This closes #1416


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/74b535d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/74b535d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/74b535d5

Branch: refs/heads/master
Commit: 74b535d56a81fdf7460b9ce632e14e3c3d119355
Parents: 1190f3b
Author: Robert Metzger <rmetz...@apache.org>
Authored: Thu Nov 26 17:38:45 2015 +0100
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Wed Dec 9 17:41:05 2015 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            | 13 +++-
 docs/setup/yarn_setup.md                        | 25 +++++++
 .../flink/configuration/ConfigConstants.java    | 22 ++++++
 .../java/org/apache/flink/util/NetUtils.java    | 73 ++++++++++++++++---
 .../org/apache/flink/util/NetUtilsTest.java     | 33 +++++++--
 .../apache/flink/runtime/blob/BlobServer.java   | 26 +++----
 flink-yarn/src/main/resources/log4j.properties  | 24 +++++++
 .../flink/yarn/ApplicationMasterBase.scala      | 74 ++++++++++++++++++--
 8 files changed, 251 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index bc70e1d..a8aba49 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -224,7 +224,7 @@ Note: State backend must be accessible from the JobManager, 
use file:// only for
 - `blob.server.port`: Port definition for the blob server (serving user jar's) 
on the Taskmanagers.
 By default the port is set to 0, which means that the operating system is 
picking an ephemeral port.
 Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or 
a combination of both.
-It is recommended to set a range of ports to avoid collisions when multiple 
TaskManagers are running
+It is recommended to set a range of ports to avoid collisions when multiple 
JobManagers are running
 on the same machine.
 
 - `execution-retries.delay`: Delay between execution retries. Default value "5 
s". Note that values
@@ -428,6 +428,17 @@ For example for passing `LD_LIBRARY_PATH` as an env 
variable to the ApplicationM
 - `yarn.taskmanager.env.` Similar to the configuration prefix about, this 
prefix allows setting custom
 environment variables for the TaskManager processes.
 
+
+- `yarn.application-master.port` (Default: 0, which lets the OS choose an 
ephemeral port)
+With this configuration option, users can specify a port, a range of ports or 
a list of ports for the 
+Application Master (and JobManager) RPC port. By default we recommend using 
the default value (0) to
+let the operating system choose an appropriate port. In particular when 
multiple AMs are running on the 
+same physical host, fixed port assignments prevent the AM from starting.
+
+For example when running Flink on YARN on an environment with a restrictive 
firewall, this
+option allows specifying a range of allowed ports.
+
+
 ## High Availability Mode
 
 - `recovery.mode`: (Default 'standalone') Defines the recovery mode used for 
the cluster execution. Currently,

http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/docs/setup/yarn_setup.md
----------------------------------------------------------------------
diff --git a/docs/setup/yarn_setup.md b/docs/setup/yarn_setup.md
index b95b8a5..a7309e4 100644
--- a/docs/setup/yarn_setup.md
+++ b/docs/setup/yarn_setup.md
@@ -257,6 +257,31 @@ It allows to access log files for running YARN 
applications and shows diagnostic
 Users using Hadoop distributions from companies like Hortonworks, Cloudera or 
MapR might have to build Flink against their specific versions of Hadoop (HDFS) 
and YARN. Please read the [build instructions](building.html) for more details.
 
 
+## Running Flink on YARN behind Firewalls
+
+Some YARN clusters use firewalls for controlling the network traffic between 
the cluster and the rest of the network.
+In those setups, Flink jobs can only be submitted to a YARN session from 
within the cluster's network (behind the firewall).
+If this is not feasible for production use, Flink allows to configure a port 
range for all relevant services. With these 
+ranges configured, users can also submit jobs to Flink crossing the firewall.
+
+Currently, two services are needed to submit a job:
+
+ * The JobManager (ApplicatonMaster in YARN)
+ * The BlobServer running within the JobManager.
+ 
+When submitting a job to Flink, the BlobServer will distribute the jars with 
the user code to all worker nodes (TaskManagers).
+The JobManager receives the job itself and triggers the execution.
+
+The two configuration parameters for specifying the ports are the following:
+
+ * `yarn.application-master.port`
+ * `blob.server.port`
+
+These two configuration options accept single ports (for example: "50010"), 
ranges ("50000-50025"), or a combination of
+both ("50010,50011,50020-50025,50050-50075").
+
+(Hadoop is using a similar mechanism, there the configuration parameter is 
called `yarn.app.mapreduce.am.job.client.port-range`.)
+
 ## Background / Internals
 
 This section briefly describes how Flink and YARN interact.

http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 11a9478..c8dccb8 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -86,6 +86,8 @@ public final class ConfigConstants {
         * The port can either be a port, such as "9123",
         * a range of ports: "50100-50200"
         * or a list of ranges and or points: "50100-50200,50300-50400,51234"
+        *
+        * Setting the port to 0 will let the OS choose an available port.
         */
        public static final String BLOB_SERVER_PORT = "blob.server.port";
 
@@ -264,6 +266,20 @@ public final class ConfigConstants {
        public static final String YARN_TASK_MANAGER_ENV_PREFIX = 
"yarn.taskmanager.env.";
 
 
+
+        /**
+        * The config parameter defining the Akka actor system port for the 
ApplicationMaster and
+        * JobManager
+        *
+        * The port can either be a port, such as "9123",
+        * a range of ports: "50100-50200"
+        * or a list of ranges and or points: "50100-50200,50300-50400,51234"
+        *
+        * Setting the port to 0 will let the OS choose an available port.
+        */
+       public static final String YARN_APPLICATION_MASTER_PORT = 
"yarn.application-master.port";
+
+
        // ------------------------ Hadoop Configuration 
------------------------
 
        /**
@@ -628,6 +644,12 @@ public final class ConfigConstants {
         * Relative amount of memory to subtract from the requested memory.
         */
        public static final float DEFAULT_YARN_HEAP_CUTOFF_RATIO = 0.25f;
+
+       /**
+        * Default port for the application master is 0, which means
+        * the operating system assigns an ephemeral port
+        */
+       public static final String DEFAULT_YARN_APPLICATION_MASTER_PORT = "0";
        
        
        // ------------------------ File System Behavior 
------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index 0ba8820..cdb54ed 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.util;
 
+import com.google.common.collect.Iterators;
 import com.google.common.net.InetAddresses;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.Inet4Address;
@@ -29,10 +32,13 @@ import java.net.MalformedURLException;
 import java.net.ServerSocket;
 import java.net.URL;
 import java.net.UnknownHostException;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
 public class NetUtils {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(NetUtils.class);
        
        /**
         * Turn a fully qualified domain name (fqdn) into a hostname. If the 
fqdn has multiple subparts
@@ -170,30 +176,75 @@ public class NetUtils {
        }
 
        /**
-        * Returns a set of available ports defined by the range definition.
+        * Returns an iterator over available ports defined by the range 
definition.
         *
         * @param rangeDefinition String describing a single port, a range of 
ports or multiple ranges.
         * @return Set of ports from the range definition
         * @throws NumberFormatException If an invalid string is passed.
         */
-       public static Set<Integer> getPortRangeFromString(String 
rangeDefinition) throws NumberFormatException {
-               Set<Integer> finalSet = new HashSet<>();
+
+       public static Iterator<Integer> getPortRangeFromString(String 
rangeDefinition) throws NumberFormatException {
                final String[] ranges = rangeDefinition.trim().split(",");
+               List<Iterator<Integer>> iterators = new 
ArrayList<>(ranges.length);
                for(String rawRange: ranges) {
+                       Iterator<Integer> rangeIterator = null;
                        String range = rawRange.trim();
                        int dashIdx = range.indexOf('-');
                        if (dashIdx == -1) {
                                // only one port in range:
-                               finalSet.add(Integer.valueOf(range));
+                               rangeIterator = 
Iterators.singletonIterator(Integer.valueOf(range));
                        } else {
                                // evaluate range
-                               int start = Integer.valueOf(range.substring(0, 
dashIdx));
-                               int end = 
Integer.valueOf(range.substring(dashIdx+1, range.length()));
-                               for(int i = start; i <= end; i++) {
-                                       finalSet.add(i);
+                               final int start = 
Integer.valueOf(range.substring(0, dashIdx));
+                               final int end = 
Integer.valueOf(range.substring(dashIdx+1, range.length()));
+                               rangeIterator = new Iterator<Integer>() {
+                                       int i = start;
+                                       @Override
+                                       public boolean hasNext() {
+                                               return i <= end;
+                                       }
+
+                                       @Override
+                                       public Integer next() {
+                                               return i++;
+                                       }
+
+                                       @Override
+                                       public void remove() {
+                                               throw new 
UnsupportedOperationException("Remove not supported");
+                                       }
+                               };
+                       }
+                       iterators.add(rangeIterator);
+               }
+               return Iterators.concat(iterators.iterator());
+       }
+
+       /**
+        * Tries to allocate a socket from the given sets of ports.
+        *
+        * @param portsIterator A set of ports to choose from.
+        * @param factory A factory for creating the SocketServer
+        * @return null if no port was available or an allocated socket.
+        */
+       public static ServerSocket createSocketFromPorts(Iterator<Integer> 
portsIterator, SocketFactory factory) throws IOException {
+               while (portsIterator.hasNext()) {
+                       int port = portsIterator.next();
+                       LOG.debug("Trying to open socket on port {}", port);
+                       try {
+                               return factory.createSocket(port);
+                       } catch (IOException | IllegalArgumentException e) {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Unable to allocate socket on 
port", e);
+                               } else {
+                                       LOG.info("Unable to allocate on port 
{}, due to error: {}", port, e.getMessage());
                                }
                        }
                }
-               return finalSet;
+               return null;
+       }
+
+       public interface SocketFactory {
+               ServerSocket createSocket(int port) throws IOException;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java 
b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
index 13a59fa..e367e8b 100644
--- a/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
+++ b/flink-core/src/test/java/org/apache/flink/util/NetUtilsTest.java
@@ -18,11 +18,17 @@
 
 package org.apache.flink.util;
 
+import com.google.common.collect.DiscreteDomain;
+import com.google.common.collect.Iterators;
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 
 import static org.hamcrest.core.IsCollectionContaining.hasItems;
@@ -99,11 +105,18 @@ public class NetUtilsTest {
                }
        }
 
+
+
        @Test
        public void testFreePortRangeUtility() {
                // inspired by Hadoop's example for 
"yarn.app.mapreduce.am.job.client.port-range"
                String rangeDefinition = "50000-50050, 50100-50200,51234 "; // 
this also contains some whitespaces
-               Set<Integer> ports = 
NetUtils.getPortRangeFromString(rangeDefinition);
+               Iterator<Integer> portsIter = 
NetUtils.getPortRangeFromString(rangeDefinition);
+               Set<Integer> ports = new HashSet<>();
+               while(portsIter.hasNext()) {
+                       Assert.assertTrue("Duplicate element", 
ports.add(portsIter.next()));
+               }
+
                Assert.assertEquals(51+101+1, ports.size());
                // check first range
                Assert.assertThat(ports, hasItems(50000, 50001, 50002, 50050));
@@ -114,14 +127,20 @@ public class NetUtilsTest {
 
 
                // test single port "range":
-               ports = NetUtils.getPortRangeFromString(" 51234");
-               Assert.assertEquals(1, ports.size());
-               Assert.assertEquals(51234, (int)ports.iterator().next());
+               portsIter = NetUtils.getPortRangeFromString(" 51234");
+               Assert.assertTrue(portsIter.hasNext());
+               Assert.assertEquals(51234, (int)portsIter.next());
+               Assert.assertFalse(portsIter.hasNext());
 
                // test port list
-               ports = NetUtils.getPortRangeFromString("5,1,2,3,4");
-               Assert.assertEquals(5, ports.size());
-               Assert.assertThat(ports, hasItems(1,2,3,4,5));
+               portsIter = NetUtils.getPortRangeFromString("5,1,2,3,4");
+               Assert.assertTrue(portsIter.hasNext());
+               Assert.assertEquals(5, (int)portsIter.next());
+               Assert.assertEquals(1, (int)portsIter.next());
+               Assert.assertEquals(2, (int)portsIter.next());
+               Assert.assertEquals(3, (int)portsIter.next());
+               Assert.assertEquals(4, (int)portsIter.next());
+               Assert.assertFalse(portsIter.hasNext());
 
 
                Throwable error = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index f4b6c00..910bd23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -68,7 +68,7 @@ public class BlobServer extends Thread implements BlobService 
{
        private final BlobStore blobStore;
 
        /** Set of currently running threads */
-       private final Set<BlobServerConnection> activeConnections = new 
HashSet<BlobServerConnection>();
+       private final Set<BlobServerConnection> activeConnections = new 
HashSet<>();
 
        /** The maximum number of concurrent connections */
        private final int maxConnections;
@@ -142,23 +142,17 @@ public class BlobServer extends Thread implements 
BlobService {
                //  ----------------------- start the server -------------------
 
                String serverPortRange = 
config.getString(ConfigConstants.BLOB_SERVER_PORT, 
ConfigConstants.DEFAULT_BLOB_SERVER_PORT);
-               Iterator<Integer> ports = 
NetUtils.getPortRangeFromString(serverPortRange).iterator();
 
-               ServerSocket socketAttempt = null;
-               while(ports.hasNext()) {
-                       int port = ports.next();
-                       LOG.debug("Trying to open socket on port {}", port);
-                       try {
-                               socketAttempt = new ServerSocket(port, backlog);
-                               break; // we were able to use the port.
-                       } catch (IOException | IllegalArgumentException e) {
-                               if(LOG.isDebugEnabled()) {
-                                       LOG.debug("Unable to allocate socket on 
port", e);
-                               } else {
-                                       LOG.info("Unable to allocate on port 
{}, due to error: {}", port, e.getMessage());
-                               }
+               Iterator<Integer> ports = 
NetUtils.getPortRangeFromString(serverPortRange);
+
+               final int finalBacklog = backlog;
+               ServerSocket socketAttempt = 
NetUtils.createSocketFromPorts(ports, new NetUtils.SocketFactory() {
+                       @Override
+                       public ServerSocket createSocket(int port) throws 
IOException {
+                               return new ServerSocket(port, finalBacklog);
                        }
-               }
+               });
+
                if(socketAttempt == null) {
                        throw new IOException("Unable to allocate socket for 
blob server in specified port range: "+serverPortRange);
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/flink-yarn/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/resources/log4j.properties 
b/flink-yarn/src/main/resources/log4j.properties
new file mode 100644
index 0000000..749796f
--- /dev/null
+++ b/flink-yarn/src/main/resources/log4j.properties
@@ -0,0 +1,24 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+
+# Convenience file for local debugging of the JobManager/TaskManager.
+log4j.rootLogger=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x 
- %m%n

http://git-wip-us.apache.org/repos/asf/flink/blob/74b535d5/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
index dcddad8..dcb75e1 100644
--- 
a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
+++ 
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
@@ -19,22 +19,27 @@
 package org.apache.flink.yarn
 
 import java.io.{FileWriter, BufferedWriter, PrintWriter}
+import java.net.{BindException, ServerSocket}
 import java.security.PrivilegedAction
 
-import akka.actor.ActorSystem
+import akka.actor.{ActorRef, ActorSystem}
 import org.apache.flink.client.CliFrontend
 import org.apache.flink.configuration.{GlobalConfiguration, Configuration, 
ConfigConstants}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.jobmanager.{MemoryArchivist, JobManagerMode, 
JobManager}
 import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.runtime.webmonitor.WebMonitor
+import org.apache.flink.util.NetUtils
 import org.apache.flink.yarn.YarnMessages.StartYarnSession
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.jboss.netty.channel.ChannelException
 import org.slf4j.LoggerFactory
 
+import scala.annotation.tailrec
 import scala.io.Source
+import scala.util.{Success, Failure, Try}
 
 /** Base class for all application masters. This base class provides 
functionality to start a
   * [[JobManager]] implementation in a Yarn container.
@@ -111,17 +116,78 @@ abstract class ApplicationMasterBase {
         config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
       }
 
-      val (actorSystem, jmActor, archiveActor, webMonitor) =
+      // we try to start the JobManager actor system using the port definition
+      // from the config.
+      // first, we check if the port is available by opening a socket
+      // if the actor system fails to start on the port, we try further
+      val amPortRange: String = 
config.getString(ConfigConstants.YARN_APPLICATION_MASTER_PORT,
+        ConfigConstants.DEFAULT_YARN_APPLICATION_MASTER_PORT)
+      val portsIterator = NetUtils.getPortRangeFromString(amPortRange)
+
+      // method to start the actor system.
+      def startActorSystem(
+          portsIterator: java.util.Iterator[Integer])
+        : (ActorSystem, ActorRef, ActorRef, Option[WebMonitor]) = {
+        val availableSocket = NetUtils.createSocketFromPorts(
+          portsIterator,
+          new NetUtils.SocketFactory {
+            override def createSocket(port: Int): ServerSocket = new 
ServerSocket(port)
+          })
+
+        // get port as integer and close socket
+       val tryPort = if (availableSocket == null) {
+          throw new BindException(s"Unable to allocate port for 
ApplicationMaster in " +
+            s"specified port range: $amPortRange ")
+        } else {
+          val port = availableSocket.getLocalPort
+          availableSocket.close()
+          port // return for if
+        }
+
         JobManager.startActorSystemAndJobManagerActors(
           config,
           JobManagerMode.CLUSTER,
           ownHostname,
-          0,
+          tryPort,
           getJobManagerClass,
           getArchivistClass
         )
+      }
+
+      @tailrec
+      def retry[T](fn: => T, stopCond: => Boolean): Try[T] = {
+        Try {
+          fn
+        } match {
+          case Failure(x: BindException) =>
+            if (stopCond) {
+              Failure(new RuntimeException("Unable to do further retries 
starting the actor " +
+                "system"))
+            } else {
+              retry(fn, stopCond)
+            }
+          case Failure(x: Exception) => x.getCause match {
+            case c: ChannelException =>
+              if (stopCond) {
+                Failure(new RuntimeException("Unable to do further retries 
starting the actor " +
+                  "system"))
+              } else {
+                retry(fn, stopCond)
+              }
+            case _ => Failure(x)
+          }
+          case f => f
+        }
+      }
+
+      // try starting the actor system
+      val result = retry(startActorSystem(portsIterator), 
{portsIterator.hasNext})
+
+      val (actorSystem, jmActor, archiveActor, webMonitor) = result match {
+        case Success(r) => r
+        case Failure(failure) => throw new RuntimeException("Unable to start 
actor system", failure)
+      }
 
-      actorSystemOption = Option(actorSystem)
       webMonitorOption = webMonitor
 
       val address = AkkaUtils.getAddress(actorSystem)

Reply via email to