Repository: spark
Updated Branches:
  refs/heads/master b4a4421b6 -> 2cd1bfa4f


[SPARK-4563][CORE] Allow driver to advertise a different network address.

The goal of this feature is to allow the Spark driver to run in an
isolated environment, such as a docker container, and be able to use
the host's port forwarding mechanism to be able to accept connections
from the outside world.

The change is restricted to the driver: there is no support for achieving
the same thing on executors (or the YARN AM for that matter). Those still
need full access to the outside world so that, for example, connections
can be made to an executor's block manager.

The core of the change is simple: add a new configuration that tells what's
the address the driver should bind to, which can be different than the address
it advertises to executors (spark.driver.host). Everything else is plumbing
the new configuration where it's needed.

To use the feature, the host starting the container needs to set up the
driver's port range to fall into a range that is being forwarded; this
required the block manager port to need a special configuration just for
the driver, which falls back to the existing spark.blockManager.port when
not set. This way, users can modify the driver settings without affecting
the executors; it would theoretically be nice to also have different
retry counts for driver and executors, but given that docker (at least)
allows forwarding port ranges, we can probably live without that for now.

Because of the nature of the feature it's kinda hard to add unit tests;
I just added a simple one to make sure the configuration works.

This was tested with a docker image running spark-shell with the following
command:

 docker blah blah blah \
   -p 38000-38100:38000-38100 \
   [image] \
   spark-shell \
     --num-executors 3 \
     --conf spark.shuffle.service.enabled=false \
     --conf spark.dynamicAllocation.enabled=false \
     --conf spark.driver.host=[host's address] \
     --conf spark.driver.port=38000 \
     --conf spark.driver.blockManager.port=38020 \
     --conf spark.ui.port=38040

Running on YARN; verified the driver works, executors start up and listen
on ephemeral ports (instead of using the driver's config), and that caching
and shuffling (without the shuffle service) works. Clicked through the UI
to make sure all pages (including executor thread dumps) worked. Also tested
apps without docker, and ran unit tests.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #15120 from vanzin/SPARK-4563.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2cd1bfa4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2cd1bfa4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2cd1bfa4

Branch: refs/heads/master
Commit: 2cd1bfa4f0c6625b0ab1dbeba2b9586b9a6a9f42
Parents: b4a4421
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Wed Sep 21 14:42:41 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Wed Sep 21 14:42:41 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |  2 ++
 .../scala/org/apache/spark/SparkContext.scala   |  5 ++--
 .../main/scala/org/apache/spark/SparkEnv.scala  | 27 +++++++++++++++-----
 .../spark/internal/config/ConfigProvider.scala  |  2 +-
 .../apache/spark/internal/config/package.scala  | 20 +++++++++++++++
 .../netty/NettyBlockTransferService.scala       |  7 ++---
 .../scala/org/apache/spark/rpc/RpcEnv.scala     | 17 ++++++++++--
 .../apache/spark/rpc/netty/NettyRpcEnv.scala    |  9 ++++---
 .../main/scala/org/apache/spark/ui/WebUI.scala  |  5 ++--
 .../scala/org/apache/spark/util/Utils.scala     |  6 ++---
 .../netty/NettyBlockTransferSecuritySuite.scala |  6 +++--
 .../netty/NettyBlockTransferServiceSuite.scala  |  5 ++--
 .../spark/rpc/netty/NettyRpcEnvSuite.scala      | 16 ++++++++++--
 .../storage/BlockManagerReplicationSuite.scala  |  2 +-
 .../spark/storage/BlockManagerSuite.scala       |  4 +--
 docs/configuration.md                           | 23 ++++++++++++++++-
 .../cluster/mesos/MesosSchedulerUtils.scala     |  3 ++-
 ...esosCoarseGrainedSchedulerBackendSuite.scala |  5 ++--
 .../mesos/MesosSchedulerUtilsSuite.scala        |  3 ++-
 .../spark/streaming/CheckpointSuite.scala       |  4 ++-
 .../streaming/ReceivedBlockHandlerSuite.scala   |  2 +-
 21 files changed, 133 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index e85e5aa..51a699f4 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -422,6 +422,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
       configsWithAlternatives.get(key).toSeq.flatten.exists { alt => 
contains(alt.key) }
   }
 
+  private[spark] def contains(entry: ConfigEntry[_]): Boolean = 
contains(entry.key)
+
   /** Copy this object */
   override def clone: SparkConf = {
     val cloned = new SparkConf(false)

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 35b6334..db84172 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -383,8 +383,9 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
       logInfo("Spark configuration:\n" + _conf.toDebugString)
     }
 
-    // Set Spark driver host and port system properties
-    _conf.setIfMissing("spark.driver.host", Utils.localHostName())
+    // Set Spark driver host and port system properties. This explicitly sets 
the configuration
+    // instead of relying on the default value of the config constant.
+    _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
     _conf.setIfMissing("spark.driver.port", "0")
 
     _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index cc8e3fd..1ffeb12 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -29,6 +29,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.python.PythonWorkerFactory
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, 
UnifiedMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.network.netty.NettyBlockTransferService
@@ -158,14 +159,17 @@ object SparkEnv extends Logging {
       listenerBus: LiveListenerBus,
       numCores: Int,
       mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): 
SparkEnv = {
-    assert(conf.contains("spark.driver.host"), "spark.driver.host is not set 
on the driver!")
+    assert(conf.contains(DRIVER_HOST_ADDRESS),
+      s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
     assert(conf.contains("spark.driver.port"), "spark.driver.port is not set 
on the driver!")
-    val hostname = conf.get("spark.driver.host")
+    val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
+    val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
     val port = conf.get("spark.driver.port").toInt
     create(
       conf,
       SparkContext.DRIVER_IDENTIFIER,
-      hostname,
+      bindAddress,
+      advertiseAddress,
       port,
       isDriver = true,
       isLocal = isLocal,
@@ -190,6 +194,7 @@ object SparkEnv extends Logging {
       conf,
       executorId,
       hostname,
+      hostname,
       port,
       isDriver = false,
       isLocal = isLocal,
@@ -205,7 +210,8 @@ object SparkEnv extends Logging {
   private def create(
       conf: SparkConf,
       executorId: String,
-      hostname: String,
+      bindAddress: String,
+      advertiseAddress: String,
       port: Int,
       isDriver: Boolean,
       isLocal: Boolean,
@@ -221,8 +227,8 @@ object SparkEnv extends Logging {
     val securityManager = new SecurityManager(conf)
 
     val systemName = if (isDriver) driverSystemName else executorSystemName
-    val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, 
securityManager,
-      clientMode = !isDriver)
+    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, 
port, conf,
+      securityManager, clientMode = !isDriver)
 
     // Figure out which port RpcEnv actually bound to in case the original 
port is 0 or occupied.
     // In the non-driver case, the RPC env's address may be null since it may 
not be listening
@@ -309,8 +315,15 @@ object SparkEnv extends Logging {
         UnifiedMemoryManager(conf, numUsableCores)
       }
 
+    val blockManagerPort = if (isDriver) {
+      conf.get(DRIVER_BLOCK_MANAGER_PORT)
+    } else {
+      conf.get(BLOCK_MANAGER_PORT)
+    }
+
     val blockTransferService =
-      new NettyBlockTransferService(conf, securityManager, hostname, 
numUsableCores)
+      new NettyBlockTransferService(conf, securityManager, bindAddress, 
advertiseAddress,
+        blockManagerPort, numUsableCores)
 
     val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
       BlockManagerMaster.DRIVER_ENDPOINT_NAME,

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala 
b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
index 4b546c8..97f56a6 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/ConfigProvider.scala
@@ -66,7 +66,7 @@ private[spark] class SparkConfigProvider(conf: JMap[String, 
String]) extends Con
     findEntry(key) match {
       case e: ConfigEntryWithDefault[_] => Option(e.defaultValueString)
       case e: ConfigEntryWithDefaultString[_] => Option(e.defaultValueString)
-      case e: FallbackConfigEntry[_] => defaultValueString(e.fallback.key)
+      case e: FallbackConfigEntry[_] => get(e.fallback.key)
       case _ => None
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 02d7d18..d536cc5 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -19,6 +19,7 @@ package org.apache.spark.internal
 
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.util.Utils
 
 package object config {
 
@@ -143,4 +144,23 @@ package object config {
       .internal()
       .stringConf
       .createWithDefaultString("AES/CTR/NoPadding")
+
+  private[spark] val DRIVER_HOST_ADDRESS = ConfigBuilder("spark.driver.host")
+    .doc("Address of driver endpoints.")
+    .stringConf
+    .createWithDefault(Utils.localHostName())
+
+  private[spark] val DRIVER_BIND_ADDRESS = 
ConfigBuilder("spark.driver.bindAddress")
+    .doc("Address where to bind network listen sockets on the driver.")
+    .fallbackConf(DRIVER_HOST_ADDRESS)
+
+  private[spark] val BLOCK_MANAGER_PORT = 
ConfigBuilder("spark.blockManager.port")
+    .doc("Port to use for the block manager when a more specific setting is 
not provided.")
+    .intConf
+    .createWithDefault(0)
+
+  private[spark] val DRIVER_BLOCK_MANAGER_PORT = 
ConfigBuilder("spark.driver.blockManager.port")
+    .doc("Port to use for the block managed on the driver.")
+    .fallbackConf(BLOCK_MANAGER_PORT)
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 33a3219..dc70eb8 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -42,7 +42,9 @@ import org.apache.spark.util.Utils
 private[spark] class NettyBlockTransferService(
     conf: SparkConf,
     securityManager: SecurityManager,
+    bindAddress: String,
     override val hostName: String,
+    _port: Int,
     numCores: Int)
   extends BlockTransferService {
 
@@ -75,12 +77,11 @@ private[spark] class NettyBlockTransferService(
   /** Creates and binds the TransportServer, possibly trying multiple ports. */
   private def createServer(bootstraps: List[TransportServerBootstrap]): 
TransportServer = {
     def startService(port: Int): (TransportServer, Int) = {
-      val server = transportContext.createServer(hostName, port, 
bootstraps.asJava)
+      val server = transportContext.createServer(bindAddress, port, 
bootstraps.asJava)
       (server, server.getPort)
     }
 
-    val portToTry = conf.getInt("spark.blockManager.port", 0)
-    Utils.startServiceOnPort(portToTry, startService, conf, 
getClass.getName)._1
+    Utils.startServiceOnPort(_port, startService, conf, getClass.getName)._1
   }
 
   override def fetchBlocks(

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 5668377..5791228 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -40,7 +40,19 @@ private[spark] object RpcEnv {
       conf: SparkConf,
       securityManager: SecurityManager,
       clientMode: Boolean = false): RpcEnv = {
-    val config = RpcEnvConfig(conf, name, host, port, securityManager, 
clientMode)
+    create(name, host, host, port, conf, securityManager, clientMode)
+  }
+
+  def create(
+      name: String,
+      bindAddress: String,
+      advertiseAddress: String,
+      port: Int,
+      conf: SparkConf,
+      securityManager: SecurityManager,
+      clientMode: Boolean): RpcEnv = {
+    val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, 
securityManager,
+      clientMode)
     new NettyRpcEnvFactory().create(config)
   }
 }
@@ -186,7 +198,8 @@ private[spark] trait RpcEnvFileServer {
 private[spark] case class RpcEnvConfig(
     conf: SparkConf,
     name: String,
-    host: String,
+    bindAddress: String,
+    advertiseAddress: String,
     port: Int,
     securityManager: SecurityManager,
     clientMode: Boolean)

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 89d2fb9..e51649a 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -108,14 +108,14 @@ private[netty] class NettyRpcEnv(
     }
   }
 
-  def startServer(port: Int): Unit = {
+  def startServer(bindAddress: String, port: Int): Unit = {
     val bootstraps: java.util.List[TransportServerBootstrap] =
       if (securityManager.isAuthenticationEnabled()) {
         java.util.Arrays.asList(new SaslServerBootstrap(transportConf, 
securityManager))
       } else {
         java.util.Collections.emptyList()
       }
-    server = transportContext.createServer(host, port, bootstraps)
+    server = transportContext.createServer(bindAddress, port, bootstraps)
     dispatcher.registerRpcEndpoint(
       RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
   }
@@ -441,10 +441,11 @@ private[rpc] class NettyRpcEnvFactory extends 
RpcEnvFactory with Logging {
     val javaSerializerInstance =
       new 
JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
     val nettyEnv =
-      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, 
config.securityManager)
+      new NettyRpcEnv(sparkConf, javaSerializerInstance, 
config.advertiseAddress,
+        config.securityManager)
     if (!config.clientMode) {
       val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
-        nettyEnv.startServer(actualPort)
+        nettyEnv.startServer(config.bindAddress, actualPort)
         (nettyEnv, nettyEnv.address.port)
       }
       try {

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/main/scala/org/apache/spark/ui/WebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala 
b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index 3836380..4118fcf 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -28,6 +28,7 @@ import org.json4s.JsonAST.{JNothing, JValue}
 
 import org.apache.spark.{SecurityManager, SparkConf, SSLOptions}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.util.Utils
 
@@ -50,8 +51,8 @@ private[spark] abstract class WebUI(
   protected val handlers = ArrayBuffer[ServletContextHandler]()
   protected val pageToHandlers = new HashMap[WebUIPage, 
ArrayBuffer[ServletContextHandler]]
   protected var serverInfo: Option[ServerInfo] = None
-  protected val localHostName = Utils.localHostNameForURI()
-  protected val publicHostName = 
Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
+  protected val publicHostName = 
Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(
+    conf.get(DRIVER_HOST_ADDRESS))
   private val className = Utils.getFormattedClassName(this)
 
   def getBasePath: String = basePath

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 9b4274a..09896c4 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2079,9 +2079,9 @@ private[spark] object Utils extends Logging {
         case e: Exception if isBindCollision(e) =>
           if (offset >= maxRetries) {
             val exceptionMessage = s"${e.getMessage}: Service$serviceString 
failed after " +
-              s"$maxRetries retries! Consider explicitly setting the 
appropriate port for the " +
-              s"service$serviceString (for example spark.ui.port for SparkUI) 
to an available " +
-              "port or increasing spark.port.maxRetries."
+              s"$maxRetries retries (starting from $startPort)! Consider 
explicitly setting " +
+              s"the appropriate port for the service$serviceString (for 
example spark.ui.port " +
+              s"for SparkUI) to an available port or increasing 
spark.port.maxRetries."
             val exception = new BindException(exceptionMessage)
             // restore original stack trace
             exception.setStackTrace(e.getStackTrace)

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index ed15e77..022fe91 100644
--- 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -108,11 +108,13 @@ class NettyBlockTransferSecuritySuite extends 
SparkFunSuite with MockitoSugar wi
     when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer)
 
     val securityManager0 = new SecurityManager(conf0)
-    val exec0 = new NettyBlockTransferService(conf0, securityManager0, 
"localhost", numCores = 1)
+    val exec0 = new NettyBlockTransferService(conf0, securityManager0, 
"localhost", "localhost", 0,
+      1)
     exec0.init(blockManager)
 
     val securityManager1 = new SecurityManager(conf1)
-    val exec1 = new NettyBlockTransferService(conf1, securityManager1, 
"localhost", numCores = 1)
+    val exec1 = new NettyBlockTransferService(conf1, securityManager1, 
"localhost", "localhost", 0,
+      1)
     exec1.init(blockManager)
 
     val result = fetchBlock(exec0, exec1, "1", blockId) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
index e7df7cb..121447a 100644
--- 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
@@ -23,6 +23,7 @@ import org.mockito.Mockito.mock
 import org.scalatest._
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
 import org.apache.spark.network.BlockDataManager
 
 class NettyBlockTransferServiceSuite
@@ -86,10 +87,10 @@ class NettyBlockTransferServiceSuite
   private def createService(port: Int): NettyBlockTransferService = {
     val conf = new SparkConf()
       .set("spark.app.id", s"test-${getClass.getName}")
-      .set("spark.blockManager.port", port.toString)
     val securityManager = new SecurityManager(conf)
     val blockDataManager = mock(classOf[BlockDataManager])
-    val service = new NettyBlockTransferService(conf, securityManager, 
"localhost", numCores = 1)
+    val service = new NettyBlockTransferService(conf, securityManager, 
"localhost", "localhost",
+      port, 1)
     service.init(blockDataManager)
     service
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala 
b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
index 2d6543d..0409aa3 100644
--- a/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/netty/NettyRpcEnvSuite.scala
@@ -27,8 +27,8 @@ class NettyRpcEnvSuite extends RpcEnvSuite {
       name: String,
       port: Int,
       clientMode: Boolean = false): RpcEnv = {
-    val config = RpcEnvConfig(conf, "test", "localhost", port, new 
SecurityManager(conf),
-      clientMode)
+    val config = RpcEnvConfig(conf, "test", "localhost", "localhost", port,
+      new SecurityManager(conf), clientMode)
     new NettyRpcEnvFactory().create(config)
   }
 
@@ -41,4 +41,16 @@ class NettyRpcEnvSuite extends RpcEnvSuite {
     assert(e.getCause.getMessage.contains(uri))
   }
 
+  test("advertise address different from bind address") {
+    val sparkConf = new SparkConf()
+    val config = RpcEnvConfig(sparkConf, "test", "localhost", "example.com", 0,
+      new SecurityManager(sparkConf), false)
+    val env = new NettyRpcEnvFactory().create(config)
+    try {
+      assert(env.address.hostPort.startsWith("example.com:"))
+    } finally {
+      env.shutdown()
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index b9e3a36..e1c1787 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -67,7 +67,7 @@ class BlockManagerReplicationSuite extends SparkFunSuite
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     conf.set("spark.testing.memory", maxMem.toString)
     conf.set("spark.memory.offHeap.size", maxMem.toString)
-    val transfer = new NettyBlockTransferService(conf, securityMgr, 
"localhost", numCores = 1)
+    val transfer = new NettyBlockTransferService(conf, securityMgr, 
"localhost", "localhost", 0, 1)
     val memManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(serializer, conf)
     val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 6d53d2e..1652fcd 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -80,7 +80,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
     conf.set("spark.memory.offHeap.size", maxMem.toString)
     val serializer = new KryoSerializer(conf)
     val transfer = transferService
-      .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", 
numCores = 1))
+      .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", 
"localhost", 0, 1))
     val memManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(serializer, conf)
     val blockManager = new BlockManager(name, rpcEnv, master, 
serializerManager, conf,
@@ -854,7 +854,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
   test("block store put failure") {
     // Use Java serializer so we can create an unserializable error.
     conf.set("spark.testing.memory", "1200")
-    val transfer = new NettyBlockTransferService(conf, securityMgr, 
"localhost", numCores = 1)
+    val transfer = new NettyBlockTransferService(conf, securityMgr, 
"localhost", "localhost", 0, 1)
     val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
     val serializerManager = new SerializerManager(new JavaSerializer(conf), 
conf)
     store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index b505653..82ce232 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1069,10 +1069,31 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.driver.blockManager.port</code></td>
+  <td>(value of spark.blockManager.port)</td>
+  <td>
+    Driver-specific port for the block manager to listen on, for cases where 
it cannot use the same
+    configuration as executors.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.driver.bindAddress</code></td>
+  <td>(value of spark.driver.host)</td>
+  <td>
+    <p>Hostname or IP address where to bind listening sockets. This config 
overrides the SPARK_LOCAL_IP
+    environment variable (see below).</p>
+
+    <p>It also allows a different address from the local one to be advertised 
to executors or external systems.
+    This is useful, for example, when running containers with bridged 
networking. For this to properly work,
+    the different ports used by the driver (RPC, block manager and UI) need to 
be forwarded from the
+    container's host.</p>
+  </td>
+</tr>
+<tr>
   <td><code>spark.driver.host</code></td>
   <td>(local hostname)</td>
   <td>
-    Hostname or IP address for the driver to listen on.
+    Hostname or IP address for the driver.
     This is used for communicating with the executors and the standalone 
Master.
   </td>
 </tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git 
a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 
b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index e19d445..2963d16 100644
--- 
a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ 
b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -32,6 +32,7 @@ import org.apache.mesos.protobuf.{ByteString, 
GeneratedMessage}
 import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.TaskState
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.util.Utils
 
 
@@ -424,7 +425,7 @@ trait MesosSchedulerUtils extends Logging {
     }
   }
 
-  val managedPortNames = List("spark.executor.port", "spark.blockManager.port")
+  val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key)
 
   /**
    * The values of the non-zero ports to be used by the executor process.

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git 
a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 
b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index bbc79dd..c3ab488 100644
--- 
a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ 
b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, 
SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
 import org.apache.spark.rpc.RpcEndpointRef
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
@@ -221,7 +222,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
   }
 
   test("Port offer decline when there is no appropriate range") {
-    setBackend(Map("spark.blockManager.port" -> "30100"))
+    setBackend(Map(BLOCK_MANAGER_PORT.key -> "30100"))
     val offeredPorts = (31100L, 31200L)
     val (mem, cpu) = (backend.executorMemory(sc), 4)
 
@@ -242,7 +243,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
 
   test("Port offer accepted with user defined port numbers") {
     val port = 30100
-    setBackend(Map("spark.blockManager.port" -> s"$port"))
+    setBackend(Map(BLOCK_MANAGER_PORT.key -> s"$port"))
     val offeredPorts = (30000L, 31000L)
     val (mem, cpu) = (backend.executorMemory(sc), 4)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
 
b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
index e3d7949..ec47ab1 100644
--- 
a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ 
b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest._
 import org.scalatest.mock.MockitoSugar
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.internal.config._
 
 class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with 
MockitoSugar {
 
@@ -179,7 +180,7 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with 
Matchers with MockitoS
   test("Port reservation is done correctly with user specified ports only") {
     val conf = new SparkConf()
     conf.set("spark.executor.port", "3000" )
-    conf.set("spark.blockManager.port", "4000")
+    conf.set(BLOCK_MANAGER_PORT, 4000)
     val portResource = createTestPortResource((3000, 5000), Some("my_role"))
 
     val (resourcesLeft, resourcesToBeUsed) = utils

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index bd8f995..b79cc65 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.concurrent.Eventually._
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite, TestUtils}
+import org.apache.spark.internal.config._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.scheduler._
@@ -406,7 +407,8 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
     // explicitly.
     ssc = new StreamingContext(null, newCp, null)
     val restoredConf1 = ssc.conf
-    assert(restoredConf1.get("spark.driver.host") === "localhost")
+    val defaultConf = new SparkConf()
+    assert(restoredConf1.get("spark.driver.host") === 
defaultConf.get(DRIVER_HOST_ADDRESS))
     assert(restoredConf1.get("spark.driver.port") !== "9999")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2cd1bfa4/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 7e66545..f224193 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -272,7 +272,7 @@ class ReceivedBlockHandlerSuite
       conf: SparkConf,
       name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
     val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, 
numCores = 1)
-    val transfer = new NettyBlockTransferService(conf, securityMgr, 
"localhost", numCores = 1)
+    val transfer = new NettyBlockTransferService(conf, securityMgr, 
"localhost", "localhost", 0, 1)
     val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, 
serializerManager, conf,
       memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
     memManager.setMemoryStore(blockManager.memoryStore)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to