This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bf0794  [SPARK-26463][CORE] Use ConfigEntry for hardcoded configs for 
scheduler categories.
7bf0794 is described below

commit 7bf0794651f4d11547325539ebf7131a57ee1ba2
Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
AuthorDate: Tue Jan 22 07:44:36 2019 -0600

    [SPARK-26463][CORE] Use ConfigEntry for hardcoded configs for scheduler 
categories.
    
    ## What changes were proposed in this pull request?
    
    The PR makes hardcoded `spark.dynamicAllocation`, `spark.scheduler`, 
`spark.rpc`, `spark.task`, `spark.speculation`, and `spark.cleaner` configs to 
use `ConfigEntry`.
    
    ## How was this patch tested?
    
    Existing tests
    
    Closes #23416 from kiszk/SPARK-26463.
    
    Authored-by: Kazuaki Ishizaki <ishiz...@jp.ibm.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../scala/org/apache/spark/ContextCleaner.scala    |  11 +-
 .../apache/spark/ExecutorAllocationManager.scala   |  39 +++---
 .../scala/org/apache/spark/HeartbeatReceiver.scala |  16 +--
 .../scala/org/apache/spark/SecurityManager.scala   |   2 +-
 .../main/scala/org/apache/spark/SparkConf.scala    |  26 ++--
 .../main/scala/org/apache/spark/SparkContext.scala |   3 +-
 .../src/main/scala/org/apache/spark/SparkEnv.scala |   4 +-
 .../scala/org/apache/spark/deploy/Client.scala     |   5 +-
 .../apache/spark/deploy/SparkSubmitArguments.scala |   3 +-
 .../scala/org/apache/spark/executor/Executor.scala |  12 +-
 .../org/apache/spark/internal/config/Network.scala |  93 +++++++++++++++
 .../org/apache/spark/internal/config/package.scala | 131 +++++++++++++++++++--
 .../org/apache/spark/rdd/PairRDDFunctions.scala    |   3 +-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala |   5 +-
 .../spark/rdd/ReliableRDDCheckpointData.scala      |   3 +-
 .../org/apache/spark/rpc/netty/Dispatcher.scala    |   5 +-
 .../org/apache/spark/rpc/netty/NettyRpcEnv.scala   |   7 +-
 .../scheduler/BarrierJobAllocationFailed.scala     |   3 +-
 .../apache/spark/scheduler/BlacklistTracker.scala  |   6 +-
 .../spark/scheduler/SchedulableBuilder.scala       |  10 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  13 +-
 .../apache/spark/scheduler/TaskSetManager.scala    |  11 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  12 +-
 .../apache/spark/status/AppStatusListener.scala    |   7 +-
 .../org/apache/spark/storage/BlockManager.scala    |   6 +-
 .../org/apache/spark/ui/UIWorkloadGenerator.scala  |   5 +-
 .../org/apache/spark/ui/jobs/AllJobsPage.scala     |   3 +-
 .../scala/org/apache/spark/ui/jobs/JobsTab.scala   |   3 +-
 .../scala/org/apache/spark/ui/jobs/StagesTab.scala |   3 +-
 .../scala/org/apache/spark/util/RpcUtils.scala     |  13 +-
 .../main/scala/org/apache/spark/util/Utils.scala   |   4 +-
 .../spark/BarrierStageOnSubmittedSuite.scala       |  30 ++---
 .../org/apache/spark/ContextCleanerSuite.scala     |  14 +--
 .../spark/ExecutorAllocationManagerSuite.scala     |  52 ++++----
 .../org/apache/spark/HeartbeatReceiverSuite.scala  |   3 +-
 .../org/apache/spark/JobCancellationSuite.scala    |  23 ++--
 .../org/apache/spark/MapOutputTrackerSuite.scala   |  13 +-
 .../scala/org/apache/spark/SparkConfSuite.scala    |  17 +--
 .../scala/org/apache/spark/SparkContextSuite.scala |   2 +-
 .../org/apache/spark/deploy/SparkSubmitSuite.scala |   4 +-
 .../deploy/StandaloneDynamicAllocationSuite.scala  |   6 +-
 .../netty/NettyBlockTransferSecuritySuite.scala    |   5 +-
 .../scala/org/apache/spark/rpc/RpcEnvSuite.scala   |  13 +-
 .../scheduler/BlacklistIntegrationSuite.scala      |   4 +-
 .../spark/scheduler/BlacklistTrackerSuite.scala    |   8 +-
 .../CoarseGrainedSchedulerBackendSuite.scala       |   8 +-
 .../org/apache/spark/scheduler/PoolSuite.scala     |  12 +-
 .../scheduler/SchedulerIntegrationSuite.scala      |   3 +-
 .../spark/scheduler/SparkListenerSuite.scala       |   3 +-
 .../spark/scheduler/TaskResultGetterSuite.scala    |   3 +-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   |  10 +-
 .../spark/scheduler/TaskSetManagerSuite.scala      |  40 +++----
 .../KryoSerializerDistributedSuite.scala           |   2 +-
 .../spark/status/AppStatusListenerSuite.scala      |   2 +-
 .../scala/org/apache/spark/util/UtilsSuite.scala   |  14 +--
 .../mllib/util/LocalClusterSparkContext.scala      |   3 +-
 .../k8s/KubernetesClusterSchedulerBackend.scala    |   3 +-
 .../mesos/MesosCoarseGrainedSchedulerBackend.scala |   3 +-
 .../MesosCoarseGrainedSchedulerBackendSuite.scala  |   6 +-
 .../scheduler/cluster/YarnSchedulerBackend.scala   |   3 +-
 .../deploy/yarn/YarnShuffleIntegrationSuite.scala  |   5 +-
 .../org/apache/spark/sql/RuntimeConfigSuite.scala  |   2 +-
 .../streaming/state/StateStoreSuite.scala          |   3 +-
 .../execution/ui/SQLAppStatusListenerSuite.scala   |   2 +-
 .../sql/sources/v2/SimpleWritableDataSource.scala  |   1 +
 .../SparkExecuteStatementOperation.scala           |   6 +-
 .../spark/sql/hive/execution/HiveFileFormat.scala  |   3 +-
 .../scheduler/ExecutorAllocationManagerSuite.scala |   5 +-
 68 files changed, 522 insertions(+), 281 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 4d884de..305ec46 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
 import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils, 
Utils}
 
@@ -83,8 +84,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends 
Logging {
    * on the driver, this may happen very occasionally or not at all. Not 
cleaning at all may
    * lead to executors running out of disk space after a while.
    */
-  private val periodicGCInterval =
-    sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min")
+  private val periodicGCInterval = sc.conf.get(CLEANER_PERIODIC_GC_INTERVAL)
 
   /**
    * Whether the cleaning thread will block on cleanup tasks (other than 
shuffle, which
@@ -96,8 +96,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends 
Logging {
    * for instance, when the driver performs a GC and cleans up all broadcast 
blocks that are no
    * longer in scope.
    */
-  private val blockOnCleanupTasks = sc.conf.getBoolean(
-    "spark.cleaner.referenceTracking.blocking", true)
+  private val blockOnCleanupTasks = 
sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING)
 
   /**
    * Whether the cleaning thread will block on shuffle cleanup tasks.
@@ -109,8 +108,8 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
    * until the real RPC issue (referred to in the comment above 
`blockOnCleanupTasks`) is
    * resolved.
    */
-  private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
-    "spark.cleaner.referenceTracking.blocking.shuffle", false)
+  private val blockOnShuffleCleanupTasks =
+    sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE)
 
   @volatile private var stopped = false
 
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 0807e65..c9da30e 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -107,28 +107,25 @@ private[spark] class ExecutorAllocationManager(
   private val initialNumExecutors = 
Utils.getDynamicAllocationInitialExecutors(conf)
 
   // How long there must be backlogged tasks for before an addition is 
triggered (seconds)
-  private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
-    "spark.dynamicAllocation.schedulerBacklogTimeout", "1s")
+  private val schedulerBacklogTimeoutS = 
conf.get(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT)
 
   // Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded
-  private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds(
-    "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", 
s"${schedulerBacklogTimeoutS}s")
+  private val sustainedSchedulerBacklogTimeoutS =
+    conf.get(DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT)
 
   // How long an executor must be idle for before it is removed (seconds)
-  private val executorIdleTimeoutS = conf.getTimeAsSeconds(
-    "spark.dynamicAllocation.executorIdleTimeout", "60s")
+  private val executorIdleTimeoutS = 
conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT)
 
-  private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
-    "spark.dynamicAllocation.cachedExecutorIdleTimeout", 
s"${Integer.MAX_VALUE}s")
+  private val cachedExecutorIdleTimeoutS = 
conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)
 
   // During testing, the methods to actually kill and add executors are mocked 
out
-  private val testing = conf.getBoolean("spark.dynamicAllocation.testing", 
false)
+  private val testing = conf.get(DYN_ALLOCATION_TESTING)
 
   // TODO: The default value of 1 for spark.executor.cores works right now 
because dynamic
   // allocation is only supported for YARN and the default number of cores per 
executor in YARN is
   // 1, but it might need to be attained differently for different cluster 
managers
   private val tasksPerExecutorForFullParallelism =
-    conf.get(EXECUTOR_CORES) / conf.getInt("spark.task.cpus", 1)
+    conf.get(EXECUTOR_CORES) / conf.get(CPUS_PER_TASK)
 
   private val executorAllocationRatio =
     conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
@@ -195,27 +192,29 @@ private[spark] class ExecutorAllocationManager(
    */
   private def validateSettings(): Unit = {
     if (minNumExecutors < 0 || maxNumExecutors < 0) {
-      throw new SparkException("spark.dynamicAllocation.{min/max}Executors 
must be positive!")
+      throw new SparkException(
+        s"${DYN_ALLOCATION_MIN_EXECUTORS.key} and 
${DYN_ALLOCATION_MAX_EXECUTORS.key} must be " +
+          "positive!")
     }
     if (maxNumExecutors == 0) {
-      throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 
0!")
+      throw new SparkException(s"${DYN_ALLOCATION_MAX_EXECUTORS.key} cannot be 
0!")
     }
     if (minNumExecutors > maxNumExecutors) {
-      throw new SparkException(s"spark.dynamicAllocation.minExecutors 
($minNumExecutors) must " +
-        s"be less than or equal to spark.dynamicAllocation.maxExecutors 
($maxNumExecutors)!")
+      throw new SparkException(s"${DYN_ALLOCATION_MIN_EXECUTORS.key} 
($minNumExecutors) must " +
+        s"be less than or equal to ${DYN_ALLOCATION_MAX_EXECUTORS.key} 
($maxNumExecutors)!")
     }
     if (schedulerBacklogTimeoutS <= 0) {
-      throw new 
SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!")
+      throw new 
SparkException(s"${DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
     }
     if (sustainedSchedulerBacklogTimeoutS <= 0) {
       throw new SparkException(
-        "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 
0!")
+        s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 
0!")
     }
     if (executorIdleTimeoutS < 0) {
-      throw new SparkException("spark.dynamicAllocation.executorIdleTimeout 
must be >= 0!")
+      throw new SparkException(s"${DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key} 
must be >= 0!")
     }
     if (cachedExecutorIdleTimeoutS < 0) {
-      throw new 
SparkException("spark.dynamicAllocation.cachedExecutorIdleTimeout must be >= 
0!")
+      throw new 
SparkException(s"${DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key} must be >= 
0!")
     }
     // Require external shuffle service for dynamic allocation
     // Otherwise, we may lose shuffle files when killing executors
@@ -224,12 +223,12 @@ private[spark] class ExecutorAllocationManager(
         "shuffle service. You may enable this through 
spark.shuffle.service.enabled.")
     }
     if (tasksPerExecutorForFullParallelism == 0) {
-      throw new SparkException(s"${EXECUTOR_CORES.key} must not be < 
spark.task.cpus.")
+      throw new SparkException(s"${EXECUTOR_CORES.key} must not be < 
${CPUS_PER_TASK.key}.")
     }
 
     if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
       throw new SparkException(
-        "spark.dynamicAllocation.executorAllocationRatio must be > 0 and <= 
1.0")
+        s"${DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO.key} must be > 0 and <= 
1.0")
     }
   }
 
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index 67f2c27..55f2ae6 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -24,6 +24,7 @@ import scala.concurrent.Future
 
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.config.Network
 import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.BlockManagerId
@@ -74,18 +75,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
   // executor ID -> timestamp of when the last heartbeat from this executor 
was received
   private val executorLastSeen = new mutable.HashMap[String, Long]
 
-  // "spark.network.timeout" uses "seconds", while 
`spark.storage.blockManagerSlaveTimeoutMs` uses
-  // "milliseconds"
-  private val executorTimeoutMs =
-    sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
-      s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s")
-
-  // "spark.network.timeoutInterval" uses "seconds", while
-  // "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
-  private val timeoutIntervalMs =
-    sc.conf.get(config.STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL)
-  private val checkTimeoutIntervalMs =
-    sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", 
s"${timeoutIntervalMs}ms") * 1000
+  private val executorTimeoutMs = 
sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT)
+
+  private val checkTimeoutIntervalMs = 
sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL)
 
   private var timeoutCheckingTask: ScheduledFuture[_] = null
 
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index c64fdc0..0661b30 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -288,7 +288,7 @@ private[spark] class SecurityManager(
    * @return Whether to enable encryption when connecting to services that 
support it.
    */
   def isEncryptionEnabled(): Boolean = {
-    sparkConf.get(NETWORK_ENCRYPTION_ENABLED) || 
sparkConf.get(SASL_ENCRYPTION_ENABLED)
+    sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || 
sparkConf.get(SASL_ENCRYPTION_ENABLED)
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b596be0..7d49f10 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.History._
 import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.internal.config.Network._
 import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.util.Utils
 
@@ -576,26 +577,27 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
       }
     }
 
-    if (contains(EXECUTOR_CORES) && contains("spark.task.cpus")) {
+    if (contains(EXECUTOR_CORES) && contains(CPUS_PER_TASK)) {
       val executorCores = get(EXECUTOR_CORES)
-      val taskCpus = getInt("spark.task.cpus", 1)
+      val taskCpus = get(CPUS_PER_TASK)
 
       if (executorCores < taskCpus) {
-        throw new SparkException(s"${EXECUTOR_CORES.key} must not be less than 
spark.task.cpus.")
+        throw new SparkException(
+          s"${EXECUTOR_CORES.key} must not be less than ${CPUS_PER_TASK.key}.")
       }
     }
 
-    val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || 
get(SASL_ENCRYPTION_ENABLED)
+    val encryptionEnabled = get(NETWORK_CRYPTO_ENABLED) || 
get(SASL_ENCRYPTION_ENABLED)
     require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED),
       s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.")
 
-    val executorTimeoutThresholdMs =
-      getTimeAsSeconds("spark.network.timeout", "120s") * 1000
+    val executorTimeoutThresholdMs = get(NETWORK_TIMEOUT) * 1000
     val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL)
+    val networkTimeout = NETWORK_TIMEOUT.key
     // If spark.executor.heartbeatInterval bigger than spark.network.timeout,
     // it will almost always cause ExecutorLostFailure. See SPARK-22754.
     require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The 
value of " +
-      s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be greater 
than the value of " +
+      s"${networkTimeout}=${executorTimeoutThresholdMs}ms must be no less than 
the value of " +
       s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.")
   }
 
@@ -680,13 +682,13 @@ private[spark] object SparkConf extends Logging {
       AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
     IO_COMPRESSION_LZ4_BLOCKSIZE.key -> Seq(
       AlternateConfig("spark.io.compression.lz4.block.size", "1.4")),
-    "spark.rpc.numRetries" -> Seq(
+    RPC_NUM_RETRIES.key -> Seq(
       AlternateConfig("spark.akka.num.retries", "1.4")),
-    "spark.rpc.retry.wait" -> Seq(
+    RPC_RETRY_WAIT.key -> Seq(
       AlternateConfig("spark.akka.retry.wait", "1.4")),
-    "spark.rpc.askTimeout" -> Seq(
+    RPC_ASK_TIMEOUT.key -> Seq(
       AlternateConfig("spark.akka.askTimeout", "1.4")),
-    "spark.rpc.lookupTimeout" -> Seq(
+    RPC_LOOKUP_TIMEOUT.key -> Seq(
       AlternateConfig("spark.akka.lookupTimeout", "1.4")),
     "spark.streaming.fileStream.minRememberDuration" -> Seq(
       AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
@@ -694,7 +696,7 @@ private[spark] object SparkConf extends Logging {
       AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
     MEMORY_OFFHEAP_ENABLED.key -> Seq(
       AlternateConfig("spark.unsafe.offHeap", "1.6")),
-    "spark.rpc.message.maxSize" -> Seq(
+    RPC_MESSAGE_MAX_SIZE.key -> Seq(
       AlternateConfig("spark.akka.frameSize", "1.6")),
     "spark.yarn.jars" -> Seq(
       AlternateConfig("spark.yarn.jar", "2.0")),
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e0c0635..be70d42 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -553,7 +553,7 @@ class SparkContext(config: SparkConf) extends Logging {
     _executorAllocationManager.foreach(_.start())
 
     _cleaner =
-      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
+      if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
         Some(new ContextCleaner(this))
       } else {
         None
@@ -2538,6 +2538,7 @@ object SparkContext extends Logging {
   private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"
   private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
   private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = 
"spark.job.interruptOnCancel"
+  private[spark] val SPARK_SCHEDULER_POOL = "spark.scheduler.pool"
   private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope"
   private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride"
 
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 36998d1..93d5cd7 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -417,8 +417,8 @@ object SparkEnv extends Logging {
     // Spark properties
     // This includes the scheduling mode whether or not it is configured (used 
by SparkUI)
     val schedulerMode =
-      if (!conf.contains("spark.scheduler.mode")) {
-        Seq(("spark.scheduler.mode", schedulingMode))
+      if (!conf.contains(SCHEDULER_MODE)) {
+        Seq((SCHEDULER_MODE.key, schedulingMode))
       } else {
         Seq.empty[(String, String)]
       }
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala 
b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index d94b174..e65a494 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
 import org.apache.spark.util.{SparkExitCode, ThreadUtils, Utils}
 
@@ -226,8 +227,8 @@ private[spark] class ClientApp extends SparkApplication {
   override def start(args: Array[String], conf: SparkConf): Unit = {
     val driverArgs = new ClientArguments(args)
 
-    if (!conf.contains("spark.rpc.askTimeout")) {
-      conf.set("spark.rpc.askTimeout", "10s")
+    if (!conf.contains(RPC_ASK_TIMEOUT)) {
+      conf.set(RPC_ASK_TIMEOUT, "10s")
     }
     Logger.getRootLogger.setLevel(driverArgs.logLevel)
 
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 9692d2a..e23d1f8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -32,6 +32,7 @@ import scala.util.Try
 import org.apache.spark.{SparkException, SparkUserAppException}
 import org.apache.spark.deploy.SparkSubmitAction._
 import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.config.DYN_ALLOCATION_ENABLED
 import org.apache.spark.launcher.SparkSubmitArgumentsParser
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.util.Utils
@@ -208,7 +209,7 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
       .orElse(sparkProperties.get("spark.yarn.principal"))
       .orNull
     dynamicAllocationEnabled =
-      
sparkProperties.get("spark.dynamicAllocation.enabled").exists("true".equalsIgnoreCase)
+      
sparkProperties.get(DYN_ALLOCATION_ENABLED.key).exists("true".equalsIgnoreCase)
 
     // Try to set main class from JAR if no --class argument is given
     if (mainClass == null && !isPython && !isR && primaryResource != null) {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a3644e7..fccb1ac 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -124,7 +124,7 @@ private[spark] class Executor(
   private val userClassPathFirst = conf.get(EXECUTOR_USER_CLASS_PATH_FIRST)
 
   // Whether to monitor killed / interrupted tasks
-  private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", 
false)
+  private val taskReaperEnabled = conf.get(TASK_REAPER_ENABLED)
 
   // Create our ClassLoader
   // do this after SparkEnv creation so can access the SecurityManager
@@ -163,7 +163,7 @@ private[spark] class Executor(
   // Max size of direct result. If task result is bigger than this, we use the 
block manager
   // to send the result back.
   private val maxDirectResultSize = Math.min(
-    conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20),
+    conf.get(TASK_MAX_DIRECT_RESULT_SIZE),
     RpcUtils.maxMessageSizeBytes(conf))
 
   private val maxResultSize = conf.get(MAX_RESULT_SIZE)
@@ -667,13 +667,11 @@ private[spark] class Executor(
 
     private[this] val taskId: Long = taskRunner.taskId
 
-    private[this] val killPollingIntervalMs: Long =
-      conf.getTimeAsMs("spark.task.reaper.pollingInterval", "10s")
+    private[this] val killPollingIntervalMs: Long = 
conf.get(TASK_REAPER_POLLING_INTERVAL)
 
-    private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.reaper.killTimeout", "-1")
+    private[this] val killTimeoutMs: Long = conf.get(TASK_REAPER_KILL_TIMEOUT)
 
-    private[this] val takeThreadDump: Boolean =
-      conf.getBoolean("spark.task.reaper.threadDump", true)
+    private[this] val takeThreadDump: Boolean = 
conf.get(TASK_REAPER_THREAD_DUMP)
 
     override def run(): Unit = {
       val startTimeMs = System.currentTimeMillis()
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Network.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Network.scala
new file mode 100644
index 0000000..129e31a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/config/Network.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.config
+
+import java.util.concurrent.TimeUnit
+
+private[spark] object Network {
+
+  private[spark] val NETWORK_CRYPTO_SASL_FALLBACK =
+    ConfigBuilder("spark.network.crypto.saslFallback")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val NETWORK_CRYPTO_ENABLED =
+    ConfigBuilder("spark.network.crypto.enabled")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION =
+    ConfigBuilder("spark.network.remoteReadNioBufferConversion")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val NETWORK_TIMEOUT =
+    ConfigBuilder("spark.network.timeout")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefaultString("120s")
+
+  private[spark] val NETWORK_TIMEOUT_INTERVAL =
+    ConfigBuilder("spark.network.timeoutInterval")
+      .timeConf(TimeUnit.MILLISECONDS)
+      
.createWithDefaultString(STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL.defaultValueString)
+
+  private[spark] val RPC_ASK_TIMEOUT =
+    ConfigBuilder("spark.rpc.askTimeout")
+      .stringConf
+      .createOptional
+
+  private[spark] val RPC_CONNECT_THREADS =
+    ConfigBuilder("spark.rpc.connect.threads")
+      .intConf
+      .createWithDefault(64)
+
+  private[spark] val RPC_IO_NUM_CONNECTIONS_PER_PEER =
+    ConfigBuilder("spark.rpc.io.numConnectionsPerPeer")
+      .intConf
+      .createWithDefault(1)
+
+  private[spark] val RPC_IO_THREADS =
+    ConfigBuilder("spark.rpc.io.threads")
+      .intConf
+      .createOptional
+
+  private[spark] val RPC_LOOKUP_TIMEOUT =
+    ConfigBuilder("spark.rpc.lookupTimeout")
+      .stringConf
+      .createOptional
+
+  private[spark] val RPC_MESSAGE_MAX_SIZE =
+    ConfigBuilder("spark.rpc.message.maxSize")
+      .intConf
+      .createWithDefault(128)
+
+  private[spark] val RPC_NETTY_DISPATCHER_NUM_THREADS =
+    ConfigBuilder("spark.rpc.netty.dispatcher.numThreads")
+      .intConf
+      .createOptional
+
+  private[spark] val RPC_NUM_RETRIES =
+    ConfigBuilder("spark.rpc.numRetries")
+      .intConf
+      .createWithDefault(3)
+
+  private[spark] val RPC_RETRY_WAIT =
+    ConfigBuilder("spark.rpc.retry.wait")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("3s")
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 1e72800..b591284 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
 
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.util.ByteUnit
-import org.apache.spark.scheduler.EventLoggingListener
+import org.apache.spark.scheduler.{EventLoggingListener, SchedulingMode}
 import org.apache.spark.storage.{DefaultTopologyMapper, 
RandomBlockReplicationPolicy}
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.util.Utils
@@ -265,11 +265,22 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("60s")
 
+  private[spark] val STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT =
+    ConfigBuilder("spark.storage.blockManagerSlaveTimeoutMs")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString(Network.NETWORK_TIMEOUT.defaultValueString)
+
   private[spark] val IS_PYTHON_APP = 
ConfigBuilder("spark.yarn.isPython").internal()
     .booleanConf.createWithDefault(false)
 
   private[spark] val CPUS_PER_TASK = 
ConfigBuilder("spark.task.cpus").intConf.createWithDefault(1)
 
+  private[spark] val DYN_ALLOCATION_ENABLED =
+    
ConfigBuilder("spark.dynamicAllocation.enabled").booleanConf.createWithDefault(false)
+
+  private[spark] val DYN_ALLOCATION_TESTING =
+    
ConfigBuilder("spark.dynamicAllocation.testing").booleanConf.createWithDefault(false)
+
   private[spark] val DYN_ALLOCATION_MIN_EXECUTORS =
     
ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.createWithDefault(0)
 
@@ -284,6 +295,22 @@ package object config {
     ConfigBuilder("spark.dynamicAllocation.executorAllocationRatio")
       .doubleConf.createWithDefault(1.0)
 
+  private[spark] val DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT =
+    ConfigBuilder("spark.dynamicAllocation.cachedExecutorIdleTimeout")
+      .timeConf(TimeUnit.SECONDS).createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT =
+    ConfigBuilder("spark.dynamicAllocation.executorIdleTimeout")
+      .timeConf(TimeUnit.SECONDS).createWithDefault(60)
+
+  private[spark] val DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT =
+    ConfigBuilder("spark.dynamicAllocation.schedulerBacklogTimeout")
+      .timeConf(TimeUnit.SECONDS).createWithDefault(1)
+
+  private[spark] val DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT =
+    ConfigBuilder("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout")
+      .fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT)
+
   private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait")
     .timeConf(TimeUnit.MILLISECONDS)
     .createWithDefaultString("3s")
@@ -316,11 +343,36 @@ package object config {
     .toSequence
     .createWithDefault(Nil)
 
-  private[spark] val MAX_TASK_FAILURES =
+  private[spark] val TASK_MAX_DIRECT_RESULT_SIZE =
+    ConfigBuilder("spark.task.maxDirectResultSize")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(1L << 20)
+
+  private[spark] val TASK_MAX_FAILURES =
     ConfigBuilder("spark.task.maxFailures")
       .intConf
       .createWithDefault(4)
 
+  private[spark] val TASK_REAPER_ENABLED =
+    ConfigBuilder("spark.task.reaper.enabled")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val TASK_REAPER_KILL_TIMEOUT =
+    ConfigBuilder("spark.task.reaper.killTimeout")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(-1)
+
+  private[spark] val TASK_REAPER_POLLING_INTERVAL =
+    ConfigBuilder("spark.task.reaper.pollingInterval")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("10s")
+
+  private[spark] val TASK_REAPER_THREAD_DUMP =
+    ConfigBuilder("spark.task.reaper.threadDump")
+      .booleanConf
+      .createWithDefault(true)
+
   // Blacklist confs
   private[spark] val BLACKLIST_ENABLED =
     ConfigBuilder("spark.blacklist.enabled")
@@ -574,11 +626,6 @@ package object config {
         "secret keys are only allowed when using Kubernetes.")
       .fallbackConf(AUTH_SECRET_FILE)
 
-  private[spark] val NETWORK_ENCRYPTION_ENABLED =
-    ConfigBuilder("spark.network.crypto.enabled")
-      .booleanConf
-      .createWithDefault(false)
-
   private[spark] val BUFFER_WRITE_CHUNK_SIZE =
     ConfigBuilder("spark.buffer.write.chunkSize")
       .internal()
@@ -930,6 +977,31 @@ package object config {
       .toSequence
       .createWithDefault(Nil)
 
+  private[spark] val CLEANER_PERIODIC_GC_INTERVAL =
+    ConfigBuilder("spark.cleaner.periodicGC.interval")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefaultString("30min")
+
+  private[spark] val CLEANER_REFERENCE_TRACKING =
+    ConfigBuilder("spark.cleaner.referenceTracking")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val CLEANER_REFERENCE_TRACKING_BLOCKING =
+    ConfigBuilder("spark.cleaner.referenceTracking.blocking")
+      .booleanConf
+      .createWithDefault(true)
+
+  private[spark] val CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE =
+    ConfigBuilder("spark.cleaner.referenceTracking.blocking.shuffle")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS =
+    ConfigBuilder("spark.cleaner.referenceTracking.cleanCheckpoints")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val EXECUTOR_LOGS_ROLLING_STRATEGY =
     
ConfigBuilder("spark.executor.logs.rolling.strategy").stringConf.createWithDefault("")
 
@@ -1103,4 +1175,49 @@ package object config {
     .stringConf
     .toSequence
     .createWithDefault(Nil)
+
+  private[spark] val SCHEDULER_ALLOCATION_FILE =
+    ConfigBuilder("spark.scheduler.allocation.file")
+      .stringConf
+      .createOptional
+
+  private[spark] val SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO =
+    ConfigBuilder("spark.scheduler.minRegisteredResourcesRatio")
+      .doubleConf
+      .createOptional
+
+  private[spark] val SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME =
+    ConfigBuilder("spark.scheduler.maxRegisteredResourcesWaitingTime")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("30s")
+
+  private[spark] val SCHEDULER_MODE =
+    ConfigBuilder("spark.scheduler.mode")
+      .stringConf
+      .createWithDefault(SchedulingMode.FIFO.toString)
+
+  private[spark] val SCHEDULER_REVIVE_INTERVAL =
+    ConfigBuilder("spark.scheduler.revive.interval")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createOptional
+
+  private[spark] val SPECULATION_ENABLED =
+    ConfigBuilder("spark.speculation")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val SPECULATION_INTERVAL =
+    ConfigBuilder("spark.speculation.interval")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefault(100)
+
+  private[spark] val SPECULATION_MULTIPLIER =
+    ConfigBuilder("spark.speculation.multiplier")
+      .doubleConf
+      .createWithDefault(1.5)
+
+  private[spark] val SPECULATION_QUANTILE =
+    ConfigBuilder("spark.speculation.quantile")
+      .doubleConf
+      .createWithDefault(0.75)
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala 
b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 4bf4f08..8b5f9bb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -36,6 +36,7 @@ import org.apache.spark._
 import org.apache.spark.Partitioner.defaultPartitioner
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.SPECULATION_ENABLED
 import org.apache.spark.internal.io._
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.serializer.Serializer
@@ -1051,7 +1052,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
 
     // When speculation is on and output committer class name contains 
"Direct", we should warn
     // users that they may loss data if they are using a direct output 
committer.
-    val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
+    val speculationEnabled = self.conf.get(SPECULATION_ENABLED)
     val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", 
"")
     if (speculationEnabled && outputCommitterClass.contains("Direct")) {
       val warningMessage =
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index b2ed2d3..954582f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -36,6 +36,7 @@ import org.apache.spark.Partitioner._
 import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.RDD_LIMIT_SCALE_UP_FACTOR
 import org.apache.spark.partial.BoundedDouble
 import org.apache.spark.partial.CountEvaluator
@@ -1591,8 +1592,8 @@ abstract class RDD[T: ClassTag](
    * The checkpoint directory set through `SparkContext#setCheckpointDir` is 
not used.
    */
   def localCheckpoint(): this.type = RDDCheckpointData.synchronized {
-    if (conf.getBoolean("spark.dynamicAllocation.enabled", false) &&
-        conf.contains("spark.dynamicAllocation.cachedExecutorIdleTimeout")) {
+    if (conf.get(DYN_ALLOCATION_ENABLED) &&
+        conf.contains(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)) {
       logWarning("Local checkpointing is NOT safe to use with dynamic 
allocation, " +
         "which removes executors along with their cached blocks. If you must 
use both " +
         "features, you are advised to set 
`spark.dynamicAllocation.cachedExecutorIdleTimeout` " +
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala 
b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index b6d723c..7a592ab 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS
 
 /**
  * An implementation of checkpointing that writes the RDD data to reliable 
storage.
@@ -58,7 +59,7 @@ private[spark] class ReliableRDDCheckpointData[T: 
ClassTag](@transient private v
     val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, 
cpDir)
 
     // Optionally clean our checkpoint files if the reference is out of scope
-    if 
(rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", 
false)) {
+    if (rdd.conf.get(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS)) {
       rdd.context.cleaner.foreach { cleaner =>
         cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id)
       }
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index 904c4d0..ce238a2 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -26,6 +26,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.SparkException
 import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.Network.RPC_NETTY_DISPATCHER_NUM_THREADS
 import org.apache.spark.network.client.RpcResponseCallback
 import org.apache.spark.rpc._
 import org.apache.spark.util.ThreadUtils
@@ -197,8 +198,8 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, 
numUsableCores: Int) exte
   private val threadpool: ThreadPoolExecutor = {
     val availableCores =
       if (numUsableCores > 0) numUsableCores else 
Runtime.getRuntime.availableProcessors()
-    val numThreads = 
nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
-      math.max(2, availableCores))
+    val numThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS)
+      .getOrElse(math.max(2, availableCores))
     val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, 
"dispatcher-event-loop")
     for (i <- 0 until numThreads) {
       pool.execute(new MessageLoop)
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 4757695..2540196 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -31,6 +31,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.Network._
 import org.apache.spark.network.TransportContext
 import org.apache.spark.network.client._
 import org.apache.spark.network.crypto.{AuthClientBootstrap, 
AuthServerBootstrap}
@@ -48,9 +49,9 @@ private[netty] class NettyRpcEnv(
     numUsableCores: Int) extends RpcEnv(conf) with Logging {
 
   private[netty] val transportConf = SparkTransportConf.fromSparkConf(
-    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
+    conf.clone.set(RPC_IO_NUM_CONNECTIONS_PER_PEER, 1),
     "rpc",
-    conf.getInt("spark.rpc.io.threads", numUsableCores))
+    conf.get(RPC_IO_THREADS).getOrElse(numUsableCores))
 
   private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
 
@@ -87,7 +88,7 @@ private[netty] class NettyRpcEnv(
   // TODO: a non-blocking TransportClientFactory.createClient in future
   private[netty] val clientConnectionExecutor = 
ThreadUtils.newDaemonCachedThreadPool(
     "netty-rpc-connection",
-    conf.getInt("spark.rpc.connect.threads", 64))
+    conf.get(RPC_CONNECT_THREADS))
 
   @volatile private var server: TransportServer = _
 
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
index 803a0a1..64a02b3 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import org.apache.spark.SparkException
+import org.apache.spark.internal.config.DYN_ALLOCATION_ENABLED
 
 /**
  * Exception thrown when submit a job with barrier stage(s) failing a required 
check.
@@ -51,7 +52,7 @@ private[spark] object BarrierJobAllocationFailed {
   val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION =
     "[SPARK-24942]: Barrier execution mode does not support dynamic resource 
allocation for " +
       "now. You can disable dynamic resource allocation by setting Spark conf 
" +
-      "\"spark.dynamicAllocation.enabled\" to \"false\"."
+      s""""${DYN_ALLOCATION_ENABLED.key}" to "false"."""
 
   // Error message when running a barrier stage that requires more slots than 
current total number.
   val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER =
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala 
b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
index ef6d02d..9e524c5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala
@@ -460,15 +460,15 @@ private[spark] object BlacklistTracker extends Logging {
       }
     }
 
-    val maxTaskFailures = conf.get(config.MAX_TASK_FAILURES)
+    val maxTaskFailures = conf.get(config.TASK_MAX_FAILURES)
     val maxNodeAttempts = conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE)
 
     if (maxNodeAttempts >= maxTaskFailures) {
       throw new 
IllegalArgumentException(s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key} " +
-        s"( = ${maxNodeAttempts}) was >= ${config.MAX_TASK_FAILURES.key} " +
+        s"( = ${maxNodeAttempts}) was >= ${config.TASK_MAX_FAILURES.key} " +
         s"( = ${maxTaskFailures} ).  Though blacklisting is enabled, with this 
configuration, " +
         s"Spark will not be robust to one bad node.  Decrease " +
-        s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase 
${config.MAX_TASK_FAILURES.key}, " +
+        s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase 
${config.TASK_MAX_FAILURES.key}, " +
         s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}")
     }
   }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 5f3c280..c85c74f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -23,8 +23,9 @@ import java.util.{Locale, NoSuchElementException, Properties}
 import scala.util.control.NonFatal
 import scala.xml.{Node, XML}
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.util.Utils
 
@@ -56,10 +57,9 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: 
Pool)
 private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: 
SparkConf)
   extends SchedulableBuilder with Logging {
 
-  val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
-  val schedulerAllocFile = conf.getOption(SCHEDULER_ALLOCATION_FILE_PROPERTY)
+  val schedulerAllocFile = conf.get(SCHEDULER_ALLOCATION_FILE)
   val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
-  val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
+  val FAIR_SCHEDULER_PROPERTIES = SparkContext.SPARK_SCHEDULER_POOL
   val DEFAULT_POOL_NAME = "default"
   val MINIMUM_SHARES_PROPERTY = "minShare"
   val SCHEDULING_MODE_PROPERTY = "schedulingMode"
@@ -85,7 +85,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: 
Pool, conf: SparkConf)
         } else {
           logWarning("Fair Scheduler configuration file not found so jobs will 
be scheduled in " +
             s"FIFO order. To use fair scheduling, configure pools in 
$DEFAULT_SCHEDULER_FILE or " +
-            s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains 
the configuration.")
+            s"set ${SCHEDULER_ALLOCATION_FILE.key} to a file that contains the 
configuration.")
           None
         }
       }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 61556ea..d551fb7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -31,6 +31,7 @@ import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config._
 import org.apache.spark.rpc.RpcEndpoint
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.scheduler.TaskLocality.TaskLocality
@@ -61,7 +62,7 @@ private[spark] class TaskSchedulerImpl(
   import TaskSchedulerImpl._
 
   def this(sc: SparkContext) = {
-    this(sc, sc.conf.get(config.MAX_TASK_FAILURES))
+    this(sc, sc.conf.get(config.TASK_MAX_FAILURES))
   }
 
   // Lazily initializing blacklistTrackerOpt to avoid getting empty 
ExecutorAllocationClient,
@@ -71,7 +72,7 @@ private[spark] class TaskSchedulerImpl(
   val conf = sc.conf
 
   // How often to check for speculative tasks
-  val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", 
"100ms")
+  val SPECULATION_INTERVAL_MS = conf.get(SPECULATION_INTERVAL)
 
   // Duplicate copies of a task will only be launched if the original copy has 
been running for
   // at least this amount of time. This is to avoid the overhead of launching 
speculative copies
@@ -85,7 +86,7 @@ private[spark] class TaskSchedulerImpl(
   val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", 
"15s")
 
   // CPUs to request per task
-  val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
+  val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK)
 
   // TaskSetManagers are not thread safe, so any access to one should be 
synchronized
   // on this class.
@@ -131,7 +132,7 @@ private[spark] class TaskSchedulerImpl(
 
   private var schedulableBuilder: SchedulableBuilder = null
   // default scheduler is FIFO
-  private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, 
SchedulingMode.FIFO.toString)
+  private val schedulingModeConf = conf.get(SCHEDULER_MODE)
   val schedulingMode: SchedulingMode =
     try {
       SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))
@@ -183,7 +184,7 @@ private[spark] class TaskSchedulerImpl(
   override def start() {
     backend.start()
 
-    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
+    if (!isLocal && conf.get(SPECULATION_ENABLED)) {
       logInfo("Starting speculative execution thread")
       speculationScheduler.scheduleWithFixedDelay(new Runnable {
         override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
@@ -857,7 +858,7 @@ private[spark] class TaskSchedulerImpl(
 
 private[spark] object TaskSchedulerImpl {
 
-  val SCHEDULER_MODE_PROPERTY = "spark.scheduler.mode"
+  val SCHEDULER_MODE_PROPERTY = SCHEDULER_MODE.key
 
   /**
    * Used to balance containers across hosts.
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 1b42cb4..6f3f77c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -28,6 +28,7 @@ import scala.util.control.NonFatal
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.config._
 import org.apache.spark.scheduler.SchedulingMode._
 import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, 
SystemClock, Utils}
 import org.apache.spark.util.collection.MedianHeap
@@ -61,12 +62,12 @@ private[spark] class TaskSetManager(
   private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*)
 
   // Quantile of tasks at which to start speculation
-  val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75)
-  val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 
1.5)
+  val speculationQuantile = conf.get(SPECULATION_QUANTILE)
+  val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER)
 
   val maxResultSize = conf.get(config.MAX_RESULT_SIZE)
 
-  val speculationEnabled = conf.getBoolean("spark.speculation", false)
+  val speculationEnabled = conf.get(SPECULATION_ENABLED)
 
   // Serializer for closures and tasks.
   val env = SparkEnv.get
@@ -1015,13 +1016,13 @@ private[spark] class TaskSetManager(
       return false
     }
     var foundTasks = false
-    val minFinishedForSpeculation = (SPECULATION_QUANTILE * 
numTasks).floor.toInt
+    val minFinishedForSpeculation = (speculationQuantile * 
numTasks).floor.toInt
     logDebug("Checking for speculative tasks: minFinished = " + 
minFinishedForSpeculation)
 
     if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
       val time = clock.getTimeMillis()
       val medianDuration = successfulTaskDurations.median
-      val threshold = max(SPECULATION_MULTIPLIER * medianDuration, 
minTimeToSpeculation)
+      val threshold = max(speculationMultiplier * medianDuration, 
minTimeToSpeculation)
       // TODO: Threshold should also look at standard deviation of task 
durations and have a lower
       // bound based on that.
       logDebug("Task length threshold for speculation: " + threshold)
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 98ed2ff..3b05875 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -30,6 +30,8 @@ import org.apache.spark.{ExecutorAllocationClient, SparkEnv, 
SparkException, Tas
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Network._
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -58,11 +60,11 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // Submit tasks only after (registered resources / total expected resources)
   // is equal to at least this value, that is double between 0 and 1.
   private val _minRegisteredRatio =
-    math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 
0))
+    math.min(1, 
conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).getOrElse(0.0))
   // Submit tasks after maxRegisteredWaitingTime milliseconds
   // if minRegisteredRatio has not yet been reached
   private val maxRegisteredWaitingTimeMs =
-    conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", 
"30s")
+    conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME)
   private val createTime = System.currentTimeMillis()
 
   // Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` 
doesn't need any
@@ -118,7 +120,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
     override def onStart() {
       // Periodically revive offers to allow delay scheduling to work
-      val reviveIntervalMs = 
conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
+      val reviveIntervalMs = 
conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L)
 
       reviveThread.scheduleAtFixedRate(new Runnable {
         override def run(): Unit = Utils.tryLogNonFatalError {
@@ -301,8 +303,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { 
taskSetMgr =>
             try {
               var msg = "Serialized task %s:%d was %d bytes, which exceeds max 
allowed: " +
-                "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
-                "spark.rpc.message.maxSize or using broadcast variables for 
large values."
+                s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing 
" +
+                s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for 
large values."
               msg = msg.format(task.taskId, task.index, 
serializedTask.limit(), maxRpcMessageSize)
               taskSetMgr.abort(msg)
             } catch {
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 12c5d4d..1067cdc 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -25,8 +25,9 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
+import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.CPUS_PER_TASK
 import org.apache.spark.internal.config.Status._
 import org.apache.spark.scheduler._
 import org.apache.spark.status.api.v1
@@ -151,7 +152,7 @@ private[spark] class AppStatusListener(
       details.getOrElse("System Properties", Nil),
       details.getOrElse("Classpath Entries", Nil))
 
-    coresPerTask = 
envInfo.sparkProperties.toMap.get("spark.task.cpus").map(_.toInt)
+    coresPerTask = 
envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt)
       .getOrElse(coresPerTask)
 
     kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo))
@@ -434,7 +435,7 @@ private[spark] class AppStatusListener(
     val stage = getOrCreateStage(event.stageInfo)
     stage.status = v1.StageStatus.ACTIVE
     stage.schedulingPool = Option(event.properties).flatMap { p =>
-      Option(p.getProperty("spark.scheduler.pool"))
+      Option(p.getProperty(SparkContext.SPARK_SCHEDULER_POOL))
     }.getOrElse(SparkUI.DEFAULT_POOL_NAME)
 
     // Look at all active jobs to find the ones that mention this stage.
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 8f993bf..dd05cb3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -36,7 +36,9 @@ import com.codahale.metrics.{MetricRegistry, MetricSet}
 
 import org.apache.spark._
 import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Network
 import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.network._
@@ -133,7 +135,7 @@ private[spark] class BlockManager(
   private[spark] val externalShuffleServiceEnabled =
     conf.get(config.SHUFFLE_SERVICE_ENABLED)
   private val remoteReadNioBufferConversion =
-    conf.getBoolean("spark.network.remoteReadNioBufferConversion", false)
+    conf.get(Network.NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION)
 
   val diskBlockManager = {
     // Only perform cleanup if an external service is not serving our shuffle 
files.
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala 
b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 6229e80..8845dcf 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.Semaphore
 import scala.util.Random
 
 import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.internal.config.SCHEDULER_MODE
 import org.apache.spark.scheduler.SchedulingMode
 
 // scalastyle:off
@@ -50,14 +51,14 @@ private[spark] object UIWorkloadGenerator {
 
     val schedulingMode = SchedulingMode.withName(args(1))
     if (schedulingMode == SchedulingMode.FAIR) {
-      conf.set("spark.scheduler.mode", "FAIR")
+      conf.set(SCHEDULER_MODE, "FAIR")
     }
     val nJobSet = args(2).toInt
     val sc = new SparkContext(conf)
 
     def setProperties(s: String): Unit = {
       if (schedulingMode == SchedulingMode.FAIR) {
-        sc.setLocalProperty("spark.scheduler.pool", s)
+        sc.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, s)
       }
       sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s)
     }
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index b35ea5b..4523326 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -28,6 +28,7 @@ import scala.xml._
 import org.apache.commons.lang3.StringEscapeUtils
 
 import org.apache.spark.JobExecutionStatus
+import org.apache.spark.internal.config.SCHEDULER_MODE
 import org.apache.spark.scheduler._
 import org.apache.spark.status.AppStatusStore
 import org.apache.spark.status.api.v1
@@ -295,7 +296,7 @@ private[ui] class AllJobsPage(parent: JobsTab, store: 
AppStatusStore) extends We
     }
 
     val schedulingMode = store.environmentInfo().sparkProperties.toMap
-      .get("spark.scheduler.mode")
+      .get(SCHEDULER_MODE.key)
       .map { mode => SchedulingMode.withName(mode).toString }
       .getOrElse("Unknown")
 
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index 37bb292..1c1915e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -22,6 +22,7 @@ import javax.servlet.http.HttpServletRequest
 import scala.collection.JavaConverters._
 
 import org.apache.spark.JobExecutionStatus
+import org.apache.spark.internal.config.SCHEDULER_MODE
 import org.apache.spark.scheduler.SchedulingMode
 import org.apache.spark.status.AppStatusStore
 import org.apache.spark.ui._
@@ -37,7 +38,7 @@ private[ui] class JobsTab(parent: SparkUI, store: 
AppStatusStore)
     store
       .environmentInfo()
       .sparkProperties
-      .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString))
+      .contains((SCHEDULER_MODE.key, SchedulingMode.FAIR.toString))
   }
 
   def getSparkUser: String = parent.getSparkUser
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index e16c337..b74f3db 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -19,6 +19,7 @@ package org.apache.spark.ui.jobs
 
 import javax.servlet.http.HttpServletRequest
 
+import org.apache.spark.internal.config.SCHEDULER_MODE
 import org.apache.spark.scheduler.SchedulingMode
 import org.apache.spark.status.AppStatusStore
 import org.apache.spark.status.api.v1.StageStatus
@@ -40,7 +41,7 @@ private[ui] class StagesTab(val parent: SparkUI, val store: 
AppStatusStore)
     store
       .environmentInfo()
       .sparkProperties
-      .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString))
+      .contains((SCHEDULER_MODE.key, SchedulingMode.FAIR.toString))
   }
 
   def handleKillRequest(request: HttpServletRequest): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala 
b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
index 902e48f..7272b37 100644
--- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala
@@ -19,6 +19,7 @@ package org.apache.spark.util
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Network._
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
 
 private[spark] object RpcUtils {
@@ -35,32 +36,32 @@ private[spark] object RpcUtils {
 
   /** Returns the configured number of times to retry connecting */
   def numRetries(conf: SparkConf): Int = {
-    conf.getInt("spark.rpc.numRetries", 3)
+    conf.get(RPC_NUM_RETRIES)
   }
 
   /** Returns the configured number of milliseconds to wait on each retry */
   def retryWaitMs(conf: SparkConf): Long = {
-    conf.getTimeAsMs("spark.rpc.retry.wait", "3s")
+    conf.get(RPC_RETRY_WAIT)
   }
 
   /** Returns the default Spark timeout to use for RPC ask operations. */
   def askRpcTimeout(conf: SparkConf): RpcTimeout = {
-    RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), 
"120s")
+    RpcTimeout(conf, Seq(RPC_ASK_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s")
   }
 
   /** Returns the default Spark timeout to use for RPC remote endpoint lookup. 
*/
   def lookupRpcTimeout(conf: SparkConf): RpcTimeout = {
-    RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), 
"120s")
+    RpcTimeout(conf, Seq(RPC_LOOKUP_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s")
   }
 
   private val MAX_MESSAGE_SIZE_IN_MB = Int.MaxValue / 1024 / 1024
 
   /** Returns the configured max message size for messages in bytes. */
   def maxMessageSizeBytes(conf: SparkConf): Int = {
-    val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128)
+    val maxSizeInMB = conf.get(RPC_MESSAGE_MAX_SIZE)
     if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) {
       throw new IllegalArgumentException(
-        s"spark.rpc.message.maxSize should not be greater than 
$MAX_MESSAGE_SIZE_IN_MB MB")
+        s"${RPC_MESSAGE_MAX_SIZE.key} should not be greater than 
$MAX_MESSAGE_SIZE_IN_MB MB")
     }
     maxSizeInMB * 1024 * 1024
   }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1a6dc1f..0ad1ffc 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2465,9 +2465,9 @@ private[spark] object Utils extends Logging {
    * Return whether dynamic allocation is enabled in the given conf.
    */
   def isDynamicAllocationEnabled(conf: SparkConf): Boolean = {
-    val dynamicAllocationEnabled = 
conf.getBoolean("spark.dynamicAllocation.enabled", false)
+    val dynamicAllocationEnabled = conf.get(DYN_ALLOCATION_ENABLED)
     dynamicAllocationEnabled &&
-      (!isLocalMaster(conf) || 
conf.getBoolean("spark.dynamicAllocation.testing", false))
+      (!isLocalMaster(conf) || conf.get(DYN_ALLOCATION_TESTING))
   }
 
   /**
diff --git 
a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala 
b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
index d49ab4a..ca839d1 100644
--- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala
@@ -20,9 +20,9 @@ package org.apache.spark
 import scala.concurrent.duration._
 import scala.language.postfixOps
 
+import org.apache.spark.internal.config._
 import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
 import org.apache.spark.scheduler.BarrierJobAllocationFailed._
-import org.apache.spark.scheduler.DAGScheduler
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -157,8 +157,8 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with LocalSparkContext
 
   test("submit a barrier ResultStage with dynamic resource allocation 
enabled") {
     val conf = new SparkConf()
-      .set("spark.dynamicAllocation.enabled", "true")
-      .set("spark.dynamicAllocation.testing", "true")
+      .set(DYN_ALLOCATION_ENABLED, true)
+      .set(DYN_ALLOCATION_TESTING, true)
       .setMaster("local[4]")
       .setAppName("test")
     sc = createSparkContext(Some(conf))
@@ -172,8 +172,8 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with LocalSparkContext
 
   test("submit a barrier ShuffleMapStage with dynamic resource allocation 
enabled") {
     val conf = new SparkConf()
-      .set("spark.dynamicAllocation.enabled", "true")
-      .set("spark.dynamicAllocation.testing", "true")
+      .set(DYN_ALLOCATION_ENABLED, true)
+      .set(DYN_ALLOCATION_TESTING, true)
       .setMaster("local[4]")
       .setAppName("test")
     sc = createSparkContext(Some(conf))
@@ -191,9 +191,9 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with LocalSparkContext
       "mode") {
     val conf = new SparkConf()
       // Shorten the time interval between two failed checks to make the test 
fail faster.
-      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
+      .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL.key, "1s")
       // Reduce max check failures allowed to make the test fail faster.
-      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
+      .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 3)
       .setMaster("local[4]")
       .setAppName("test")
     sc = createSparkContext(Some(conf))
@@ -208,9 +208,9 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with LocalSparkContext
     "local mode") {
     val conf = new SparkConf()
       // Shorten the time interval between two failed checks to make the test 
fail faster.
-      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
+      .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL.key, "1s")
       // Reduce max check failures allowed to make the test fail faster.
-      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
+      .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 3)
       .setMaster("local[4]")
       .setAppName("test")
     sc = createSparkContext(Some(conf))
@@ -226,11 +226,11 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with LocalSparkContext
   test("submit a barrier ResultStage that requires more slots than current 
total under " +
     "local-cluster mode") {
     val conf = new SparkConf()
-      .set("spark.task.cpus", "2")
+      .set(CPUS_PER_TASK, 2)
       // Shorten the time interval between two failed checks to make the test 
fail faster.
-      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
+      .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL.key, "1s")
       // Reduce max check failures allowed to make the test fail faster.
-      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
+      .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 3)
       .setMaster("local-cluster[4, 3, 1024]")
       .setAppName("test")
     sc = createSparkContext(Some(conf))
@@ -244,11 +244,11 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite 
with LocalSparkContext
   test("submit a barrier ShuffleMapStage that requires more slots than current 
total under " +
     "local-cluster mode") {
     val conf = new SparkConf()
-      .set("spark.task.cpus", "2")
+      .set(CPUS_PER_TASK, 2)
       // Shorten the time interval between two failed checks to make the test 
fail faster.
-      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s")
+      .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL.key, "1s")
       // Reduce max check failures allowed to make the test fail faster.
-      .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3")
+      .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 3)
       .setMaster("local-cluster[4, 3, 1024]")
       .setAppName("test")
     sc = createSparkContext(Some(conf))
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala 
b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 9e28284..b9b47d4 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -29,10 +29,10 @@ import org.scalatest.concurrent.PatienceConfiguration
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.internal.config._
 import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData}
 import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage._
-import org.apache.spark.util.Utils
 
 /**
  * An abstract base class for context cleaner tests, which sets up a context 
with a config
@@ -46,9 +46,9 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: 
Class[_] = classOf[So
   val conf = new SparkConf()
     .setMaster("local[2]")
     .setAppName("ContextCleanerSuite")
-    .set("spark.cleaner.referenceTracking.blocking", "true")
-    .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
-    .set("spark.cleaner.referenceTracking.cleanCheckpoints", "true")
+    .set(CLEANER_REFERENCE_TRACKING_BLOCKING, true)
+    .set(CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE, true)
+    .set(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS, true)
     .set(config.SHUFFLE_MANAGER, shuffleManager.getName)
 
   before {
@@ -234,7 +234,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
       val conf = new SparkConf()
         .setMaster("local[2]")
         .setAppName("cleanupCheckpoint")
-        .set("spark.cleaner.referenceTracking.cleanCheckpoints", "false")
+        .set(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS, false)
       sc = new SparkContext(conf)
       rdd = newPairRDD()
       sc.setCheckpointDir(checkpointDir.toString)
@@ -317,8 +317,8 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
     val conf2 = new SparkConf()
       .setMaster("local-cluster[2, 1, 1024]")
       .setAppName("ContextCleanerSuite")
-      .set("spark.cleaner.referenceTracking.blocking", "true")
-      .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
+      .set(CLEANER_REFERENCE_TRACKING_BLOCKING, true)
+      .set(CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE, true)
       .set(config.SHUFFLE_MANAGER, shuffleManager.getName)
     sc = new SparkContext(conf2)
 
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 6b310b9..fdaea28 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -63,19 +63,19 @@ class ExecutorAllocationManagerSuite
     val conf = new SparkConf()
       .setMaster("myDummyLocalExternalClusterManager")
       .setAppName("test-executor-allocation-manager")
-      .set("spark.dynamicAllocation.enabled", "true")
-      .set("spark.dynamicAllocation.testing", "true")
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_TESTING, true)
     val sc0 = new SparkContext(conf)
     contexts += sc0
     assert(sc0.executorAllocationManager.isDefined)
     sc0.stop()
 
     // Min < 0
-    val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
+    val conf1 = conf.clone().set(config.DYN_ALLOCATION_MIN_EXECUTORS, -1)
     intercept[SparkException] { contexts += new SparkContext(conf1) }
 
     // Max < 0
-    val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
+    val conf2 = conf.clone().set(config.DYN_ALLOCATION_MAX_EXECUTORS, -1)
     intercept[SparkException] { contexts += new SparkContext(conf2) }
 
     // Both min and max, but min > max
@@ -151,11 +151,11 @@ class ExecutorAllocationManagerSuite
     val conf = new SparkConf()
       .setMaster("myDummyLocalExternalClusterManager")
       .setAppName("test-executor-allocation-manager")
-      .set("spark.dynamicAllocation.enabled", "true")
-      .set("spark.dynamicAllocation.testing", "true")
-      .set("spark.dynamicAllocation.maxExecutors", "15")
-      .set("spark.dynamicAllocation.minExecutors", "3")
-      .set("spark.dynamicAllocation.executorAllocationRatio", divisor.toString)
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_TESTING, true)
+      .set(config.DYN_ALLOCATION_MAX_EXECUTORS, 15)
+      .set(config.DYN_ALLOCATION_MIN_EXECUTORS, 3)
+      .set(config.DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO, divisor)
       .set(config.EXECUTOR_CORES, cores)
     val sc = new SparkContext(conf)
     contexts += sc
@@ -1093,14 +1093,14 @@ class ExecutorAllocationManagerSuite
     val initialExecutors = 1
     val maxExecutors = 2
     val conf = new SparkConf()
-      .set("spark.dynamicAllocation.enabled", "true")
-      .set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
-      .set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
-      .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
-      .set("spark.dynamicAllocation.initialExecutors", 
initialExecutors.toString)
-      .set("spark.dynamicAllocation.schedulerBacklogTimeout", "1000ms")
-      .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", 
"1000ms")
-      .set("spark.dynamicAllocation.executorIdleTimeout", s"3000ms")
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.SHUFFLE_SERVICE_ENABLED, true)
+      .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors)
+      .set(config.DYN_ALLOCATION_MAX_EXECUTORS, maxExecutors)
+      .set(config.DYN_ALLOCATION_INITIAL_EXECUTORS, initialExecutors)
+      .set(config.DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT.key, "1000ms")
+      .set(config.DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key, 
"1000ms")
+      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "3000ms")
     val mockAllocationClient = mock(classOf[ExecutorAllocationClient])
     val mockBMM = mock(classOf[BlockManagerMaster])
     val manager = new ExecutorAllocationManager(
@@ -1155,16 +1155,16 @@ class ExecutorAllocationManagerSuite
     val conf = new SparkConf()
       .setMaster("myDummyLocalExternalClusterManager")
       .setAppName("test-executor-allocation-manager")
-      .set("spark.dynamicAllocation.enabled", "true")
-      .set("spark.dynamicAllocation.minExecutors", minExecutors.toString)
-      .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString)
-      .set("spark.dynamicAllocation.initialExecutors", 
initialExecutors.toString)
-      .set("spark.dynamicAllocation.schedulerBacklogTimeout",
-          s"${schedulerBacklogTimeout.toString}s")
-      .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout",
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors)
+      .set(config.DYN_ALLOCATION_MAX_EXECUTORS, maxExecutors)
+      .set(config.DYN_ALLOCATION_INITIAL_EXECUTORS, initialExecutors)
+      .set(config.DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT.key,
+        s"${schedulerBacklogTimeout.toString}s")
+      .set(config.DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key,
         s"${sustainedSchedulerBacklogTimeout.toString}s")
-      .set("spark.dynamicAllocation.executorIdleTimeout", 
s"${executorIdleTimeout.toString}s")
-      .set("spark.dynamicAllocation.testing", "true")
+      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, 
s"${executorIdleTimeout.toString}s")
+      .set(config.DYN_ALLOCATION_TESTING, true)
       // SPARK-22864: effectively disable the allocation schedule by setting 
the period to a
       // really long value.
       .set(TEST_SCHEDULE_INTERVAL, 10000L)
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index a69e589..dbe187d 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -28,6 +28,7 @@ import org.mockito.Mockito.{mock, spy, verify, when}
 import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
 
 import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
+import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, 
RpcEnv}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -67,7 +68,7 @@ class HeartbeatReceiverSuite
     val conf = new SparkConf()
       .setMaster("local[2]")
       .setAppName("test")
-      .set("spark.dynamicAllocation.testing", "true")
+      .set(DYN_ALLOCATION_TESTING, true)
     sc = spy(new SparkContext(conf))
     scheduler = mock(classOf[TaskSchedulerImpl])
     when(sc.taskScheduler).thenReturn(scheduler)
diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala 
b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
index f8adaf5..0c59259 100644
--- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
@@ -27,6 +27,7 @@ import scala.concurrent.duration._
 import org.scalatest.BeforeAndAfter
 import org.scalatest.Matchers
 
+import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Deploy._
 import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, 
SparkListenerTaskEnd, SparkListenerTaskStart}
 import org.apache.spark.util.ThreadUtils
@@ -52,7 +53,7 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
   }
 
   test("local mode, FIFO scheduler") {
-    val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
+    val conf = new SparkConf().set(SCHEDULER_MODE, "FIFO")
     sc = new SparkContext("local[2]", "test", conf)
     testCount()
     testTake()
@@ -61,9 +62,9 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
   }
 
   test("local mode, fair scheduler") {
-    val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
+    val conf = new SparkConf().set(SCHEDULER_MODE, "FAIR")
     val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    conf.set("spark.scheduler.allocation.file", xmlPath)
+    conf.set(SCHEDULER_ALLOCATION_FILE, xmlPath)
     sc = new SparkContext("local[2]", "test", conf)
     testCount()
     testTake()
@@ -72,7 +73,7 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
   }
 
   test("cluster mode, FIFO scheduler") {
-    val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
+    val conf = new SparkConf().set(SCHEDULER_MODE, "FIFO")
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     testCount()
     testTake()
@@ -81,9 +82,9 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
   }
 
   test("cluster mode, fair scheduler") {
-    val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
+    val conf = new SparkConf().set(SCHEDULER_MODE, "FAIR")
     val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    conf.set("spark.scheduler.allocation.file", xmlPath)
+    conf.set(SCHEDULER_ALLOCATION_FILE, xmlPath)
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
     testCount()
     testTake()
@@ -217,8 +218,8 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
 
   test("task reaper kills JVM if killed tasks keep running for too long") {
     val conf = new SparkConf()
-      .set("spark.task.reaper.enabled", "true")
-      .set("spark.task.reaper.killTimeout", "5s")
+      .set(TASK_REAPER_ENABLED, true)
+      .set(TASK_REAPER_KILL_TIMEOUT.key, "5s")
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
 
     // Add a listener to release the semaphore once any tasks are launched.
@@ -254,9 +255,9 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
 
   test("task reaper will not kill JVM if spark.task.killTimeout == -1") {
     val conf = new SparkConf()
-      .set("spark.task.reaper.enabled", "true")
-      .set("spark.task.reaper.killTimeout", "-1")
-      .set("spark.task.reaper.PollingInterval", "1s")
+      .set(TASK_REAPER_ENABLED, true)
+      .set(TASK_REAPER_KILL_TIMEOUT.key, "-1")
+      .set(TASK_REAPER_POLLING_INTERVAL.key, "1s")
       .set(MAX_EXECUTOR_RETRIES, 1)
     sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
 
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index adaa069..d869759 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -25,6 +25,7 @@ import org.mockito.Mockito._
 import org.apache.spark.LocalSparkContext._
 import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, 
RPC_MESSAGE_MAX_SIZE}
 import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
 import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus}
 import org.apache.spark.shuffle.FetchFailedException
@@ -170,8 +171,8 @@ class MapOutputTrackerSuite extends SparkFunSuite {
 
   test("remote fetch below max RPC message size") {
     val newConf = new SparkConf
-    newConf.set("spark.rpc.message.maxSize", "1")
-    newConf.set("spark.rpc.askTimeout", "1") // Fail fast
+    newConf.set(RPC_MESSAGE_MAX_SIZE, 1)
+    newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast
     newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 1048576L)
 
     val masterTracker = newTrackerMaster(newConf)
@@ -199,8 +200,8 @@ class MapOutputTrackerSuite extends SparkFunSuite {
 
   test("min broadcast size exceeds max RPC message size") {
     val newConf = new SparkConf
-    newConf.set("spark.rpc.message.maxSize", "1")
-    newConf.set("spark.rpc.askTimeout", "1") // Fail fast
+    newConf.set(RPC_MESSAGE_MAX_SIZE, 1)
+    newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast
     newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, Int.MaxValue.toLong)
 
     intercept[IllegalArgumentException] { newTrackerMaster(newConf) }
@@ -243,8 +244,8 @@ class MapOutputTrackerSuite extends SparkFunSuite {
 
   test("remote fetch using broadcast") {
     val newConf = new SparkConf
-    newConf.set("spark.rpc.message.maxSize", "1")
-    newConf.set("spark.rpc.askTimeout", "1") // Fail fast
+    newConf.set(RPC_MESSAGE_MAX_SIZE, 1)
+    newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast
     newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 10240L) // 10 KiB << 
1MiB framesize
 
     // needs TorrentBroadcast so need a SparkContext
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 4071dd4..079170d 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -29,6 +29,7 @@ import com.esotericsoftware.kryo.Kryo
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.History._
 import org.apache.spark.internal.config.Kryo._
+import org.apache.spark.internal.config.Network._
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, 
KryoSerializer}
 import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
@@ -142,7 +143,7 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
   test("creating SparkContext with cpus per tasks bigger than cores per 
executors") {
     val conf = new SparkConf(false)
       .set(EXECUTOR_CORES, 1)
-      .set("spark.task.cpus", "2")
+      .set(CPUS_PER_TASK, 2)
     intercept[SparkException] { sc = new SparkContext(conf) }
   }
 
@@ -268,10 +269,10 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
   test("akka deprecated configs") {
     val conf = new SparkConf()
 
-    assert(!conf.contains("spark.rpc.numRetries"))
-    assert(!conf.contains("spark.rpc.retry.wait"))
-    assert(!conf.contains("spark.rpc.askTimeout"))
-    assert(!conf.contains("spark.rpc.lookupTimeout"))
+    assert(!conf.contains(RPC_NUM_RETRIES))
+    assert(!conf.contains(RPC_RETRY_WAIT))
+    assert(!conf.contains(RPC_ASK_TIMEOUT))
+    assert(!conf.contains(RPC_LOOKUP_TIMEOUT))
 
     conf.set("spark.akka.num.retries", "1")
     assert(RpcUtils.numRetries(conf) === 1)
@@ -322,12 +323,12 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
     val conf = new SparkConf()
     conf.validateSettings()
 
-    conf.set(NETWORK_ENCRYPTION_ENABLED, true)
+    conf.set(NETWORK_CRYPTO_ENABLED, true)
     intercept[IllegalArgumentException] {
       conf.validateSettings()
     }
 
-    conf.set(NETWORK_ENCRYPTION_ENABLED, false)
+    conf.set(NETWORK_CRYPTO_ENABLED, false)
     conf.set(SASL_ENCRYPTION_ENABLED, true)
     intercept[IllegalArgumentException] {
       conf.validateSettings()
@@ -341,7 +342,7 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
     val conf = new SparkConf()
     conf.validateSettings()
 
-    conf.set("spark.network.timeout", "5s")
+    conf.set(NETWORK_TIMEOUT.key, "5s")
     intercept[IllegalArgumentException] {
       conf.validateSettings()
     }
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 41d5dee..7a16f7b 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -423,7 +423,7 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
   test("No exception when both num-executors and dynamic allocation set.") {
     noException should be thrownBy {
       sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local")
-        .set("spark.dynamicAllocation.enabled", 
"true").set("spark.executor.instances", "6"))
+        .set(DYN_ALLOCATION_ENABLED, true).set("spark.executor.instances", 
"6"))
       assert(sc.executorAllocationManager.isEmpty)
       assert(sc.getConf.getInt("spark.executor.instances", 0) === 6)
     }
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 30efbb0..3712d1a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -195,7 +195,7 @@ class SparkSubmitSuite
       "--name", "myApp",
       "--class", "Foo",
       "--num-executors", "0",
-      "--conf", "spark.dynamicAllocation.enabled=true",
+      "--conf", s"${DYN_ALLOCATION_ENABLED.key}=true",
       "thejar.jar")
     new SparkSubmitArguments(clArgs1)
 
@@ -203,7 +203,7 @@ class SparkSubmitSuite
       "--name", "myApp",
       "--class", "Foo",
       "--num-executors", "0",
-      "--conf", "spark.dynamicAllocation.enabled=false",
+      "--conf", s"${DYN_ALLOCATION_ENABLED.key}=false",
       "thejar.jar")
 
     val e = intercept[SparkException](new SparkSubmitArguments(clArgs2))
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 8c3c38d..a6d5066 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -457,9 +457,9 @@ class StandaloneDynamicAllocationSuite
   test("initial executor limit") {
     val initialExecutorLimit = 1
     val myConf = appConf
-      .set("spark.dynamicAllocation.enabled", "true")
-      .set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
-      .set("spark.dynamicAllocation.initialExecutors", 
initialExecutorLimit.toString)
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.SHUFFLE_SERVICE_ENABLED, true)
+      .set(config.DYN_ALLOCATION_INITIAL_EXECUTORS, initialExecutorLimit)
     sc = new SparkContext(myConf)
     val appId = sc.applicationId
     eventually(timeout(10.seconds), interval(10.millis)) {
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index f1cf14d..544d52d 100644
--- 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -33,6 +33,7 @@ import org.scalatest.mockito.MockitoSugar
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Network
 import org.apache.spark.network.{BlockDataManager, BlockTransferService}
 import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.shuffle.BlockFetchingListener
@@ -101,8 +102,8 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite 
with MockitoSugar wi
       .set(NETWORK_AUTH_ENABLED, true)
       .set(AUTH_SECRET, "good")
       .set("spark.app.id", "app-id")
-      .set("spark.network.crypto.enabled", "true")
-      .set("spark.network.crypto.saslFallback", "false")
+      .set(Network.NETWORK_CRYPTO_ENABLED, true)
+      .set(Network.NETWORK_CRYPTO_SASL_FALLBACK, false)
     testConnection(conf, conf) match {
       case Success(_) => // expected
       case Failure(t) => fail(t)
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala 
b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index 51bf5c2..1b7f166 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -37,6 +37,7 @@ import org.scalatest.concurrent.Eventually._
 import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException, 
SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Network
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
@@ -172,8 +173,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
     val conf = new SparkConf()
     val shortProp = "spark.rpc.short.timeout"
-    conf.set("spark.rpc.retry.wait", "0")
-    conf.set("spark.rpc.numRetries", "1")
+    conf.set(Network.RPC_RETRY_WAIT, 0L)
+    conf.set(Network.RPC_NUM_RETRIES, 1)
     val anotherEnv = createRpcEnv(conf, "remote", 0, clientMode = true)
     // Use anotherEnv to find out the RpcEndpointRef
     val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, 
"ask-timeout")
@@ -709,8 +710,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     testSend(new SparkConf()
       .set(NETWORK_AUTH_ENABLED, true)
       .set(AUTH_SECRET, "good")
-      .set("spark.network.crypto.enabled", "true")
-      .set("spark.network.crypto.saslFallback", "false"))
+      .set(Network.NETWORK_CRYPTO_ENABLED, true)
+      .set(Network.NETWORK_CRYPTO_SASL_FALLBACK, false))
   }
 
   test("ask with authentication") {
@@ -730,8 +731,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with 
BeforeAndAfterAll {
     testAsk(new SparkConf()
       .set(NETWORK_AUTH_ENABLED, true)
       .set(AUTH_SECRET, "good")
-      .set("spark.network.crypto.enabled", "true")
-      .set("spark.network.crypto.saslFallback", "false"))
+      .set(Network.NETWORK_CRYPTO_ENABLED, true)
+      .set(Network.NETWORK_CRYPTO_SASL_FALLBACK, false))
   }
 
   test("construct RpcTimeout with conf property") {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index aadddca..0fe0e5b 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -58,7 +58,7 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
     "With default settings, job can succeed despite multiple bad executors on 
node",
     extraConfs = Seq(
       config.BLACKLIST_ENABLED.key -> "true",
-      config.MAX_TASK_FAILURES.key -> "4",
+      config.TASK_MAX_FAILURES.key -> "4",
       TEST_N_HOSTS.key -> "2",
       TEST_N_EXECUTORS_HOST.key -> "5",
       TEST_N_CORES_EXECUTOR.key -> "10"
@@ -106,7 +106,7 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
       TEST_N_HOSTS.key -> "2",
       TEST_N_EXECUTORS_HOST.key -> "1",
       TEST_N_CORES_EXECUTOR.key -> "1",
-      "spark.scheduler.blacklist.unschedulableTaskSetTimeout" -> "0s"
+      config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0s"
     )
   ) {
     def runBackend(): Unit = {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
index aea4c5f..0adfb07 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala
@@ -443,20 +443,20 @@ class BlacklistTrackerSuite extends SparkFunSuite with 
BeforeAndAfterEach with M
       (2, 2),
       (2, 3)
     ).foreach { case (maxTaskFailures, maxNodeAttempts) =>
-      conf.set(config.MAX_TASK_FAILURES, maxTaskFailures)
+      conf.set(config.TASK_MAX_FAILURES, maxTaskFailures)
       conf.set(config.MAX_TASK_ATTEMPTS_PER_NODE.key, maxNodeAttempts.toString)
       val excMsg = intercept[IllegalArgumentException] {
         BlacklistTracker.validateBlacklistConfs(conf)
       }.getMessage()
       assert(excMsg === s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key} " +
-        s"( = ${maxNodeAttempts}) was >= ${config.MAX_TASK_FAILURES.key} " +
+        s"( = ${maxNodeAttempts}) was >= ${config.TASK_MAX_FAILURES.key} " +
         s"( = ${maxTaskFailures} ).  Though blacklisting is enabled, with this 
configuration, " +
         s"Spark will not be robust to one bad node.  Decrease " +
-        s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase 
${config.MAX_TASK_FAILURES.key}, " +
+        s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase 
${config.TASK_MAX_FAILURES.key}, " +
         s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}")
     }
 
-    conf.remove(config.MAX_TASK_FAILURES)
+    conf.remove(config.TASK_MAX_FAILURES)
     conf.remove(config.MAX_TASK_ATTEMPTS_PER_NODE)
 
     Seq(
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index c5a3966..b31b8cf 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -24,6 +24,8 @@ import scala.concurrent.duration._
 import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkException, SparkFunSuite}
+import org.apache.spark.internal.config.CPUS_PER_TASK
+import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.{RpcUtils, SerializableBuffer}
 
@@ -34,7 +36,7 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
 
   test("serialized task larger than max RPC message size") {
     val conf = new SparkConf
-    conf.set("spark.rpc.message.maxSize", "1")
+    conf.set(RPC_MESSAGE_MAX_SIZE, 1)
     conf.set("spark.default.parallelism", "1")
     sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf)
     val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf)
@@ -62,7 +64,7 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
 
   test("compute max number of concurrent tasks can be launched when 
spark.task.cpus > 1") {
     val conf = new SparkConf()
-      .set("spark.task.cpus", "2")
+      .set(CPUS_PER_TASK, 2)
       .setMaster("local-cluster[4, 3, 1024]")
       .setAppName("test")
     sc = new SparkContext(conf)
@@ -76,7 +78,7 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
 
   test("compute max number of concurrent tasks can be launched when some 
executors are busy") {
     val conf = new SparkConf()
-      .set("spark.task.cpus", "2")
+      .set(CPUS_PER_TASK, 2)
       .setMaster("local-cluster[4, 3, 1024]")
       .setAppName("test")
     sc = new SparkContext(conf)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index 5bd3955..b953add 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -21,6 +21,7 @@ import java.io.FileNotFoundException
 import java.util.Properties
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
+import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
 import org.apache.spark.scheduler.SchedulingMode._
 
 /**
@@ -31,7 +32,6 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
 
   val LOCAL = "local"
   val APP_NAME = "PoolSuite"
-  val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
   val TEST_POOL = "testPool"
 
   def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: 
TaskSchedulerImpl)
@@ -80,7 +80,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
    */
   test("Fair Scheduler Test") {
     val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
-    val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath)
+    val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath)
     sc = new SparkContext(LOCAL, APP_NAME, conf)
     val taskScheduler = new TaskSchedulerImpl(sc)
 
@@ -182,7 +182,7 @@ class PoolSuite extends SparkFunSuite with 
LocalSparkContext {
   test("SPARK-17663: FairSchedulableBuilder sets default values for blank or 
invalid datas") {
     val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml")
       .getFile()
-    val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath)
+    val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath)
 
     val rootPool = new Pool("", FAIR, 0, 0)
     val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf)
@@ -218,7 +218,7 @@ class PoolSuite extends SparkFunSuite with 
LocalSparkContext {
     val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, 
taskScheduler)
 
     val properties = new Properties()
-    properties.setProperty("spark.scheduler.pool", TEST_POOL)
+    properties.setProperty(SparkContext.SPARK_SCHEDULER_POOL, TEST_POOL)
 
     // When FIFO Scheduler is used and task sets are submitted, they should be 
added to
     // the root pool, and no additional pools should be created
@@ -296,7 +296,7 @@ class PoolSuite extends SparkFunSuite with 
LocalSparkContext {
   test("Fair Scheduler should build fair scheduler when " +
     "valid spark.scheduler.allocation.file property is set") {
     val xmlPath = 
getClass.getClassLoader.getResource("fairscheduler-with-valid-data.xml").getFile()
-    val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath)
+    val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath)
     sc = new SparkContext(LOCAL, APP_NAME, conf)
 
     val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
@@ -326,7 +326,7 @@ class PoolSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("Fair Scheduler should throw FileNotFoundException " +
     "when invalid spark.scheduler.allocation.file property is set") {
-    val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, 
"INVALID_FILE_PATH")
+    val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, 
"INVALID_FILE_PATH")
     sc = new SparkContext(LOCAL, APP_NAME, conf)
 
     val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index ff0f99b..aa6db8d 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -34,6 +34,7 @@ import org.scalatest.time.SpanSugar._
 import org.apache.spark._
 import org.apache.spark.TaskState._
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.SCHEDULER_REVIVE_INTERVAL
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.{CallSite, ThreadUtils, Utils}
 
@@ -290,7 +291,7 @@ private[spark] abstract class MockBackend(
   // Periodically revive offers to allow delay scheduling to work
   private val reviveThread =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
-  private val reviveIntervalMs = 
conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms")
+  private val reviveIntervalMs = 
conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(10L)
 
   /**
    * Test backends should call this to get a task that has been assigned to 
them by the scheduler.
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 6ffd1e8..2940aef 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -29,6 +29,7 @@ import org.scalatest.Matchers
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{ResetSystemProperties, RpcUtils}
 
@@ -358,7 +359,7 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
   }
 
   test("onTaskGettingResult() called when result fetched remotely") {
-    val conf = new SparkConf().set("spark.rpc.message.maxSize", "1")
+    val conf = new SparkConf().set(RPC_MESSAGE_MAX_SIZE, 1)
     sc = new SparkContext("local", "SparkListenerSuite", conf)
     val listener = new SaveTaskEvents
     sc.addSparkListener(listener)
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
index ea1439c..5b17dbf 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark._
 import org.apache.spark.TestUtils.JavaSourceFromString
+import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
 import org.apache.spark.storage.TaskResultBlockId
 import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, Utils}
 
@@ -110,7 +111,7 @@ class TaskResultGetterSuite extends SparkFunSuite with 
BeforeAndAfter with Local
 
   // Set the RPC message size to be as small as possible (it must be an 
integer, so 1 is as small
   // as we can make it) so the tests don't take too long.
-  def conf: SparkConf = new SparkConf().set("spark.rpc.message.maxSize", "1")
+  def conf: SparkConf = new SparkConf().set(RPC_MESSAGE_MAX_SIZE, 1)
 
   test("handling results smaller than max RPC message size") {
     sc = new SparkContext("local", "test", conf)
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 85c87b9..016adb8 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -92,7 +92,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
     sc = new SparkContext(conf)
     taskScheduler =
-      new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) {
+      new TaskSchedulerImpl(sc, sc.conf.get(config.TASK_MAX_FAILURES)) {
         override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): 
TaskSetManager = {
           val tsm = super.createTaskSetManager(taskSet, maxFailures)
           // we need to create a spied tsm just so we can set the 
TaskSetBlacklist
@@ -155,7 +155,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
   test("Scheduler correctly accounts for multiple CPUs per task") {
     val taskCpus = 2
-    val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
+    val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> 
taskCpus.toString)
     // Give zero core offers. Should not generate any tasks
     val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", 
"host0", 0),
       new WorkerOffer("executor1", "host1", 0))
@@ -185,7 +185,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
   test("Scheduler does not crash when tasks are not serializable") {
     val taskCpus = 2
-    val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
+    val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> 
taskCpus.toString)
     val numFreeCores = 1
     val taskSet = new TaskSet(
       Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)), 0, 0, 0, null)
@@ -1208,7 +1208,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
   test("don't schedule for a barrier taskSet if available slots are less than 
pending tasks") {
     val taskCpus = 2
-    val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
+    val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> 
taskCpus.toString)
 
     val numFreeCores = 3
     val workerOffers = IndexedSeq(
@@ -1225,7 +1225,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
   test("schedule tasks for a barrier taskSet if all tasks can be launched 
together") {
     val taskCpus = 2
-    val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
+    val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> 
taskCpus.toString)
 
     val numFreeCores = 3
     val workerOffers = IndexedSeq(
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 502a013..d0f98b5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -680,7 +680,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
   }
 
   test("[SPARK-13931] taskSetManager should not send Resubmitted tasks after 
being a zombie") {
-    val conf = new SparkConf().set("spark.speculation", "true")
+    val conf = new SparkConf().set(config.SPECULATION_ENABLED, true)
     sc = new SparkContext("local", "test", conf)
 
     sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2"))
@@ -747,12 +747,12 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
 
   test("[SPARK-22074] Task killed by other attempt task should not be 
resubmitted") {
-    val conf = new SparkConf().set("spark.speculation", "true")
+    val conf = new SparkConf().set(config.SPECULATION_ENABLED, true)
     sc = new SparkContext("local", "test", conf)
     // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
-    sc.conf.set("spark.speculation.multiplier", "0.0")
-    sc.conf.set("spark.speculation.quantile", "0.5")
-    sc.conf.set("spark.speculation", "true")
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
 
     var killTaskCalled = false
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
@@ -1013,8 +1013,8 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
     val taskSet = FakeTask.createTaskSet(4)
     // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
-    sc.conf.set("spark.speculation.multiplier", "0.0")
-    sc.conf.set("spark.speculation", "true")
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
     val clock = new ManualClock()
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
     val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
@@ -1070,9 +1070,9 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
     val taskSet = FakeTask.createTaskSet(5)
     // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
-    sc.conf.set("spark.speculation.multiplier", "0.0")
-    sc.conf.set("spark.speculation.quantile", "0.6")
-    sc.conf.set("spark.speculation", "true")
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.6)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
     val clock = new ManualClock()
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
     val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
@@ -1366,12 +1366,12 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
   }
 
   test("[SPARK-24677] Avoid NoSuchElementException from MedianHeap") {
-    val conf = new SparkConf().set("spark.speculation", "true")
+    val conf = new SparkConf().set(config.SPECULATION_ENABLED, true)
     sc = new SparkContext("local", "test", conf)
     // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
-    sc.conf.set("spark.speculation.multiplier", "0.0")
-    sc.conf.set("spark.speculation.quantile", "0.1")
-    sc.conf.set("spark.speculation", "true")
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.1)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
 
     sched = new FakeTaskScheduler(sc)
     sched.initialize(new FakeSchedulerBackend())
@@ -1416,13 +1416,13 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
 
   test("SPARK-24755 Executor loss can cause task to not be resubmitted") {
-    val conf = new SparkConf().set("spark.speculation", "true")
+    val conf = new SparkConf().set(config.SPECULATION_ENABLED, true)
     sc = new SparkContext("local", "test", conf)
     // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
-    sc.conf.set("spark.speculation.multiplier", "0.0")
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
 
-    sc.conf.set("spark.speculation.quantile", "0.5")
-    sc.conf.set("spark.speculation", "true")
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
 
     var killTaskCalled = false
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
@@ -1538,8 +1538,8 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
     val taskSet = FakeTask.createTaskSet(4)
     // Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
-    sc.conf.set("spark.speculation.multiplier", "0.0")
-    sc.conf.set("spark.speculation", "true")
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
     val clock = new ManualClock()
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
     val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
diff --git 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
index ae87109..57ed1f6 100644
--- 
a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala
@@ -30,7 +30,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite 
with LocalSparkContex
     val conf = new SparkConf(false)
       .set(config.SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
       .set(config.Kryo.KRYO_USER_REGISTRATORS, 
classOf[AppJarRegistrator].getName)
-      .set(config.MAX_TASK_FAILURES, 1)
+      .set(config.TASK_MAX_FAILURES, 1)
       .set(config.BLACKLIST_ENABLED, false)
 
     val jar = 
TestUtils.createJarWithClasses(List(AppJarRegistrator.customClassName))
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 71eeb04..ede9466 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -154,7 +154,7 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
 
     val jobProps = new Properties()
     jobProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup")
-    jobProps.setProperty("spark.scheduler.pool", "schedPool")
+    jobProps.setProperty(SparkContext.SPARK_SCHEDULER_POOL, "schedPool")
 
     listener.onJobStart(SparkListenerJobStart(1, time, stages, jobProps))
 
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 7aca0ad..0086a79 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -833,32 +833,32 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
     conf.set(SUBMIT_DEPLOY_MODE, "client")
     assert(Utils.isDynamicAllocationEnabled(conf) === false)
     assert(Utils.isDynamicAllocationEnabled(
-      conf.set("spark.dynamicAllocation.enabled", "false")) === false)
+      conf.set(DYN_ALLOCATION_ENABLED, false)) === false)
     assert(Utils.isDynamicAllocationEnabled(
-      conf.set("spark.dynamicAllocation.enabled", "true")) === true)
+      conf.set(DYN_ALLOCATION_ENABLED, true)) === true)
     assert(Utils.isDynamicAllocationEnabled(
       conf.set("spark.executor.instances", "1")) === true)
     assert(Utils.isDynamicAllocationEnabled(
       conf.set("spark.executor.instances", "0")) === true)
     assert(Utils.isDynamicAllocationEnabled(conf.set("spark.master", "local")) 
=== false)
-    
assert(Utils.isDynamicAllocationEnabled(conf.set("spark.dynamicAllocation.testing",
 "true")))
+    assert(Utils.isDynamicAllocationEnabled(conf.set(DYN_ALLOCATION_TESTING, 
true)))
   }
 
   test("getDynamicAllocationInitialExecutors") {
     val conf = new SparkConf()
     assert(Utils.getDynamicAllocationInitialExecutors(conf) === 0)
     assert(Utils.getDynamicAllocationInitialExecutors(
-      conf.set("spark.dynamicAllocation.minExecutors", "3")) === 3)
+      conf.set(DYN_ALLOCATION_MIN_EXECUTORS, 3)) === 3)
     assert(Utils.getDynamicAllocationInitialExecutors( // should use 
minExecutors
       conf.set("spark.executor.instances", "2")) === 3)
     assert(Utils.getDynamicAllocationInitialExecutors( // should use 
executor.instances
       conf.set("spark.executor.instances", "4")) === 4)
     assert(Utils.getDynamicAllocationInitialExecutors( // should use 
executor.instances
-      conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4)
+      conf.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 3)) === 4)
     assert(Utils.getDynamicAllocationInitialExecutors( // should use 
initialExecutors
-      conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5)
+      conf.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 5)) === 5)
     assert(Utils.getDynamicAllocationInitialExecutors( // should use 
minExecutors
-      conf.set("spark.dynamicAllocation.initialExecutors", "2")
+      conf.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 2)
         .set("spark.executor.instances", "1")) === 3)
   }
 
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
index 95d874b..2853b75 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala
@@ -20,6 +20,7 @@ package org.apache.spark.mllib.util
 import org.scalatest.{BeforeAndAfterAll, Suite}
 
 import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
 
 trait LocalClusterSparkContext extends BeforeAndAfterAll { self: Suite =>
   @transient var sc: SparkContext = _
@@ -29,7 +30,7 @@ trait LocalClusterSparkContext extends BeforeAndAfterAll { 
self: Suite =>
     val conf = new SparkConf()
       .setMaster("local-cluster[2, 1, 1024]")
       .setAppName("test-cluster")
-      .set("spark.rpc.message.maxSize", "1") // set to 1MB to detect direct 
serialization of data
+      .set(RPC_MESSAGE_MAX_SIZE, 1) // set to 1MB to detect direct 
serialization of data
     sc = new SparkContext(conf)
   }
 
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index e285e20..0d2737e 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -26,6 +26,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import 
org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
SchedulerBackendUtils}
@@ -47,7 +48,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
     ExecutionContext.fromExecutorService(requestExecutorsService)
 
   protected override val minRegisteredRatio =
-    if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) 
{
+    if (conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).isEmpty) {
       0.8
     } else {
       super.minRegisteredRatio
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index fb23535..be2854f 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -630,8 +630,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
           .registerDriverWithShuffleService(
             slave.hostname,
             externalShufflePort,
-            sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
-              s"${sc.conf.getTimeAsSeconds("spark.network.timeout", 
"120s")}s"),
+            sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT),
             sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL))
         slave.shuffleRegistered = true
       }
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 0cfaa0a..f2fd5e6 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -591,7 +591,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
     val expectedCores = 1
     setBackend(Map(
       "spark.cores.max" -> expectedCores.toString,
-      "spark.scheduler.minRegisteredResourcesRatio" -> "1.0"))
+      SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO.key -> "1.0"))
 
     val offers = List(Resources(backend.executorMemory(sc), expectedCores))
     offerResources(offers)
@@ -604,8 +604,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
 
   test("supports data locality with dynamic allocation") {
     setBackend(Map(
-      "spark.dynamicAllocation.enabled" -> "true",
-      "spark.dynamicAllocation.testing" -> "true",
+      DYN_ALLOCATION_ENABLED.key -> "true",
+      DYN_ALLOCATION_TESTING.key -> "true",
       "spark.locality.wait" -> "1s"))
 
     assert(backend.getExecutorIds().isEmpty)
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index a7bed75..3dae11e 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -33,6 +33,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config
 import org.apache.spark.internal.config.UI._
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler._
@@ -52,7 +53,7 @@ private[spark] abstract class YarnSchedulerBackend(
   private val stopped = new AtomicBoolean(false)
 
   override val minRegisteredRatio =
-    if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) 
{
+    if (conf.get(config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).isEmpty) {
       0.8
     } else {
       super.minRegisteredRatio
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
index 37bccaf..8c62069 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Network._
 import org.apache.spark.network.shuffle.ShuffleTestAccessor
 import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor}
 import org.apache.spark.tags.ExtendedYarnTest
@@ -94,14 +95,14 @@ class YarnShuffleAuthSuite extends 
YarnShuffleIntegrationSuite {
   override def newYarnConfig(): YarnConfiguration = {
     val yarnConfig = super.newYarnConfig()
     yarnConfig.set(NETWORK_AUTH_ENABLED.key, "true")
-    yarnConfig.set(NETWORK_ENCRYPTION_ENABLED.key, "true")
+    yarnConfig.set(NETWORK_CRYPTO_ENABLED.key, "true")
     yarnConfig
   }
 
   override protected def extraSparkConf(): Map[String, String] = {
     super.extraSparkConf() ++ Map(
       NETWORK_AUTH_ENABLED.key -> "true",
-      NETWORK_ENCRYPTION_ENABLED.key -> "true"
+      NETWORK_CRYPTO_ENABLED.key -> "true"
     )
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
index 6196757..3284231 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala
@@ -63,7 +63,7 @@ class RuntimeConfigSuite extends SparkFunSuite {
     assert(!conf.isModifiable("spark.sql.sources.schemaStringLengthThreshold"))
     assert(conf.isModifiable("spark.sql.streaming.checkpointLocation"))
     // Core configs
-    assert(!conf.isModifiable("spark.task.cpus"))
+    assert(!conf.isModifiable(config.CPUS_PER_TASK.key))
     assert(!conf.isModifiable("spark.executor.cores"))
     // Invalid config parameters
     assert(!conf.isModifiable(""))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 5e97314..12bc68c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -35,6 +35,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 import org.apache.spark.LocalSparkContext._
+import org.apache.spark.internal.config.Network.RPC_NUM_RETRIES
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.util.quietly
@@ -393,7 +394,7 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
       .set(StateStore.MAINTENANCE_INTERVAL_CONFIG, "10ms")
       // Make sure that when SparkContext stops, the StateStore maintenance 
thread 'quickly'
       // fails to talk to the StateStoreCoordinator and unloads all the 
StateStores
-      .set("spark.rpc.numRetries", "1")
+      .set(RPC_NUM_RETRIES, 1)
     val opId = 0
     val dir = newDir()
     val storeProviderId = StateStoreProviderId(StateStoreId(dir, opId, 0), 
UUID.randomUUID)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index d79c0cf..4c008c4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -603,7 +603,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends 
SparkFunSuite {
     val conf = new SparkConf()
       .setMaster("local")
       .setAppName("test")
-      .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this 
test quickly
+      .set(config.TASK_MAX_FAILURES, 1) // Don't retry the tasks to run this 
test quickly
       .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run 
this test quickly
       .set(ASYNC_TRACKING_ENABLED, false)
     withSpark(new SparkContext(conf)) { sc =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index 6e4f2bb..daca65f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.SparkContext
+import org.apache.spark.internal.config.SPECULATION_ENABLED
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.v2.reader._
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index e68c601..83dfa74 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -32,6 +32,7 @@ import org.apache.hive.service.cli._
 import org.apache.hive.service.cli.operation.ExecuteStatementOperation
 import org.apache.hive.service.cli.session.HiveSession
 
+import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext}
 import org.apache.spark.sql.execution.HiveResult
@@ -226,7 +227,7 @@ private[hive] class SparkExecuteStatementOperation(
     sqlContext.sparkContext.setJobGroup(statementId, statement)
     val pool = sessionToActivePool.get(parentSession.getSessionHandle)
     if (pool != null) {
-      sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
+      
sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, 
pool)
     }
     try {
       result = sqlContext.sql(statement)
@@ -234,7 +235,8 @@ private[hive] class SparkExecuteStatementOperation(
       result.queryExecution.logical match {
         case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
           sessionToActivePool.put(parentSession.getSessionHandle, value)
-          logInfo(s"Setting spark.scheduler.pool=$value for future statements 
in this session.")
+          logInfo(s"Setting ${SparkContext.SPARK_SCHEDULER_POOL}=$value for 
future statements " +
+            "in this session.")
         case _ =>
       }
       HiveThriftServer2.listener.onStatementParsed(statementId, 
result.queryExecution.toString())
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
index d8d2a80..2707107 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.{JobConf, Reporter}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.SPECULATION_ENABLED
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, 
OutputWriterFactory}
@@ -69,7 +70,7 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc)
 
     // When speculation is on and output committer class name contains 
"Direct", we should warn
     // users that they may loss data if they are using a direct output 
committer.
-    val speculationEnabled = 
sparkSession.sparkContext.conf.getBoolean("spark.speculation", false)
+    val speculationEnabled = 
sparkSession.sparkContext.conf.get(SPECULATION_ENABLED)
     val outputCommitterClass = conf.get("mapred.output.committer.class", "")
     if (speculationEnabled && outputCommitterClass.contains("Direct")) {
       val warningMessage =
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index 7ec02c4..4707d6e 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.mockito.MockitoSugar
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, 
DYN_ALLOCATION_TESTING}
 import org.apache.spark.streaming.{DummyInputDStream, Seconds, 
StreamingContext}
 import org.apache.spark.util.{ManualClock, Utils}
 
@@ -332,8 +333,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
 
     val confWithBothDynamicAllocationEnabled = new SparkConf()
       .set("spark.streaming.dynamicAllocation.enabled", "true")
-      .set("spark.dynamicAllocation.enabled", "true")
-      .set("spark.dynamicAllocation.testing", "true")
+      .set(DYN_ALLOCATION_ENABLED, true)
+      .set(DYN_ALLOCATION_TESTING, true)
     
require(Utils.isDynamicAllocationEnabled(confWithBothDynamicAllocationEnabled) 
=== true)
     withStreamingContext(confWithBothDynamicAllocationEnabled) { ssc =>
       intercept[IllegalArgumentException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to