This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7296999  [SPARK-26462][CORE] Use ConfigEntry for hardcoded configs for 
execution categories
7296999 is described below

commit 7296999c4751cfddcca5b77e3348354cff65d069
Author: Pralabh Kumar <pkum...@linkedin.com>
AuthorDate: Tue Jan 15 12:50:07 2019 -0600

    [SPARK-26462][CORE] Use ConfigEntry for hardcoded configs for execution 
categories
    
    ## What changes were proposed in this pull request?
    
    Make the following hardcoded configs to use ConfigEntry.
    spark.memory
    spark.storage
    spark.io
    spark.buffer
    spark.rdd
    spark.locality
    spark.broadcast
    spark.reducer
    
    ## How was this patch tested?
    
    Existing tests.
    
    Closes #23447 from pralabhkumar/execution_categories.
    
    Authored-by: Pralabh Kumar <pkum...@linkedin.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../scala/org/apache/spark/HeartbeatReceiver.scala |   4 +-
 .../main/scala/org/apache/spark/SparkConf.scala    |   8 +-
 .../org/apache/spark/api/python/PythonRDD.scala    |   3 +-
 .../org/apache/spark/api/python/PythonRunner.scala |   4 +-
 .../scala/org/apache/spark/api/r/RRunner.scala     |   4 +-
 .../apache/spark/broadcast/TorrentBroadcast.scala  |   8 +-
 .../org/apache/spark/deploy/SparkHadoopUtil.scala  |   3 +-
 .../scala/org/apache/spark/executor/Executor.scala |   2 +-
 .../org/apache/spark/internal/config/package.scala | 191 ++++++++++++++++++++-
 .../org/apache/spark/io/CompressionCodec.scala     |  13 +-
 .../org/apache/spark/memory/MemoryManager.scala    |   2 +-
 .../apache/spark/memory/UnifiedMemoryManager.scala |   4 +-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala |   3 +-
 .../apache/spark/rdd/ReliableCheckpointRDD.scala   |  10 +-
 .../main/scala/org/apache/spark/rdd/UnionRDD.scala |   3 +-
 .../apache/spark/scheduler/TaskSetManager.scala    |  13 +-
 .../spark/serializer/SerializerManager.scala       |   5 +-
 .../spark/shuffle/BlockStoreShuffleReader.scala    |   4 +-
 .../org/apache/spark/storage/BlockManager.scala    |   7 +-
 .../spark/storage/BlockManagerMasterEndpoint.scala |   6 +-
 .../scala/org/apache/spark/storage/DiskStore.scala |   2 +-
 .../org/apache/spark/storage/TopologyMapper.scala  |   4 +-
 .../apache/spark/storage/memory/MemoryStore.scala  |   4 +-
 .../scala/org/apache/spark/DistributedSuite.scala  |   8 +-
 .../scala/org/apache/spark/SparkConfSuite.scala    |   4 +-
 .../apache/spark/broadcast/BroadcastSuite.scala    |   7 +-
 .../spark/memory/UnifiedMemoryManagerSuite.scala   |  20 ++-
 .../test/scala/org/apache/spark/rdd/RDDSuite.scala |   5 +-
 .../scheduler/BlacklistIntegrationSuite.scala      |   2 +-
 .../scheduler/EventLoggingListenerSuite.scala      |   2 +-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   |   4 +-
 .../spark/scheduler/TaskSetManagerSuite.scala      |   4 +-
 .../shuffle/sort/ShuffleExternalSorterSuite.scala  |   4 +-
 .../storage/BlockManagerReplicationSuite.scala     |  17 +-
 .../apache/spark/storage/BlockManagerSuite.scala   |  18 +-
 .../org/apache/spark/storage/DiskStoreSuite.scala  |   2 +-
 .../apache/spark/storage/MemoryStoreSuite.scala    |   3 +-
 .../apache/spark/storage/TopologyMapperSuite.scala |   3 +-
 .../collection/ExternalAppendOnlyMapSuite.scala    |   2 +-
 39 files changed, 309 insertions(+), 103 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala 
b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index ab0ae55..67f2c27 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
 import scala.concurrent.Future
 
 import org.apache.spark.executor.ExecutorMetrics
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.BlockManagerId
@@ -83,7 +83,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, 
clock: Clock)
   // "spark.network.timeoutInterval" uses "seconds", while
   // "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds"
   private val timeoutIntervalMs =
-    sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s")
+    sc.conf.get(config.STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL)
   private val checkTimeoutIntervalMs =
     sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", 
s"${timeoutIntervalMs}ms") * 1000
 
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 1100868..22bcb81 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -532,7 +532,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
     }
 
     // Validate memory fractions
-    for (key <- Seq("spark.memory.fraction", "spark.memory.storageFraction")) {
+    for (key <- Seq(MEMORY_FRACTION.key, MEMORY_STORAGE_FRACTION.key)) {
       val value = getDouble(key, 0.5)
       if (value > 1 || value < 0) {
         throw new IllegalArgumentException(s"$key should be between 0 and 1 
(was '$value').")
@@ -664,7 +664,7 @@ private[spark] object SparkConf extends Logging {
       AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3",
         // Translate old value to a duration, with 10s wait time per try.
         translation = s => s"${s.toLong * 10}s")),
-    "spark.reducer.maxSizeInFlight" -> Seq(
+    REDUCER_MAX_SIZE_IN_FLIGHT.key -> Seq(
       AlternateConfig("spark.reducer.maxMbInFlight", "1.4")),
     "spark.kryoserializer.buffer" -> Seq(
       AlternateConfig("spark.kryoserializer.buffer.mb", "1.4",
@@ -675,9 +675,9 @@ private[spark] object SparkConf extends Logging {
       AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")),
     EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> Seq(
       AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")),
-    "spark.io.compression.snappy.blockSize" -> Seq(
+    IO_COMPRESSION_SNAPPY_BLOCKSIZE.key -> Seq(
       AlternateConfig("spark.io.compression.snappy.block.size", "1.4")),
-    "spark.io.compression.lz4.blockSize" -> Seq(
+    IO_COMPRESSION_LZ4_BLOCKSIZE.key -> Seq(
       AlternateConfig("spark.io.compression.lz4.block.size", "1.4")),
     "spark.rpc.numRetries" -> Seq(
       AlternateConfig("spark.akka.num.retries", "1.4")),
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 5ed5070..14ea289 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -39,6 +39,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, 
JavaSparkContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.input.PortableDataStream
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.security.SocketAuthHelper
@@ -604,7 +605,7 @@ private[spark] class PythonAccumulatorV2(
 
   Utils.checkHost(serverHost)
 
-  val bufferSize = SparkEnv.get.conf.getInt("spark.buffer.size", 65536)
+  val bufferSize = SparkEnv.get.conf.get(BUFFER_SIZE)
 
   /**
    * We try to reuse a single Socket to transfer accumulator updates, as they 
are all added
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 5168e93..b7f14e0 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -27,7 +27,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark._
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.EXECUTOR_CORES
+import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES}
 import org.apache.spark.internal.config.Python._
 import org.apache.spark.security.SocketAuthHelper
 import org.apache.spark.util._
@@ -71,7 +71,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
   require(funcs.length == argOffsets.length, "argOffsets should have the same 
length as funcs")
 
   private val conf = SparkEnv.get.conf
-  private val bufferSize = conf.getInt("spark.buffer.size", 65536)
+  private val bufferSize = conf.get(BUFFER_SIZE)
   private val reuseWorker = conf.get(PYTHON_WORKER_REUSE)
   // each python worker gets an equal part of the allocation. the worker pool 
will grow to the
   // number of concurrent tasks, which is determined by the number of cores in 
this executor.
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala 
b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
index 3fdea04..b367c7f 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala
@@ -27,6 +27,7 @@ import scala.util.Try
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
 import org.apache.spark.internal.config.R._
 import org.apache.spark.util.Utils
 
@@ -124,7 +125,8 @@ private[spark] class RRunner[U](
       partitionIndex: Int): Unit = {
     val env = SparkEnv.get
     val taskContext = TaskContext.get()
-    val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+    val bufferSize = System.getProperty(BUFFER_SIZE.key,
+      BUFFER_SIZE.defaultValueString).toInt
     val stream = new BufferedOutputStream(output, bufferSize)
 
     new Thread("writer for R") {
diff --git 
a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala 
b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 26ead57..6410866 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -27,7 +27,7 @@ import scala.reflect.ClassTag
 import scala.util.Random
 
 import org.apache.spark._
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.storage._
@@ -74,14 +74,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, 
id: Long)
   @transient private var blockSize: Int = _
 
   private def setConf(conf: SparkConf) {
-    compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
+    compressionCodec = if (conf.get(config.BROADCAST_COMPRESS)) {
       Some(CompressionCodec.createCodec(conf))
     } else {
       None
     }
     // Note: use getSizeAsKb (not bytes) to maintain compatibility if no units 
are provided
-    blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 
1024
-    checksumEnabled = conf.getBoolean("spark.broadcast.checksum", true)
+    blockSize = conf.get(config.BROADCAST_BLOCKSIZE).toInt * 1024
+    checksumEnabled = conf.get(config.BROADCAST_CHECKSUM)
   }
   setConf(SparkEnv.get.conf)
 
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 9371992..a97d072 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.BUFFER_SIZE
 import org.apache.spark.util.Utils
 
 /**
@@ -442,7 +443,7 @@ private[spark] object SparkHadoopUtil {
         }
       }
       appendSparkHadoopConfigs(conf, hadoopConf)
-      val bufferSize = conf.get("spark.buffer.size", "65536")
+      val bufferSize = conf.get(BUFFER_SIZE).toString
       hadoopConf.set("io.file.buffer.size", bufferSize)
     }
   }
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index a30a501..a6759c4 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -442,7 +442,7 @@ private[spark] class Executor(
             val errMsg =
               s"${releasedLocks.size} block locks were not released by TID = 
$taskId:\n" +
                 releasedLocks.mkString("[", ", ", "]")
-            if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) {
+            if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) {
               throw new SparkException(errMsg)
             } else {
               logInfo(errMsg)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index c942c27..95becfa 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.util.ByteUnit
 import org.apache.spark.scheduler.EventLoggingListener
+import org.apache.spark.storage.{DefaultTopologyMapper, 
RandomBlockReplicationPolicy}
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.util.Utils
 
@@ -187,6 +188,82 @@ package object config {
     .checkValue(_ >= 0, "The off-heap memory size must not be negative")
     .createWithDefault(0)
 
+  private[spark] val MEMORY_STORAGE_FRACTION = 
ConfigBuilder("spark.memory.storageFraction")
+    .doc("Amount of storage memory immune to eviction, expressed as a fraction 
of the " +
+      "size of the region set aside by spark.memory.fraction. The higher this 
is, the " +
+      "less working memory may be available to execution and tasks may spill 
to disk more " +
+      "often. Leaving this at the default value is recommended. ")
+    .doubleConf
+    .createWithDefault(0.5)
+
+  private[spark] val MEMORY_FRACTION = ConfigBuilder("spark.memory.fraction")
+    .doc("Fraction of (heap space - 300MB) used for execution and storage. The 
" +
+      "lower this is, the more frequently spills and cached data eviction 
occur. " +
+      "The purpose of this config is to set aside memory for internal 
metadata, " +
+      "user data structures, and imprecise size estimation in the case of 
sparse, " +
+      "unusually large records. Leaving this at the default value is 
recommended.  ")
+    .doubleConf
+    .createWithDefault(0.6)
+
+  private[spark] val STORAGE_SAFETY_FRACTION = 
ConfigBuilder("spark.storage.safetyFraction")
+    .doubleConf
+    .createWithDefault(0.9)
+
+  private[spark] val STORAGE_UNROLL_MEMORY_THRESHOLD =
+    ConfigBuilder("spark.storage.unrollMemoryThreshold")
+      .doc("Initial memory to request before unrolling any block")
+      .longConf
+      .createWithDefault(1024 * 1024)
+
+  private[spark] val STORAGE_REPLICATION_PROACTIVE =
+    ConfigBuilder("spark.storage.replication.proactive")
+      .doc("Enables proactive block replication for RDD blocks. " +
+        "Cached RDD block replicas lost due to executor failures are 
replenished " +
+        "if there are any existing available replicas. This tries to " +
+        "get the replication level of the block to the initial number")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val STORAGE_MEMORY_MAP_THRESHOLD =
+    ConfigBuilder("spark.storage.memoryMapThreshold")
+      .doc("Size in bytes of a block above which Spark memory maps when " +
+        "reading a block from disk. " +
+        "This prevents Spark from memory mapping very small blocks. " +
+        "In general, memory mapping has high overhead for blocks close to or 
below " +
+        "the page size of the operating system.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("2m")
+
+  private[spark] val STORAGE_REPLICATION_POLICY =
+    ConfigBuilder("spark.storage.replication.policy")
+      .stringConf
+      .createWithDefaultString(classOf[RandomBlockReplicationPolicy].getName)
+
+  private[spark] val STORAGE_REPLICATION_TOPOLOGY_MAPPER =
+    ConfigBuilder("spark.storage.replication.topologyMapper")
+      .stringConf
+      .createWithDefaultString(classOf[DefaultTopologyMapper].getName)
+
+  private[spark] val STORAGE_CACHED_PEERS_TTL = 
ConfigBuilder("spark.storage.cachedPeersTtl")
+    .intConf.createWithDefault(60 * 1000)
+
+  private[spark] val STORAGE_MAX_REPLICATION_FAILURE =
+    ConfigBuilder("spark.storage.maxReplicationFailures")
+      .intConf.createWithDefault(1)
+
+  private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
+    
ConfigBuilder("spark.storage.replication.topologyFile").stringConf.createOptional
+
+  private[spark] val STORAGE_EXCEPTION_PIN_LEAK =
+    ConfigBuilder("spark.storage.exceptionOnPinLeak")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL =
+    ConfigBuilder("spark.storage.blockManagerTimeoutIntervalMs")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("60s")
+
   private[spark] val IS_PYTHON_APP = 
ConfigBuilder("spark.yarn.isPython").internal()
     .booleanConf.createWithDefault(false)
 
@@ -400,7 +477,7 @@ package object config {
 
   private[spark] val IGNORE_MISSING_FILES = 
ConfigBuilder("spark.files.ignoreMissingFiles")
     .doc("Whether to ignore missing files. If true, the Spark jobs will 
continue to run when " +
-        "encountering missing files and the contents that have been read will 
still be returned.")
+      "encountering missing files and the contents that have been read will 
still be returned.")
     .booleanConf
     .createWithDefault(false)
 
@@ -772,4 +849,116 @@ package object config {
   private[spark] val MASTER_UI_PORT = ConfigBuilder("spark.master.ui.port")
     .intConf
     .createWithDefault(8080)
+
+  private[spark] val IO_COMPRESSION_SNAPPY_BLOCKSIZE =
+    ConfigBuilder("spark.io.compression.snappy.blockSize")
+      .doc("Block size in bytes used in Snappy compression, in the case when " 
+
+        "Snappy compression codec is used. Lowering this block size " +
+        "will also lower shuffle memory usage when Snappy is used")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("32k")
+
+  private[spark] val IO_COMPRESSION_LZ4_BLOCKSIZE =
+    ConfigBuilder("spark.io.compression.lz4.blockSize")
+      .doc("Block size in bytes used in LZ4 compression, in the case when LZ4 
compression" +
+        "codec is used. Lowering this block size will also lower shuffle 
memory " +
+        "usage when LZ4 is used.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("32k")
+
+  private[spark] val IO_COMPRESSION_CODEC =
+    ConfigBuilder("spark.io.compression.codec")
+      .doc("The codec used to compress internal data such as RDD partitions, 
event log, " +
+        "broadcast variables and shuffle outputs. By default, Spark provides 
four codecs: " +
+        "lz4, lzf, snappy, and zstd. You can also use fully qualified class 
names to specify " +
+        "the codec")
+      .stringConf
+      .createWithDefaultString("lz4")
+
+  private[spark] val IO_COMPRESSION_ZSTD_BUFFERSIZE =
+    ConfigBuilder("spark.io.compression.zstd.bufferSize")
+      .doc("Buffer size in bytes used in Zstd compression, in the case when 
Zstd " +
+        "compression codec is used. Lowering this size will lower the shuffle 
" +
+        "memory usage when Zstd is used, but it might increase the compression 
" +
+        "cost because of excessive JNI call overhead")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("32k")
+
+  private[spark] val IO_COMPRESSION_ZSTD_LEVEL =
+    ConfigBuilder("spark.io.compression.zstd.level")
+      .doc("Compression level for Zstd compression codec. Increasing the 
compression " +
+        "level will result in better compression at the expense of more CPU 
and memory")
+      .intConf
+      .createWithDefault(1)
+
+  private[spark] val BUFFER_SIZE =
+    ConfigBuilder("spark.buffer.size")
+      .intConf
+      .createWithDefault(65536)
+
+  private[spark] val LOCALITY_WAIT_PROCESS = 
ConfigBuilder("spark.locality.wait.process")
+    .fallbackConf(LOCALITY_WAIT)
+
+  private[spark] val LOCALITY_WAIT_NODE = 
ConfigBuilder("spark.locality.wait.node")
+    .fallbackConf(LOCALITY_WAIT)
+
+  private[spark] val LOCALITY_WAIT_RACK = 
ConfigBuilder("spark.locality.wait.rack")
+    .fallbackConf(LOCALITY_WAIT)
+
+    private[spark] val REDUCER_MAX_SIZE_IN_FLIGHT = 
ConfigBuilder("spark.reducer.maxSizeInFlight")
+    .doc("Maximum size of map outputs to fetch simultaneously from each reduce 
task, " +
+      "in MiB unless otherwise specified. Since each output requires us to 
create a " +
+      "buffer to receive it, this represents a fixed memory overhead per 
reduce task, " +
+      "so keep it small unless you have a large amount of memory")
+    .bytesConf(ByteUnit.MiB)
+    .createWithDefaultString("48m")
+
+  private[spark] val REDUCER_MAX_REQS_IN_FLIGHT = 
ConfigBuilder("spark.reducer.maxReqsInFlight")
+    .doc("This configuration limits the number of remote requests to fetch 
blocks at " +
+      "any given point. When the number of hosts in the cluster increase, " +
+      "it might lead to very large number of inbound connections to one or 
more nodes, " +
+      "causing the workers to fail under load. By allowing it to limit the 
number of " +
+      "fetch requests, this scenario can be mitigated")
+    .intConf
+    .createWithDefault(Int.MaxValue)
+
+  private[spark] val BROADCAST_COMPRESS = 
ConfigBuilder("spark.broadcast.compress")
+    .doc("Whether to compress broadcast variables before sending them. " +
+      "Generally a good idea. Compression will use spark.io.compression.codec")
+    .booleanConf.createWithDefault(true)
+
+  private[spark] val BROADCAST_BLOCKSIZE = 
ConfigBuilder("spark.broadcast.blockSize")
+    .doc("Size of each piece of a block for TorrentBroadcastFactory, in " +
+      "KiB unless otherwise specified. Too large a value decreases " +
+      "parallelism during broadcast (makes it slower); however, " +
+      "if it is too small, BlockManager might take a performance hit")
+    .bytesConf(ByteUnit.KiB)
+    .createWithDefaultString("4m")
+
+  private[spark] val BROADCAST_CHECKSUM = 
ConfigBuilder("spark.broadcast.checksum")
+    .doc("Whether to enable checksum for broadcast. If enabled, " +
+      "broadcasts will include a checksum, which can help detect " +
+      "corrupted blocks, at the cost of computing and sending a little " +
+      "more data. It's possible to disable it if the network has other " +
+      "mechanisms to guarantee data won't be corrupted during broadcast")
+    .booleanConf.createWithDefault(true)
+
+  private[spark] val RDD_COMPRESS = ConfigBuilder("spark.rdd.compress")
+    .doc("Whether to compress serialized RDD partitions " +
+      "(e.g. for StorageLevel.MEMORY_ONLY_SER in Scala " +
+      "or StorageLevel.MEMORY_ONLY in Python). Can save substantial " +
+      "space at the cost of some extra CPU time. " +
+      "Compression will use spark.io.compression.codec")
+    .booleanConf.createWithDefault(false)
+
+  private[spark] val RDD_PARALLEL_LISTING_THRESHOLD =
+    ConfigBuilder("spark.rdd.parallelListingThreshold")
+      .intConf
+      .createWithDefault(10)
+
+  private[spark] val RDD_LIMIT_SCALE_UP_FACTOR =
+    ConfigBuilder("spark.rdd.limit.scaleUpFactor")
+      .intConf
+      .createWithDefault(4)
+
 }
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala 
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index c4f4b18..288c0d1 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -27,6 +27,7 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, 
SnappyOutputStream}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.config._
 import org.apache.spark.util.Utils
 
 /**
@@ -51,7 +52,7 @@ trait CompressionCodec {
 
 private[spark] object CompressionCodec {
 
-  private val configKey = "spark.io.compression.codec"
+  private val configKey = IO_COMPRESSION_CODEC.key
 
   private[spark] def supportsConcatenationOfSerializedStreams(codec: 
CompressionCodec): Boolean = {
     (codec.isInstanceOf[SnappyCompressionCodec] || 
codec.isInstanceOf[LZFCompressionCodec]
@@ -65,7 +66,7 @@ private[spark] object CompressionCodec {
     "zstd" -> classOf[ZStdCompressionCodec].getName)
 
   def getCodecName(conf: SparkConf): String = {
-    conf.get(configKey, DEFAULT_COMPRESSION_CODEC)
+    conf.get(IO_COMPRESSION_CODEC)
   }
 
   def createCodec(conf: SparkConf): CompressionCodec = {
@@ -117,7 +118,7 @@ private[spark] object CompressionCodec {
 class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec {
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
-    val blockSize = conf.getSizeAsBytes("spark.io.compression.lz4.blockSize", 
"32k").toInt
+    val blockSize = conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt
     new LZ4BlockOutputStream(s, blockSize)
   }
 
@@ -166,7 +167,7 @@ class SnappyCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
   }
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
-    val blockSize = 
conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
+    val blockSize = conf.get(IO_COMPRESSION_SNAPPY_BLOCKSIZE).toInt
     new SnappyOutputStream(s, blockSize)
   }
 
@@ -185,10 +186,10 @@ class SnappyCompressionCodec(conf: SparkConf) extends 
CompressionCodec {
 @DeveloperApi
 class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec {
 
-  private val bufferSize = 
conf.getSizeAsBytes("spark.io.compression.zstd.bufferSize", "32k").toInt
+  private val bufferSize = conf.get(IO_COMPRESSION_ZSTD_BUFFERSIZE).toInt
   // Default compression level for zstd compression to 1 because it is
   // fastest of all with reasonably high compression ratio.
-  private val level = conf.getInt("spark.io.compression.zstd.level", 1)
+  private val level = conf.get(IO_COMPRESSION_ZSTD_LEVEL)
 
   override def compressedOutputStream(s: OutputStream): OutputStream = {
     // Wrap the zstd output stream in a buffered output stream, so that we can
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 4fde2d0..4ba4107 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -57,7 +57,7 @@ private[spark] abstract class MemoryManager(
 
   protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE)
   protected[this] val offHeapStorageMemory =
-    (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 
0.5)).toLong
+    (maxOffHeapMemory * conf.get(MEMORY_STORAGE_FRACTION)).toLong
 
   offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - 
offHeapStorageMemory)
   offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
diff --git 
a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala 
b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index a0fbbbd..7282a83 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -203,7 +203,7 @@ object UnifiedMemoryManager {
       conf,
       maxHeapMemory = maxMemory,
       onHeapStorageRegionSize =
-        (maxMemory * conf.getDouble("spark.memory.storageFraction", 
0.5)).toLong,
+        (maxMemory * conf.get(config.MEMORY_STORAGE_FRACTION)).toLong,
       numCores = numCores)
   }
 
@@ -230,7 +230,7 @@ object UnifiedMemoryManager {
       }
     }
     val usableMemory = systemMemory - reservedMemory
-    val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
+    val memoryFraction = conf.get(config.MEMORY_FRACTION)
     (usableMemory * memoryFraction).toLong
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 6a25ee2..b2ed2d3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -36,6 +36,7 @@ import org.apache.spark.Partitioner._
 import org.apache.spark.annotation.{DeveloperApi, Experimental, Since}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.RDD_LIMIT_SCALE_UP_FACTOR
 import org.apache.spark.partial.BoundedDouble
 import org.apache.spark.partial.CountEvaluator
 import org.apache.spark.partial.GroupedCountEvaluator
@@ -1349,7 +1350,7 @@ abstract class RDD[T: ClassTag](
    * an exception if called on an RDD of `Nothing` or `Null`.
    */
   def take(num: Int): Array[T] = withScope {
-    val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 
4), 2)
+    val scaleUpFactor = Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2)
     if (num == 0) {
       new Array[T](0)
     } else {
diff --git 
a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 8273d8a..d165610 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.CHECKPOINT_COMPRESS
+import org.apache.spark.internal.config.{BUFFER_SIZE, CHECKPOINT_COMPRESS}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 
@@ -176,7 +176,7 @@ private[spark] object ReliableCheckpointRDD extends Logging 
{
     val tempOutputPath =
       new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
 
-    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
+    val bufferSize = env.conf.get(BUFFER_SIZE)
 
     val fileOutputStream = if (blockSize < 0) {
       val fileStream = fs.create(tempOutputPath, false, bufferSize)
@@ -222,7 +222,7 @@ private[spark] object ReliableCheckpointRDD extends Logging 
{
     sc: SparkContext, partitioner: Partitioner, checkpointDirPath: Path): Unit 
= {
     try {
       val partitionerFilePath = new Path(checkpointDirPath, 
checkpointPartitionerFileName)
-      val bufferSize = sc.conf.getInt("spark.buffer.size", 65536)
+      val bufferSize = sc.conf.get(BUFFER_SIZE)
       val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
       val fileOutputStream = fs.create(partitionerFilePath, false, bufferSize)
       val serializer = SparkEnv.get.serializer.newInstance()
@@ -249,7 +249,7 @@ private[spark] object ReliableCheckpointRDD extends Logging 
{
       sc: SparkContext,
       checkpointDirPath: String): Option[Partitioner] = {
     try {
-      val bufferSize = sc.conf.getInt("spark.buffer.size", 65536)
+      val bufferSize = sc.conf.get(BUFFER_SIZE)
       val partitionerFilePath = new Path(checkpointDirPath, 
checkpointPartitionerFileName)
       val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
       val fileInputStream = fs.open(partitionerFilePath, bufferSize)
@@ -287,7 +287,7 @@ private[spark] object ReliableCheckpointRDD extends Logging 
{
       context: TaskContext): Iterator[T] = {
     val env = SparkEnv.get
     val fs = path.getFileSystem(broadcastedConf.value.value)
-    val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
+    val bufferSize = env.conf.get(BUFFER_SIZE)
     val fileInputStream = {
       val fileStream = fs.open(path, bufferSize)
       if (env.conf.get(CHECKPOINT_COMPRESS)) {
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 60e383a..6480e87 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -26,6 +26,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, 
TaskContext}
 import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
 import org.apache.spark.util.Utils
 
 /**
@@ -71,7 +72,7 @@ class UnionRDD[T: ClassTag](
 
   // visible for testing
   private[spark] val isPartitionListingParallel: Boolean =
-    rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10)
+    rdds.length > conf.get(RDD_PARALLEL_LISTING_THRESHOLD)
 
   override def getPartitions: Array[Partition] = {
     val parRDDs = if (isPartitionListingParallel) {
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 41f032c..1b42cb4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -1043,16 +1043,15 @@ private[spark] class TaskSetManager(
   }
 
   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
-    val defaultWait = conf.get(config.LOCALITY_WAIT)
-    val localityWaitKey = level match {
-      case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process"
-      case TaskLocality.NODE_LOCAL => "spark.locality.wait.node"
-      case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack"
+    val localityWait = level match {
+      case TaskLocality.PROCESS_LOCAL => config.LOCALITY_WAIT_PROCESS
+      case TaskLocality.NODE_LOCAL => config.LOCALITY_WAIT_NODE
+      case TaskLocality.RACK_LOCAL => config.LOCALITY_WAIT_RACK
       case _ => null
     }
 
-    if (localityWaitKey != null) {
-      conf.getTimeAsMs(localityWaitKey, defaultWait.toString)
+    if (localityWait != null) {
+      conf.get(localityWait)
     } else {
       0L
     }
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala 
b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 1d4b05c..1e233ca 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer
 import scala.reflect.ClassTag
 
 import org.apache.spark.SparkConf
+import org.apache.spark.internal.config
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.security.CryptoStreamUtils
 import org.apache.spark.storage._
@@ -63,11 +64,11 @@ private[spark] class SerializerManager(
   }
 
   // Whether to compress broadcast variables that are stored
-  private[this] val compressBroadcast = 
conf.getBoolean("spark.broadcast.compress", true)
+  private[this] val compressBroadcast = conf.get(config.BROADCAST_COMPRESS)
   // Whether to compress shuffle output that are stored
   private[this] val compressShuffle = 
conf.getBoolean("spark.shuffle.compress", true)
   // Whether to compress RDD partitions that are stored serialized
-  private[this] val compressRdds = conf.getBoolean("spark.rdd.compress", false)
+  private[this] val compressRdds = conf.get(config.RDD_COMPRESS)
   // Whether to compress shuffle output temporarily spilled to disk
   private[this] val compressShuffleSpill = 
conf.getBoolean("spark.shuffle.spill.compress", true)
 
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala 
b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index 27e2f98..37f5169 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -50,8 +50,8 @@ private[spark] class BlockStoreShuffleReader[K, C](
       mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, 
startPartition, endPartition),
       serializerManager.wrapStream,
       // Note: we use getSizeAsMb when no suffix is provided for backwards 
compatibility
-      SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 
1024 * 1024,
-      SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
+      SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024,
+      SparkEnv.get.conf.get(config.REDUCER_MAX_REQS_IN_FLIGHT),
       SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
       SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
       SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true),
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 1dfbc6e..5bfe778 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -233,8 +233,7 @@ private[spark] class BlockManager(
     shuffleClient.init(appId)
 
     blockReplicationPolicy = {
-      val priorityClass = conf.get(
-        "spark.storage.replication.policy", 
classOf[RandomBlockReplicationPolicy].getName)
+      val priorityClass = conf.get(config.STORAGE_REPLICATION_POLICY)
       val clazz = Utils.classForName(priorityClass)
       val ret = 
clazz.getConstructor().newInstance().asInstanceOf[BlockReplicationPolicy]
       logInfo(s"Using $priorityClass for block replication policy")
@@ -1339,7 +1338,7 @@ private[spark] class BlockManager(
    */
   private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
     peerFetchLock.synchronized {
-      val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 
1000) // milliseconds
+      val cachedPeersTtl = conf.get(config.STORAGE_CACHED_PEERS_TTL) // 
milliseconds
       val timeout = System.currentTimeMillis - lastPeerFetchTime > 
cachedPeersTtl
       if (cachedPeers == null || forceFetch || timeout) {
         cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
@@ -1393,7 +1392,7 @@ private[spark] class BlockManager(
       classTag: ClassTag[_],
       existingReplicas: Set[BlockManagerId] = Set.empty): Unit = {
 
-    val maxReplicationFailures = 
conf.getInt("spark.storage.maxReplicationFailures", 1)
+    val maxReplicationFailures = 
conf.get(config.STORAGE_MAX_REPLICATION_FAILURE)
     val tLevel = StorageLevel(
       useDisk = level.useDisk,
       useMemory = level.useMemory,
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index f984cf7..f5d6029 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -27,7 +27,7 @@ import scala.util.Random
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.BlockManagerMessages._
@@ -60,7 +60,7 @@ class BlockManagerMasterEndpoint(
 
   private val topologyMapper = {
     val topologyMapperClassName = conf.get(
-      "spark.storage.replication.topologyMapper", 
classOf[DefaultTopologyMapper].getName)
+      config.STORAGE_REPLICATION_TOPOLOGY_MAPPER)
     val clazz = Utils.classForName(topologyMapperClassName)
     val mapper =
       
clazz.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[TopologyMapper]
@@ -68,7 +68,7 @@ class BlockManagerMasterEndpoint(
     mapper
   }
 
-  val proactivelyReplicate = conf.get("spark.storage.replication.proactive", 
"false").toBoolean
+  val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE)
 
   logInfo("BlockManagerMasterEndpoint up")
 
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 29963a9..36cbaeb 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -45,7 +45,7 @@ private[spark] class DiskStore(
     diskManager: DiskBlockManager,
     securityManager: SecurityManager) extends Logging {
 
-  private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+  private val minMemoryMapBytes = conf.get(config.STORAGE_MEMORY_MAP_THRESHOLD)
   private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS)
   private val blockSizes = new ConcurrentHashMap[BlockId, Long]()
 
diff --git a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala 
b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala
index a150a8e..3c2c4b4 100644
--- a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
 
 import org.apache.spark.SparkConf
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.util.Utils
 
 /**
@@ -68,7 +68,7 @@ class DefaultTopologyMapper(conf: SparkConf) extends 
TopologyMapper(conf) with L
  */
 @DeveloperApi
 class FileBasedTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) 
with Logging {
-  val topologyFile = conf.getOption("spark.storage.replication.topologyFile")
+  val topologyFile = conf.get(config.STORAGE_REPLICATION_TOPOLOGY_FILE)
   require(topologyFile.isDefined, "Please specify topology file via " +
     "spark.storage.replication.topologyFile for FileBasedTopologyMapper.")
   val topologyMap = Utils.getPropertiesFromFile(topologyFile.get)
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 8513359..375d05b 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -29,7 +29,7 @@ import com.google.common.io.ByteStreams
 
 import org.apache.spark.{SparkConf, TaskContext}
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{UNROLL_MEMORY_CHECK_PERIOD, 
UNROLL_MEMORY_GROWTH_FACTOR}
+import org.apache.spark.internal.config.{STORAGE_UNROLL_MEMORY_THRESHOLD, 
UNROLL_MEMORY_CHECK_PERIOD, UNROLL_MEMORY_GROWTH_FACTOR}
 import org.apache.spark.memory.{MemoryManager, MemoryMode}
 import org.apache.spark.serializer.{SerializationStream, SerializerManager}
 import org.apache.spark.storage._
@@ -100,7 +100,7 @@ private[spark] class MemoryStore(
 
   // Initial memory to request before unrolling any block
   private val unrollMemoryThreshold: Long =
-    conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
+    conf.get(STORAGE_UNROLL_MEMORY_THRESHOLD)
 
   /** Total amount of memory available for storage, in bytes. */
   private def maxMemory: Long = {
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala 
b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 21050e4..17ffc7b 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -87,7 +87,7 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
   }
 
   test("groupByKey where map output sizes exceed maxMbInFlight") {
-    val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "1m")
+    val conf = new SparkConf().set(config.REDUCER_MAX_SIZE_IN_FLIGHT.key, "1m")
     sc = new SparkContext(clusterUrl, "test", conf)
     // This data should be around 20 MB, so even with 4 mappers and 2 
reducers, each map output
     // file should be about 2.5 MB
@@ -217,8 +217,9 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
   test("compute without caching when no partitions fit in memory") {
     val size = 10000
     val conf = new SparkConf()
-      .set("spark.storage.unrollMemoryThreshold", "1024")
+      .set(config.STORAGE_UNROLL_MEMORY_THRESHOLD, 1024L)
       .set(TEST_MEMORY, size.toLong / 2)
+
     sc = new SparkContext(clusterUrl, "test", conf)
     val data = sc.parallelize(1 to size, 2).persist(StorageLevel.MEMORY_ONLY)
     assert(data.count() === size)
@@ -233,8 +234,9 @@ class DistributedSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
     val size = 10000
     val numPartitions = 20
     val conf = new SparkConf()
-      .set("spark.storage.unrollMemoryThreshold", "1024")
+      .set(config.STORAGE_UNROLL_MEMORY_THRESHOLD, 1024L)
       .set(TEST_MEMORY, size.toLong)
+
     sc = new SparkContext(clusterUrl, "test", conf)
     val data = sc.parallelize(1 to size, 
numPartitions).persist(StorageLevel.MEMORY_ONLY)
     assert(data.count() === size)
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 9a6abbd..a8849ab 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -290,12 +290,12 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
     // set the conf in the deprecated way
     conf.set("spark.io.compression.lz4.block.size", "12345")
     // get the conf in the recommended way
-    assert(conf.get("spark.io.compression.lz4.blockSize") === "12345")
+    assert(conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE.key) === "12345")
     // we can still get the conf in the deprecated way
     assert(conf.get("spark.io.compression.lz4.block.size") === "12345")
     // the contains() also works as expected
     assert(conf.contains("spark.io.compression.lz4.block.size"))
-    assert(conf.contains("spark.io.compression.lz4.blockSize"))
+    assert(conf.contains(IO_COMPRESSION_LZ4_BLOCKSIZE.key))
     assert(conf.contains("spark.io.unknown") === false)
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala 
b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 6976464..6d74812 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -24,6 +24,7 @@ import scala.util.Random
 import org.scalatest.Assertions
 
 import org.apache.spark._
+import org.apache.spark.internal.config
 import org.apache.spark.io.SnappyCompressionCodec
 import org.apache.spark.rdd.RDD
 import org.apache.spark.security.EncryptionFunSuite
@@ -68,7 +69,7 @@ class BroadcastSuite extends SparkFunSuite with 
LocalSparkContext with Encryptio
   encryptionTest("Accessing TorrentBroadcast variables in a local cluster") { 
conf =>
     val numSlaves = 4
     conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
-    conf.set("spark.broadcast.compress", "true")
+    conf.set(config.BROADCAST_COMPRESS, true)
     sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), 
"test", conf)
     val list = List[Int](1, 2, 3, 4)
     val broadcast = sc.broadcast(list)
@@ -145,7 +146,7 @@ class BroadcastSuite extends SparkFunSuite with 
LocalSparkContext with Encryptio
   encryptionTest("Cache broadcast to disk") { conf =>
     conf.setMaster("local")
       .setAppName("test")
-      .set("spark.memory.storageFraction", "0.0")
+      .set(config.MEMORY_STORAGE_FRACTION, 0.0)
     sc = new SparkContext(conf)
     val list = List[Int](1, 2, 3, 4)
     val broadcast = sc.broadcast(list)
@@ -172,7 +173,7 @@ class BroadcastSuite extends SparkFunSuite with 
LocalSparkContext with Encryptio
     val conf = new SparkConf()
       .setMaster("local[4]")
       .setAppName("test")
-      .set("spark.memory.storageFraction", "0.0")
+      .set(config.MEMORY_STORAGE_FRACTION, 0.0)
 
     sc = new SparkContext(conf)
     val list = List[Int](1, 2, 3, 4)
diff --git 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 8556e92..0a689f8 100644
--- 
a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -43,10 +43,10 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
       maxOnHeapExecutionMemory: Long,
       maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = {
     val conf = new SparkConf()
-      .set("spark.memory.fraction", "1")
+      .set(MEMORY_FRACTION, 1.0)
       .set(TEST_MEMORY, maxOnHeapExecutionMemory)
       .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory)
-      .set("spark.memory.storageFraction", storageFraction.toString)
+      .set(MEMORY_STORAGE_FRACTION, storageFraction)
     UnifiedMemoryManager(conf, numCores = 1)
   }
 
@@ -223,9 +223,10 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
     val reservedMemory = 300L * 1024
     val memoryFraction = 0.8
     val conf = new SparkConf()
-      .set("spark.memory.fraction", memoryFraction.toString)
+      .set(MEMORY_FRACTION, memoryFraction)
       .set(TEST_MEMORY, systemMemory)
       .set(TEST_RESERVED_MEMORY, reservedMemory)
+
     val mm = UnifiedMemoryManager(conf, numCores = 1)
     val expectedMaxMemory = ((systemMemory - reservedMemory) * 
memoryFraction).toLong
     assert(mm.maxHeapMemory === expectedMaxMemory)
@@ -243,9 +244,10 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
     val reservedMemory = 300L * 1024
     val memoryFraction = 0.8
     val conf = new SparkConf()
-      .set("spark.memory.fraction", memoryFraction.toString)
+      .set(MEMORY_FRACTION, memoryFraction)
       .set(TEST_MEMORY, systemMemory)
       .set(TEST_RESERVED_MEMORY, reservedMemory)
+
     val mm = UnifiedMemoryManager(conf, numCores = 1)
 
     // Try using an executor memory that's too small
@@ -258,9 +260,10 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
 
   test("execution can evict cached blocks when there are multiple active tasks 
(SPARK-12155)") {
     val conf = new SparkConf()
-      .set("spark.memory.fraction", "1")
-      .set("spark.memory.storageFraction", "0")
+      .set(MEMORY_FRACTION, 1.0)
+      .set(MEMORY_STORAGE_FRACTION, 0.0)
       .set(TEST_MEMORY, 1000L)
+
     val mm = UnifiedMemoryManager(conf, numCores = 2)
     val ms = makeMemoryStore(mm)
     val memoryMode = MemoryMode.ON_HEAP
@@ -284,9 +287,10 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite 
with PrivateMethodTes
 
   test("SPARK-15260: atomically resize memory pools") {
     val conf = new SparkConf()
-      .set("spark.memory.fraction", "1")
-      .set("spark.memory.storageFraction", "0")
+      .set(MEMORY_FRACTION, 1.0)
+      .set(MEMORY_STORAGE_FRACTION, 0.0)
       .set(TEST_MEMORY, 1000L)
+
     val mm = UnifiedMemoryManager(conf, numCores = 2)
     makeBadMemoryStore(mm)
     val memoryMode = MemoryMode.ON_HEAP
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 2227698..e957340 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.{FileSplit, TextInputFormat}
 
 import org.apache.spark._
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
 import org.apache.spark.rdd.RDDSuiteUtils._
 import org.apache.spark.util.{ThreadUtils, Utils}
 
@@ -136,10 +137,10 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
 
     assert(serialUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel 
=== false)
 
-    sc.conf.set("spark.rdd.parallelListingThreshold", "1")
+    sc.conf.set(RDD_PARALLEL_LISTING_THRESHOLD, 1)
     val parallelUnion = sc.union(nums1, nums2)
     val actual = parallelUnion.collect().toList
-    sc.conf.remove("spark.rdd.parallelListingThreshold")
+    sc.conf.remove(RDD_PARALLEL_LISTING_THRESHOLD.key)
 
     
assert(parallelUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel === 
true)
     assert(expected === actual)
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
index 2215f7f..aadddca 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala
@@ -85,7 +85,7 @@ class BlacklistIntegrationSuite extends 
SchedulerIntegrationSuite[MultiExecutorM
     extraConfs = Seq(
        config.BLACKLIST_ENABLED.key -> "true",
       // just to avoid this test taking too long
-      "spark.locality.wait" -> "10ms"
+      config.LOCALITY_WAIT.key -> "10ms"
     )
   ) {
     val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost)
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 04987e6..c0cb158 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -533,7 +533,7 @@ object EventLoggingListenerSuite {
     conf.set(EVENT_LOG_DIR, logDir.toString)
     compressionCodec.foreach { codec =>
       conf.set(EVENT_LOG_COMPRESS, true)
-      conf.set("spark.io.compression.codec", codec)
+      conf.set(IO_COMPRESSION_CODEC, codec)
     }
     conf.set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true)
     conf
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 9c555a9..85c87b9 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -1011,7 +1011,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
   test("Locality should be used for bulk offers even with delay scheduling 
off") {
     val conf = new SparkConf()
-      .set("spark.locality.wait", "0")
+      .set(config.LOCALITY_WAIT.key, "0")
     sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
     // we create a manual clock just so we can be sure the clock doesn't 
advance at all in this test
     val clock = new ManualClock()
@@ -1058,7 +1058,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
 
   test("With delay scheduling off, tasks can be run at any locality level 
immediately") {
     val conf = new SparkConf()
-      .set("spark.locality.wait", "0")
+      .set(config.LOCALITY_WAIT.key, "0")
     sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
 
     // we create a manual clock just so we can be sure the clock doesn't 
advance at all in this test
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index f9dfd2c..502a013 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -166,7 +166,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 
   private val conf = new SparkConf
 
-  val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s")
+  val LOCALITY_WAIT_MS = conf.get(config.LOCALITY_WAIT)
   val MAX_TASK_FAILURES = 4
 
   var sched: FakeTaskScheduler = null
@@ -429,7 +429,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
       set(config.BLACKLIST_ENABLED, true).
       set(config.BLACKLIST_TIMEOUT_CONF, rescheduleDelay).
       // don't wait to jump locality levels in this test
-      set("spark.locality.wait", "0")
+      set(config.LOCALITY_WAIT.key, "0")
 
     sc = new SparkContext("local", "test", conf)
     // two executors on same host, one on different.
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
index 43621cb..9d71647 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala
@@ -24,6 +24,7 @@ import org.scalatest.mockito.MockitoSugar
 
 import org.apache.spark._
 import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.internal.config.MEMORY_FRACTION
 import org.apache.spark.internal.config.Tests._
 import org.apache.spark.memory._
 import org.apache.spark.unsafe.Platform
@@ -36,7 +37,8 @@ class ShuffleExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext wi
       .setAppName("ShuffleExternalSorterSuite")
       .set(IS_TESTING, true)
       .set(TEST_MEMORY, 1600L)
-      .set("spark.memory.fraction", "1")
+      .set(MEMORY_FRACTION, 1.0)
+
     sc = new SparkContext(conf)
 
     val memoryManager = UnifiedMemoryManager(conf, 1)
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 1070e87..10855bf 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -85,18 +85,17 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
 
   before {
     rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
-
     conf.set(NETWORK_AUTH_ENABLED, false)
     conf.set(DRIVER_PORT, rpcEnv.address.port)
     conf.set(IS_TESTING, true)
-    conf.set("spark.memory.fraction", "1")
-    conf.set("spark.memory.storageFraction", "1")
-    conf.set("spark.storage.unrollMemoryThreshold", "512")
+    conf.set(MEMORY_FRACTION, 1.0)
+    conf.set(MEMORY_STORAGE_FRACTION, 1.0)
+    conf.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
 
     // to make a replication attempt to inactive store fail fast
     conf.set("spark.core.connection.ack.wait.timeout", "1s")
     // to make cached peers refresh frequently
-    conf.set("spark.storage.cachedPeersTtl", "10")
+    conf.set(STORAGE_CACHED_PEERS_TTL, 10)
 
     sc = new SparkContext("local", "test", conf)
     master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
@@ -424,8 +423,8 @@ class BlockManagerReplicationSuite extends 
BlockManagerReplicationBehavior {
 class BlockManagerProactiveReplicationSuite extends 
BlockManagerReplicationBehavior {
   val conf = new SparkConf(false).set("spark.app.id", "test")
   conf.set("spark.kryoserializer.buffer", "1m")
-  conf.set("spark.storage.replication.proactive", "true")
-  conf.set("spark.storage.exceptionOnPinLeak", "true")
+  conf.set(STORAGE_REPLICATION_PROACTIVE, true)
+  conf.set(STORAGE_EXCEPTION_PIN_LEAK, true)
 
   (2 to 5).foreach { i =>
     test(s"proactive block replication - $i replicas - ${i - 1} block manager 
deletions") {
@@ -500,10 +499,10 @@ class BlockManagerBasicStrategyReplicationSuite extends 
BlockManagerReplicationB
   val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test")
   conf.set("spark.kryoserializer.buffer", "1m")
   conf.set(
-    "spark.storage.replication.policy",
+    STORAGE_REPLICATION_POLICY,
     classOf[BasicBlockReplicationPolicy].getName)
   conf.set(
-    "spark.storage.replication.topologyMapper",
+    STORAGE_REPLICATION_TOPOLOGY_MAPPER,
     classOf[DummyTopologyMapper].getName)
 }
 
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index c232641..cafd980 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -117,10 +117,10 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     conf = new SparkConf(false)
       .set("spark.app.id", "test")
       .set(IS_TESTING, true)
-      .set("spark.memory.fraction", "1")
-      .set("spark.memory.storageFraction", "1")
+      .set(MEMORY_FRACTION, 1.0)
+      .set(MEMORY_STORAGE_FRACTION, 1.0)
       .set("spark.kryoserializer.buffer", "1m")
-      .set("spark.storage.unrollMemoryThreshold", "512")
+      .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
 
     rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
     conf.set(DRIVER_PORT, rpcEnv.address.port)
@@ -854,7 +854,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
       store.stop()
       store = null
 
-      conf.set("spark.broadcast.compress", "true")
+      conf.set(BROADCAST_COMPRESS, true)
       store = makeBlockManager(20000, "exec3")
       store.putSingle(
         BroadcastBlockId(0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
@@ -863,7 +863,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
       store.stop()
       store = null
 
-      conf.set("spark.broadcast.compress", "false")
+      conf.set(BROADCAST_COMPRESS, false)
       store = makeBlockManager(20000, "exec4")
       store.putSingle(
         BroadcastBlockId(0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
@@ -871,14 +871,14 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
       store.stop()
       store = null
 
-      conf.set("spark.rdd.compress", "true")
+      conf.set(RDD_COMPRESS, true)
       store = makeBlockManager(20000, "exec5")
       store.putSingle(rdd(0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not 
compressed")
       store.stop()
       store = null
 
-      conf.set("spark.rdd.compress", "false")
+      conf.set(RDD_COMPRESS, false)
       store = makeBlockManager(20000, "exec6")
       store.putSingle(rdd(0, 0), new Array[Byte](10000), 
StorageLevel.MEMORY_ONLY_SER)
       assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was 
compressed")
@@ -893,8 +893,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterE
       store = null
     } finally {
       System.clearProperty("spark.shuffle.compress")
-      System.clearProperty("spark.broadcast.compress")
-      System.clearProperty("spark.rdd.compress")
+      System.clearProperty(BROADCAST_COMPRESS.key)
+      System.clearProperty(RDD_COMPRESS.key)
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
index 6f60b08..97b9c97 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
@@ -39,7 +39,7 @@ class DiskStoreSuite extends SparkFunSuite {
     // It will cause error when we tried to re-open the filestore and the
     // memory-mapped byte buffer tot he file has not been GC on Windows.
     assume(!Utils.isWindows)
-    val confKey = "spark.storage.memoryMapThreshold"
+    val confKey = config.STORAGE_MEMORY_MAP_THRESHOLD.key
 
     // Create a non-trivial (not all zeros) byte array
     val bytes = Array.tabulate[Byte](1000)(_.toByte)
diff --git 
a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
index 7cdcd0f..5595ce4 100644
--- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala
@@ -26,6 +26,7 @@ import scala.reflect.ClassTag
 import org.scalatest._
 
 import org.apache.spark._
+import org.apache.spark.internal.config.STORAGE_UNROLL_MEMORY_THRESHOLD
 import org.apache.spark.memory.{MemoryMode, UnifiedMemoryManager}
 import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
 import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, 
PartiallySerializedBlock, PartiallyUnrolledIterator}
@@ -39,7 +40,7 @@ class MemoryStoreSuite
   with ResetSystemProperties {
 
   var conf: SparkConf = new SparkConf(false)
-    .set("spark.storage.unrollMemoryThreshold", "512")
+    .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
 
   // Reuse a serializer across tests to avoid creating a new thread-local 
buffer on each test
   val serializer = new KryoSerializer(new 
SparkConf(false).set("spark.kryoserializer.buffer", "1m"))
diff --git 
a/core/src/test/scala/org/apache/spark/storage/TopologyMapperSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/TopologyMapperSuite.scala
index bbd252d..0bc26ad 100644
--- a/core/src/test/scala/org/apache/spark/storage/TopologyMapperSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/TopologyMapperSuite.scala
@@ -22,6 +22,7 @@ import java.io.{File, FileOutputStream}
 import org.scalatest.{BeforeAndAfter, Matchers}
 
 import org.apache.spark._
+import org.apache.spark.internal.config.STORAGE_REPLICATION_TOPOLOGY_FILE
 import org.apache.spark.util.Utils
 
 class TopologyMapperSuite  extends SparkFunSuite
@@ -36,7 +37,7 @@ class TopologyMapperSuite  extends SparkFunSuite
     val propsFile = createPropertiesFile(props)
 
     val sparkConf = (new SparkConf(false))
-    sparkConf.set("spark.storage.replication.topologyFile", 
propsFile.getAbsolutePath)
+    sparkConf.set(STORAGE_REPLICATION_TOPOLOGY_FILE, propsFile.getAbsolutePath)
     val topologyMapper = new FileBasedTopologyMapper(sparkConf)
 
     props.foreach {case (host, topology) =>
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 1e03998..5efbf4e 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -58,7 +58,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
     conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
     conf.set("spark.shuffle.spill.compress", codec.isDefined.toString)
     conf.set("spark.shuffle.compress", codec.isDefined.toString)
-    codec.foreach { c => conf.set("spark.io.compression.codec", c) }
+    codec.foreach { c => conf.set(IO_COMPRESSION_CODEC, c) }
     // Ensure that we actually have multiple batches per spill file
     conf.set("spark.shuffle.spill.batchSize", "10")
     conf


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to