Repository: flink
Updated Branches:
  refs/heads/master 56017a98f -> 3039df809


[FLINK-7540] Apply consistent hostname normalization

The hostname normalization is now applied when generationg the remote akka 
config.
That way it should be ensured that all ActorSystems are bound to a normalized
hostname.

This closes #4812.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3039df80
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3039df80
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3039df80

Branch: refs/heads/master
Commit: 3039df8099bc8d42b645d69deb360c884c94b83b
Parents: 56017a9
Author: Till Rohrmann <[email protected]>
Authored: Thu Oct 12 01:17:23 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Thu Oct 26 09:49:05 2017 +0200

----------------------------------------------------------------------
 .../flink/client/program/ClusterClient.java     |  8 ++--
 .../apache/flink/runtime/client/JobClient.java  | 46 ++++++++------------
 .../clusterframework/BootstrapTools.java        | 12 +++--
 .../runtime/rpc/akka/AkkaRpcServiceUtils.java   |  2 +-
 .../apache/flink/runtime/akka/AkkaUtils.scala   | 15 ++++---
 .../flink/runtime/jobmanager/JobManager.scala   | 32 +++-----------
 .../flink/runtime/taskmanager/TaskManager.scala | 29 +++---------
 .../taskmanager/TaskManagerStartupTest.java     | 11 +++--
 .../flink/runtime/akka/AkkaUtilsTest.scala      | 15 ++++++-
 9 files changed, 72 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 06fb42a..62efcfa 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobListeningContext;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -83,7 +84,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
-import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -239,9 +239,11 @@ public abstract class ClusterClient {
                                }
 
                                try {
-                                       actorSystem = 
AkkaUtils.createActorSystem(
+                                       actorSystem = 
BootstrapTools.startActorSystem(
                                                configuration,
-                                               Option.apply(new Tuple2<String, 
Object>(ownHostname.getCanonicalHostName(), 0)));
+                                               
ownHostname.getCanonicalHostName(),
+                                               0,
+                                               log);
                                } catch (Exception e) {
                                        throw new FlinkException("Could not 
start the ActorSystem lazily.", e);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index ae6576e..82c40e0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.runtime.client;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.Address;
-import akka.actor.Identify;
-import akka.actor.PoisonPill;
-import akka.actor.Props;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
@@ -34,6 +26,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -41,21 +34,20 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobClientMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.SerializedThrowable;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Identify;
+import akka.actor.PoisonPill;
+import akka.actor.Props;
+import akka.pattern.Patterns;
+import akka.util.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URL;
 import java.util.Collection;
@@ -64,6 +56,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -75,20 +72,11 @@ public class JobClient {
        private static final Logger LOG = 
LoggerFactory.getLogger(JobClient.class);
 
        public static ActorSystem startJobClientActorSystem(Configuration 
config, String hostname)
-                       throws IOException {
+                       throws Exception {
                LOG.info("Starting JobClient actor system");
 
-               Option<Tuple2<String, Object>> remoting = new Some<>(new 
Tuple2<String, Object>(hostname, 0));
-
                // start a remote actor system to listen on an arbitrary port
-               ActorSystem system = AkkaUtils.createActorSystem(config, 
remoting);
-               Address address = system.provider().getDefaultAddress();
-
-               String hostAddress = address.host().isDefined() ?
-                               
NetUtils.ipAddressToUrlString(InetAddress.getByName(address.host().get())) :
-                               "(unknown)";
-               int port = address.port().isDefined() ? ((Integer) 
address.port().get()) : -1;
-               LOG.info("Started JobClient actor system at " + hostAddress + 
':' + port);
+               ActorSystem system = BootstrapTools.startActorSystem(config, 
hostname, 0, LOG);
 
                return system;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 0a56963..5849c21 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -34,6 +34,8 @@ import 
org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.NetUtils;
 
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
+
 import akka.actor.ActorSystem;
 import com.typesafe.config.Config;
 import org.apache.commons.cli.CommandLine;
@@ -53,6 +55,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 
+import scala.Some;
+import scala.Tuple2;
 import scala.concurrent.duration.FiniteDuration;
 
 /**
@@ -140,13 +144,13 @@ public class BootstrapTools {
                                int listeningPort,
                                Logger logger) throws Exception {
 
-               String hostPortUrl = listeningAddress + ':' + listeningPort;
+               String hostPortUrl = 
NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, 
listeningPort);
                logger.info("Trying to start actor system at {}", hostPortUrl);
 
                try {
                        Config akkaConfig = AkkaUtils.getAkkaConfig(
                                configuration,
-                               new scala.Some<>(new scala.Tuple2<String, 
Object>(listeningAddress, listeningPort))
+                               new Some<>(new Tuple2<>(listeningAddress, 
listeningPort))
                        );
 
                        logger.debug("Using akka configuration\n {}", 
akkaConfig);
@@ -157,9 +161,9 @@ public class BootstrapTools {
                        return actorSystem;
                }
                catch (Throwable t) {
-                       if (t instanceof 
org.jboss.netty.channel.ChannelException) {
+                       if (t instanceof ChannelException) {
                                Throwable cause = t.getCause();
-                               if (cause != null && t.getCause() instanceof 
java.net.BindException) {
+                               if (cause != null && t.getCause() instanceof 
BindException) {
                                        throw new IOException("Unable to create 
ActorSystem at address " + hostPortUrl +
                                                        " : " + 
cause.getMessage(), t);
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
index dbf61a2..20e18ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java
@@ -73,7 +73,7 @@ public class AkkaRpcServiceUtils {
         * @throws Exception      Thrown is some other error occurs while 
creating akka actor system
         */
        public static RpcService createRpcService(String hostname, int port, 
Configuration configuration) throws Exception {
-               LOG.info("Starting AkkaRpcService at {}.", 
NetUtils.hostAndPortToUrlString(hostname, port));
+               LOG.info("Starting AkkaRpcService at {}.", 
NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port));
 
                final ActorSystem actorSystem;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 05ffc87..565b5ea 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -268,9 +268,14 @@ object AkkaUtils {
    * @param externalPort The port to expect for Akka messages
    * @return Flink's Akka configuration for remote actor systems
    */
-  private def getRemoteAkkaConfig(configuration: Configuration,
-                                  bindAddress: String, port: Int,
-                                  externalHostname: String, externalPort: 
Int): Config = {
+  private def getRemoteAkkaConfig(
+      configuration: Configuration,
+      bindAddress: String,
+      port: Int,
+      externalHostname: String,
+      externalPort: Int): Config = {
+
+    val normalizedExternalHostname = 
NetUtils.unresolvedHostToNormalizedString(externalHostname)
 
     val akkaAskTimeout = 
Duration(configuration.getString(AkkaOptions.ASK_TIMEOUT))
 
@@ -360,8 +365,8 @@ object AkkaUtils {
        """.stripMargin
 
     val effectiveHostname =
-      if (externalHostname != null && externalHostname.nonEmpty) {
-        externalHostname
+      if (normalizedExternalHostname != null && 
normalizedExternalHostname.nonEmpty) {
+        normalizedExternalHostname
       } else {
         // if bindAddress is null or empty, then leave bindAddress 
unspecified. Akka will pick
         // InetAddress.getLocalHost.getHostAddress

http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 3bb7faa..7ac965a 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.blob.{BlobServer, BlobStore}
 import org.apache.flink.runtime.checkpoint._
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.client._
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager
+import org.apache.flink.runtime.clusterframework.{BootstrapTools, 
FlinkResourceManager}
 import org.apache.flink.runtime.clusterframework.messages._
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
@@ -2167,32 +2167,12 @@ object JobManager {
       externalHostname: String,
       port: Int): ActorSystem = {
 
-    val hostPort = 
NetUtils.unresolvedHostAndPortToNormalizedString(externalHostname, port)
-
     // Bring up the job manager actor system first, bind it to the given 
address.
-    LOG.info(s"Starting JobManager actor system reachable at $hostPort")
-
-    val jobManagerSystem = try {
-      val akkaConfig = AkkaUtils.getAkkaConfig(
-        configuration,
-        Some((externalHostname, port))
-      )
-      if (LOG.isDebugEnabled) {
-        LOG.debug("Using akka configuration\n " + akkaConfig)
-      }
-      AkkaUtils.createActorSystem(akkaConfig)
-    }
-    catch {
-      case t: Throwable =>
-        if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
-          val cause = t.getCause()
-          if (cause != null && 
t.getCause().isInstanceOf[java.net.BindException]) {
-            throw new Exception("Unable to create JobManager at address " + 
hostPort +
-                                  " - " + cause.getMessage(), t)
-          }
-        }
-        throw new Exception("Could not create JobManager actor system", t)
-    }
+    val jobManagerSystem = BootstrapTools.startActorSystem(
+      configuration,
+      externalHostname,
+      port,
+      LOG.logger)
 
     val address = AkkaUtils.getAddress(jobManagerSystem)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 1219fc9..cc01a8d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -35,8 +35,9 @@ import org.apache.flink.configuration._
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, 
QuarantineMonitor}
-import org.apache.flink.runtime.blob.{BlobClient, BlobService, 
BlobCacheService}
+import org.apache.flink.runtime.blob.{BlobCacheService, BlobClient, 
BlobService}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
+import org.apache.flink.runtime.clusterframework.BootstrapTools
 import org.apache.flink.runtime.clusterframework.messages.StopCluster
 import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.concurrent.{Executors, FutureUtils}
@@ -1825,27 +1826,11 @@ object TaskManager {
 
     LOG.info(s"Starting TaskManager actor system at 
$taskManagerHostname:$actorSystemPort.")
 
-    val taskManagerSystem = try {
-      val akkaConfig = AkkaUtils.getAkkaConfig(
-        configuration,
-        Some((taskManagerHostname, actorSystemPort))
-      )
-      if (LOG.isDebugEnabled) {
-        LOG.debug("Using akka configuration\n " + akkaConfig)
-      }
-      AkkaUtils.createActorSystem(akkaConfig)
-    }
-    catch {
-      case t: Throwable =>
-        if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) {
-          val cause = t.getCause()
-          if (cause != null && 
t.getCause().isInstanceOf[java.net.BindException]) {
-            throw new IOException("Unable to bind TaskManager actor system to 
address " +
-              taskManagerHostname + ':' + actorSystemPort + " - " + 
cause.getMessage(), t)
-          }
-        }
-        throw new Exception("Could not create TaskManager actor system", t)
-    }
+    val taskManagerSystem = BootstrapTools.startActorSystem(
+      configuration,
+      taskManagerHostname,
+      actorSystemPort,
+      LOG.logger)
 
     // start all the TaskManager services (network stack,  library cache, ...)
     // and the TaskManager actor

http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index 79dd207..67accdb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServic
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.StartupUtils;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -50,7 +52,7 @@ import java.util.UUID;
  * Tests that check how the TaskManager behaves when encountering startup
  * problems.
  */
-public class TaskManagerStartupTest {
+public class TaskManagerStartupTest extends TestLogger {
 
        private HighAvailabilityServices highAvailabilityServices;
 
@@ -96,7 +98,7 @@ public class TaskManagerStartupTest {
                        fail("This should fail with an IOException");
 
                }
-               catch (IOException e) {
+               catch (Exception e) {
                        // expected. validate the error message
                        List<Throwable> causes = 
StartupUtils.getExceptionCauses(e, new ArrayList<Throwable>());
                        for (Throwable cause : causes) {
@@ -104,12 +106,9 @@ public class TaskManagerStartupTest {
                                        throw (BindException) cause;
                                }
                        }
+
                        fail("This should fail with an exception caused by 
BindException");
                }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
                finally {
                        if (blocker != null) {
                                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index 2404fb9..de8c26c 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -20,14 +20,14 @@ package org.apache.flink.runtime.akka
 
 import java.net.InetSocketAddress
 
+import org.apache.flink.configuration.Configuration
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
-import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.AkkaProtocol
 import org.apache.flink.util.NetUtils
 import org.junit.runner.RunWith
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
 
 @RunWith(classOf[JUnitRunner])
 class AkkaUtilsTest
@@ -136,4 +136,15 @@ class AkkaUtilsTest
 
     result should equal(address)
   }
+
+  test("getAkkaConfig should normalize the hostname") {
+    val configuration = new Configuration()
+    val hostname = "AbC123foOBaR"
+    val port = 1234
+
+    val akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port)
+
+    akkaConfig.getString("akka.remote.netty.tcp.hostname") should
+      equal(NetUtils.unresolvedHostToNormalizedString(hostname))
+  }
 }

Reply via email to