Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e12ab57a1 -> 9b5dc5c48


[SPARK-11830][CORE] Make NettyRpcEnv bind to the specified host

This PR includes the following change:

1. Bind NettyRpcEnv to the specified host
2. Fix the port information in the log for NettyRpcEnv.
3. Fix the service name of NettyRpcEnv.

Author: zsxwing <zsxw...@gmail.com>
Author: Shixiong Zhu <shixi...@databricks.com>

Closes #9821 from zsxwing/SPARK-11830.

(cherry picked from commit 72d150c271d2b206148fd0917a0def263445121b)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b5dc5c4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b5dc5c4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b5dc5c4

Branch: refs/heads/branch-1.6
Commit: 9b5dc5c4864e0e58b15f9215a8993fcfcbfb222d
Parents: e12ab57
Author: zsxwing <zsxw...@gmail.com>
Authored: Thu Nov 19 11:57:50 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Nov 19 11:58:03 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkEnv.scala   |  9 ++++++++-
 .../org/apache/spark/rpc/netty/NettyRpcEnv.scala      |  7 +++----
 .../org/apache/spark/network/TransportContext.java    |  8 +++++++-
 .../apache/spark/network/server/TransportServer.java  | 14 ++++++++++----
 4 files changed, 28 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9b5dc5c4/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 4474a83..88df27f 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -258,8 +258,15 @@ object SparkEnv extends Logging {
       if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
         rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
       } else {
+        val actorSystemPort = if (port == 0) 0 else rpcEnv.address.port + 1
         // Create a ActorSystem for legacy codes
-        AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, 
securityManager)._1
+        AkkaUtils.createActorSystem(
+          actorSystemName + "ActorSystem",
+          hostname,
+          actorSystemPort,
+          conf,
+          securityManager
+        )._1
       }
 
     // Figure out which port Akka actually bound to in case the original port 
is 0 or occupied.

http://git-wip-us.apache.org/repos/asf/spark/blob/9b5dc5c4/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 3e0c497..3ce3598 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -102,7 +102,7 @@ private[netty] class NettyRpcEnv(
       } else {
         java.util.Collections.emptyList()
       }
-    server = transportContext.createServer(port, bootstraps)
+    server = transportContext.createServer(host, port, bootstraps)
     dispatcher.registerRpcEndpoint(
       RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
   }
@@ -337,10 +337,10 @@ private[netty] class NettyRpcEnvFactory extends 
RpcEnvFactory with Logging {
     if (!config.clientMode) {
       val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
         nettyEnv.startServer(actualPort)
-        (nettyEnv, actualPort)
+        (nettyEnv, nettyEnv.address.port)
       }
       try {
-        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, 
"NettyRpcEnv")._1
+        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, 
config.name)._1
       } catch {
         case NonFatal(e) =>
           nettyEnv.shutdown()
@@ -370,7 +370,6 @@ private[netty] class NettyRpcEnvFactory extends 
RpcEnvFactory with Logging {
  * @param conf Spark configuration.
  * @param endpointAddress The address where the endpoint is listening.
  * @param nettyEnv The RpcEnv associated with this ref.
- * @param local Whether the referenced endpoint lives in the same process.
  */
 private[netty] class NettyRpcEndpointRef(
     @transient private val conf: SparkConf,

http://git-wip-us.apache.org/repos/asf/spark/blob/9b5dc5c4/network/common/src/main/java/org/apache/spark/network/TransportContext.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/TransportContext.java 
b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
index 1b64b86..238710d 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/TransportContext.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -94,7 +94,13 @@ public class TransportContext {
 
   /** Create a server which will attempt to bind to a specific port. */
   public TransportServer createServer(int port, List<TransportServerBootstrap> 
bootstraps) {
-    return new TransportServer(this, port, rpcHandler, bootstraps);
+    return new TransportServer(this, null, port, rpcHandler, bootstraps);
+  }
+
+  /** Create a server which will attempt to bind to a specific host and port. 
*/
+  public TransportServer createServer(
+      String host, int port, List<TransportServerBootstrap> bootstraps) {
+    return new TransportServer(this, host, port, rpcHandler, bootstraps);
   }
 
   /** Creates a new server, binding to any available ephemeral port. */

http://git-wip-us.apache.org/repos/asf/spark/blob/9b5dc5c4/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
 
b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
index f4fadb1..baae235 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java
@@ -55,9 +55,13 @@ public class TransportServer implements Closeable {
   private ChannelFuture channelFuture;
   private int port = -1;
 
-  /** Creates a TransportServer that binds to the given port, or to any 
available if 0. */
+  /**
+   * Creates a TransportServer that binds to the given host and the given 
port, or to any available
+   * if 0. If you don't want to bind to any special host, set "hostToBind" to 
null.
+   * */
   public TransportServer(
       TransportContext context,
+      String hostToBind,
       int portToBind,
       RpcHandler appRpcHandler,
       List<TransportServerBootstrap> bootstraps) {
@@ -67,7 +71,7 @@ public class TransportServer implements Closeable {
     this.bootstraps = 
Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
 
     try {
-      init(portToBind);
+      init(hostToBind, portToBind);
     } catch (RuntimeException e) {
       JavaUtils.closeQuietly(this);
       throw e;
@@ -81,7 +85,7 @@ public class TransportServer implements Closeable {
     return port;
   }
 
-  private void init(int portToBind) {
+  private void init(String hostToBind, int portToBind) {
 
     IOMode ioMode = IOMode.valueOf(conf.ioMode());
     EventLoopGroup bossGroup =
@@ -120,7 +124,9 @@ public class TransportServer implements Closeable {
       }
     });
 
-    channelFuture = bootstrap.bind(new InetSocketAddress(portToBind));
+    InetSocketAddress address = hostToBind == null ?
+        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, 
portToBind);
+    channelFuture = bootstrap.bind(address);
     channelFuture.syncUninterruptibly();
 
     port = ((InetSocketAddress) 
channelFuture.channel().localAddress()).getPort();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to