Repository: spark Updated Branches: refs/heads/master aeb2ecc0c -> 829cd7b8b
[SPARK-20605][CORE][YARN][MESOS] Deprecate not used AM and executor port configuration ## What changes were proposed in this pull request? After SPARK-10997, client mode Netty RpcEnv doesn't require to start server, so port configurations are not used any more, here propose to remove these two configurations: "spark.executor.port" and "spark.am.port". ## How was this patch tested? Existing UTs. Author: jerryshao <ss...@hortonworks.com> Closes #17866 from jerryshao/SPARK-20605. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/829cd7b8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/829cd7b8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/829cd7b8 Branch: refs/heads/master Commit: 829cd7b8b70e65a91aa66e6d626bd45f18e0ad97 Parents: aeb2ecc Author: jerryshao <ss...@hortonworks.com> Authored: Mon May 8 14:27:56 2017 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Mon May 8 14:27:56 2017 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/SparkConf.scala | 4 ++- .../main/scala/org/apache/spark/SparkEnv.scala | 14 +++----- .../executor/CoarseGrainedExecutorBackend.scala | 5 ++- docs/running-on-mesos.md | 2 +- docs/running-on-yarn.md | 7 ---- .../spark/executor/MesosExecutorBackend.scala | 3 +- .../cluster/mesos/MesosSchedulerUtils.scala | 2 +- .../mesos/MesosSchedulerUtilsSuite.scala | 34 ++++++-------------- .../spark/deploy/yarn/ApplicationMaster.scala | 3 +- .../org/apache/spark/deploy/yarn/config.scala | 5 --- 10 files changed, 22 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/core/src/main/scala/org/apache/spark/SparkConf.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 2a2ce05..956724b 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -579,7 +579,9 @@ private[spark] object SparkConf extends Logging { "are no longer accepted. To specify the equivalent now, one may use '64k'."), DeprecatedConfig("spark.rpc", "2.0", "Not used any more."), DeprecatedConfig("spark.scheduler.executorTaskBlacklistTime", "2.1.0", - "Please use the new blacklisting options, spark.blacklist.*") + "Please use the new blacklisting options, spark.blacklist.*"), + DeprecatedConfig("spark.yarn.am.port", "2.0.0", "Not used any more"), + DeprecatedConfig("spark.executor.port", "2.0.0", "Not used any more") ) Map(configs.map { cfg => (cfg.key -> cfg) } : _*) http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index f4a59f0..3196c1e 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -177,7 +177,7 @@ object SparkEnv extends Logging { SparkContext.DRIVER_IDENTIFIER, bindAddress, advertiseAddress, - port, + Option(port), isLocal, numCores, ioEncryptionKey, @@ -194,7 +194,6 @@ object SparkEnv extends Logging { conf: SparkConf, executorId: String, hostname: String, - port: Int, numCores: Int, ioEncryptionKey: Option[Array[Byte]], isLocal: Boolean): SparkEnv = { @@ -203,7 +202,7 @@ object SparkEnv extends Logging { executorId, hostname, hostname, - port, + None, isLocal, numCores, ioEncryptionKey @@ -220,7 +219,7 @@ object SparkEnv extends Logging { executorId: String, bindAddress: String, advertiseAddress: String, - port: Int, + port: Option[Int], isLocal: Boolean, numUsableCores: Int, ioEncryptionKey: Option[Array[Byte]], @@ -243,17 +242,12 @@ object SparkEnv extends Logging { } val systemName = if (isDriver) driverSystemName else executorSystemName - val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port, conf, + val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf, securityManager, clientMode = !isDriver) // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied. - // In the non-driver case, the RPC env's address may be null since it may not be listening - // for incoming connections. if (isDriver) { conf.set("spark.driver.port", rpcEnv.address.port.toString) - } else if (rpcEnv.address != null) { - conf.set("spark.executor.port", rpcEnv.address.port.toString) - logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}") } // Create an instance of the class with the given name, possibly initializing it with our conf http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index b2b26ee..a2f1aa2 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -191,11 +191,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf - val port = executorConf.getInt("spark.executor.port", 0) val fetcher = RpcEnv.create( "driverPropsFetcher", hostname, - port, + -1, executorConf, new SecurityManager(executorConf), clientMode = true) @@ -221,7 +220,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } val env = SparkEnv.createExecutorEnv( - driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false) + driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/docs/running-on-mesos.md ---------------------------------------------------------------------- diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 314a806..c1344ad 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -209,7 +209,7 @@ provide such guarantees on the offer stream. In this mode spark executors will honor port allocation if such is provided from the user. Specifically if the user defines -`spark.executor.port` or `spark.blockManager.port` in Spark configuration, +`spark.blockManager.port` in Spark configuration, the mesos scheduler will check the available offers for a valid port range containing the port numbers. If no such range is available it will not launch any task. If no restriction is imposed on port numbers by the http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/docs/running-on-yarn.md ---------------------------------------------------------------------- diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index e9ddaa7..2d56123 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -240,13 +240,6 @@ To use a custom metrics.properties for the application master and executors, upd </td> </tr> <tr> - <td><code>spark.yarn.am.port</code></td> - <td>(random)</td> - <td> - Port for the YARN Application Master to listen on. In YARN client mode, this is used to communicate between the Spark driver running on a gateway and the YARN Application Master running on YARN. In YARN cluster mode, this is used for the dynamic executor feature, where it handles the kill from the scheduler backend. - </td> -</tr> -<tr> <td><code>spark.yarn.queue</code></td> <td><code>default</code></td> <td> http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index a086ec7..61bfa27 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -74,9 +74,8 @@ private[spark] class MesosExecutorBackend val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++ Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) val conf = new SparkConf(loadDefaults = true).setAll(properties) - val port = conf.getInt("spark.executor.port", 0) val env = SparkEnv.createExecutorEnv( - conf, executorId, slaveInfo.getHostname, port, cpusPerTask, None, isLocal = false) + conf, executorId, slaveInfo.getHostname, cpusPerTask, None, isLocal = false) executor = new Executor( executorId, http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 9d81025..062ed1f 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -438,7 +438,7 @@ trait MesosSchedulerUtils extends Logging { } } - val managedPortNames = List("spark.executor.port", BLOCK_MANAGER_PORT.key) + val managedPortNames = List(BLOCK_MANAGER_PORT.key) /** * The values of the non-zero ports to be used by the executor process. http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index ec47ab1..5d4bf6d 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -179,40 +179,25 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("Port reservation is done correctly with user specified ports only") { val conf = new SparkConf() - conf.set("spark.executor.port", "3000" ) conf.set(BLOCK_MANAGER_PORT, 4000) val portResource = createTestPortResource((3000, 5000), Some("my_role")) val (resourcesLeft, resourcesToBeUsed) = utils - .partitionPortResources(List(3000, 4000), List(portResource)) - resourcesToBeUsed.length shouldBe 2 + .partitionPortResources(List(4000), List(portResource)) + resourcesToBeUsed.length shouldBe 1 val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray - portsToUse.length shouldBe 2 - arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true + portsToUse.length shouldBe 1 + arePortsEqual(portsToUse, Array(4000L)) shouldBe true val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) - val expectedUSed = Array((3000L, 3000L), (4000L, 4000L)) + val expectedUSed = Array((4000L, 4000L)) arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true } - test("Port reservation is done correctly with some user specified ports (spark.executor.port)") { - val conf = new SparkConf() - conf.set("spark.executor.port", "3100" ) - val portResource = createTestPortResource((3000, 5000), Some("my_role")) - - val (resourcesLeft, resourcesToBeUsed) = utils - .partitionPortResources(List(3100), List(portResource)) - - val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} - - portsToUse.length shouldBe 1 - portsToUse.contains(3100) shouldBe true - } - test("Port reservation is done correctly with all random ports") { val conf = new SparkConf() val portResource = createTestPortResource((3000L, 5000L), Some("my_role")) @@ -226,21 +211,20 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("Port reservation is done correctly with user specified ports only - multiple ranges") { val conf = new SparkConf() - conf.set("spark.executor.port", "2100" ) conf.set("spark.blockManager.port", "4000") val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")), createTestPortResource((2000, 2500), Some("other_role"))) val (resourcesLeft, resourcesToBeUsed) = utils - .partitionPortResources(List(2100, 4000), portResourceList) + .partitionPortResources(List(4000), portResourceList) val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} - portsToUse.length shouldBe 2 + portsToUse.length shouldBe 1 val portsRangesLeft = rangesResourcesToTuple(resourcesLeft) val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) - val expectedUsed = Array((2100L, 2100L), (4000L, 4000L)) + val expectedUsed = Array((4000L, 4000L)) - arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true + arePortsEqual(portsToUse.toArray, Array(4000L)) shouldBe true arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true } http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 864c834..6da2c0b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -429,8 +429,7 @@ private[spark] class ApplicationMaster( } private def runExecutorLauncher(securityMgr: SecurityManager): Unit = { - val port = sparkConf.get(AM_PORT) - rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr, + rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, -1, sparkConf, securityMgr, clientMode = true) val driverRef = waitForSparkDriver() addAmIpFilter() http://git-wip-us.apache.org/repos/asf/spark/blob/829cd7b8/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index d8c96c3..d4108ca 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -40,11 +40,6 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createOptional - private[spark] val AM_PORT = - ConfigBuilder("spark.yarn.am.port") - .intConf - .createWithDefault(0) - private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS = ConfigBuilder("spark.yarn.executor.failuresValidityInterval") .doc("Interval after which Executor failures will be considered independent and not " + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org