This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4419e1d  [SPARK-26445][CORE] Use ConfigEntry for hardcoded configs for 
driver/executor categories.
4419e1d is described below

commit 4419e1daca6c5de373d5f3f13c417b791d768c96
Author: Takuya UESHIN <ues...@databricks.com>
AuthorDate: Fri Jan 4 22:12:35 2019 +0800

    [SPARK-26445][CORE] Use ConfigEntry for hardcoded configs for 
driver/executor categories.
    
    ## What changes were proposed in this pull request?
    
    The PR makes hardcoded spark.driver, spark.executor, and spark.cores.max 
configs to use `ConfigEntry`.
    
    Note that some config keys are from `SparkLauncher` instead of defining in 
the config package object because the string is already defined in it and it 
does not depend on core module.
    
    ## How was this patch tested?
    
    Existing tests.
    
    Closes #23415 from 
ueshin/issues/SPARK-26445/hardcoded_driver_executor_configs.
    
    Authored-by: Takuya UESHIN <ues...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../apache/spark/ExecutorAllocationManager.scala   |  4 +-
 .../main/scala/org/apache/spark/SparkConf.scala    | 25 ++++------
 .../main/scala/org/apache/spark/SparkContext.scala |  8 ++--
 .../src/main/scala/org/apache/spark/SparkEnv.scala |  8 ++--
 .../org/apache/spark/api/python/PythonRunner.scala |  4 +-
 .../scala/org/apache/spark/deploy/Client.scala     |  8 ++--
 .../apache/spark/deploy/FaultToleranceTest.scala   |  6 +--
 .../org/apache/spark/deploy/SparkSubmit.scala      | 24 +++++-----
 .../apache/spark/deploy/SparkSubmitArguments.scala | 20 ++++----
 .../spark/deploy/rest/StandaloneRestServer.scala   | 13 ++---
 .../deploy/rest/SubmitRestProtocolRequest.scala    | 11 +++--
 .../apache/spark/deploy/worker/DriverWrapper.scala |  6 +--
 .../org/apache/spark/internal/config/package.scala | 55 +++++++++++++++++++++-
 .../apache/spark/memory/StaticMemoryManager.scala  |  9 ++--
 .../apache/spark/memory/UnifiedMemoryManager.scala |  9 ++--
 .../org/apache/spark/metrics/MetricsSystem.scala   |  2 +-
 .../apache/spark/scheduler/TaskSetManager.scala    |  2 +-
 .../cluster/StandaloneSchedulerBackend.scala       | 16 +++----
 .../scheduler/local/LocalSchedulerBackend.scala    |  4 +-
 .../scala/org/apache/spark/util/RpcUtils.scala     |  5 +-
 .../main/scala/org/apache/spark/util/Utils.scala   |  2 +-
 .../apache/spark/util/logging/FileAppender.scala   | 10 ++--
 .../spark/util/logging/RollingFileAppender.scala   | 19 ++------
 .../spark/ExecutorAllocationManagerSuite.scala     |  2 +-
 .../scala/org/apache/spark/SparkConfSuite.scala    |  2 +-
 .../deploy/StandaloneDynamicAllocationSuite.scala  |  6 +--
 .../spark/memory/UnifiedMemoryManagerSuite.scala   |  2 +-
 .../spark/scheduler/TaskSetManagerSuite.scala      |  2 +-
 .../storage/BlockManagerReplicationSuite.scala     |  4 +-
 .../apache/spark/storage/BlockManagerSuite.scala   |  2 +-
 .../org/apache/spark/util/FileAppenderSuite.scala  | 13 +++--
 .../k8s/features/BasicDriverFeatureStep.scala      |  4 +-
 .../k8s/features/BasicExecutorFeatureStep.scala    |  6 +--
 .../k8s/features/DriverServiceFeatureStep.scala    | 15 +++---
 .../k8s/features/BasicDriverFeatureStepSuite.scala |  2 +-
 .../features/BasicExecutorFeatureStepSuite.scala   | 16 +++----
 .../features/DriverServiceFeatureStepSuite.scala   |  6 +--
 .../org/apache/spark/deploy/mesos/config.scala     |  3 ++
 .../spark/deploy/rest/mesos/MesosRestServer.scala  | 13 ++---
 .../cluster/mesos/MesosClusterScheduler.scala      | 10 ++--
 .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 20 ++++----
 .../mesos/MesosFineGrainedSchedulerBackend.scala   | 12 ++---
 .../spark/deploy/yarn/ApplicationMaster.scala      |  6 +--
 .../org/apache/spark/deploy/yarn/config.scala      | 10 ++--
 .../cluster/YarnClientSchedulerBackend.scala       |  8 ++--
 .../deploy/yarn/ResourceRequestHelperSuite.scala   |  2 +-
 46 files changed, 236 insertions(+), 200 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 3f0b71b..d966582 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -127,7 +127,7 @@ private[spark] class ExecutorAllocationManager(
   // allocation is only supported for YARN and the default number of cores per 
executor in YARN is
   // 1, but it might need to be attained differently for different cluster 
managers
   private val tasksPerExecutorForFullParallelism =
-    conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
+    conf.get(EXECUTOR_CORES) / conf.getInt("spark.task.cpus", 1)
 
   private val executorAllocationRatio =
     conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
@@ -223,7 +223,7 @@ private[spark] class ExecutorAllocationManager(
         "shuffle service. You may enable this through 
spark.shuffle.service.enabled.")
     }
     if (tasksPerExecutorForFullParallelism == 0) {
-      throw new SparkException("spark.executor.cores must not be < 
spark.task.cpus.")
+      throw new SparkException(s"${EXECUTOR_CORES.key} must not be < 
spark.task.cpus.")
     }
 
     if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 0b47da1..681e437 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -503,12 +503,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
       logWarning(msg)
     }
 
-    val executorOptsKey = "spark.executor.extraJavaOptions"
-    val executorClasspathKey = "spark.executor.extraClassPath"
-    val driverOptsKey = "spark.driver.extraJavaOptions"
-    val driverClassPathKey = "spark.driver.extraClassPath"
-    val driverLibraryPathKey = "spark.driver.extraLibraryPath"
-    val sparkExecutorInstances = "spark.executor.instances"
+    val executorOptsKey = EXECUTOR_JAVA_OPTIONS.key
 
     // Used by Yarn in 1.1 and before
     sys.props.get("spark.driver.libraryPath").foreach { value =>
@@ -517,7 +512,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
           |spark.driver.libraryPath was detected (set to '$value').
           |This is deprecated in Spark 1.2+.
           |
-          |Please instead use: $driverLibraryPathKey
+          |Please instead use: ${DRIVER_LIBRARY_PATH.key}
         """.stripMargin
       logWarning(warning)
     }
@@ -594,9 +589,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
       }
     }
 
-    if (contains("spark.cores.max") && contains("spark.executor.cores")) {
-      val totalCores = getInt("spark.cores.max", 1)
-      val executorCores = getInt("spark.executor.cores", 1)
+    if (contains(CORES_MAX) && contains(EXECUTOR_CORES)) {
+      val totalCores = getInt(CORES_MAX.key, 1)
+      val executorCores = get(EXECUTOR_CORES)
       val leftCores = totalCores % executorCores
       if (leftCores != 0) {
         logWarning(s"Total executor cores: ${totalCores} is not " +
@@ -605,12 +600,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
       }
     }
 
-    if (contains("spark.executor.cores") && contains("spark.task.cpus")) {
-      val executorCores = getInt("spark.executor.cores", 1)
+    if (contains(EXECUTOR_CORES) && contains("spark.task.cpus")) {
+      val executorCores = get(EXECUTOR_CORES)
       val taskCpus = getInt("spark.task.cpus", 1)
 
       if (executorCores < taskCpus) {
-        throw new SparkException("spark.executor.cores must not be less than 
spark.task.cpus.")
+        throw new SparkException(s"${EXECUTOR_CORES.key} must not be less than 
spark.task.cpus.")
       }
     }
 
@@ -680,7 +675,7 @@ private[spark] object SparkConf extends Logging {
    * TODO: consolidate it with `ConfigBuilder.withAlternative`.
    */
   private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
-    "spark.executor.userClassPathFirst" -> Seq(
+    EXECUTOR_USER_CLASS_PATH_FIRST.key -> Seq(
       AlternateConfig("spark.files.userClassPathFirst", "1.3")),
     UPDATE_INTERVAL_S.key -> Seq(
       AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
@@ -703,7 +698,7 @@ private[spark] object SparkConf extends Logging {
       AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")),
     "spark.shuffle.file.buffer" -> Seq(
       AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
-    "spark.executor.logs.rolling.maxSize" -> Seq(
+    EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> Seq(
       AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
     "spark.io.compression.snappy.blockSize" -> Seq(
       AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3475859..89be9de 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -386,9 +386,9 @@ class SparkContext(config: SparkConf) extends Logging {
     // Set Spark driver host and port system properties. This explicitly sets 
the configuration
     // instead of relying on the default value of the config constant.
     _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
-    _conf.setIfMissing("spark.driver.port", "0")
+    _conf.setIfMissing(DRIVER_PORT, 0)
 
-    _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
+    _conf.set(EXECUTOR_ID, SparkContext.DRIVER_IDENTIFIER)
 
     _jars = Utils.getUserJars(_conf)
     _files = 
_conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
@@ -461,7 +461,7 @@ class SparkContext(config: SparkConf) extends Logging {
       files.foreach(addFile)
     }
 
-    _executorMemory = _conf.getOption("spark.executor.memory")
+    _executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
       .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
       .orElse(Option(System.getenv("SPARK_MEM"))
       .map(warnSparkMem))
@@ -2639,7 +2639,7 @@ object SparkContext extends Logging {
       case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => 
convertToInt(threads)
       case "yarn" =>
         if (conf != null && 
conf.getOption("spark.submit.deployMode").contains("cluster")) {
-          conf.getInt("spark.driver.cores", 0)
+          conf.getInt(DRIVER_CORES.key, 0)
         } else {
           0
         }
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index de0c857..9222781 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -163,10 +163,10 @@ object SparkEnv extends Logging {
       mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): 
SparkEnv = {
     assert(conf.contains(DRIVER_HOST_ADDRESS),
       s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
-    assert(conf.contains("spark.driver.port"), "spark.driver.port is not set 
on the driver!")
+    assert(conf.contains(DRIVER_PORT), s"${DRIVER_PORT.key} is not set on the 
driver!")
     val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
     val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
-    val port = conf.get("spark.driver.port").toInt
+    val port = conf.get(DRIVER_PORT)
     val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
       Some(CryptoStreamUtils.createKey(conf))
     } else {
@@ -251,7 +251,7 @@ object SparkEnv extends Logging {
 
     // Figure out which port RpcEnv actually bound to in case the original 
port is 0 or occupied.
     if (isDriver) {
-      conf.set("spark.driver.port", rpcEnv.address.port.toString)
+      conf.set(DRIVER_PORT, rpcEnv.address.port)
     }
 
     // Create an instance of the class with the given name, possibly 
initializing it with our conf
@@ -359,7 +359,7 @@ object SparkEnv extends Logging {
       // We need to set the executor ID before the MetricsSystem is created 
because sources and
       // sinks specified in the metrics configuration file will want to 
incorporate this executor's
       // ID into the metrics they report.
-      conf.set("spark.executor.id", executorId)
+      conf.set(EXECUTOR_ID, executorId)
       val ms = MetricsSystem.createMetricsSystem("executor", conf, 
securityManager)
       ms.start()
       ms
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 6b748c8..5168e93 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -27,6 +27,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.EXECUTOR_CORES
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.security.SocketAuthHelper
 import org.apache.spark.util._
@@ -74,8 +75,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
   // each python worker gets an equal part of the allocation. the worker pool 
will grow to the
   // number of concurrent tasks, which is determined by the number of cores in 
this executor.
-  private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY)
-      .map(_ / conf.getInt("spark.executor.cores", 1))
+  private val memoryMb = conf.get(PYSPARK_EXECUTOR_MEMORY).map(_ / 
conf.get(EXECUTOR_CORES))
 
   // All the Python functions should have the same exec, version and envvars.
   protected val envVars = funcs.head.funcs.head.envVars
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index d514509..d94b174 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -27,7 +27,7 @@ import org.apache.log4j.Logger
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.{DriverState, Master}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
 import org.apache.spark.util.{SparkExitCode, ThreadUtils, Utils}
 
@@ -68,17 +68,17 @@ private class ClientEndpoint(
         //       people call `addJar` assuming the jar is in the same 
directory.
         val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
 
-        val classPathConf = "spark.driver.extraClassPath"
+        val classPathConf = config.DRIVER_CLASS_PATH.key
         val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp 
=>
           cp.split(java.io.File.pathSeparator)
         }
 
-        val libraryPathConf = "spark.driver.extraLibraryPath"
+        val libraryPathConf = config.DRIVER_LIBRARY_PATH.key
         val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap 
{ cp =>
           cp.split(java.io.File.pathSeparator)
         }
 
-        val extraJavaOptsConf = "spark.driver.extraJavaOptions"
+        val extraJavaOptsConf = config.DRIVER_JAVA_OPTIONS.key
         val extraJavaOpts = sys.props.get(extraJavaOptsConf)
           .map(Utils.splitCommandString).getOrElse(Seq.empty)
         val sparkJavaOpts = Utils.sparkJavaOpts(conf)
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala 
b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
index c6307da..0679bdf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -34,7 +34,7 @@ import org.json4s.jackson.JsonMethods
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.deploy.master.RecoveryState
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
@@ -77,7 +77,7 @@ private object FaultToleranceTest extends App with Logging {
   private val containerSparkHome = "/opt/spark"
   private val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)
 
-  System.setProperty("spark.driver.host", "172.17.42.1") // default docker 
host ip
+  System.setProperty(config.DRIVER_HOST_ADDRESS.key, "172.17.42.1") // default 
docker host ip
 
   private def afterEach() {
     if (sc != null) {
@@ -216,7 +216,7 @@ private object FaultToleranceTest extends App with Logging {
     if (sc != null) { sc.stop() }
     // Counter-hack: Because of a hack in SparkEnv#create() that changes this
     // property, we need to reset it.
-    System.setProperty("spark.driver.port", "0")
+    System.setProperty(config.DRIVER_PORT.key, "0")
     sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", 
containerSparkHome)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 763bd0a..a4c65ae 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -514,13 +514,13 @@ private[spark] class SparkSubmit extends Logging {
       OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = 
"spark.app.name"),
       OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = 
"spark.jars.ivy"),
       OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
-        confKey = "spark.driver.memory"),
+        confKey = DRIVER_MEMORY.key),
       OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, 
ALL_DEPLOY_MODES,
-        confKey = "spark.driver.extraClassPath"),
+        confKey = DRIVER_CLASS_PATH.key),
       OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, 
ALL_DEPLOY_MODES,
-        confKey = "spark.driver.extraJavaOptions"),
+        confKey = DRIVER_JAVA_OPTIONS.key),
       OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, 
ALL_DEPLOY_MODES,
-        confKey = "spark.driver.extraLibraryPath"),
+        confKey = DRIVER_LIBRARY_PATH.key),
       OptionAssigner(args.principal, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
         confKey = PRINCIPAL.key),
       OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
@@ -537,7 +537,7 @@ private[spark] class SparkSubmit extends Logging {
       // Yarn only
       OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.queue"),
       OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
-        confKey = "spark.executor.instances"),
+        confKey = EXECUTOR_INSTANCES.key),
       OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.pyFiles"),
       OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.jars"),
       OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.files"),
@@ -545,22 +545,22 @@ private[spark] class SparkSubmit extends Logging {
 
       // Other options
       OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, 
ALL_DEPLOY_MODES,
-        confKey = "spark.executor.cores"),
+        confKey = EXECUTOR_CORES.key),
       OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | 
KUBERNETES, ALL_DEPLOY_MODES,
-        confKey = "spark.executor.memory"),
+        confKey = EXECUTOR_MEMORY.key),
       OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, 
ALL_DEPLOY_MODES,
-        confKey = "spark.cores.max"),
+        confKey = CORES_MAX.key),
       OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, 
ALL_DEPLOY_MODES,
         confKey = "spark.files"),
       OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
       OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, 
ALL_DEPLOY_MODES,
         confKey = "spark.jars"),
       OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | 
KUBERNETES, CLUSTER,
-        confKey = "spark.driver.memory"),
+        confKey = DRIVER_MEMORY.key),
       OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, 
CLUSTER,
-        confKey = "spark.driver.cores"),
+        confKey = DRIVER_CORES.key),
       OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
-        confKey = "spark.driver.supervise"),
+        confKey = DRIVER_SUPERVISE.key),
       OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = 
"spark.jars.ivy"),
 
       // An internal option used only for spark-shell to add user jars to 
repl's classloader,
@@ -727,7 +727,7 @@ private[spark] class SparkSubmit extends Logging {
 
     // Ignore invalid spark.driver.host in cluster modes.
     if (deployMode == CLUSTER) {
-      sparkConf.remove("spark.driver.host")
+      sparkConf.remove(DRIVER_HOST_ADDRESS)
     }
 
     // Resolve paths in certain spark properties
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 4cf08a7..34facd5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -31,7 +31,7 @@ import scala.util.Try
 
 import org.apache.spark.{SparkException, SparkUserAppException}
 import org.apache.spark.deploy.SparkSubmitAction._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.launcher.SparkSubmitArgumentsParser
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.util.Utils
@@ -155,31 +155,31 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
       .orElse(env.get("MASTER"))
       .orNull
     driverExtraClassPath = Option(driverExtraClassPath)
-      .orElse(sparkProperties.get("spark.driver.extraClassPath"))
+      .orElse(sparkProperties.get(config.DRIVER_CLASS_PATH.key))
       .orNull
     driverExtraJavaOptions = Option(driverExtraJavaOptions)
-      .orElse(sparkProperties.get("spark.driver.extraJavaOptions"))
+      .orElse(sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key))
       .orNull
     driverExtraLibraryPath = Option(driverExtraLibraryPath)
-      .orElse(sparkProperties.get("spark.driver.extraLibraryPath"))
+      .orElse(sparkProperties.get(config.DRIVER_LIBRARY_PATH.key))
       .orNull
     driverMemory = Option(driverMemory)
-      .orElse(sparkProperties.get("spark.driver.memory"))
+      .orElse(sparkProperties.get(config.DRIVER_MEMORY.key))
       .orElse(env.get("SPARK_DRIVER_MEMORY"))
       .orNull
     driverCores = Option(driverCores)
-      .orElse(sparkProperties.get("spark.driver.cores"))
+      .orElse(sparkProperties.get(config.DRIVER_CORES.key))
       .orNull
     executorMemory = Option(executorMemory)
-      .orElse(sparkProperties.get("spark.executor.memory"))
+      .orElse(sparkProperties.get(config.EXECUTOR_MEMORY.key))
       .orElse(env.get("SPARK_EXECUTOR_MEMORY"))
       .orNull
     executorCores = Option(executorCores)
-      .orElse(sparkProperties.get("spark.executor.cores"))
+      .orElse(sparkProperties.get(config.EXECUTOR_CORES.key))
       .orElse(env.get("SPARK_EXECUTOR_CORES"))
       .orNull
     totalExecutorCores = Option(totalExecutorCores)
-      .orElse(sparkProperties.get("spark.cores.max"))
+      .orElse(sparkProperties.get(config.CORES_MAX.key))
       .orNull
     name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
     jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
@@ -197,7 +197,7 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
       .orElse(env.get("DEPLOY_MODE"))
       .orNull
     numExecutors = Option(numExecutors)
-      .getOrElse(sparkProperties.get("spark.executor.instances").orNull)
+      .getOrElse(sparkProperties.get(config.EXECUTOR_INSTANCES.key).orNull)
     queue = 
Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull
     keytab = Option(keytab)
       .orElse(sparkProperties.get("spark.kerberos.keytab"))
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
index afa1a5f..c75e684 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
@@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletResponse
 import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
 import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
 import org.apache.spark.deploy.ClientArguments._
+import org.apache.spark.internal.config
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.util.Utils
 
@@ -132,12 +133,12 @@ private[rest] class StandaloneSubmitRequestServlet(
 
     // Optional fields
     val sparkProperties = request.sparkProperties
-    val driverMemory = sparkProperties.get("spark.driver.memory")
-    val driverCores = sparkProperties.get("spark.driver.cores")
-    val driverExtraJavaOptions = 
sparkProperties.get("spark.driver.extraJavaOptions")
-    val driverExtraClassPath = 
sparkProperties.get("spark.driver.extraClassPath")
-    val driverExtraLibraryPath = 
sparkProperties.get("spark.driver.extraLibraryPath")
-    val superviseDriver = sparkProperties.get("spark.driver.supervise")
+    val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
+    val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
+    val driverExtraJavaOptions = 
sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key)
+    val driverExtraClassPath = 
sparkProperties.get(config.DRIVER_CLASS_PATH.key)
+    val driverExtraLibraryPath = 
sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)
+    val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key)
     // The semantics of "spark.master" and the masterUrl are different. While 
the
     // property "spark.master" could contain all registered masters, masterUrl
     // contains only the active master. To make sure a Spark driver can recover
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
 
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
index 86ddf95..7f46214 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/SubmitRestProtocolRequest.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.rest
 
 import scala.util.Try
 
+import org.apache.spark.internal.config
 import org.apache.spark.util.Utils
 
 /**
@@ -49,11 +50,11 @@ private[rest] class CreateSubmissionRequest extends 
SubmitRestProtocolRequest {
     assertFieldIsSet(appArgs, "appArgs")
     assertFieldIsSet(environmentVariables, "environmentVariables")
     assertPropertyIsSet("spark.app.name")
-    assertPropertyIsBoolean("spark.driver.supervise")
-    assertPropertyIsNumeric("spark.driver.cores")
-    assertPropertyIsNumeric("spark.cores.max")
-    assertPropertyIsMemory("spark.driver.memory")
-    assertPropertyIsMemory("spark.executor.memory")
+    assertPropertyIsBoolean(config.DRIVER_SUPERVISE.key)
+    assertPropertyIsNumeric(config.DRIVER_CORES.key)
+    assertPropertyIsNumeric(config.CORES_MAX.key)
+    assertPropertyIsMemory(config.DRIVER_MEMORY.key)
+    assertPropertyIsMemory(config.EXECUTOR_MEMORY.key)
   }
 
   private def assertPropertyIsSet(key: String): Unit =
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 8d6a2b8..1e8ad0b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -23,7 +23,7 @@ import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.{DependencyUtils, SparkHadoopUtil, SparkSubmit}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.util._
 
@@ -43,7 +43,7 @@ object DriverWrapper extends Logging {
       case workerUrl :: userJar :: mainClass :: extraArgs =>
         val conf = new SparkConf()
         val host: String = Utils.localHostName()
-        val port: Int = sys.props.getOrElse("spark.driver.port", "0").toInt
+        val port: Int = sys.props.getOrElse(config.DRIVER_PORT.key, "0").toInt
         val rpcEnv = RpcEnv.create("Driver", host, port, conf, new 
SecurityManager(conf))
         logInfo(s"Driver address: ${rpcEnv.address}")
         rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, 
workerUrl))
@@ -51,7 +51,7 @@ object DriverWrapper extends Logging {
         val currentLoader = Thread.currentThread.getContextClassLoader
         val userJarUrl = new File(userJar).toURI().toURL()
         val loader =
-          if (sys.props.getOrElse("spark.driver.userClassPathFirst", 
"false").toBoolean) {
+          if (sys.props.getOrElse(config.DRIVER_USER_CLASS_PATH_FIRST.key, 
"false").toBoolean) {
             new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)
           } else {
             new MutableURLClassLoader(Array(userJarUrl), currentLoader)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index da80604..8caaa73 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -39,7 +39,12 @@ package object config {
   private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
     
ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false)
 
-  private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
+  private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
+    .doc("Number of cores to use for the driver process, only in cluster 
mode.")
+    .intConf
+    .createWithDefault(1)
+
+  private[spark] val DRIVER_MEMORY = ConfigBuilder(SparkLauncher.DRIVER_MEMORY)
     .doc("Amount of memory to use for the driver process, in MiB unless 
otherwise specified.")
     .bytesConf(ByteUnit.MiB)
     .createWithDefaultString("1g")
@@ -113,6 +118,9 @@ package object config {
   private[spark] val EVENT_LOG_CALLSITE_LONG_FORM =
     
ConfigBuilder("spark.eventLog.longForm.enabled").booleanConf.createWithDefault(false)
 
+  private[spark] val EXECUTOR_ID =
+    ConfigBuilder("spark.executor.id").stringConf.createOptional
+
   private[spark] val EXECUTOR_CLASS_PATH =
     
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
 
@@ -139,7 +147,11 @@ package object config {
   private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
     
ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.createWithDefault(false)
 
-  private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
+  private[spark] val EXECUTOR_CORES = 
ConfigBuilder(SparkLauncher.EXECUTOR_CORES)
+    .intConf
+    .createWithDefault(1)
+
+  private[spark] val EXECUTOR_MEMORY = 
ConfigBuilder(SparkLauncher.EXECUTOR_MEMORY)
     .doc("Amount of memory to use per executor process, in MiB unless 
otherwise specified.")
     .bytesConf(ByteUnit.MiB)
     .createWithDefaultString("1g")
@@ -150,6 +162,15 @@ package object config {
     .bytesConf(ByteUnit.MiB)
     .createOptional
 
+  private[spark] val CORES_MAX = ConfigBuilder("spark.cores.max")
+    .doc("When running on a standalone deploy cluster or a Mesos cluster in 
coarse-grained " +
+      "sharing mode, the maximum amount of CPU cores to request for the 
application from across " +
+      "the cluster (not from each machine). If not set, the default will be " +
+      "`spark.deploy.defaultCores` on Spark's standalone cluster manager, or 
infinite " +
+      "(all available cores) on Mesos.")
+    .intConf
+    .createOptional
+
   private[spark] val MEMORY_OFFHEAP_ENABLED = 
ConfigBuilder("spark.memory.offHeap.enabled")
     .doc("If true, Spark will attempt to use off-heap memory for certain 
operations. " +
       "If off-heap memory use is enabled, then spark.memory.offHeap.size must 
be positive.")
@@ -347,6 +368,17 @@ package object config {
     .stringConf
     .createWithDefault(Utils.localCanonicalHostName())
 
+  private[spark] val DRIVER_PORT = ConfigBuilder("spark.driver.port")
+    .doc("Port of driver endpoints.")
+    .intConf
+    .createWithDefault(0)
+
+  private[spark] val DRIVER_SUPERVISE = ConfigBuilder("spark.driver.supervise")
+    .doc("If true, restarts the driver automatically if it fails with a 
non-zero exit status. " +
+      "Only has effect in Spark standalone mode or Mesos cluster deploy mode.")
+    .booleanConf
+    .createWithDefault(false)
+
   private[spark] val DRIVER_BIND_ADDRESS = 
ConfigBuilder("spark.driver.bindAddress")
     .doc("Address where to bind network listen sockets on the driver.")
     .fallbackConf(DRIVER_HOST_ADDRESS)
@@ -729,4 +761,23 @@ package object config {
       .stringConf
       .toSequence
       .createWithDefault(Nil)
+
+  private[spark] val EXECUTOR_LOGS_ROLLING_STRATEGY =
+    
ConfigBuilder("spark.executor.logs.rolling.strategy").stringConf.createWithDefault("")
+
+  private[spark] val EXECUTOR_LOGS_ROLLING_TIME_INTERVAL =
+    
ConfigBuilder("spark.executor.logs.rolling.time.interval").stringConf.createWithDefault("daily")
+
+  private[spark] val EXECUTOR_LOGS_ROLLING_MAX_SIZE =
+    ConfigBuilder("spark.executor.logs.rolling.maxSize")
+      .stringConf
+      .createWithDefault((1024 * 1024).toString)
+
+  private[spark] val EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES =
+    
ConfigBuilder("spark.executor.logs.rolling.maxRetainedFiles").intConf.createWithDefault(-1)
+
+  private[spark] val EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION =
+    ConfigBuilder("spark.executor.logs.rolling.enableCompression")
+      .booleanConf
+      .createWithDefault(false)
 }
diff --git 
a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index a6f7db0..8286087 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.memory
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
 import org.apache.spark.storage.BlockId
 
 /**
@@ -127,14 +128,14 @@ private[spark] object StaticMemoryManager {
     if (systemMaxMemory < MIN_MEMORY_BYTES) {
       throw new IllegalArgumentException(s"System memory $systemMaxMemory must 
" +
         s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the 
--driver-memory " +
-        s"option or spark.driver.memory in Spark configuration.")
+        s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.")
     }
-    if (conf.contains("spark.executor.memory")) {
-      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
+    if (conf.contains(config.EXECUTOR_MEMORY)) {
+      val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
       if (executorMemory < MIN_MEMORY_BYTES) {
         throw new IllegalArgumentException(s"Executor memory $executorMemory 
must be at least " +
           s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
-          s"--executor-memory option or spark.executor.memory in Spark 
configuration.")
+          s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark 
configuration.")
       }
     }
     val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 78edd2c..9260fd3 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.memory
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
 import org.apache.spark.storage.BlockId
 
 /**
@@ -216,15 +217,15 @@ object UnifiedMemoryManager {
     if (systemMemory < minSystemMemory) {
       throw new IllegalArgumentException(s"System memory $systemMemory must " +
         s"be at least $minSystemMemory. Please increase heap size using the 
--driver-memory " +
-        s"option or spark.driver.memory in Spark configuration.")
+        s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.")
     }
     // SPARK-12759 Check executor memory to fail fast if memory is insufficient
-    if (conf.contains("spark.executor.memory")) {
-      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
+    if (conf.contains(config.EXECUTOR_MEMORY)) {
+      val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
       if (executorMemory < minSystemMemory) {
         throw new IllegalArgumentException(s"Executor memory $executorMemory 
must be at least " +
           s"$minSystemMemory. Please increase executor memory using the " +
-          s"--executor-memory option or spark.executor.memory in Spark 
configuration.")
+          s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark 
configuration.")
       }
     }
     val usableMemory = systemMemory - reservedMemory
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index 301317a7..b1e311a 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -130,7 +130,7 @@ private[spark] class MetricsSystem private (
   private[spark] def buildRegistryName(source: Source): String = {
     val metricsNamespace = 
conf.get(METRICS_NAMESPACE).orElse(conf.getOption("spark.app.id"))
 
-    val executorId = conf.getOption("spark.executor.id")
+    val executorId = conf.get(EXECUTOR_ID)
     val defaultName = MetricRegistry.name(source.sourceName)
 
     if (instance == "driver" || instance == "executor") {
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 6bf60dd..41f032c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -717,7 +717,7 @@ private[spark] class TaskSetManager(
     calculatedTasks += 1
     if (maxResultSize > 0 && totalResultSize > maxResultSize) {
       val msg = s"Total size of serialized results of ${calculatedTasks} tasks 
" +
-        s"(${Utils.bytesToString(totalResultSize)}) is bigger than 
spark.driver.maxResultSize " +
+        s"(${Utils.bytesToString(totalResultSize)}) is bigger than 
${config.MAX_RESULT_SIZE.key} " +
         s"(${Utils.bytesToString(maxResultSize)})"
       logError(msg)
       abort(msg)
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index f73a58f..adef20d 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -25,7 +25,7 @@ import scala.concurrent.Future
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.client.{StandaloneAppClient, 
StandaloneAppClientListener}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
 import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler._
@@ -54,7 +54,7 @@ private[spark] class StandaloneSchedulerBackend(
 
   private val registrationBarrier = new Semaphore(0)
 
-  private val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
+  private val maxCores = conf.get(config.CORES_MAX)
   private val totalExpectedCores = maxCores.getOrElse(0)
 
   override def start() {
@@ -69,8 +69,8 @@ private[spark] class StandaloneSchedulerBackend(
 
     // The endpoint for executors to talk to us
     val driverUrl = RpcEndpointAddress(
-      sc.conf.get("spark.driver.host"),
-      sc.conf.get("spark.driver.port").toInt,
+      sc.conf.get(config.DRIVER_HOST_ADDRESS),
+      sc.conf.get(config.DRIVER_PORT),
       CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
     val args = Seq(
       "--driver-url", driverUrl,
@@ -79,11 +79,11 @@ private[spark] class StandaloneSchedulerBackend(
       "--cores", "{{CORES}}",
       "--app-id", "{{APP_ID}}",
       "--worker-url", "{{WORKER_URL}}")
-    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
+    val extraJavaOpts = sc.conf.get(config.EXECUTOR_JAVA_OPTIONS)
       .map(Utils.splitCommandString).getOrElse(Seq.empty)
-    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
+    val classPathEntries = sc.conf.get(config.EXECUTOR_CLASS_PATH)
       .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
-    val libraryPathEntries = 
sc.conf.getOption("spark.executor.extraLibraryPath")
+    val libraryPathEntries = sc.conf.get(config.EXECUTOR_LIBRARY_PATH)
       .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
 
     // When testing, expose the parent class path to the child. This is 
processed by
@@ -102,7 +102,7 @@ private[spark] class StandaloneSchedulerBackend(
     val command = 
Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
       args, sc.executorEnvs, classPathEntries ++ testingClassPath, 
libraryPathEntries, javaOpts)
     val webUrl = sc.ui.map(_.webUrl).getOrElse("")
-    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
+    val coresPerExecutor = 
conf.getOption(config.EXECUTOR_CORES.key).map(_.toInt)
     // If we're using dynamic allocation, set our initial executor limit to 0 
for now.
     // ExecutorAllocationManager will send the real initial limit to the 
Master later.
     val initialExecutorLimit =
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
index 0de57fb..6ff8bf2 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.{Executor, ExecutorBackend}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
 import org.apache.spark.scheduler._
@@ -116,7 +116,7 @@ private[spark] class LocalSchedulerBackend(
    * @param conf Spark configuration.
    */
   def getUserClasspath(conf: SparkConf): Seq[URL] = {
-    val userClassPathStr = conf.getOption("spark.executor.extraClassPath")
+    val userClassPathStr = conf.get(config.EXECUTOR_CLASS_PATH)
     userClassPathStr.map(_.split(File.pathSeparator)).toSeq.flatten.map(new 
File(_).toURI.toURL)
   }
 
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala 
b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index e5cccf3..902e48f 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.util
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
 
 private[spark] object RpcUtils {
@@ -26,8 +27,8 @@ private[spark] object RpcUtils {
    * Retrieve a `RpcEndpointRef` which is located in the driver via its name.
    */
   def makeDriverRef(name: String, conf: SparkConf, rpcEnv: RpcEnv): 
RpcEndpointRef = {
-    val driverHost: String = conf.get("spark.driver.host", "localhost")
-    val driverPort: Int = conf.getInt("spark.driver.port", 7077)
+    val driverHost: String = conf.get(config.DRIVER_HOST_ADDRESS.key, 
"localhost")
+    val driverPort: Int = conf.getInt(config.DRIVER_PORT.key, 7077)
     Utils.checkHost(driverHost)
     rpcEnv.setupEndpointRef(RpcAddress(driverHost, driverPort), name)
   }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 22f074c..3527fee 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2231,7 +2231,7 @@ private[spark] object Utils extends Logging {
               s"${e.getMessage}: Service$serviceString failed after " +
                 s"$maxRetries retries (on a random free port)! " +
                 s"Consider explicitly setting the appropriate binding address 
for " +
-                s"the service$serviceString (for example 
spark.driver.bindAddress " +
+                s"the service$serviceString (for example 
${DRIVER_BIND_ADDRESS.key} " +
                 s"for SparkDriver) to the correct binding address."
             } else {
               s"${e.getMessage}: Service$serviceString failed after " +
diff --git 
a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala 
b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
index 2f9ad4c..3188e0b 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/FileAppender.scala
@@ -20,7 +20,7 @@ package org.apache.spark.util.logging
 import java.io.{File, FileOutputStream, InputStream, IOException}
 
 import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.util.{IntParam, Utils}
 
 /**
@@ -115,11 +115,9 @@ private[spark] object FileAppender extends Logging {
   /** Create the right appender based on Spark configuration */
   def apply(inputStream: InputStream, file: File, conf: SparkConf): 
FileAppender = {
 
-    import RollingFileAppender._
-
-    val rollingStrategy = conf.get(STRATEGY_PROPERTY, STRATEGY_DEFAULT)
-    val rollingSizeBytes = conf.get(SIZE_PROPERTY, STRATEGY_DEFAULT)
-    val rollingInterval = conf.get(INTERVAL_PROPERTY, INTERVAL_DEFAULT)
+    val rollingStrategy = conf.get(config.EXECUTOR_LOGS_ROLLING_STRATEGY)
+    val rollingSizeBytes = conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE)
+    val rollingInterval = conf.get(config.EXECUTOR_LOGS_ROLLING_TIME_INTERVAL)
 
     def createTimeBasedAppender(): FileAppender = {
       val validatedParams: Option[(Long, String)] = rollingInterval match {
diff --git 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
index 5d8cec8..59439b6 100644
--- 
a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala
@@ -24,6 +24,7 @@ import com.google.common.io.Files
 import org.apache.commons.io.IOUtils
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
 
 /**
  * Continuously appends data from input stream into the given file, and rolls
@@ -44,10 +45,8 @@ private[spark] class RollingFileAppender(
     bufferSize: Int = RollingFileAppender.DEFAULT_BUFFER_SIZE
   ) extends FileAppender(inputStream, activeFile, bufferSize) {
 
-  import RollingFileAppender._
-
-  private val maxRetainedFiles = conf.getInt(RETAINED_FILES_PROPERTY, -1)
-  private val enableCompression = conf.getBoolean(ENABLE_COMPRESSION, false)
+  private val maxRetainedFiles = 
conf.get(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES)
+  private val enableCompression = 
conf.get(config.EXECUTOR_LOGS_ROLLING_ENABLE_COMPRESSION)
 
   /** Stop the appender */
   override def stop() {
@@ -82,7 +81,7 @@ private[spark] class RollingFileAppender(
   // Roll the log file and compress if enableCompression is true.
   private def rotateFile(activeFile: File, rolloverFile: File): Unit = {
     if (enableCompression) {
-      val gzFile = new File(rolloverFile.getAbsolutePath + GZIP_LOG_SUFFIX)
+      val gzFile = new File(rolloverFile.getAbsolutePath + 
RollingFileAppender.GZIP_LOG_SUFFIX)
       var gzOutputStream: GZIPOutputStream = null
       var inputStream: InputStream = null
       try {
@@ -103,7 +102,7 @@ private[spark] class RollingFileAppender(
 
   // Check if the rollover file already exists.
   private def rolloverFileExist(file: File): Boolean = {
-    file.exists || new File(file.getAbsolutePath + GZIP_LOG_SUFFIX).exists
+    file.exists || new File(file.getAbsolutePath + 
RollingFileAppender.GZIP_LOG_SUFFIX).exists
   }
 
   /** Move the active log file to a new rollover file */
@@ -164,15 +163,7 @@ private[spark] class RollingFileAppender(
  * names of configurations that configure rolling file appenders.
  */
 private[spark] object RollingFileAppender {
-  val STRATEGY_PROPERTY = "spark.executor.logs.rolling.strategy"
-  val STRATEGY_DEFAULT = ""
-  val INTERVAL_PROPERTY = "spark.executor.logs.rolling.time.interval"
-  val INTERVAL_DEFAULT = "daily"
-  val SIZE_PROPERTY = "spark.executor.logs.rolling.maxSize"
-  val SIZE_DEFAULT = (1024 * 1024).toString
-  val RETAINED_FILES_PROPERTY = "spark.executor.logs.rolling.maxRetainedFiles"
   val DEFAULT_BUFFER_SIZE = 8192
-  val ENABLE_COMPRESSION = "spark.executor.logs.rolling.enableCompression"
 
   val GZIP_LOG_SUFFIX = ".gz"
 
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 5c718cb..d038923 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -155,7 +155,7 @@ class ExecutorAllocationManagerSuite
       .set("spark.dynamicAllocation.maxExecutors", "15")
       .set("spark.dynamicAllocation.minExecutors", "3")
       .set("spark.dynamicAllocation.executorAllocationRatio", divisor.toString)
-      .set("spark.executor.cores", cores.toString)
+      .set(config.EXECUTOR_CORES, cores)
     val sc = new SparkContext(conf)
     contexts += sc
     var manager = sc.executorAllocationManager.get
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index e14a5dc..9a6abbd 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -140,7 +140,7 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
 
   test("creating SparkContext with cpus per tasks bigger than cores per 
executors") {
     val conf = new SparkConf(false)
-      .set("spark.executor.cores", "1")
+      .set(EXECUTOR_CORES, 1)
       .set("spark.task.cpus", "2")
     intercept[SparkException] { sc = new SparkContext(conf) }
   }
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index a1d2a12..8567dd1 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -243,7 +243,7 @@ class StandaloneDynamicAllocationSuite
   }
 
   test("dynamic allocation with cores per executor") {
-    sc = new SparkContext(appConf.set("spark.executor.cores", "2"))
+    sc = new SparkContext(appConf.set(config.EXECUTOR_CORES, 2))
     val appId = sc.applicationId
     eventually(timeout(10.seconds), interval(10.millis)) {
       val apps = getApplications()
@@ -296,7 +296,7 @@ class StandaloneDynamicAllocationSuite
 
   test("dynamic allocation with cores per executor AND max cores") {
     sc = new SparkContext(appConf
-      .set("spark.executor.cores", "2")
+      .set(config.EXECUTOR_CORES, 2)
       .set("spark.cores.max", "8"))
     val appId = sc.applicationId
     eventually(timeout(10.seconds), interval(10.millis)) {
@@ -526,7 +526,7 @@ class StandaloneDynamicAllocationSuite
     new SparkConf()
       .setMaster(masterRpcEnv.address.toSparkURL)
       .setAppName("test")
-      .set("spark.executor.memory", "256m")
+      .set(config.EXECUTOR_MEMORY.key, "256m")
   }
 
   /** Make a master to which our application will send executor requests. */
diff --git 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index d56cfc1..5ce3453 100644
--- 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -248,7 +248,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
     val mm = UnifiedMemoryManager(conf, numCores = 1)
 
     // Try using an executor memory that's too small
-    val conf2 = conf.clone().set("spark.executor.memory", (reservedMemory / 
2).toString)
+    val conf2 = conf.clone().set(EXECUTOR_MEMORY.key, (reservedMemory / 
2).toString)
     val exception = intercept[IllegalArgumentException] {
       UnifiedMemoryManager(conf2, numCores = 1)
     }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index d264ada..f73ff67 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -655,7 +655,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
   }
 
   test("abort the job if total size of results is too large") {
-    val conf = new SparkConf().set("spark.driver.maxResultSize", "2m")
+    val conf = new SparkConf().set(config.MAX_RESULT_SIZE.key, "2m")
     sc = new SparkContext("local", "test", conf)
 
     def genBytes(size: Int): (Int) => Array[Byte] = { (x: Int) =>
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 3962bdc..19116cf 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -31,7 +31,7 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
+import org.apache.spark.internal.config.{DRIVER_PORT, MEMORY_OFFHEAP_SIZE}
 import org.apache.spark.memory.UnifiedMemoryManager
 import org.apache.spark.network.BlockTransferService
 import org.apache.spark.network.netty.NettyBlockTransferService
@@ -86,7 +86,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
     rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
 
     conf.set("spark.authenticate", "false")
-    conf.set("spark.driver.port", rpcEnv.address.port.toString)
+    conf.set(DRIVER_PORT, rpcEnv.address.port)
     conf.set("spark.testing", "true")
     conf.set("spark.memory.fraction", "1")
     conf.set("spark.memory.storageFraction", "1")
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index cf00c1c..e866342 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -124,7 +124,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
       .set("spark.storage.unrollMemoryThreshold", "512")
 
     rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
-    conf.set("spark.driver.port", rpcEnv.address.port.toString)
+    conf.set(DRIVER_PORT, rpcEnv.address.port)
 
     // Mock SparkContext to reduce the memory usage of tests. It's fine since 
the only reason we
     // need to create a SparkContext is to initialize LiveListenerBus.
diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala 
b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
index 52cd537..2421639 100644
--- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala
@@ -34,7 +34,7 @@ import org.mockito.Mockito.{atLeast, mock, verify}
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.util.logging.{FileAppender, RollingFileAppender, 
SizeBasedRollingPolicy, TimeBasedRollingPolicy}
 
 class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging 
{
@@ -136,7 +136,7 @@ class FileAppenderSuite extends SparkFunSuite with 
BeforeAndAfter with Logging {
     // setup input stream and appender
     val testOutputStream = new PipedOutputStream()
     val testInputStream = new PipedInputStream(testOutputStream, 100 * 1000)
-    val conf = new 
SparkConf().set(RollingFileAppender.RETAINED_FILES_PROPERTY, "10")
+    val conf = new 
SparkConf().set(config.EXECUTOR_LOGS_ROLLING_MAX_RETAINED_FILES, 10)
     val appender = new RollingFileAppender(testInputStream, testFile,
       new SizeBasedRollingPolicy(1000, false), conf, 10)
 
@@ -200,13 +200,12 @@ class FileAppenderSuite extends SparkFunSuite with 
BeforeAndAfter with Logging {
       appender.awaitTermination()
     }
 
-    import RollingFileAppender._
-
     def rollingStrategy(strategy: String): Seq[(String, String)] =
-      Seq(STRATEGY_PROPERTY -> strategy)
-    def rollingSize(size: String): Seq[(String, String)] = Seq(SIZE_PROPERTY 
-> size)
+      Seq(config.EXECUTOR_LOGS_ROLLING_STRATEGY.key -> strategy)
+    def rollingSize(size: String): Seq[(String, String)] =
+      Seq(config.EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> size)
     def rollingInterval(interval: String): Seq[(String, String)] =
-      Seq(INTERVAL_PROPERTY -> interval)
+      Seq(config.EXECUTOR_LOGS_ROLLING_TIME_INTERVAL.key -> interval)
 
     val msInDay = 24 * 60 * 60 * 1000L
     val msInHour = 60 * 60 * 1000L
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index 8362c14..d52988d 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -42,7 +42,7 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
     .getOrElse(throw new SparkException("Must specify the driver container 
image"))
 
   // CPU settings
-  private val driverCpuCores = conf.get("spark.driver.cores", "1")
+  private val driverCpuCores = conf.get(DRIVER_CORES.key, "1")
   private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES)
 
   // Memory settings
@@ -85,7 +85,7 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
       ("cpu", new QuantityBuilder(false).withAmount(limitCores).build())
     }
 
-    val driverPort = conf.sparkConf.getInt("spark.driver.port", 
DEFAULT_DRIVER_PORT)
+    val driverPort = conf.sparkConf.getInt(DRIVER_PORT.key, 
DEFAULT_DRIVER_PORT)
     val driverBlockManagerPort = conf.sparkConf.getInt(
       DRIVER_BLOCK_MANAGER_PORT.key,
       DEFAULT_BLOCKMANAGER_PORT
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index dd73a5e..6c3a6b3 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -46,8 +46,8 @@ private[spark] class BasicExecutorFeatureStep(
   private val executorPodNamePrefix = kubernetesConf.resourceNamePrefix
 
   private val driverUrl = RpcEndpointAddress(
-    kubernetesConf.get("spark.driver.host"),
-    kubernetesConf.sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT),
+    kubernetesConf.get(DRIVER_HOST_ADDRESS),
+    kubernetesConf.sparkConf.getInt(DRIVER_PORT.key, DEFAULT_DRIVER_PORT),
     CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
   private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY)
   private val executorMemoryString = kubernetesConf.get(
@@ -67,7 +67,7 @@ private[spark] class BasicExecutorFeatureStep(
       executorMemoryWithOverhead
     }
 
-  private val executorCores = 
kubernetesConf.sparkConf.getInt("spark.executor.cores", 1)
+  private val executorCores = kubernetesConf.sparkConf.get(EXECUTOR_CORES)
   private val executorCoresRequest =
     if (kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) {
       kubernetesConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
index 4230545..1567117 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
@@ -22,7 +22,7 @@ import io.fabric8.kubernetes.api.model.{HasMetadata, 
ServiceBuilder}
 
 import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.util.{Clock, SystemClock}
 
 private[spark] class DriverServiceFeatureStep(
@@ -51,18 +51,17 @@ private[spark] class DriverServiceFeatureStep(
   }
 
   private val driverPort = kubernetesConf.sparkConf.getInt(
-    "spark.driver.port", DEFAULT_DRIVER_PORT)
+    config.DRIVER_PORT.key, DEFAULT_DRIVER_PORT)
   private val driverBlockManagerPort = kubernetesConf.sparkConf.getInt(
-    org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, 
DEFAULT_BLOCKMANAGER_PORT)
+    config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT)
 
   override def configurePod(pod: SparkPod): SparkPod = pod
 
   override def getAdditionalPodSystemProperties(): Map[String, String] = {
     val driverHostname = 
s"$resolvedServiceName.${kubernetesConf.namespace}.svc"
     Map(DRIVER_HOST_KEY -> driverHostname,
-      "spark.driver.port" -> driverPort.toString,
-      org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key ->
-        driverBlockManagerPort.toString)
+      config.DRIVER_PORT.key -> driverPort.toString,
+      config.DRIVER_BLOCK_MANAGER_PORT.key -> driverBlockManagerPort.toString)
   }
 
   override def getAdditionalKubernetesResources(): Seq[HasMetadata] = {
@@ -90,8 +89,8 @@ private[spark] class DriverServiceFeatureStep(
 }
 
 private[spark] object DriverServiceFeatureStep {
-  val DRIVER_BIND_ADDRESS_KEY = 
org.apache.spark.internal.config.DRIVER_BIND_ADDRESS.key
-  val DRIVER_HOST_KEY = 
org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key
+  val DRIVER_BIND_ADDRESS_KEY = config.DRIVER_BIND_ADDRESS.key
+  val DRIVER_HOST_KEY = config.DRIVER_HOST_ADDRESS.key
   val DRIVER_SVC_POSTFIX = "-driver-svc"
   val MAX_SERVICE_NAME_LENGTH = 63
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index 5ceb9d6..27d59dd 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -46,7 +46,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
   test("Check the pod respects all configurations from the user.") {
     val sparkConf = new SparkConf()
       .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
-      .set("spark.driver.cores", "2")
+      .set(DRIVER_CORES, 2)
       .set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
       .set(DRIVER_MEMORY.key, "256M")
       .set(DRIVER_MEMORY_OVERHEAD, 200L)
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index e28c650..36bfb7d 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -29,7 +29,7 @@ import org.apache.spark.{SecurityManager, SparkConf, 
SparkFunSuite}
 import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, 
KubernetesTestConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.internal.config._
+import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -74,8 +74,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
       .set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, RESOURCE_NAME_PREFIX)
       .set(CONTAINER_IMAGE, EXECUTOR_IMAGE)
       .set(KUBERNETES_DRIVER_SUBMIT_CHECK, true)
-      .set(DRIVER_HOST_ADDRESS, DRIVER_HOSTNAME)
-      .set("spark.driver.port", DRIVER_PORT.toString)
+      .set(config.DRIVER_HOST_ADDRESS, DRIVER_HOSTNAME)
+      .set(config.DRIVER_PORT, DRIVER_PORT)
       .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS)
       .set("spark.kubernetes.resource.type", "java")
   }
@@ -125,8 +125,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
   }
 
   test("classpath and extra java options get translated into environment 
variables") {
-    baseConf.set(EXECUTOR_JAVA_OPTIONS, "foo=bar")
-    baseConf.set(EXECUTOR_CLASS_PATH, "bar=baz")
+    baseConf.set(config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
+    baseConf.set(config.EXECUTOR_CLASS_PATH, "bar=baz")
     val kconf = newExecutorConf(environment = Map("qux" -> "quux"))
     val step = new BasicExecutorFeatureStep(kconf, new 
SecurityManager(baseConf))
     val executor = step.configurePod(SparkPod.initialPod())
@@ -150,7 +150,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
 
   test("auth secret propagation") {
     val conf = baseConf.clone()
-      .set(NETWORK_AUTH_ENABLED, true)
+      .set(config.NETWORK_AUTH_ENABLED, true)
       .set("spark.master", "k8s://127.0.0.1")
 
     val secMgr = new SecurityManager(conf)
@@ -168,8 +168,8 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
     val secretFile = new File(secretDir, "secret-file.txt")
     Files.write(secretFile.toPath, 
"some-secret".getBytes(StandardCharsets.UTF_8))
     val conf = baseConf.clone()
-      .set(NETWORK_AUTH_ENABLED, true)
-      .set(AUTH_SECRET_FILE, secretFile.getAbsolutePath)
+      .set(config.NETWORK_AUTH_ENABLED, true)
+      .set(config.AUTH_SECRET_FILE, secretFile.getAbsolutePath)
       .set("spark.master", "k8s://127.0.0.1")
     val secMgr = new SecurityManager(conf)
     secMgr.initializeAuth()
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index 0452789..822f1e3 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -39,7 +39,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
 
   test("Headless service has a port for the driver RPC and the block 
manager.") {
     val sparkConf = new SparkConf(false)
-      .set("spark.driver.port", "9000")
+      .set(DRIVER_PORT, 9000)
       .set(DRIVER_BLOCK_MANAGER_PORT, 8080)
     val kconf = KubernetesTestConf.createDriverConf(
       sparkConf = sparkConf,
@@ -61,7 +61,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
 
   test("Hostname and ports are set according to the service name.") {
     val sparkConf = new SparkConf(false)
-      .set("spark.driver.port", "9000")
+      .set(DRIVER_PORT, 9000)
       .set(DRIVER_BLOCK_MANAGER_PORT, 8080)
       .set(KUBERNETES_NAMESPACE, "my-namespace")
     val kconf = KubernetesTestConf.createDriverConf(
@@ -87,7 +87,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite {
       
s"${kconf.resourceNamePrefix}${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}",
       resolvedService)
     val additionalProps = configurationStep.getAdditionalPodSystemProperties()
-    assert(additionalProps("spark.driver.port") === 
DEFAULT_DRIVER_PORT.toString)
+    assert(additionalProps(DRIVER_PORT.key) === DEFAULT_DRIVER_PORT.toString)
     assert(additionalProps(DRIVER_BLOCK_MANAGER_PORT.key) === 
DEFAULT_BLOCKMANAGER_PORT.toString)
   }
 
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index d134847..dd0b2ba 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -129,4 +129,7 @@ package object config {
         "when launching drivers. Default is to accept all offers with 
sufficient resources.")
       .stringConf
       .createWithDefault("")
+
+  private[spark] val EXECUTOR_URI =
+    ConfigBuilder("spark.executor.uri").stringConf.createOptional
 }
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
index 68f6921..a4aba3e 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala
@@ -27,6 +27,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, 
SparkConf}
 import org.apache.spark.deploy.Command
 import org.apache.spark.deploy.mesos.MesosDriverDescription
 import org.apache.spark.deploy.rest._
+import org.apache.spark.internal.config
 import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
 import org.apache.spark.util.Utils
 
@@ -92,12 +93,12 @@ private[mesos] class MesosSubmitRequestServlet(
 
     // Optional fields
     val sparkProperties = request.sparkProperties
-    val driverExtraJavaOptions = 
sparkProperties.get("spark.driver.extraJavaOptions")
-    val driverExtraClassPath = 
sparkProperties.get("spark.driver.extraClassPath")
-    val driverExtraLibraryPath = 
sparkProperties.get("spark.driver.extraLibraryPath")
-    val superviseDriver = sparkProperties.get("spark.driver.supervise")
-    val driverMemory = sparkProperties.get("spark.driver.memory")
-    val driverCores = sparkProperties.get("spark.driver.cores")
+    val driverExtraJavaOptions = 
sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key)
+    val driverExtraClassPath = 
sparkProperties.get(config.DRIVER_CLASS_PATH.key)
+    val driverExtraLibraryPath = 
sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)
+    val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key)
+    val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
+    val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
     val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)
 
     // Construct driver description
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index cb1bcba..021b1ac 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -32,6 +32,7 @@ import org.apache.mesos.Protos.TaskStatus.Reason
 import org.apache.spark.{SecurityManager, SparkConf, SparkException, TaskState}
 import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
 import org.apache.spark.deploy.rest.{CreateSubmissionResponse, 
KillSubmissionResponse, SubmissionStatusResponse}
+import org.apache.spark.internal.config.{CORES_MAX, EXECUTOR_LIBRARY_PATH, 
EXECUTOR_MEMORY}
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.Utils
 
@@ -365,8 +366,7 @@ private[spark] class MesosClusterScheduler(
   }
 
   private def getDriverExecutorURI(desc: MesosDriverDescription): 
Option[String] = {
-    desc.conf.getOption("spark.executor.uri")
-      .orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
+    
desc.conf.get(config.EXECUTOR_URI).orElse(desc.command.environment.get("SPARK_EXECUTOR_URI"))
   }
 
   private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
@@ -474,7 +474,7 @@ private[spark] class MesosClusterScheduler(
     } else if (executorUri.isDefined) {
       val folderBasename = executorUri.get.split('/').last.split('.').head
 
-      val entries = conf.getOption("spark.executor.extraLibraryPath")
+      val entries = conf.get(EXECUTOR_LIBRARY_PATH)
         .map(path => Seq(path) ++ desc.command.libraryPathEntries)
         .getOrElse(desc.command.libraryPathEntries)
 
@@ -528,10 +528,10 @@ private[spark] class MesosClusterScheduler(
       options ++= Seq("--class", desc.command.mainClass)
     }
 
-    desc.conf.getOption("spark.executor.memory").foreach { v =>
+    desc.conf.getOption(EXECUTOR_MEMORY.key).foreach { v =>
       options ++= Seq("--executor-memory", v)
     }
-    desc.conf.getOption("spark.cores.max").foreach { v =>
+    desc.conf.getOption(CORES_MAX.key).foreach { v =>
       options ++= Seq("--total-executor-cores", v)
     }
     desc.conf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index f586665..d017451 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -33,7 +33,6 @@ import org.apache.spark.{SecurityManager, SparkConf, 
SparkContext, SparkExceptio
 import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.config
-import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_INTERVAL
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
 import org.apache.spark.network.netty.SparkTransportConf
 import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
@@ -63,9 +62,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
   // Blacklist a slave after this many failures
   private val MAX_SLAVE_FAILURES = 2
 
-  private val maxCoresOption = conf.getOption("spark.cores.max").map(_.toInt)
+  private val maxCoresOption = conf.get(config.CORES_MAX)
 
-  private val executorCoresOption = 
conf.getOption("spark.executor.cores").map(_.toInt)
+  private val executorCoresOption = 
conf.getOption(config.EXECUTOR_CORES.key).map(_.toInt)
 
   private val minCoresPerExecutor = executorCoresOption.getOrElse(1)
 
@@ -220,18 +219,18 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
 
   def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo 
= {
     val environment = Environment.newBuilder()
-    val extraClassPath = conf.getOption("spark.executor.extraClassPath")
+    val extraClassPath = conf.get(config.EXECUTOR_CLASS_PATH)
     extraClassPath.foreach { cp =>
       environment.addVariables(
         
Environment.Variable.newBuilder().setName("SPARK_EXECUTOR_CLASSPATH").setValue(cp).build())
     }
-    val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions").map {
+    val extraJavaOpts = conf.get(config.EXECUTOR_JAVA_OPTIONS).map {
       Utils.substituteAppNExecIds(_, appId, taskId)
     }.getOrElse("")
 
     // Set the environment variable through a command prefix
     // to append to the existing value of the variable
-    val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p 
=>
+    val prefixEnv = conf.get(config.EXECUTOR_LIBRARY_PATH).map { p =>
       Utils.libraryPathEnvPrefix(Seq(p))
     }.getOrElse("")
 
@@ -261,8 +260,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
     val command = CommandInfo.newBuilder()
       .setEnvironment(environment)
 
-    val uri = conf.getOption("spark.executor.uri")
-      .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
+    val uri = 
conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
 
     if (uri.isEmpty) {
       val executorSparkHome = conf.getOption("spark.mesos.executor.home")
@@ -304,8 +302,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
       "driverURL"
     } else {
       RpcEndpointAddress(
-        conf.get("spark.driver.host"),
-        conf.get("spark.driver.port").toInt,
+        conf.get(config.DRIVER_HOST_ADDRESS),
+        conf.get(config.DRIVER_PORT),
         CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
     }
   }
@@ -633,7 +631,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
             externalShufflePort,
             sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
               s"${sc.conf.getTimeAsSeconds("spark.network.timeout", 
"120s")}s"),
-            sc.conf.get(EXECUTOR_HEARTBEAT_INTERVAL))
+            sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL))
         slave.shuffleRegistered = true
       }
 
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 0bb6fe0..192f940 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -28,8 +28,9 @@ import org.apache.mesos.SchedulerDriver
 import org.apache.mesos.protobuf.ByteString
 
 import org.apache.spark.{SparkContext, SparkException, TaskState}
-import org.apache.spark.deploy.mesos.config
+import org.apache.spark.deploy.mesos.config.EXECUTOR_URI
 import org.apache.spark.executor.MesosExecutorBackend
+import org.apache.spark.internal.config
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.Utils
@@ -107,15 +108,15 @@ private[spark] class MesosFineGrainedSchedulerBackend(
       throw new SparkException("Executor Spark home 
`spark.mesos.executor.home` is not set!")
     }
     val environment = Environment.newBuilder()
-    sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
+    sc.conf.get(config.EXECUTOR_CLASS_PATH).foreach { cp =>
       environment.addVariables(
         
Environment.Variable.newBuilder().setName("SPARK_EXECUTOR_CLASSPATH").setValue(cp).build())
     }
-    val extraJavaOpts = 
sc.conf.getOption("spark.executor.extraJavaOptions").map {
+    val extraJavaOpts = sc.conf.get(config.EXECUTOR_JAVA_OPTIONS).map {
       Utils.substituteAppNExecIds(_, appId, execId)
     }.getOrElse("")
 
-    val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { 
p =>
+    val prefixEnv = sc.conf.get(config.EXECUTOR_LIBRARY_PATH).map { p =>
       Utils.libraryPathEnvPrefix(Seq(p))
     }.getOrElse("")
 
@@ -132,8 +133,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
     }
     val command = CommandInfo.newBuilder()
       .setEnvironment(environment)
-    val uri = sc.conf.getOption("spark.executor.uri")
-      .orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
+    val uri = 
sc.conf.get(EXECUTOR_URI).orElse(Option(System.getenv("SPARK_EXECUTOR_URI")))
 
     val executorBackendName = classOf[MesosExecutorBackend].getName
     if (uri.isEmpty) {
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e46c4f9..8dbdac1 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -470,8 +470,8 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
         rpcEnv = sc.env.rpcEnv
 
         val userConf = sc.getConf
-        val host = userConf.get("spark.driver.host")
-        val port = userConf.get("spark.driver.port").toInt
+        val host = userConf.get(DRIVER_HOST_ADDRESS)
+        val port = userConf.get(DRIVER_PORT)
         registerAM(host, port, userConf, sc.ui.map(_.webUrl))
 
         val driverRef = rpcEnv.setupEndpointRef(
@@ -505,7 +505,7 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
       amCores, true)
 
     // The client-mode AM doesn't listen for incoming connections, so report 
an invalid port.
-    registerAM(hostname, -1, sparkConf, 
sparkConf.getOption("spark.driver.appUIAddress"))
+    registerAM(hostname, -1, sparkConf, sparkConf.get(DRIVER_APP_UI_ADDRESS))
 
     // The driver should be up and listening, so unlike cluster mode, just try 
to connect to it
     // with no waiting or retrying.
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index b257d8f..7e9cd40 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -224,16 +224,12 @@ package object config {
 
   /* Driver configuration. */
 
-  private[spark] val DRIVER_CORES = ConfigBuilder("spark.driver.cores")
-    .intConf
-    .createWithDefault(1)
+  private[spark] val DRIVER_APP_UI_ADDRESS = 
ConfigBuilder("spark.driver.appUIAddress")
+    .stringConf
+    .createOptional
 
   /* Executor configuration. */
 
-  private[spark] val EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
-    .intConf
-    .createWithDefault(1)
-
   private[spark] val EXECUTOR_NODE_LABEL_EXPRESSION =
     ConfigBuilder("spark.yarn.executor.nodeLabelExpression")
       .doc("Node label expression for executors.")
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 9397a1e..167eef1 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState
 import org.apache.spark.{SparkContext, SparkException}
 import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnAppReport}
 import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.launcher.SparkAppHandle
 import org.apache.spark.scheduler.TaskSchedulerImpl
 
@@ -42,10 +42,10 @@ private[spark] class YarnClientSchedulerBackend(
    * This waits until the application is running.
    */
   override def start() {
-    val driverHost = conf.get("spark.driver.host")
-    val driverPort = conf.get("spark.driver.port")
+    val driverHost = conf.get(config.DRIVER_HOST_ADDRESS)
+    val driverPort = conf.get(config.DRIVER_PORT)
     val hostport = driverHost + ":" + driverPort
-    sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) }
+    sc.ui.foreach { ui => conf.set(DRIVER_APP_UI_ADDRESS, ui.webUrl) }
 
     val argsArrayBuf = new ArrayBuffer[String]()
     argsArrayBuf += ("--arg", hostport)
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
index 8032213..9e3cc6e 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
@@ -24,7 +24,7 @@ import org.scalatest.Matchers
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
 import 
org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
 import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.config.{DRIVER_MEMORY, EXECUTOR_MEMORY}
+import org.apache.spark.internal.config.{DRIVER_CORES, DRIVER_MEMORY, 
EXECUTOR_CORES, EXECUTOR_MEMORY}
 
 class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to