Repository: flink Updated Branches: refs/heads/master 56017a98f -> 3039df809
[FLINK-7540] Apply consistent hostname normalization The hostname normalization is now applied when generationg the remote akka config. That way it should be ensured that all ActorSystems are bound to a normalized hostname. This closes #4812. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3039df80 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3039df80 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3039df80 Branch: refs/heads/master Commit: 3039df8099bc8d42b645d69deb360c884c94b83b Parents: 56017a9 Author: Till Rohrmann <[email protected]> Authored: Thu Oct 12 01:17:23 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Oct 26 09:49:05 2017 +0200 ---------------------------------------------------------------------- .../flink/client/program/ClusterClient.java | 8 ++-- .../apache/flink/runtime/client/JobClient.java | 46 ++++++++------------ .../clusterframework/BootstrapTools.java | 12 +++-- .../runtime/rpc/akka/AkkaRpcServiceUtils.java | 2 +- .../apache/flink/runtime/akka/AkkaUtils.scala | 15 ++++--- .../flink/runtime/jobmanager/JobManager.scala | 32 +++----------- .../flink/runtime/taskmanager/TaskManager.scala | 29 +++--------- .../taskmanager/TaskManagerStartupTest.java | 11 +++-- .../flink/runtime/akka/AkkaUtilsTest.scala | 15 ++++++- 9 files changed, 72 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index 06fb42a..62efcfa 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.client.JobClient; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobListeningContext; import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.concurrent.FutureUtils; @@ -83,7 +84,6 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import scala.Option; -import scala.Tuple2; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -239,9 +239,11 @@ public abstract class ClusterClient { } try { - actorSystem = AkkaUtils.createActorSystem( + actorSystem = BootstrapTools.startActorSystem( configuration, - Option.apply(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(), 0))); + ownHostname.getCanonicalHostName(), + 0, + log); } catch (Exception e) { throw new FlinkException("Could not start the ActorSystem lazily.", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java index ae6576e..82c40e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java @@ -18,14 +18,6 @@ package org.apache.flink.runtime.client; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.actor.Address; -import akka.actor.Identify; -import akka.actor.PoisonPill; -import akka.actor.Props; -import akka.pattern.Patterns; -import akka.util.Timeout; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; @@ -34,6 +26,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.blob.PermanentBlobCache; import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -41,21 +34,20 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.JobClientMessages; import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.NetUtils; +import org.apache.flink.util.SerializedThrowable; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Identify; +import akka.actor.PoisonPill; +import akka.actor.Props; +import akka.pattern.Patterns; +import akka.util.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Option; -import scala.Some; -import scala.Tuple2; -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; -import scala.concurrent.duration.FiniteDuration; import java.io.IOException; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URL; import java.util.Collection; @@ -64,6 +56,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -75,20 +72,11 @@ public class JobClient { private static final Logger LOG = LoggerFactory.getLogger(JobClient.class); public static ActorSystem startJobClientActorSystem(Configuration config, String hostname) - throws IOException { + throws Exception { LOG.info("Starting JobClient actor system"); - Option<Tuple2<String, Object>> remoting = new Some<>(new Tuple2<String, Object>(hostname, 0)); - // start a remote actor system to listen on an arbitrary port - ActorSystem system = AkkaUtils.createActorSystem(config, remoting); - Address address = system.provider().getDefaultAddress(); - - String hostAddress = address.host().isDefined() ? - NetUtils.ipAddressToUrlString(InetAddress.getByName(address.host().get())) : - "(unknown)"; - int port = address.port().isDefined() ? ((Integer) address.port().get()) : -1; - LOG.info("Started JobClient actor system at " + hostAddress + ':' + port); + ActorSystem system = BootstrapTools.startActorSystem(config, hostname, 0, LOG); return system; } http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 0a56963..5849c21 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -34,6 +34,8 @@ import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.NetUtils; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException; + import akka.actor.ActorSystem; import com.typesafe.config.Config; import org.apache.commons.cli.CommandLine; @@ -53,6 +55,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import scala.Some; +import scala.Tuple2; import scala.concurrent.duration.FiniteDuration; /** @@ -140,13 +144,13 @@ public class BootstrapTools { int listeningPort, Logger logger) throws Exception { - String hostPortUrl = listeningAddress + ':' + listeningPort; + String hostPortUrl = NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, listeningPort); logger.info("Trying to start actor system at {}", hostPortUrl); try { Config akkaConfig = AkkaUtils.getAkkaConfig( configuration, - new scala.Some<>(new scala.Tuple2<String, Object>(listeningAddress, listeningPort)) + new Some<>(new Tuple2<>(listeningAddress, listeningPort)) ); logger.debug("Using akka configuration\n {}", akkaConfig); @@ -157,9 +161,9 @@ public class BootstrapTools { return actorSystem; } catch (Throwable t) { - if (t instanceof org.jboss.netty.channel.ChannelException) { + if (t instanceof ChannelException) { Throwable cause = t.getCause(); - if (cause != null && t.getCause() instanceof java.net.BindException) { + if (cause != null && t.getCause() instanceof BindException) { throw new IOException("Unable to create ActorSystem at address " + hostPortUrl + " : " + cause.getMessage(), t); } http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java index dbf61a2..20e18ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceUtils.java @@ -73,7 +73,7 @@ public class AkkaRpcServiceUtils { * @throws Exception Thrown is some other error occurs while creating akka actor system */ public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception { - LOG.info("Starting AkkaRpcService at {}.", NetUtils.hostAndPortToUrlString(hostname, port)); + LOG.info("Starting AkkaRpcService at {}.", NetUtils.unresolvedHostAndPortToNormalizedString(hostname, port)); final ActorSystem actorSystem; http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 05ffc87..565b5ea 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -268,9 +268,14 @@ object AkkaUtils { * @param externalPort The port to expect for Akka messages * @return Flink's Akka configuration for remote actor systems */ - private def getRemoteAkkaConfig(configuration: Configuration, - bindAddress: String, port: Int, - externalHostname: String, externalPort: Int): Config = { + private def getRemoteAkkaConfig( + configuration: Configuration, + bindAddress: String, + port: Int, + externalHostname: String, + externalPort: Int): Config = { + + val normalizedExternalHostname = NetUtils.unresolvedHostToNormalizedString(externalHostname) val akkaAskTimeout = Duration(configuration.getString(AkkaOptions.ASK_TIMEOUT)) @@ -360,8 +365,8 @@ object AkkaUtils { """.stripMargin val effectiveHostname = - if (externalHostname != null && externalHostname.nonEmpty) { - externalHostname + if (normalizedExternalHostname != null && normalizedExternalHostname.nonEmpty) { + normalizedExternalHostname } else { // if bindAddress is null or empty, then leave bindAddress unspecified. Akka will pick // InetAddress.getLocalHost.getHostAddress http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 3bb7faa..7ac965a 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -42,7 +42,7 @@ import org.apache.flink.runtime.blob.{BlobServer, BlobStore} import org.apache.flink.runtime.checkpoint._ import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore import org.apache.flink.runtime.client._ -import org.apache.flink.runtime.clusterframework.FlinkResourceManager +import org.apache.flink.runtime.clusterframework.{BootstrapTools, FlinkResourceManager} import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID @@ -2167,32 +2167,12 @@ object JobManager { externalHostname: String, port: Int): ActorSystem = { - val hostPort = NetUtils.unresolvedHostAndPortToNormalizedString(externalHostname, port) - // Bring up the job manager actor system first, bind it to the given address. - LOG.info(s"Starting JobManager actor system reachable at $hostPort") - - val jobManagerSystem = try { - val akkaConfig = AkkaUtils.getAkkaConfig( - configuration, - Some((externalHostname, port)) - ) - if (LOG.isDebugEnabled) { - LOG.debug("Using akka configuration\n " + akkaConfig) - } - AkkaUtils.createActorSystem(akkaConfig) - } - catch { - case t: Throwable => - if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) { - val cause = t.getCause() - if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) { - throw new Exception("Unable to create JobManager at address " + hostPort + - " - " + cause.getMessage(), t) - } - } - throw new Exception("Could not create JobManager actor system", t) - } + val jobManagerSystem = BootstrapTools.startActorSystem( + configuration, + externalHostname, + port, + LOG.logger) val address = AkkaUtils.getAddress(jobManagerSystem) http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 1219fc9..cc01a8d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -35,8 +35,9 @@ import org.apache.flink.configuration._ import org.apache.flink.core.fs.FileSystem import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, QuarantineMonitor} -import org.apache.flink.runtime.blob.{BlobClient, BlobService, BlobCacheService} +import org.apache.flink.runtime.blob.{BlobCacheService, BlobClient, BlobService} import org.apache.flink.runtime.broadcast.BroadcastVariableManager +import org.apache.flink.runtime.clusterframework.BootstrapTools import org.apache.flink.runtime.clusterframework.messages.StopCluster import org.apache.flink.runtime.clusterframework.types.ResourceID import org.apache.flink.runtime.concurrent.{Executors, FutureUtils} @@ -1825,27 +1826,11 @@ object TaskManager { LOG.info(s"Starting TaskManager actor system at $taskManagerHostname:$actorSystemPort.") - val taskManagerSystem = try { - val akkaConfig = AkkaUtils.getAkkaConfig( - configuration, - Some((taskManagerHostname, actorSystemPort)) - ) - if (LOG.isDebugEnabled) { - LOG.debug("Using akka configuration\n " + akkaConfig) - } - AkkaUtils.createActorSystem(akkaConfig) - } - catch { - case t: Throwable => - if (t.isInstanceOf[org.jboss.netty.channel.ChannelException]) { - val cause = t.getCause() - if (cause != null && t.getCause().isInstanceOf[java.net.BindException]) { - throw new IOException("Unable to bind TaskManager actor system to address " + - taskManagerHostname + ':' + actorSystemPort + " - " + cause.getMessage(), t) - } - } - throw new Exception("Could not create TaskManager actor system", t) - } + val taskManagerSystem = BootstrapTools.startActorSystem( + configuration, + taskManagerHostname, + actorSystemPort, + LOG.logger) // start all the TaskManager services (network stack, library cache, ...) // and the TaskManager actor http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java index 79dd207..67accdb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java @@ -32,6 +32,8 @@ import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServic import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.StartupUtils; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.TestLogger; + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -50,7 +52,7 @@ import java.util.UUID; * Tests that check how the TaskManager behaves when encountering startup * problems. */ -public class TaskManagerStartupTest { +public class TaskManagerStartupTest extends TestLogger { private HighAvailabilityServices highAvailabilityServices; @@ -96,7 +98,7 @@ public class TaskManagerStartupTest { fail("This should fail with an IOException"); } - catch (IOException e) { + catch (Exception e) { // expected. validate the error message List<Throwable> causes = StartupUtils.getExceptionCauses(e, new ArrayList<Throwable>()); for (Throwable cause : causes) { @@ -104,12 +106,9 @@ public class TaskManagerStartupTest { throw (BindException) cause; } } + fail("This should fail with an exception caused by BindException"); } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } finally { if (blocker != null) { try { http://git-wip-us.apache.org/repos/asf/flink/blob/3039df80/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala index 2404fb9..de8c26c 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala @@ -20,14 +20,14 @@ package org.apache.flink.runtime.akka import java.net.InetSocketAddress +import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution -import org.apache.flink.runtime.jobmanager.JobManager import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.AkkaProtocol import org.apache.flink.util.NetUtils import org.junit.runner.RunWith -import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} @RunWith(classOf[JUnitRunner]) class AkkaUtilsTest @@ -136,4 +136,15 @@ class AkkaUtilsTest result should equal(address) } + + test("getAkkaConfig should normalize the hostname") { + val configuration = new Configuration() + val hostname = "AbC123foOBaR" + val port = 1234 + + val akkaConfig = AkkaUtils.getAkkaConfig(configuration, hostname, port) + + akkaConfig.getString("akka.remote.netty.tcp.hostname") should + equal(NetUtils.unresolvedHostToNormalizedString(hostname)) + } }
