Repository: spark
Updated Branches:
  refs/heads/master aeb2ecc0c -> 829cd7b8b


[SPARK-20605][CORE][YARN][MESOS] Deprecate not used AM and executor port 
configuration

## What changes were proposed in this pull request?

After SPARK-10997, client mode Netty RpcEnv doesn't require to start server, so 
port configurations are not used any more, here propose to remove these two 
configurations: "spark.executor.port" and "spark.am.port".

## How was this patch tested?

Existing UTs.

Author: jerryshao <ss...@hortonworks.com>

Closes #17866 from jerryshao/SPARK-20605.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/829cd7b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/829cd7b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/829cd7b8

Branch: refs/heads/master
Commit: 829cd7b8b70e65a91aa66e6d626bd45f18e0ad97
Parents: aeb2ecc
Author: jerryshao <ss...@hortonworks.com>
Authored: Mon May 8 14:27:56 2017 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Mon May 8 14:27:56 2017 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/SparkConf.scala |  4 ++-
 .../main/scala/org/apache/spark/SparkEnv.scala  | 14 +++-----
 .../executor/CoarseGrainedExecutorBackend.scala |  5 ++-
 docs/running-on-mesos.md                        |  2 +-
 docs/running-on-yarn.md                         |  7 ----
 .../spark/executor/MesosExecutorBackend.scala   |  3 +-
 .../cluster/mesos/MesosSchedulerUtils.scala     |  2 +-
 .../mesos/MesosSchedulerUtilsSuite.scala        | 34 ++++++--------------
 .../spark/deploy/yarn/ApplicationMaster.scala   |  3 +-
 .../org/apache/spark/deploy/yarn/config.scala   |  5 ---
 10 files changed, 22 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 2a2ce05..956724b 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -579,7 +579,9 @@ private[spark] object SparkConf extends Logging {
           "are no longer accepted. To specify the equivalent now, one may use 
'64k'."),
       DeprecatedConfig("spark.rpc", "2.0", "Not used any more."),
       DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0",
-        "Please use the new blacklisting options, spark.blacklist.*")
+        "Please use the new blacklisting options, spark.blacklist.*"),
+      DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"),
+      DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more")
     )
 
     Map(configs.map { cfg => (cfg.key -> cfg) } : _*)

http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index f4a59f0..3196c1e 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -177,7 +177,7 @@ object SparkEnv extends Logging {
       SparkContext.DRIVER_IDENTIFIER,
       bindAddress,
       advertiseAddress,
-      port,
+      Option(port),
       isLocal,
       numCores,
       ioEncryptionKey,
@@ -194,7 +194,6 @@ object SparkEnv extends Logging {
       conf: SparkConf,
       executorId: String,
       hostname: String,
-      port: Int,
       numCores: Int,
       ioEncryptionKey: Option[Array[Byte]],
       isLocal: Boolean): SparkEnv = {
@@ -203,7 +202,7 @@ object SparkEnv extends Logging {
       executorId,
       hostname,
       hostname,
-      port,
+      None,
       isLocal,
       numCores,
       ioEncryptionKey
@@ -220,7 +219,7 @@ object SparkEnv extends Logging {
       executorId: String,
       bindAddress: String,
       advertiseAddress: String,
-      port: Int,
+      port: Option[Int],
       isLocal: Boolean,
       numUsableCores: Int,
       ioEncryptionKey: Option[Array[Byte]],
@@ -243,17 +242,12 @@ object SparkEnv extends Logging {
     }
 
     val systemName = if (isDriver) driverSystemName else executorSystemName
-    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, 
port, conf,
+    val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, 
port.getOrElse(-1), conf,
       securityManager, clientMode = !isDriver)
 
     // Figure out which port RpcEnv actually bound to in case the original 
port is 0 or occupied.
-    // In the non-driver case, the RPC env's address may be null since it may 
not be listening
-    // for incoming connections.
     if (isDriver) {
       conf.set("spark.driver.port", rpcEnv.address.port.toString)
-    } else if (rpcEnv.address != null) {
-      conf.set("spark.executor.port", rpcEnv.address.port.toString)
-      logInfo(s"Setting spark.executor.port to: 
${rpcEnv.address.port.toString}")
     }
 
     // Create an instance of the class with the given name, possibly 
initializing it with our conf

http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index b2b26ee..a2f1aa2 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -191,11 +191,10 @@ private[spark] object CoarseGrainedExecutorBackend 
extends Logging {
 
       // Bootstrap to fetch the driver's Spark properties.
       val executorConf = new SparkConf
-      val port = executorConf.getInt("spark.executor.port", 0)
       val fetcher = RpcEnv.create(
         "driverPropsFetcher",
         hostname,
-        port,
+        -1,
         executorConf,
         new SecurityManager(executorConf),
         clientMode = true)
@@ -221,7 +220,7 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
       }
 
       val env = SparkEnv.createExecutorEnv(
-        driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, 
isLocal = false)
+        driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal 
= false)
 
       env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
         env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, 
env))

http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/docs/running-on-mesos.md
----------------------------------------------------------------------
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 314a806..c1344ad 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -209,7 +209,7 @@ provide such guarantees on the offer stream.
 
 In this mode spark executors will honor port allocation if such is
 provided from the user. Specifically if the user defines
-`spark.executor.port` or `spark.blockManager.port` in Spark configuration,
+`spark.blockManager.port` in Spark configuration,
 the mesos scheduler will check the available offers for a valid port
 range containing the port numbers. If no such range is available it will
 not launch any task. If no restriction is imposed on port numbers by the

http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/docs/running-on-yarn.md
----------------------------------------------------------------------
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index e9ddaa7..2d56123 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -240,13 +240,6 @@ To use a custom metrics.properties for the application 
master and executors, upd
   </td>
 </tr>
 <tr>
-  <td><code>spark.yarn.am.port</code></td>
-  <td>(random)</td>
-  <td>
-    Port for the YARN Application Master to listen on. In YARN client mode, 
this is used to communicate between the Spark driver running on a gateway and 
the YARN Application Master running on YARN. In YARN cluster mode, this is used 
for the dynamic executor feature, where it handles the kill from the scheduler 
backend.
-  </td>
-</tr>
-<tr>
   <td><code>spark.yarn.queue</code></td>
   <td><code>default</code></td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index a086ec7..61bfa27 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -74,9 +74,8 @@ private[spark] class MesosExecutorBackend
     val properties = Utils.deserialize[Array[(String, 
String)]](executorInfo.getData.toByteArray) ++
       Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
     val conf = new SparkConf(loadDefaults = true).setAll(properties)
-    val port = conf.getInt("spark.executor.port", 0)
     val env = SparkEnv.createExecutorEnv(
-      conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, 
isLocal = false)
+      conf, executorId, slaveInfo.getHostname, cpusPerTask, None, isLocal = 
false)
 
     executor = new Executor(
       executorId,

http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 9d81025..062ed1f 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -438,7 +438,7 @@ trait MesosSchedulerUtils extends Logging {
     }
   }
 
-  val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key)
+  val managedPortNames = List(BLOCK_MANAGER_PORT.key)
 
   /**
    * The values of the non-zero ports to be used by the executor process.

http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
index ec47ab1..5d4bf6d 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -179,40 +179,25 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with 
Matchers with MockitoS
 
   test("Port reservation is done correctly with user specified ports only") {
     val conf = new SparkConf()
-    conf.set("spark.executor.port", "3000" )
     conf.set(BLOCK_MANAGER_PORT, 4000)
     val portResource = createTestPortResource((3000, 5000), Some("my_role"))
 
     val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(3000, 4000), List(portResource))
-    resourcesToBeUsed.length shouldBe 2
+      .partitionPortResources(List(4000), List(portResource))
+    resourcesToBeUsed.length shouldBe 1
 
     val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => 
r._1}.toArray
 
-    portsToUse.length shouldBe 2
-    arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
+    portsToUse.length shouldBe 1
+    arePortsEqual(portsToUse, Array(4000L)) shouldBe true
 
     val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
 
-    val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
+    val expectedUSed = Array((4000L, 4000L))
 
     arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
   }
 
-  test("Port reservation is done correctly with some user specified ports 
(spark.executor.port)") {
-    val conf = new SparkConf()
-    conf.set("spark.executor.port", "3100" )
-    val portResource = createTestPortResource((3000, 5000), Some("my_role"))
-
-    val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(3100), List(portResource))
-
-    val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
-    portsToUse.length shouldBe 1
-    portsToUse.contains(3100) shouldBe true
-  }
-
   test("Port reservation is done correctly with all random ports") {
     val conf = new SparkConf()
     val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
@@ -226,21 +211,20 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with 
Matchers with MockitoS
 
   test("Port reservation is done correctly with user specified ports only - 
multiple ranges") {
     val conf = new SparkConf()
-    conf.set("spark.executor.port", "2100" )
     conf.set("spark.blockManager.port", "4000")
     val portResourceList = List(createTestPortResource((3000, 5000), 
Some("my_role")),
       createTestPortResource((2000, 2500), Some("other_role")))
     val (resourcesLeft, resourcesToBeUsed) = utils
-      .partitionPortResources(List(2100, 4000), portResourceList)
+      .partitionPortResources(List(4000), portResourceList)
     val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
 
-    portsToUse.length shouldBe 2
+    portsToUse.length shouldBe 1
     val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
     val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
 
-    val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
+    val expectedUsed = Array((4000L, 4000L))
 
-    arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
+    arePortsEqual(portsToUse.toArray, Array(4000L)) shouldBe true
     arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 864c834..6da2c0b 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -429,8 +429,7 @@ private[spark] class ApplicationMaster(
   }
 
   private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
-    val port = sparkConf.get(AM_PORT)
-    rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, 
sparkConf, securityMgr,
+    rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, -1, sparkConf, 
securityMgr,
       clientMode = true)
     val driverRef = waitForSparkDriver()
     addAmIpFilter()

http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index d8c96c3..d4108ca 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -40,11 +40,6 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
-  private[spark] val AM_PORT =
-    ConfigBuilder("spark.yarn.am.port")
-      .intConf
-      .createWithDefault(0)
-
   private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
     ConfigBuilder("spark.yarn.executor.failuresValidityInterval")
       .doc("Interval after which Executor failures will be considered 
independent and not " +


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to