This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7296999 [SPARK-26462][CORE] Use ConfigEntry for hardcoded configs for execution categories 7296999 is described below commit 7296999c4751cfddcca5b77e3348354cff65d069 Author: Pralabh Kumar <pkum...@linkedin.com> AuthorDate: Tue Jan 15 12:50:07 2019 -0600 [SPARK-26462][CORE] Use ConfigEntry for hardcoded configs for execution categories ## What changes were proposed in this pull request? Make the following hardcoded configs to use ConfigEntry. spark.memory spark.storage spark.io spark.buffer spark.rdd spark.locality spark.broadcast spark.reducer ## How was this patch tested? Existing tests. Closes #23447 from pralabhkumar/execution_categories. Authored-by: Pralabh Kumar <pkum...@linkedin.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../scala/org/apache/spark/HeartbeatReceiver.scala | 4 +- .../main/scala/org/apache/spark/SparkConf.scala | 8 +- .../org/apache/spark/api/python/PythonRDD.scala | 3 +- .../org/apache/spark/api/python/PythonRunner.scala | 4 +- .../scala/org/apache/spark/api/r/RRunner.scala | 4 +- .../apache/spark/broadcast/TorrentBroadcast.scala | 8 +- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 3 +- .../scala/org/apache/spark/executor/Executor.scala | 2 +- .../org/apache/spark/internal/config/package.scala | 191 ++++++++++++++++++++- .../org/apache/spark/io/CompressionCodec.scala | 13 +- .../org/apache/spark/memory/MemoryManager.scala | 2 +- .../apache/spark/memory/UnifiedMemoryManager.scala | 4 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 3 +- .../apache/spark/rdd/ReliableCheckpointRDD.scala | 10 +- .../main/scala/org/apache/spark/rdd/UnionRDD.scala | 3 +- .../apache/spark/scheduler/TaskSetManager.scala | 13 +- .../spark/serializer/SerializerManager.scala | 5 +- .../spark/shuffle/BlockStoreShuffleReader.scala | 4 +- .../org/apache/spark/storage/BlockManager.scala | 7 +- .../spark/storage/BlockManagerMasterEndpoint.scala | 6 +- .../scala/org/apache/spark/storage/DiskStore.scala | 2 +- .../org/apache/spark/storage/TopologyMapper.scala | 4 +- .../apache/spark/storage/memory/MemoryStore.scala | 4 +- .../scala/org/apache/spark/DistributedSuite.scala | 8 +- .../scala/org/apache/spark/SparkConfSuite.scala | 4 +- .../apache/spark/broadcast/BroadcastSuite.scala | 7 +- .../spark/memory/UnifiedMemoryManagerSuite.scala | 20 ++- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 5 +- .../scheduler/BlacklistIntegrationSuite.scala | 2 +- .../scheduler/EventLoggingListenerSuite.scala | 2 +- .../spark/scheduler/TaskSchedulerImplSuite.scala | 4 +- .../spark/scheduler/TaskSetManagerSuite.scala | 4 +- .../shuffle/sort/ShuffleExternalSorterSuite.scala | 4 +- .../storage/BlockManagerReplicationSuite.scala | 17 +- .../apache/spark/storage/BlockManagerSuite.scala | 18 +- .../org/apache/spark/storage/DiskStoreSuite.scala | 2 +- .../apache/spark/storage/MemoryStoreSuite.scala | 3 +- .../apache/spark/storage/TopologyMapperSuite.scala | 3 +- .../collection/ExternalAppendOnlyMapSuite.scala | 2 +- 39 files changed, 309 insertions(+), 103 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index ab0ae55..67f2c27 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import scala.concurrent.Future import org.apache.spark.executor.ExecutorMetrics -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler._ import org.apache.spark.storage.BlockManagerId @@ -83,7 +83,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // "spark.network.timeoutInterval" uses "seconds", while // "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds" private val timeoutIntervalMs = - sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s") + sc.conf.get(config.STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL) private val checkTimeoutIntervalMs = sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000 diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 1100868..22bcb81 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -532,7 +532,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria } // Validate memory fractions - for (key <- Seq("spark.memory.fraction", "spark.memory.storageFraction")) { + for (key <- Seq(MEMORY_FRACTION.key, MEMORY_STORAGE_FRACTION.key)) { val value = getDouble(key, 0.5) if (value > 1 || value < 0) { throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').") @@ -664,7 +664,7 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3", // Translate old value to a duration, with 10s wait time per try. translation = s => s"${s.toLong * 10}s")), - "spark.reducer.maxSizeInFlight" -> Seq( + REDUCER_MAX_SIZE_IN_FLIGHT.key -> Seq( AlternateConfig("spark.reducer.maxMbInFlight", "1.4")), "spark.kryoserializer.buffer" -> Seq( AlternateConfig("spark.kryoserializer.buffer.mb", "1.4", @@ -675,9 +675,9 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")), EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> Seq( AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")), - "spark.io.compression.snappy.blockSize" -> Seq( + IO_COMPRESSION_SNAPPY_BLOCKSIZE.key -> Seq( AlternateConfig("spark.io.compression.snappy.block.size", "1.4")), - "spark.io.compression.lz4.blockSize" -> Seq( + IO_COMPRESSION_LZ4_BLOCKSIZE.key -> Seq( AlternateConfig("spark.io.compression.lz4.block.size", "1.4")), "spark.rpc.numRetries" -> Seq( AlternateConfig("spark.akka.num.retries", "1.4")), diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 5ed5070..14ea289 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -39,6 +39,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.input.PortableDataStream import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.network.util.JavaUtils import org.apache.spark.rdd.RDD import org.apache.spark.security.SocketAuthHelper @@ -604,7 +605,7 @@ private[spark] class PythonAccumulatorV2( Utils.checkHost(serverHost) - val bufferSize = SparkEnv.get.conf.getInt("spark.buffer.size", 65536) + val bufferSize = SparkEnv.get.conf.get(BUFFER_SIZE) /** * We try to reuse a single Socket to transfer accumulator updates, as they are all added diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 5168e93..b7f14e0 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -27,7 +27,7 @@ import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.EXECUTOR_CORES +import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES} import org.apache.spark.internal.config.Python._ import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util._ @@ -71,7 +71,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") private val conf = SparkEnv.get.conf - private val bufferSize = conf.getInt("spark.buffer.size", 65536) + private val bufferSize = conf.get(BUFFER_SIZE) private val reuseWorker = conf.get(PYTHON_WORKER_REUSE) // each python worker gets an equal part of the allocation. the worker pool will grow to the // number of concurrent tasks, which is determined by the number of cores in this executor. diff --git a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala index 3fdea04..b367c7f 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRunner.scala @@ -27,6 +27,7 @@ import scala.util.Try import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.internal.config.R._ import org.apache.spark.util.Utils @@ -124,7 +125,8 @@ private[spark] class RRunner[U]( partitionIndex: Int): Unit = { val env = SparkEnv.get val taskContext = TaskContext.get() - val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt + val bufferSize = System.getProperty(BUFFER_SIZE.key, + BUFFER_SIZE.defaultValueString).toInt val stream = new BufferedOutputStream(output, bufferSize) new Thread("writer for R") { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 26ead57..6410866 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -27,7 +27,7 @@ import scala.reflect.ClassTag import scala.util.Random import org.apache.spark._ -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.io.CompressionCodec import org.apache.spark.serializer.Serializer import org.apache.spark.storage._ @@ -74,14 +74,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) @transient private var blockSize: Int = _ private def setConf(conf: SparkConf) { - compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) { + compressionCodec = if (conf.get(config.BROADCAST_COMPRESS)) { Some(CompressionCodec.createCodec(conf)) } else { None } // Note: use getSizeAsKb (not bytes) to maintain compatibility if no units are provided - blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024 - checksumEnabled = conf.getBoolean("spark.broadcast.checksum", true) + blockSize = conf.get(config.BROADCAST_BLOCKSIZE).toInt * 1024 + checksumEnabled = conf.get(config.BROADCAST_CHECKSUM) } setConf(SparkEnv.get.conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 9371992..a97d072 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -38,6 +38,7 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.util.Utils /** @@ -442,7 +443,7 @@ private[spark] object SparkHadoopUtil { } } appendSparkHadoopConfigs(conf, hadoopConf) - val bufferSize = conf.get("spark.buffer.size", "65536") + val bufferSize = conf.get(BUFFER_SIZE).toString hadoopConf.set("io.file.buffer.size", bufferSize) } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a30a501..a6759c4 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -442,7 +442,7 @@ private[spark] class Executor( val errMsg = s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" + releasedLocks.mkString("[", ", ", "]") - if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false)) { + if (conf.get(STORAGE_EXCEPTION_PIN_LEAK)) { throw new SparkException(errMsg) } else { logInfo(errMsg) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index c942c27..95becfa 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.EventLoggingListener +import org.apache.spark.storage.{DefaultTopologyMapper, RandomBlockReplicationPolicy} import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.Utils @@ -187,6 +188,82 @@ package object config { .checkValue(_ >= 0, "The off-heap memory size must not be negative") .createWithDefault(0) + private[spark] val MEMORY_STORAGE_FRACTION = ConfigBuilder("spark.memory.storageFraction") + .doc("Amount of storage memory immune to eviction, expressed as a fraction of the " + + "size of the region set aside by spark.memory.fraction. The higher this is, the " + + "less working memory may be available to execution and tasks may spill to disk more " + + "often. Leaving this at the default value is recommended. ") + .doubleConf + .createWithDefault(0.5) + + private[spark] val MEMORY_FRACTION = ConfigBuilder("spark.memory.fraction") + .doc("Fraction of (heap space - 300MB) used for execution and storage. The " + + "lower this is, the more frequently spills and cached data eviction occur. " + + "The purpose of this config is to set aside memory for internal metadata, " + + "user data structures, and imprecise size estimation in the case of sparse, " + + "unusually large records. Leaving this at the default value is recommended. ") + .doubleConf + .createWithDefault(0.6) + + private[spark] val STORAGE_SAFETY_FRACTION = ConfigBuilder("spark.storage.safetyFraction") + .doubleConf + .createWithDefault(0.9) + + private[spark] val STORAGE_UNROLL_MEMORY_THRESHOLD = + ConfigBuilder("spark.storage.unrollMemoryThreshold") + .doc("Initial memory to request before unrolling any block") + .longConf + .createWithDefault(1024 * 1024) + + private[spark] val STORAGE_REPLICATION_PROACTIVE = + ConfigBuilder("spark.storage.replication.proactive") + .doc("Enables proactive block replication for RDD blocks. " + + "Cached RDD block replicas lost due to executor failures are replenished " + + "if there are any existing available replicas. This tries to " + + "get the replication level of the block to the initial number") + .booleanConf + .createWithDefault(false) + + private[spark] val STORAGE_MEMORY_MAP_THRESHOLD = + ConfigBuilder("spark.storage.memoryMapThreshold") + .doc("Size in bytes of a block above which Spark memory maps when " + + "reading a block from disk. " + + "This prevents Spark from memory mapping very small blocks. " + + "In general, memory mapping has high overhead for blocks close to or below " + + "the page size of the operating system.") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("2m") + + private[spark] val STORAGE_REPLICATION_POLICY = + ConfigBuilder("spark.storage.replication.policy") + .stringConf + .createWithDefaultString(classOf[RandomBlockReplicationPolicy].getName) + + private[spark] val STORAGE_REPLICATION_TOPOLOGY_MAPPER = + ConfigBuilder("spark.storage.replication.topologyMapper") + .stringConf + .createWithDefaultString(classOf[DefaultTopologyMapper].getName) + + private[spark] val STORAGE_CACHED_PEERS_TTL = ConfigBuilder("spark.storage.cachedPeersTtl") + .intConf.createWithDefault(60 * 1000) + + private[spark] val STORAGE_MAX_REPLICATION_FAILURE = + ConfigBuilder("spark.storage.maxReplicationFailures") + .intConf.createWithDefault(1) + + private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE = + ConfigBuilder("spark.storage.replication.topologyFile").stringConf.createOptional + + private[spark] val STORAGE_EXCEPTION_PIN_LEAK = + ConfigBuilder("spark.storage.exceptionOnPinLeak") + .booleanConf + .createWithDefault(false) + + private[spark] val STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL = + ConfigBuilder("spark.storage.blockManagerTimeoutIntervalMs") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("60s") + private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal() .booleanConf.createWithDefault(false) @@ -400,7 +477,7 @@ package object config { private[spark] val IGNORE_MISSING_FILES = ConfigBuilder("spark.files.ignoreMissingFiles") .doc("Whether to ignore missing files. If true, the Spark jobs will continue to run when " + - "encountering missing files and the contents that have been read will still be returned.") + "encountering missing files and the contents that have been read will still be returned.") .booleanConf .createWithDefault(false) @@ -772,4 +849,116 @@ package object config { private[spark] val MASTER_UI_PORT = ConfigBuilder("spark.master.ui.port") .intConf .createWithDefault(8080) + + private[spark] val IO_COMPRESSION_SNAPPY_BLOCKSIZE = + ConfigBuilder("spark.io.compression.snappy.blockSize") + .doc("Block size in bytes used in Snappy compression, in the case when " + + "Snappy compression codec is used. Lowering this block size " + + "will also lower shuffle memory usage when Snappy is used") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("32k") + + private[spark] val IO_COMPRESSION_LZ4_BLOCKSIZE = + ConfigBuilder("spark.io.compression.lz4.blockSize") + .doc("Block size in bytes used in LZ4 compression, in the case when LZ4 compression" + + "codec is used. Lowering this block size will also lower shuffle memory " + + "usage when LZ4 is used.") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("32k") + + private[spark] val IO_COMPRESSION_CODEC = + ConfigBuilder("spark.io.compression.codec") + .doc("The codec used to compress internal data such as RDD partitions, event log, " + + "broadcast variables and shuffle outputs. By default, Spark provides four codecs: " + + "lz4, lzf, snappy, and zstd. You can also use fully qualified class names to specify " + + "the codec") + .stringConf + .createWithDefaultString("lz4") + + private[spark] val IO_COMPRESSION_ZSTD_BUFFERSIZE = + ConfigBuilder("spark.io.compression.zstd.bufferSize") + .doc("Buffer size in bytes used in Zstd compression, in the case when Zstd " + + "compression codec is used. Lowering this size will lower the shuffle " + + "memory usage when Zstd is used, but it might increase the compression " + + "cost because of excessive JNI call overhead") + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("32k") + + private[spark] val IO_COMPRESSION_ZSTD_LEVEL = + ConfigBuilder("spark.io.compression.zstd.level") + .doc("Compression level for Zstd compression codec. Increasing the compression " + + "level will result in better compression at the expense of more CPU and memory") + .intConf + .createWithDefault(1) + + private[spark] val BUFFER_SIZE = + ConfigBuilder("spark.buffer.size") + .intConf + .createWithDefault(65536) + + private[spark] val LOCALITY_WAIT_PROCESS = ConfigBuilder("spark.locality.wait.process") + .fallbackConf(LOCALITY_WAIT) + + private[spark] val LOCALITY_WAIT_NODE = ConfigBuilder("spark.locality.wait.node") + .fallbackConf(LOCALITY_WAIT) + + private[spark] val LOCALITY_WAIT_RACK = ConfigBuilder("spark.locality.wait.rack") + .fallbackConf(LOCALITY_WAIT) + + private[spark] val REDUCER_MAX_SIZE_IN_FLIGHT = ConfigBuilder("spark.reducer.maxSizeInFlight") + .doc("Maximum size of map outputs to fetch simultaneously from each reduce task, " + + "in MiB unless otherwise specified. Since each output requires us to create a " + + "buffer to receive it, this represents a fixed memory overhead per reduce task, " + + "so keep it small unless you have a large amount of memory") + .bytesConf(ByteUnit.MiB) + .createWithDefaultString("48m") + + private[spark] val REDUCER_MAX_REQS_IN_FLIGHT = ConfigBuilder("spark.reducer.maxReqsInFlight") + .doc("This configuration limits the number of remote requests to fetch blocks at " + + "any given point. When the number of hosts in the cluster increase, " + + "it might lead to very large number of inbound connections to one or more nodes, " + + "causing the workers to fail under load. By allowing it to limit the number of " + + "fetch requests, this scenario can be mitigated") + .intConf + .createWithDefault(Int.MaxValue) + + private[spark] val BROADCAST_COMPRESS = ConfigBuilder("spark.broadcast.compress") + .doc("Whether to compress broadcast variables before sending them. " + + "Generally a good idea. Compression will use spark.io.compression.codec") + .booleanConf.createWithDefault(true) + + private[spark] val BROADCAST_BLOCKSIZE = ConfigBuilder("spark.broadcast.blockSize") + .doc("Size of each piece of a block for TorrentBroadcastFactory, in " + + "KiB unless otherwise specified. Too large a value decreases " + + "parallelism during broadcast (makes it slower); however, " + + "if it is too small, BlockManager might take a performance hit") + .bytesConf(ByteUnit.KiB) + .createWithDefaultString("4m") + + private[spark] val BROADCAST_CHECKSUM = ConfigBuilder("spark.broadcast.checksum") + .doc("Whether to enable checksum for broadcast. If enabled, " + + "broadcasts will include a checksum, which can help detect " + + "corrupted blocks, at the cost of computing and sending a little " + + "more data. It's possible to disable it if the network has other " + + "mechanisms to guarantee data won't be corrupted during broadcast") + .booleanConf.createWithDefault(true) + + private[spark] val RDD_COMPRESS = ConfigBuilder("spark.rdd.compress") + .doc("Whether to compress serialized RDD partitions " + + "(e.g. for StorageLevel.MEMORY_ONLY_SER in Scala " + + "or StorageLevel.MEMORY_ONLY in Python). Can save substantial " + + "space at the cost of some extra CPU time. " + + "Compression will use spark.io.compression.codec") + .booleanConf.createWithDefault(false) + + private[spark] val RDD_PARALLEL_LISTING_THRESHOLD = + ConfigBuilder("spark.rdd.parallelListingThreshold") + .intConf + .createWithDefault(10) + + private[spark] val RDD_LIMIT_SCALE_UP_FACTOR = + ConfigBuilder("spark.rdd.limit.scaleUpFactor") + .intConf + .createWithDefault(4) + } diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index c4f4b18..288c0d1 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -27,6 +27,7 @@ import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.internal.config._ import org.apache.spark.util.Utils /** @@ -51,7 +52,7 @@ trait CompressionCodec { private[spark] object CompressionCodec { - private val configKey = "spark.io.compression.codec" + private val configKey = IO_COMPRESSION_CODEC.key private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = { (codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec] @@ -65,7 +66,7 @@ private[spark] object CompressionCodec { "zstd" -> classOf[ZStdCompressionCodec].getName) def getCodecName(conf: SparkConf): String = { - conf.get(configKey, DEFAULT_COMPRESSION_CODEC) + conf.get(IO_COMPRESSION_CODEC) } def createCodec(conf: SparkConf): CompressionCodec = { @@ -117,7 +118,7 @@ private[spark] object CompressionCodec { class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { override def compressedOutputStream(s: OutputStream): OutputStream = { - val blockSize = conf.getSizeAsBytes("spark.io.compression.lz4.blockSize", "32k").toInt + val blockSize = conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt new LZ4BlockOutputStream(s, blockSize) } @@ -166,7 +167,7 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { } override def compressedOutputStream(s: OutputStream): OutputStream = { - val blockSize = conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt + val blockSize = conf.get(IO_COMPRESSION_SNAPPY_BLOCKSIZE).toInt new SnappyOutputStream(s, blockSize) } @@ -185,10 +186,10 @@ class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { @DeveloperApi class ZStdCompressionCodec(conf: SparkConf) extends CompressionCodec { - private val bufferSize = conf.getSizeAsBytes("spark.io.compression.zstd.bufferSize", "32k").toInt + private val bufferSize = conf.get(IO_COMPRESSION_ZSTD_BUFFERSIZE).toInt // Default compression level for zstd compression to 1 because it is // fastest of all with reasonably high compression ratio. - private val level = conf.getInt("spark.io.compression.zstd.level", 1) + private val level = conf.get(IO_COMPRESSION_ZSTD_LEVEL) override def compressedOutputStream(s: OutputStream): OutputStream = { // Wrap the zstd output stream in a buffered output stream, so that we can diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index 4fde2d0..4ba4107 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -57,7 +57,7 @@ private[spark] abstract class MemoryManager( protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE) protected[this] val offHeapStorageMemory = - (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong + (maxOffHeapMemory * conf.get(MEMORY_STORAGE_FRACTION)).toLong offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory) offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index a0fbbbd..7282a83 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -203,7 +203,7 @@ object UnifiedMemoryManager { conf, maxHeapMemory = maxMemory, onHeapStorageRegionSize = - (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong, + (maxMemory * conf.get(config.MEMORY_STORAGE_FRACTION)).toLong, numCores = numCores) } @@ -230,7 +230,7 @@ object UnifiedMemoryManager { } } val usableMemory = systemMemory - reservedMemory - val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6) + val memoryFraction = conf.get(config.MEMORY_FRACTION) (usableMemory * memoryFraction).toLong } } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 6a25ee2..b2ed2d3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -36,6 +36,7 @@ import org.apache.spark.Partitioner._ import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.RDD_LIMIT_SCALE_UP_FACTOR import org.apache.spark.partial.BoundedDouble import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator @@ -1349,7 +1350,7 @@ abstract class RDD[T: ClassTag]( * an exception if called on an RDD of `Nothing` or `Null`. */ def take(num: Int): Array[T] = withScope { - val scaleUpFactor = Math.max(conf.getInt("spark.rdd.limit.scaleUpFactor", 4), 2) + val scaleUpFactor = Math.max(conf.get(RDD_LIMIT_SCALE_UP_FACTOR), 2) if (num == 0) { new Array[T](0) } else { diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala index 8273d8a..d165610 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.CHECKPOINT_COMPRESS +import org.apache.spark.internal.config.{BUFFER_SIZE, CHECKPOINT_COMPRESS} import org.apache.spark.io.CompressionCodec import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -176,7 +176,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { val tempOutputPath = new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}") - val bufferSize = env.conf.getInt("spark.buffer.size", 65536) + val bufferSize = env.conf.get(BUFFER_SIZE) val fileOutputStream = if (blockSize < 0) { val fileStream = fs.create(tempOutputPath, false, bufferSize) @@ -222,7 +222,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { sc: SparkContext, partitioner: Partitioner, checkpointDirPath: Path): Unit = { try { val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName) - val bufferSize = sc.conf.getInt("spark.buffer.size", 65536) + val bufferSize = sc.conf.get(BUFFER_SIZE) val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration) val fileOutputStream = fs.create(partitionerFilePath, false, bufferSize) val serializer = SparkEnv.get.serializer.newInstance() @@ -249,7 +249,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { sc: SparkContext, checkpointDirPath: String): Option[Partitioner] = { try { - val bufferSize = sc.conf.getInt("spark.buffer.size", 65536) + val bufferSize = sc.conf.get(BUFFER_SIZE) val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName) val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration) val fileInputStream = fs.open(partitionerFilePath, bufferSize) @@ -287,7 +287,7 @@ private[spark] object ReliableCheckpointRDD extends Logging { context: TaskContext): Iterator[T] = { val env = SparkEnv.get val fs = path.getFileSystem(broadcastedConf.value.value) - val bufferSize = env.conf.getInt("spark.buffer.size", 65536) + val bufferSize = env.conf.get(BUFFER_SIZE) val fileInputStream = { val fileStream = fs.open(path, bufferSize) if (env.conf.get(CHECKPOINT_COMPRESS)) { diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 60e383a..6480e87 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -26,6 +26,7 @@ import scala.reflect.ClassTag import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD import org.apache.spark.util.Utils /** @@ -71,7 +72,7 @@ class UnionRDD[T: ClassTag]( // visible for testing private[spark] val isPartitionListingParallel: Boolean = - rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10) + rdds.length > conf.get(RDD_PARALLEL_LISTING_THRESHOLD) override def getPartitions: Array[Partition] = { val parRDDs = if (isPartitionListingParallel) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 41f032c..1b42cb4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -1043,16 +1043,15 @@ private[spark] class TaskSetManager( } private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { - val defaultWait = conf.get(config.LOCALITY_WAIT) - val localityWaitKey = level match { - case TaskLocality.PROCESS_LOCAL => "spark.locality.wait.process" - case TaskLocality.NODE_LOCAL => "spark.locality.wait.node" - case TaskLocality.RACK_LOCAL => "spark.locality.wait.rack" + val localityWait = level match { + case TaskLocality.PROCESS_LOCAL => config.LOCALITY_WAIT_PROCESS + case TaskLocality.NODE_LOCAL => config.LOCALITY_WAIT_NODE + case TaskLocality.RACK_LOCAL => config.LOCALITY_WAIT_RACK case _ => null } - if (localityWaitKey != null) { - conf.getTimeAsMs(localityWaitKey, defaultWait.toString) + if (localityWait != null) { + conf.get(localityWait) } else { 0L } diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 1d4b05c..1e233ca 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -23,6 +23,7 @@ import java.nio.ByteBuffer import scala.reflect.ClassTag import org.apache.spark.SparkConf +import org.apache.spark.internal.config import org.apache.spark.io.CompressionCodec import org.apache.spark.security.CryptoStreamUtils import org.apache.spark.storage._ @@ -63,11 +64,11 @@ private[spark] class SerializerManager( } // Whether to compress broadcast variables that are stored - private[this] val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true) + private[this] val compressBroadcast = conf.get(config.BROADCAST_COMPRESS) // Whether to compress shuffle output that are stored private[this] val compressShuffle = conf.getBoolean("spark.shuffle.compress", true) // Whether to compress RDD partitions that are stored serialized - private[this] val compressRdds = conf.getBoolean("spark.rdd.compress", false) + private[this] val compressRdds = conf.get(config.RDD_COMPRESS) // Whether to compress shuffle output temporarily spilled to disk private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true) diff --git a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala index 27e2f98..37f5169 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala @@ -50,8 +50,8 @@ private[spark] class BlockStoreShuffleReader[K, C]( mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition), serializerManager.wrapStream, // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility - SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024, - SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue), + SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024, + SparkEnv.get.conf.get(config.REDUCER_MAX_REQS_IN_FLIGHT), SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS), SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM), SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true), diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1dfbc6e..5bfe778 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -233,8 +233,7 @@ private[spark] class BlockManager( shuffleClient.init(appId) blockReplicationPolicy = { - val priorityClass = conf.get( - "spark.storage.replication.policy", classOf[RandomBlockReplicationPolicy].getName) + val priorityClass = conf.get(config.STORAGE_REPLICATION_POLICY) val clazz = Utils.classForName(priorityClass) val ret = clazz.getConstructor().newInstance().asInstanceOf[BlockReplicationPolicy] logInfo(s"Using $priorityClass for block replication policy") @@ -1339,7 +1338,7 @@ private[spark] class BlockManager( */ private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = { peerFetchLock.synchronized { - val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds + val cachedPeersTtl = conf.get(config.STORAGE_CACHED_PEERS_TTL) // milliseconds val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl if (cachedPeers == null || forceFetch || timeout) { cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode) @@ -1393,7 +1392,7 @@ private[spark] class BlockManager( classTag: ClassTag[_], existingReplicas: Set[BlockManagerId] = Set.empty): Unit = { - val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) + val maxReplicationFailures = conf.get(config.STORAGE_MAX_REPLICATION_FAILURE) val tLevel = StorageLevel( useDisk = level.useDisk, useMemory = level.useMemory, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index f984cf7..f5d6029 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -27,7 +27,7 @@ import scala.util.Random import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler._ import org.apache.spark.storage.BlockManagerMessages._ @@ -60,7 +60,7 @@ class BlockManagerMasterEndpoint( private val topologyMapper = { val topologyMapperClassName = conf.get( - "spark.storage.replication.topologyMapper", classOf[DefaultTopologyMapper].getName) + config.STORAGE_REPLICATION_TOPOLOGY_MAPPER) val clazz = Utils.classForName(topologyMapperClassName) val mapper = clazz.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[TopologyMapper] @@ -68,7 +68,7 @@ class BlockManagerMasterEndpoint( mapper } - val proactivelyReplicate = conf.get("spark.storage.replication.proactive", "false").toBoolean + val proactivelyReplicate = conf.get(config.STORAGE_REPLICATION_PROACTIVE) logInfo("BlockManagerMasterEndpoint up") diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 29963a9..36cbaeb 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -45,7 +45,7 @@ private[spark] class DiskStore( diskManager: DiskBlockManager, securityManager: SecurityManager) extends Logging { - private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") + private val minMemoryMapBytes = conf.get(config.STORAGE_MEMORY_MAP_THRESHOLD) private val maxMemoryMapBytes = conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS) private val blockSizes = new ConcurrentHashMap[BlockId, Long]() diff --git a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala index a150a8e..3c2c4b4 100644 --- a/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala +++ b/core/src/main/scala/org/apache/spark/storage/TopologyMapper.scala @@ -19,7 +19,7 @@ package org.apache.spark.storage import org.apache.spark.SparkConf import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.util.Utils /** @@ -68,7 +68,7 @@ class DefaultTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with L */ @DeveloperApi class FileBasedTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Logging { - val topologyFile = conf.getOption("spark.storage.replication.topologyFile") + val topologyFile = conf.get(config.STORAGE_REPLICATION_TOPOLOGY_FILE) require(topologyFile.isDefined, "Please specify topology file via " + "spark.storage.replication.topologyFile for FileBasedTopologyMapper.") val topologyMap = Utils.getPropertiesFromFile(topologyFile.get) diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 8513359..375d05b 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -29,7 +29,7 @@ import com.google.common.io.ByteStreams import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.{UNROLL_MEMORY_CHECK_PERIOD, UNROLL_MEMORY_GROWTH_FACTOR} +import org.apache.spark.internal.config.{STORAGE_UNROLL_MEMORY_THRESHOLD, UNROLL_MEMORY_CHECK_PERIOD, UNROLL_MEMORY_GROWTH_FACTOR} import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} import org.apache.spark.storage._ @@ -100,7 +100,7 @@ private[spark] class MemoryStore( // Initial memory to request before unrolling any block private val unrollMemoryThreshold: Long = - conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) + conf.get(STORAGE_UNROLL_MEMORY_THRESHOLD) /** Total amount of memory available for storage, in bytes. */ private def maxMemory: Long = { diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 21050e4..17ffc7b 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -87,7 +87,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex } test("groupByKey where map output sizes exceed maxMbInFlight") { - val conf = new SparkConf().set("spark.reducer.maxSizeInFlight", "1m") + val conf = new SparkConf().set(config.REDUCER_MAX_SIZE_IN_FLIGHT.key, "1m") sc = new SparkContext(clusterUrl, "test", conf) // This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output // file should be about 2.5 MB @@ -217,8 +217,9 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex test("compute without caching when no partitions fit in memory") { val size = 10000 val conf = new SparkConf() - .set("spark.storage.unrollMemoryThreshold", "1024") + .set(config.STORAGE_UNROLL_MEMORY_THRESHOLD, 1024L) .set(TEST_MEMORY, size.toLong / 2) + sc = new SparkContext(clusterUrl, "test", conf) val data = sc.parallelize(1 to size, 2).persist(StorageLevel.MEMORY_ONLY) assert(data.count() === size) @@ -233,8 +234,9 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex val size = 10000 val numPartitions = 20 val conf = new SparkConf() - .set("spark.storage.unrollMemoryThreshold", "1024") + .set(config.STORAGE_UNROLL_MEMORY_THRESHOLD, 1024L) .set(TEST_MEMORY, size.toLong) + sc = new SparkContext(clusterUrl, "test", conf) val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY) assert(data.count() === size) diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 9a6abbd..a8849ab 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -290,12 +290,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst // set the conf in the deprecated way conf.set("spark.io.compression.lz4.block.size", "12345") // get the conf in the recommended way - assert(conf.get("spark.io.compression.lz4.blockSize") === "12345") + assert(conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE.key) === "12345") // we can still get the conf in the deprecated way assert(conf.get("spark.io.compression.lz4.block.size") === "12345") // the contains() also works as expected assert(conf.contains("spark.io.compression.lz4.block.size")) - assert(conf.contains("spark.io.compression.lz4.blockSize")) + assert(conf.contains(IO_COMPRESSION_LZ4_BLOCKSIZE.key)) assert(conf.contains("spark.io.unknown") === false) } diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 6976464..6d74812 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -24,6 +24,7 @@ import scala.util.Random import org.scalatest.Assertions import org.apache.spark._ +import org.apache.spark.internal.config import org.apache.spark.io.SnappyCompressionCodec import org.apache.spark.rdd.RDD import org.apache.spark.security.EncryptionFunSuite @@ -68,7 +69,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio encryptionTest("Accessing TorrentBroadcast variables in a local cluster") { conf => val numSlaves = 4 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") - conf.set("spark.broadcast.compress", "true") + conf.set(config.BROADCAST_COMPRESS, true) sc = new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test", conf) val list = List[Int](1, 2, 3, 4) val broadcast = sc.broadcast(list) @@ -145,7 +146,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio encryptionTest("Cache broadcast to disk") { conf => conf.setMaster("local") .setAppName("test") - .set("spark.memory.storageFraction", "0.0") + .set(config.MEMORY_STORAGE_FRACTION, 0.0) sc = new SparkContext(conf) val list = List[Int](1, 2, 3, 4) val broadcast = sc.broadcast(list) @@ -172,7 +173,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio val conf = new SparkConf() .setMaster("local[4]") .setAppName("test") - .set("spark.memory.storageFraction", "0.0") + .set(config.MEMORY_STORAGE_FRACTION, 0.0) sc = new SparkContext(conf) val list = List[Int](1, 2, 3, 4) diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 8556e92..0a689f8 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -43,10 +43,10 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes maxOnHeapExecutionMemory: Long, maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = { val conf = new SparkConf() - .set("spark.memory.fraction", "1") + .set(MEMORY_FRACTION, 1.0) .set(TEST_MEMORY, maxOnHeapExecutionMemory) .set(MEMORY_OFFHEAP_SIZE, maxOffHeapExecutionMemory) - .set("spark.memory.storageFraction", storageFraction.toString) + .set(MEMORY_STORAGE_FRACTION, storageFraction) UnifiedMemoryManager(conf, numCores = 1) } @@ -223,9 +223,10 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val reservedMemory = 300L * 1024 val memoryFraction = 0.8 val conf = new SparkConf() - .set("spark.memory.fraction", memoryFraction.toString) + .set(MEMORY_FRACTION, memoryFraction) .set(TEST_MEMORY, systemMemory) .set(TEST_RESERVED_MEMORY, reservedMemory) + val mm = UnifiedMemoryManager(conf, numCores = 1) val expectedMaxMemory = ((systemMemory - reservedMemory) * memoryFraction).toLong assert(mm.maxHeapMemory === expectedMaxMemory) @@ -243,9 +244,10 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val reservedMemory = 300L * 1024 val memoryFraction = 0.8 val conf = new SparkConf() - .set("spark.memory.fraction", memoryFraction.toString) + .set(MEMORY_FRACTION, memoryFraction) .set(TEST_MEMORY, systemMemory) .set(TEST_RESERVED_MEMORY, reservedMemory) + val mm = UnifiedMemoryManager(conf, numCores = 1) // Try using an executor memory that's too small @@ -258,9 +260,10 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes test("execution can evict cached blocks when there are multiple active tasks (SPARK-12155)") { val conf = new SparkConf() - .set("spark.memory.fraction", "1") - .set("spark.memory.storageFraction", "0") + .set(MEMORY_FRACTION, 1.0) + .set(MEMORY_STORAGE_FRACTION, 0.0) .set(TEST_MEMORY, 1000L) + val mm = UnifiedMemoryManager(conf, numCores = 2) val ms = makeMemoryStore(mm) val memoryMode = MemoryMode.ON_HEAP @@ -284,9 +287,10 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes test("SPARK-15260: atomically resize memory pools") { val conf = new SparkConf() - .set("spark.memory.fraction", "1") - .set("spark.memory.storageFraction", "0") + .set(MEMORY_FRACTION, 1.0) + .set(MEMORY_STORAGE_FRACTION, 0.0) .set(TEST_MEMORY, 1000L) + val mm = UnifiedMemoryManager(conf, numCores = 2) makeBadMemoryStore(mm) val memoryMode = MemoryMode.ON_HEAP diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 2227698..e957340 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.{FileSplit, TextInputFormat} import org.apache.spark._ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD import org.apache.spark.rdd.RDDSuiteUtils._ import org.apache.spark.util.{ThreadUtils, Utils} @@ -136,10 +137,10 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(serialUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel === false) - sc.conf.set("spark.rdd.parallelListingThreshold", "1") + sc.conf.set(RDD_PARALLEL_LISTING_THRESHOLD, 1) val parallelUnion = sc.union(nums1, nums2) val actual = parallelUnion.collect().toList - sc.conf.remove("spark.rdd.parallelListingThreshold") + sc.conf.remove(RDD_PARALLEL_LISTING_THRESHOLD.key) assert(parallelUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel === true) assert(expected === actual) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index 2215f7f..aadddca 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -85,7 +85,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM extraConfs = Seq( config.BLACKLIST_ENABLED.key -> "true", // just to avoid this test taking too long - "spark.locality.wait" -> "10ms" + config.LOCALITY_WAIT.key -> "10ms" ) ) { val rdd = new MockRDDWithLocalityPrefs(sc, 10, Nil, badHost) diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 04987e6..c0cb158 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -533,7 +533,7 @@ object EventLoggingListenerSuite { conf.set(EVENT_LOG_DIR, logDir.toString) compressionCodec.foreach { codec => conf.set(EVENT_LOG_COMPRESS, true) - conf.set("spark.io.compression.codec", codec) + conf.set(IO_COMPRESSION_CODEC, codec) } conf.set(EVENT_LOG_STAGE_EXECUTOR_METRICS, true) conf diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 9c555a9..85c87b9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1011,7 +1011,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("Locality should be used for bulk offers even with delay scheduling off") { val conf = new SparkConf() - .set("spark.locality.wait", "0") + .set(config.LOCALITY_WAIT.key, "0") sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) // we create a manual clock just so we can be sure the clock doesn't advance at all in this test val clock = new ManualClock() @@ -1058,7 +1058,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("With delay scheduling off, tasks can be run at any locality level immediately") { val conf = new SparkConf() - .set("spark.locality.wait", "0") + .set(config.LOCALITY_WAIT.key, "0") sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) // we create a manual clock just so we can be sure the clock doesn't advance at all in this test diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index f9dfd2c..502a013 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -166,7 +166,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg private val conf = new SparkConf - val LOCALITY_WAIT_MS = conf.getTimeAsMs("spark.locality.wait", "3s") + val LOCALITY_WAIT_MS = conf.get(config.LOCALITY_WAIT) val MAX_TASK_FAILURES = 4 var sched: FakeTaskScheduler = null @@ -429,7 +429,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg set(config.BLACKLIST_ENABLED, true). set(config.BLACKLIST_TIMEOUT_CONF, rescheduleDelay). // don't wait to jump locality levels in this test - set("spark.locality.wait", "0") + set(config.LOCALITY_WAIT.key, "0") sc = new SparkContext("local", "test", conf) // two executors on same host, one on different. diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala index 43621cb..9d71647 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics} +import org.apache.spark.internal.config.MEMORY_FRACTION import org.apache.spark.internal.config.Tests._ import org.apache.spark.memory._ import org.apache.spark.unsafe.Platform @@ -36,7 +37,8 @@ class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext wi .setAppName("ShuffleExternalSorterSuite") .set(IS_TESTING, true) .set(TEST_MEMORY, 1600L) - .set("spark.memory.fraction", "1") + .set(MEMORY_FRACTION, 1.0) + sc = new SparkContext(conf) val memoryManager = UnifiedMemoryManager(conf, 1) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 1070e87..10855bf 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -85,18 +85,17 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite before { rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) - conf.set(NETWORK_AUTH_ENABLED, false) conf.set(DRIVER_PORT, rpcEnv.address.port) conf.set(IS_TESTING, true) - conf.set("spark.memory.fraction", "1") - conf.set("spark.memory.storageFraction", "1") - conf.set("spark.storage.unrollMemoryThreshold", "512") + conf.set(MEMORY_FRACTION, 1.0) + conf.set(MEMORY_STORAGE_FRACTION, 1.0) + conf.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) // to make a replication attempt to inactive store fail fast conf.set("spark.core.connection.ack.wait.timeout", "1s") // to make cached peers refresh frequently - conf.set("spark.storage.cachedPeersTtl", "10") + conf.set(STORAGE_CACHED_PEERS_TTL, 10) sc = new SparkContext("local", "test", conf) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", @@ -424,8 +423,8 @@ class BlockManagerReplicationSuite extends BlockManagerReplicationBehavior { class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehavior { val conf = new SparkConf(false).set("spark.app.id", "test") conf.set("spark.kryoserializer.buffer", "1m") - conf.set("spark.storage.replication.proactive", "true") - conf.set("spark.storage.exceptionOnPinLeak", "true") + conf.set(STORAGE_REPLICATION_PROACTIVE, true) + conf.set(STORAGE_EXCEPTION_PIN_LEAK, true) (2 to 5).foreach { i => test(s"proactive block replication - $i replicas - ${i - 1} block manager deletions") { @@ -500,10 +499,10 @@ class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationB val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test") conf.set("spark.kryoserializer.buffer", "1m") conf.set( - "spark.storage.replication.policy", + STORAGE_REPLICATION_POLICY, classOf[BasicBlockReplicationPolicy].getName) conf.set( - "spark.storage.replication.topologyMapper", + STORAGE_REPLICATION_TOPOLOGY_MAPPER, classOf[DummyTopologyMapper].getName) } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index c232641..cafd980 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -117,10 +117,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE conf = new SparkConf(false) .set("spark.app.id", "test") .set(IS_TESTING, true) - .set("spark.memory.fraction", "1") - .set("spark.memory.storageFraction", "1") + .set(MEMORY_FRACTION, 1.0) + .set(MEMORY_STORAGE_FRACTION, 1.0) .set("spark.kryoserializer.buffer", "1m") - .set("spark.storage.unrollMemoryThreshold", "512") + .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) conf.set(DRIVER_PORT, rpcEnv.address.port) @@ -854,7 +854,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store.stop() store = null - conf.set("spark.broadcast.compress", "true") + conf.set(BROADCAST_COMPRESS, true) store = makeBlockManager(20000, "exec3") store.putSingle( BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) @@ -863,7 +863,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store.stop() store = null - conf.set("spark.broadcast.compress", "false") + conf.set(BROADCAST_COMPRESS, false) store = makeBlockManager(20000, "exec4") store.putSingle( BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) @@ -871,14 +871,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store.stop() store = null - conf.set("spark.rdd.compress", "true") + conf.set(RDD_COMPRESS, true) store = makeBlockManager(20000, "exec5") store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed") store.stop() store = null - conf.set("spark.rdd.compress", "false") + conf.set(RDD_COMPRESS, false) store = makeBlockManager(20000, "exec6") store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER) assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed") @@ -893,8 +893,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE store = null } finally { System.clearProperty("spark.shuffle.compress") - System.clearProperty("spark.broadcast.compress") - System.clearProperty("spark.rdd.compress") + System.clearProperty(BROADCAST_COMPRESS.key) + System.clearProperty(RDD_COMPRESS.key) } } diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index 6f60b08..97b9c97 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -39,7 +39,7 @@ class DiskStoreSuite extends SparkFunSuite { // It will cause error when we tried to re-open the filestore and the // memory-mapped byte buffer tot he file has not been GC on Windows. assume(!Utils.isWindows) - val confKey = "spark.storage.memoryMapThreshold" + val confKey = config.STORAGE_MEMORY_MAP_THRESHOLD.key // Create a non-trivial (not all zeros) byte array val bytes = Array.tabulate[Byte](1000)(_.toByte) diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index 7cdcd0f..5595ce4 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -26,6 +26,7 @@ import scala.reflect.ClassTag import org.scalatest._ import org.apache.spark._ +import org.apache.spark.internal.config.STORAGE_UNROLL_MEMORY_THRESHOLD import org.apache.spark.memory.{MemoryMode, UnifiedMemoryManager} import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator} @@ -39,7 +40,7 @@ class MemoryStoreSuite with ResetSystemProperties { var conf: SparkConf = new SparkConf(false) - .set("spark.storage.unrollMemoryThreshold", "512") + .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m")) diff --git a/core/src/test/scala/org/apache/spark/storage/TopologyMapperSuite.scala b/core/src/test/scala/org/apache/spark/storage/TopologyMapperSuite.scala index bbd252d..0bc26ad 100644 --- a/core/src/test/scala/org/apache/spark/storage/TopologyMapperSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/TopologyMapperSuite.scala @@ -22,6 +22,7 @@ import java.io.{File, FileOutputStream} import org.scalatest.{BeforeAndAfter, Matchers} import org.apache.spark._ +import org.apache.spark.internal.config.STORAGE_REPLICATION_TOPOLOGY_FILE import org.apache.spark.util.Utils class TopologyMapperSuite extends SparkFunSuite @@ -36,7 +37,7 @@ class TopologyMapperSuite extends SparkFunSuite val propsFile = createPropertiesFile(props) val sparkConf = (new SparkConf(false)) - sparkConf.set("spark.storage.replication.topologyFile", propsFile.getAbsolutePath) + sparkConf.set(STORAGE_REPLICATION_TOPOLOGY_FILE, propsFile.getAbsolutePath) val topologyMapper = new FileBasedTopologyMapper(sparkConf) props.foreach {case (host, topology) => diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 1e03998..5efbf4e 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -58,7 +58,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer") conf.set("spark.shuffle.spill.compress", codec.isDefined.toString) conf.set("spark.shuffle.compress", codec.isDefined.toString) - codec.foreach { c => conf.set("spark.io.compression.codec", c) } + codec.foreach { c => conf.set(IO_COMPRESSION_CODEC, c) } // Ensure that we actually have multiple batches per spill file conf.set("spark.shuffle.spill.batchSize", "10") conf --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org