This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7bf0794 [SPARK-26463][CORE] Use ConfigEntry for hardcoded configs for scheduler categories. 7bf0794 is described below commit 7bf0794651f4d11547325539ebf7131a57ee1ba2 Author: Kazuaki Ishizaki <ishiz...@jp.ibm.com> AuthorDate: Tue Jan 22 07:44:36 2019 -0600 [SPARK-26463][CORE] Use ConfigEntry for hardcoded configs for scheduler categories. ## What changes were proposed in this pull request? The PR makes hardcoded `spark.dynamicAllocation`, `spark.scheduler`, `spark.rpc`, `spark.task`, `spark.speculation`, and `spark.cleaner` configs to use `ConfigEntry`. ## How was this patch tested? Existing tests Closes #23416 from kiszk/SPARK-26463. Authored-by: Kazuaki Ishizaki <ishiz...@jp.ibm.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../scala/org/apache/spark/ContextCleaner.scala | 11 +- .../apache/spark/ExecutorAllocationManager.scala | 39 +++--- .../scala/org/apache/spark/HeartbeatReceiver.scala | 16 +-- .../scala/org/apache/spark/SecurityManager.scala | 2 +- .../main/scala/org/apache/spark/SparkConf.scala | 26 ++-- .../main/scala/org/apache/spark/SparkContext.scala | 3 +- .../src/main/scala/org/apache/spark/SparkEnv.scala | 4 +- .../scala/org/apache/spark/deploy/Client.scala | 5 +- .../apache/spark/deploy/SparkSubmitArguments.scala | 3 +- .../scala/org/apache/spark/executor/Executor.scala | 12 +- .../org/apache/spark/internal/config/Network.scala | 93 +++++++++++++++ .../org/apache/spark/internal/config/package.scala | 131 +++++++++++++++++++-- .../org/apache/spark/rdd/PairRDDFunctions.scala | 3 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 +- .../spark/rdd/ReliableRDDCheckpointData.scala | 3 +- .../org/apache/spark/rpc/netty/Dispatcher.scala | 5 +- .../org/apache/spark/rpc/netty/NettyRpcEnv.scala | 7 +- .../scheduler/BarrierJobAllocationFailed.scala | 3 +- .../apache/spark/scheduler/BlacklistTracker.scala | 6 +- .../spark/scheduler/SchedulableBuilder.scala | 10 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 13 +- .../apache/spark/scheduler/TaskSetManager.scala | 11 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 12 +- .../apache/spark/status/AppStatusListener.scala | 7 +- .../org/apache/spark/storage/BlockManager.scala | 6 +- .../org/apache/spark/ui/UIWorkloadGenerator.scala | 5 +- .../org/apache/spark/ui/jobs/AllJobsPage.scala | 3 +- .../scala/org/apache/spark/ui/jobs/JobsTab.scala | 3 +- .../scala/org/apache/spark/ui/jobs/StagesTab.scala | 3 +- .../scala/org/apache/spark/util/RpcUtils.scala | 13 +- .../main/scala/org/apache/spark/util/Utils.scala | 4 +- .../spark/BarrierStageOnSubmittedSuite.scala | 30 ++--- .../org/apache/spark/ContextCleanerSuite.scala | 14 +-- .../spark/ExecutorAllocationManagerSuite.scala | 52 ++++---- .../org/apache/spark/HeartbeatReceiverSuite.scala | 3 +- .../org/apache/spark/JobCancellationSuite.scala | 23 ++-- .../org/apache/spark/MapOutputTrackerSuite.scala | 13 +- .../scala/org/apache/spark/SparkConfSuite.scala | 17 +-- .../scala/org/apache/spark/SparkContextSuite.scala | 2 +- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 4 +- .../deploy/StandaloneDynamicAllocationSuite.scala | 6 +- .../netty/NettyBlockTransferSecuritySuite.scala | 5 +- .../scala/org/apache/spark/rpc/RpcEnvSuite.scala | 13 +- .../scheduler/BlacklistIntegrationSuite.scala | 4 +- .../spark/scheduler/BlacklistTrackerSuite.scala | 8 +- .../CoarseGrainedSchedulerBackendSuite.scala | 8 +- .../org/apache/spark/scheduler/PoolSuite.scala | 12 +- .../scheduler/SchedulerIntegrationSuite.scala | 3 +- .../spark/scheduler/SparkListenerSuite.scala | 3 +- .../spark/scheduler/TaskResultGetterSuite.scala | 3 +- .../spark/scheduler/TaskSchedulerImplSuite.scala | 10 +- .../spark/scheduler/TaskSetManagerSuite.scala | 40 +++---- .../KryoSerializerDistributedSuite.scala | 2 +- .../spark/status/AppStatusListenerSuite.scala | 2 +- .../scala/org/apache/spark/util/UtilsSuite.scala | 14 +-- .../mllib/util/LocalClusterSparkContext.scala | 3 +- .../k8s/KubernetesClusterSchedulerBackend.scala | 3 +- .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 3 +- .../MesosCoarseGrainedSchedulerBackendSuite.scala | 6 +- .../scheduler/cluster/YarnSchedulerBackend.scala | 3 +- .../deploy/yarn/YarnShuffleIntegrationSuite.scala | 5 +- .../org/apache/spark/sql/RuntimeConfigSuite.scala | 2 +- .../streaming/state/StateStoreSuite.scala | 3 +- .../execution/ui/SQLAppStatusListenerSuite.scala | 2 +- .../sql/sources/v2/SimpleWritableDataSource.scala | 1 + .../SparkExecuteStatementOperation.scala | 6 +- .../spark/sql/hive/execution/HiveFileFormat.scala | 3 +- .../scheduler/ExecutorAllocationManagerSuite.scala | 5 +- 68 files changed, 522 insertions(+), 281 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 4d884de..305ec46 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, ThreadUtils, Utils} @@ -83,8 +84,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { * on the driver, this may happen very occasionally or not at all. Not cleaning at all may * lead to executors running out of disk space after a while. */ - private val periodicGCInterval = - sc.conf.getTimeAsSeconds("spark.cleaner.periodicGC.interval", "30min") + private val periodicGCInterval = sc.conf.get(CLEANER_PERIODIC_GC_INTERVAL) /** * Whether the cleaning thread will block on cleanup tasks (other than shuffle, which @@ -96,8 +96,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { * for instance, when the driver performs a GC and cleans up all broadcast blocks that are no * longer in scope. */ - private val blockOnCleanupTasks = sc.conf.getBoolean( - "spark.cleaner.referenceTracking.blocking", true) + private val blockOnCleanupTasks = sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING) /** * Whether the cleaning thread will block on shuffle cleanup tasks. @@ -109,8 +108,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { * until the real RPC issue (referred to in the comment above `blockOnCleanupTasks`) is * resolved. */ - private val blockOnShuffleCleanupTasks = sc.conf.getBoolean( - "spark.cleaner.referenceTracking.blocking.shuffle", false) + private val blockOnShuffleCleanupTasks = + sc.conf.get(CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE) @volatile private var stopped = false diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 0807e65..c9da30e 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -107,28 +107,25 @@ private[spark] class ExecutorAllocationManager( private val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf) // How long there must be backlogged tasks for before an addition is triggered (seconds) - private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds( - "spark.dynamicAllocation.schedulerBacklogTimeout", "1s") + private val schedulerBacklogTimeoutS = conf.get(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT) // Same as above, but used only after `schedulerBacklogTimeoutS` is exceeded - private val sustainedSchedulerBacklogTimeoutS = conf.getTimeAsSeconds( - "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", s"${schedulerBacklogTimeoutS}s") + private val sustainedSchedulerBacklogTimeoutS = + conf.get(DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT) // How long an executor must be idle for before it is removed (seconds) - private val executorIdleTimeoutS = conf.getTimeAsSeconds( - "spark.dynamicAllocation.executorIdleTimeout", "60s") + private val executorIdleTimeoutS = conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT) - private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds( - "spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${Integer.MAX_VALUE}s") + private val cachedExecutorIdleTimeoutS = conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT) // During testing, the methods to actually kill and add executors are mocked out - private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false) + private val testing = conf.get(DYN_ALLOCATION_TESTING) // TODO: The default value of 1 for spark.executor.cores works right now because dynamic // allocation is only supported for YARN and the default number of cores per executor in YARN is // 1, but it might need to be attained differently for different cluster managers private val tasksPerExecutorForFullParallelism = - conf.get(EXECUTOR_CORES) / conf.getInt("spark.task.cpus", 1) + conf.get(EXECUTOR_CORES) / conf.get(CPUS_PER_TASK) private val executorAllocationRatio = conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO) @@ -195,27 +192,29 @@ private[spark] class ExecutorAllocationManager( */ private def validateSettings(): Unit = { if (minNumExecutors < 0 || maxNumExecutors < 0) { - throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be positive!") + throw new SparkException( + s"${DYN_ALLOCATION_MIN_EXECUTORS.key} and ${DYN_ALLOCATION_MAX_EXECUTORS.key} must be " + + "positive!") } if (maxNumExecutors == 0) { - throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 0!") + throw new SparkException(s"${DYN_ALLOCATION_MAX_EXECUTORS.key} cannot be 0!") } if (minNumExecutors > maxNumExecutors) { - throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " + - s"be less than or equal to spark.dynamicAllocation.maxExecutors ($maxNumExecutors)!") + throw new SparkException(s"${DYN_ALLOCATION_MIN_EXECUTORS.key} ($minNumExecutors) must " + + s"be less than or equal to ${DYN_ALLOCATION_MAX_EXECUTORS.key} ($maxNumExecutors)!") } if (schedulerBacklogTimeoutS <= 0) { - throw new SparkException("spark.dynamicAllocation.schedulerBacklogTimeout must be > 0!") + throw new SparkException(s"${DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!") } if (sustainedSchedulerBacklogTimeoutS <= 0) { throw new SparkException( - "spark.dynamicAllocation.sustainedSchedulerBacklogTimeout must be > 0!") + s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!") } if (executorIdleTimeoutS < 0) { - throw new SparkException("spark.dynamicAllocation.executorIdleTimeout must be >= 0!") + throw new SparkException(s"${DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key} must be >= 0!") } if (cachedExecutorIdleTimeoutS < 0) { - throw new SparkException("spark.dynamicAllocation.cachedExecutorIdleTimeout must be >= 0!") + throw new SparkException(s"${DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key} must be >= 0!") } // Require external shuffle service for dynamic allocation // Otherwise, we may lose shuffle files when killing executors @@ -224,12 +223,12 @@ private[spark] class ExecutorAllocationManager( "shuffle service. You may enable this through spark.shuffle.service.enabled.") } if (tasksPerExecutorForFullParallelism == 0) { - throw new SparkException(s"${EXECUTOR_CORES.key} must not be < spark.task.cpus.") + throw new SparkException(s"${EXECUTOR_CORES.key} must not be < ${CPUS_PER_TASK.key}.") } if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) { throw new SparkException( - "spark.dynamicAllocation.executorAllocationRatio must be > 0 and <= 1.0") + s"${DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO.key} must be > 0 and <= 1.0") } } diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 67f2c27..55f2ae6 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -24,6 +24,7 @@ import scala.concurrent.Future import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.config.Network import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler._ import org.apache.spark.storage.BlockManagerId @@ -74,18 +75,9 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // executor ID -> timestamp of when the last heartbeat from this executor was received private val executorLastSeen = new mutable.HashMap[String, Long] - // "spark.network.timeout" uses "seconds", while `spark.storage.blockManagerSlaveTimeoutMs` uses - // "milliseconds" - private val executorTimeoutMs = - sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s") - - // "spark.network.timeoutInterval" uses "seconds", while - // "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds" - private val timeoutIntervalMs = - sc.conf.get(config.STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL) - private val checkTimeoutIntervalMs = - sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000 + private val executorTimeoutMs = sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT) + + private val checkTimeoutIntervalMs = sc.conf.get(Network.NETWORK_TIMEOUT_INTERVAL) private var timeoutCheckingTask: ScheduledFuture[_] = null diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index c64fdc0..0661b30 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -288,7 +288,7 @@ private[spark] class SecurityManager( * @return Whether to enable encryption when connecting to services that support it. */ def isEncryptionEnabled(): Boolean = { - sparkConf.get(NETWORK_ENCRYPTION_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED) + sparkConf.get(Network.NETWORK_CRYPTO_ENABLED) || sparkConf.get(SASL_ENCRYPTION_ENABLED) } /** diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index b596be0..7d49f10 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -29,6 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.Kryo._ +import org.apache.spark.internal.config.Network._ import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils @@ -576,26 +577,27 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria } } - if (contains(EXECUTOR_CORES) && contains("spark.task.cpus")) { + if (contains(EXECUTOR_CORES) && contains(CPUS_PER_TASK)) { val executorCores = get(EXECUTOR_CORES) - val taskCpus = getInt("spark.task.cpus", 1) + val taskCpus = get(CPUS_PER_TASK) if (executorCores < taskCpus) { - throw new SparkException(s"${EXECUTOR_CORES.key} must not be less than spark.task.cpus.") + throw new SparkException( + s"${EXECUTOR_CORES.key} must not be less than ${CPUS_PER_TASK.key}.") } } - val encryptionEnabled = get(NETWORK_ENCRYPTION_ENABLED) || get(SASL_ENCRYPTION_ENABLED) + val encryptionEnabled = get(NETWORK_CRYPTO_ENABLED) || get(SASL_ENCRYPTION_ENABLED) require(!encryptionEnabled || get(NETWORK_AUTH_ENABLED), s"${NETWORK_AUTH_ENABLED.key} must be enabled when enabling encryption.") - val executorTimeoutThresholdMs = - getTimeAsSeconds("spark.network.timeout", "120s") * 1000 + val executorTimeoutThresholdMs = get(NETWORK_TIMEOUT) * 1000 val executorHeartbeatIntervalMs = get(EXECUTOR_HEARTBEAT_INTERVAL) + val networkTimeout = NETWORK_TIMEOUT.key // If spark.executor.heartbeatInterval bigger than spark.network.timeout, // it will almost always cause ExecutorLostFailure. See SPARK-22754. require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " + - s"spark.network.timeout=${executorTimeoutThresholdMs}ms must be greater than the value of " + + s"${networkTimeout}=${executorTimeoutThresholdMs}ms must be no less than the value of " + s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.") } @@ -680,13 +682,13 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.io.compression.snappy.block.size", "1.4")), IO_COMPRESSION_LZ4_BLOCKSIZE.key -> Seq( AlternateConfig("spark.io.compression.lz4.block.size", "1.4")), - "spark.rpc.numRetries" -> Seq( + RPC_NUM_RETRIES.key -> Seq( AlternateConfig("spark.akka.num.retries", "1.4")), - "spark.rpc.retry.wait" -> Seq( + RPC_RETRY_WAIT.key -> Seq( AlternateConfig("spark.akka.retry.wait", "1.4")), - "spark.rpc.askTimeout" -> Seq( + RPC_ASK_TIMEOUT.key -> Seq( AlternateConfig("spark.akka.askTimeout", "1.4")), - "spark.rpc.lookupTimeout" -> Seq( + RPC_LOOKUP_TIMEOUT.key -> Seq( AlternateConfig("spark.akka.lookupTimeout", "1.4")), "spark.streaming.fileStream.minRememberDuration" -> Seq( AlternateConfig("spark.streaming.minRememberDuration", "1.5")), @@ -694,7 +696,7 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.yarn.max.worker.failures", "1.5")), MEMORY_OFFHEAP_ENABLED.key -> Seq( AlternateConfig("spark.unsafe.offHeap", "1.6")), - "spark.rpc.message.maxSize" -> Seq( + RPC_MESSAGE_MAX_SIZE.key -> Seq( AlternateConfig("spark.akka.frameSize", "1.6")), "spark.yarn.jars" -> Seq( AlternateConfig("spark.yarn.jar", "2.0")), diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e0c0635..be70d42 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -553,7 +553,7 @@ class SparkContext(config: SparkConf) extends Logging { _executorAllocationManager.foreach(_.start()) _cleaner = - if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { + if (_conf.get(CLEANER_REFERENCE_TRACKING)) { Some(new ContextCleaner(this)) } else { None @@ -2538,6 +2538,7 @@ object SparkContext extends Logging { private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description" private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel" + private[spark] val SPARK_SCHEDULER_POOL = "spark.scheduler.pool" private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope" private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride" diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 36998d1..93d5cd7 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -417,8 +417,8 @@ object SparkEnv extends Logging { // Spark properties // This includes the scheduling mode whether or not it is configured (used by SparkUI) val schedulerMode = - if (!conf.contains("spark.scheduler.mode")) { - Seq(("spark.scheduler.mode", schedulingMode)) + if (!conf.contains(SCHEDULER_MODE)) { + Seq((SCHEDULER_MODE.key, schedulingMode)) } else { Seq.empty[(String, String)] } diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index d94b174..e65a494 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -28,6 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.util.{SparkExitCode, ThreadUtils, Utils} @@ -226,8 +227,8 @@ private[spark] class ClientApp extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { val driverArgs = new ClientArguments(args) - if (!conf.contains("spark.rpc.askTimeout")) { - conf.set("spark.rpc.askTimeout", "10s") + if (!conf.contains(RPC_ASK_TIMEOUT)) { + conf.set(RPC_ASK_TIMEOUT, "10s") } Logger.getRootLogger.setLevel(driverArgs.logLevel) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 9692d2a..e23d1f8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -32,6 +32,7 @@ import scala.util.Try import org.apache.spark.{SparkException, SparkUserAppException} import org.apache.spark.deploy.SparkSubmitAction._ import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.config.DYN_ALLOCATION_ENABLED import org.apache.spark.launcher.SparkSubmitArgumentsParser import org.apache.spark.network.util.JavaUtils import org.apache.spark.util.Utils @@ -208,7 +209,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S .orElse(sparkProperties.get("spark.yarn.principal")) .orNull dynamicAllocationEnabled = - sparkProperties.get("spark.dynamicAllocation.enabled").exists("true".equalsIgnoreCase) + sparkProperties.get(DYN_ALLOCATION_ENABLED.key).exists("true".equalsIgnoreCase) // Try to set main class from JAR if no --class argument is given if (mainClass == null && !isPython && !isR && primaryResource != null) { diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a3644e7..fccb1ac 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -124,7 +124,7 @@ private[spark] class Executor( private val userClassPathFirst = conf.get(EXECUTOR_USER_CLASS_PATH_FIRST) // Whether to monitor killed / interrupted tasks - private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", false) + private val taskReaperEnabled = conf.get(TASK_REAPER_ENABLED) // Create our ClassLoader // do this after SparkEnv creation so can access the SecurityManager @@ -163,7 +163,7 @@ private[spark] class Executor( // Max size of direct result. If task result is bigger than this, we use the block manager // to send the result back. private val maxDirectResultSize = Math.min( - conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20), + conf.get(TASK_MAX_DIRECT_RESULT_SIZE), RpcUtils.maxMessageSizeBytes(conf)) private val maxResultSize = conf.get(MAX_RESULT_SIZE) @@ -667,13 +667,11 @@ private[spark] class Executor( private[this] val taskId: Long = taskRunner.taskId - private[this] val killPollingIntervalMs: Long = - conf.getTimeAsMs("spark.task.reaper.pollingInterval", "10s") + private[this] val killPollingIntervalMs: Long = conf.get(TASK_REAPER_POLLING_INTERVAL) - private[this] val killTimeoutMs: Long = conf.getTimeAsMs("spark.task.reaper.killTimeout", "-1") + private[this] val killTimeoutMs: Long = conf.get(TASK_REAPER_KILL_TIMEOUT) - private[this] val takeThreadDump: Boolean = - conf.getBoolean("spark.task.reaper.threadDump", true) + private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP) override def run(): Unit = { val startTimeMs = System.currentTimeMillis() diff --git a/core/src/main/scala/org/apache/spark/internal/config/Network.scala b/core/src/main/scala/org/apache/spark/internal/config/Network.scala new file mode 100644 index 0000000..129e31a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/internal/config/Network.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.internal.config + +import java.util.concurrent.TimeUnit + +private[spark] object Network { + + private[spark] val NETWORK_CRYPTO_SASL_FALLBACK = + ConfigBuilder("spark.network.crypto.saslFallback") + .booleanConf + .createWithDefault(true) + + private[spark] val NETWORK_CRYPTO_ENABLED = + ConfigBuilder("spark.network.crypto.enabled") + .booleanConf + .createWithDefault(false) + + private[spark] val NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION = + ConfigBuilder("spark.network.remoteReadNioBufferConversion") + .booleanConf + .createWithDefault(false) + + private[spark] val NETWORK_TIMEOUT = + ConfigBuilder("spark.network.timeout") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("120s") + + private[spark] val NETWORK_TIMEOUT_INTERVAL = + ConfigBuilder("spark.network.timeoutInterval") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString(STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL.defaultValueString) + + private[spark] val RPC_ASK_TIMEOUT = + ConfigBuilder("spark.rpc.askTimeout") + .stringConf + .createOptional + + private[spark] val RPC_CONNECT_THREADS = + ConfigBuilder("spark.rpc.connect.threads") + .intConf + .createWithDefault(64) + + private[spark] val RPC_IO_NUM_CONNECTIONS_PER_PEER = + ConfigBuilder("spark.rpc.io.numConnectionsPerPeer") + .intConf + .createWithDefault(1) + + private[spark] val RPC_IO_THREADS = + ConfigBuilder("spark.rpc.io.threads") + .intConf + .createOptional + + private[spark] val RPC_LOOKUP_TIMEOUT = + ConfigBuilder("spark.rpc.lookupTimeout") + .stringConf + .createOptional + + private[spark] val RPC_MESSAGE_MAX_SIZE = + ConfigBuilder("spark.rpc.message.maxSize") + .intConf + .createWithDefault(128) + + private[spark] val RPC_NETTY_DISPATCHER_NUM_THREADS = + ConfigBuilder("spark.rpc.netty.dispatcher.numThreads") + .intConf + .createOptional + + private[spark] val RPC_NUM_RETRIES = + ConfigBuilder("spark.rpc.numRetries") + .intConf + .createWithDefault(3) + + private[spark] val RPC_RETRY_WAIT = + ConfigBuilder("spark.rpc.retry.wait") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("3s") +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1e72800..b591284 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.launcher.SparkLauncher import org.apache.spark.network.util.ByteUnit -import org.apache.spark.scheduler.EventLoggingListener +import org.apache.spark.scheduler.{EventLoggingListener, SchedulingMode} import org.apache.spark.storage.{DefaultTopologyMapper, RandomBlockReplicationPolicy} import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.Utils @@ -265,11 +265,22 @@ package object config { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("60s") + private[spark] val STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT = + ConfigBuilder("spark.storage.blockManagerSlaveTimeoutMs") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString(Network.NETWORK_TIMEOUT.defaultValueString) + private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal() .booleanConf.createWithDefault(false) private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.createWithDefault(1) + private[spark] val DYN_ALLOCATION_ENABLED = + ConfigBuilder("spark.dynamicAllocation.enabled").booleanConf.createWithDefault(false) + + private[spark] val DYN_ALLOCATION_TESTING = + ConfigBuilder("spark.dynamicAllocation.testing").booleanConf.createWithDefault(false) + private[spark] val DYN_ALLOCATION_MIN_EXECUTORS = ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.createWithDefault(0) @@ -284,6 +295,22 @@ package object config { ConfigBuilder("spark.dynamicAllocation.executorAllocationRatio") .doubleConf.createWithDefault(1.0) + private[spark] val DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT = + ConfigBuilder("spark.dynamicAllocation.cachedExecutorIdleTimeout") + .timeConf(TimeUnit.SECONDS).createWithDefault(Integer.MAX_VALUE) + + private[spark] val DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT = + ConfigBuilder("spark.dynamicAllocation.executorIdleTimeout") + .timeConf(TimeUnit.SECONDS).createWithDefault(60) + + private[spark] val DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT = + ConfigBuilder("spark.dynamicAllocation.schedulerBacklogTimeout") + .timeConf(TimeUnit.SECONDS).createWithDefault(1) + + private[spark] val DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT = + ConfigBuilder("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout") + .fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT) + private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait") .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("3s") @@ -316,11 +343,36 @@ package object config { .toSequence .createWithDefault(Nil) - private[spark] val MAX_TASK_FAILURES = + private[spark] val TASK_MAX_DIRECT_RESULT_SIZE = + ConfigBuilder("spark.task.maxDirectResultSize") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(1L << 20) + + private[spark] val TASK_MAX_FAILURES = ConfigBuilder("spark.task.maxFailures") .intConf .createWithDefault(4) + private[spark] val TASK_REAPER_ENABLED = + ConfigBuilder("spark.task.reaper.enabled") + .booleanConf + .createWithDefault(false) + + private[spark] val TASK_REAPER_KILL_TIMEOUT = + ConfigBuilder("spark.task.reaper.killTimeout") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(-1) + + private[spark] val TASK_REAPER_POLLING_INTERVAL = + ConfigBuilder("spark.task.reaper.pollingInterval") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("10s") + + private[spark] val TASK_REAPER_THREAD_DUMP = + ConfigBuilder("spark.task.reaper.threadDump") + .booleanConf + .createWithDefault(true) + // Blacklist confs private[spark] val BLACKLIST_ENABLED = ConfigBuilder("spark.blacklist.enabled") @@ -574,11 +626,6 @@ package object config { "secret keys are only allowed when using Kubernetes.") .fallbackConf(AUTH_SECRET_FILE) - private[spark] val NETWORK_ENCRYPTION_ENABLED = - ConfigBuilder("spark.network.crypto.enabled") - .booleanConf - .createWithDefault(false) - private[spark] val BUFFER_WRITE_CHUNK_SIZE = ConfigBuilder("spark.buffer.write.chunkSize") .internal() @@ -930,6 +977,31 @@ package object config { .toSequence .createWithDefault(Nil) + private[spark] val CLEANER_PERIODIC_GC_INTERVAL = + ConfigBuilder("spark.cleaner.periodicGC.interval") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("30min") + + private[spark] val CLEANER_REFERENCE_TRACKING = + ConfigBuilder("spark.cleaner.referenceTracking") + .booleanConf + .createWithDefault(true) + + private[spark] val CLEANER_REFERENCE_TRACKING_BLOCKING = + ConfigBuilder("spark.cleaner.referenceTracking.blocking") + .booleanConf + .createWithDefault(true) + + private[spark] val CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE = + ConfigBuilder("spark.cleaner.referenceTracking.blocking.shuffle") + .booleanConf + .createWithDefault(false) + + private[spark] val CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS = + ConfigBuilder("spark.cleaner.referenceTracking.cleanCheckpoints") + .booleanConf + .createWithDefault(false) + private[spark] val EXECUTOR_LOGS_ROLLING_STRATEGY = ConfigBuilder("spark.executor.logs.rolling.strategy").stringConf.createWithDefault("") @@ -1103,4 +1175,49 @@ package object config { .stringConf .toSequence .createWithDefault(Nil) + + private[spark] val SCHEDULER_ALLOCATION_FILE = + ConfigBuilder("spark.scheduler.allocation.file") + .stringConf + .createOptional + + private[spark] val SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO = + ConfigBuilder("spark.scheduler.minRegisteredResourcesRatio") + .doubleConf + .createOptional + + private[spark] val SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME = + ConfigBuilder("spark.scheduler.maxRegisteredResourcesWaitingTime") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("30s") + + private[spark] val SCHEDULER_MODE = + ConfigBuilder("spark.scheduler.mode") + .stringConf + .createWithDefault(SchedulingMode.FIFO.toString) + + private[spark] val SCHEDULER_REVIVE_INTERVAL = + ConfigBuilder("spark.scheduler.revive.interval") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional + + private[spark] val SPECULATION_ENABLED = + ConfigBuilder("spark.speculation") + .booleanConf + .createWithDefault(false) + + private[spark] val SPECULATION_INTERVAL = + ConfigBuilder("spark.speculation.interval") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefault(100) + + private[spark] val SPECULATION_MULTIPLIER = + ConfigBuilder("spark.speculation.multiplier") + .doubleConf + .createWithDefault(1.5) + + private[spark] val SPECULATION_QUANTILE = + ConfigBuilder("spark.speculation.quantile") + .doubleConf + .createWithDefault(0.75) } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 4bf4f08..8b5f9bb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -36,6 +36,7 @@ import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.SPECULATION_ENABLED import org.apache.spark.internal.io._ import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer @@ -1051,7 +1052,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. - val speculationEnabled = self.conf.getBoolean("spark.speculation", false) + val speculationEnabled = self.conf.get(SPECULATION_ENABLED) val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "") if (speculationEnabled && outputCommitterClass.contains("Direct")) { val warningMessage = diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index b2ed2d3..954582f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -36,6 +36,7 @@ import org.apache.spark.Partitioner._ import org.apache.spark.annotation.{DeveloperApi, Experimental, Since} import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.internal.config.RDD_LIMIT_SCALE_UP_FACTOR import org.apache.spark.partial.BoundedDouble import org.apache.spark.partial.CountEvaluator @@ -1591,8 +1592,8 @@ abstract class RDD[T: ClassTag]( * The checkpoint directory set through `SparkContext#setCheckpointDir` is not used. */ def localCheckpoint(): this.type = RDDCheckpointData.synchronized { - if (conf.getBoolean("spark.dynamicAllocation.enabled", false) && - conf.contains("spark.dynamicAllocation.cachedExecutorIdleTimeout")) { + if (conf.get(DYN_ALLOCATION_ENABLED) && + conf.contains(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)) { logWarning("Local checkpointing is NOT safe to use with dynamic allocation, " + "which removes executors along with their cached blocks. If you must use both " + "features, you are advised to set `spark.dynamicAllocation.cachedExecutorIdleTimeout` " + diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala index b6d723c..7a592ab 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS /** * An implementation of checkpointing that writes the RDD data to reliable storage. @@ -58,7 +59,7 @@ private[spark] class ReliableRDDCheckpointData[T: ClassTag](@transient private v val newRDD = ReliableCheckpointRDD.writeRDDToCheckpointDirectory(rdd, cpDir) // Optionally clean our checkpoint files if the reference is out of scope - if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) { + if (rdd.conf.get(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS)) { rdd.context.cleaner.foreach { cleaner => cleaner.registerRDDCheckpointDataForCleanup(newRDD, rdd.id) } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 904c4d0..ce238a2 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -26,6 +26,7 @@ import scala.util.control.NonFatal import org.apache.spark.SparkException import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Network.RPC_NETTY_DISPATCHER_NUM_THREADS import org.apache.spark.network.client.RpcResponseCallback import org.apache.spark.rpc._ import org.apache.spark.util.ThreadUtils @@ -197,8 +198,8 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte private val threadpool: ThreadPoolExecutor = { val availableCores = if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors() - val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads", - math.max(2, availableCores)) + val numThreads = nettyEnv.conf.get(RPC_NETTY_DISPATCHER_NUM_THREADS) + .getOrElse(math.max(2, availableCores)) val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") for (i <- 0 until numThreads) { pool.execute(new MessageLoop) diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 4757695..2540196 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -31,6 +31,7 @@ import scala.util.control.NonFatal import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Network._ import org.apache.spark.network.TransportContext import org.apache.spark.network.client._ import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap} @@ -48,9 +49,9 @@ private[netty] class NettyRpcEnv( numUsableCores: Int) extends RpcEnv(conf) with Logging { private[netty] val transportConf = SparkTransportConf.fromSparkConf( - conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"), + conf.clone.set(RPC_IO_NUM_CONNECTIONS_PER_PEER, 1), "rpc", - conf.getInt("spark.rpc.io.threads", numUsableCores)) + conf.get(RPC_IO_THREADS).getOrElse(numUsableCores)) private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores) @@ -87,7 +88,7 @@ private[netty] class NettyRpcEnv( // TODO: a non-blocking TransportClientFactory.createClient in future private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool( "netty-rpc-connection", - conf.getInt("spark.rpc.connect.threads", 64)) + conf.get(RPC_CONNECT_THREADS)) @volatile private var server: TransportServer = _ diff --git a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala index 803a0a1..64a02b3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BarrierJobAllocationFailed.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import org.apache.spark.SparkException +import org.apache.spark.internal.config.DYN_ALLOCATION_ENABLED /** * Exception thrown when submit a job with barrier stage(s) failing a required check. @@ -51,7 +52,7 @@ private[spark] object BarrierJobAllocationFailed { val ERROR_MESSAGE_RUN_BARRIER_WITH_DYN_ALLOCATION = "[SPARK-24942]: Barrier execution mode does not support dynamic resource allocation for " + "now. You can disable dynamic resource allocation by setting Spark conf " + - "\"spark.dynamicAllocation.enabled\" to \"false\"." + s""""${DYN_ALLOCATION_ENABLED.key}" to "false".""" // Error message when running a barrier stage that requires more slots than current total number. val ERROR_MESSAGE_BARRIER_REQUIRE_MORE_SLOTS_THAN_CURRENT_TOTAL_NUMBER = diff --git a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala index ef6d02d..9e524c5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala @@ -460,15 +460,15 @@ private[spark] object BlacklistTracker extends Logging { } } - val maxTaskFailures = conf.get(config.MAX_TASK_FAILURES) + val maxTaskFailures = conf.get(config.TASK_MAX_FAILURES) val maxNodeAttempts = conf.get(config.MAX_TASK_ATTEMPTS_PER_NODE) if (maxNodeAttempts >= maxTaskFailures) { throw new IllegalArgumentException(s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key} " + - s"( = ${maxNodeAttempts}) was >= ${config.MAX_TASK_FAILURES.key} " + + s"( = ${maxNodeAttempts}) was >= ${config.TASK_MAX_FAILURES.key} " + s"( = ${maxTaskFailures} ). Though blacklisting is enabled, with this configuration, " + s"Spark will not be robust to one bad node. Decrease " + - s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.MAX_TASK_FAILURES.key}, " + + s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.TASK_MAX_FAILURES.key}, " + s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}") } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 5f3c280..c85c74f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -23,8 +23,9 @@ import java.util.{Locale, NoSuchElementException, Properties} import scala.util.control.NonFatal import scala.xml.{Node, XML} -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.util.Utils @@ -56,10 +57,9 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) extends SchedulableBuilder with Logging { - val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file" - val schedulerAllocFile = conf.getOption(SCHEDULER_ALLOCATION_FILE_PROPERTY) + val schedulerAllocFile = conf.get(SCHEDULER_ALLOCATION_FILE) val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml" - val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool" + val FAIR_SCHEDULER_PROPERTIES = SparkContext.SPARK_SCHEDULER_POOL val DEFAULT_POOL_NAME = "default" val MINIMUM_SHARES_PROPERTY = "minShare" val SCHEDULING_MODE_PROPERTY = "schedulingMode" @@ -85,7 +85,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) } else { logWarning("Fair Scheduler configuration file not found so jobs will be scheduled in " + s"FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE or " + - s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains the configuration.") + s"set ${SCHEDULER_ALLOCATION_FILE.key} to a file that contains the configuration.") None } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 61556ea..d551fb7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -31,6 +31,7 @@ import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.internal.Logging import org.apache.spark.internal.config +import org.apache.spark.internal.config._ import org.apache.spark.rpc.RpcEndpoint import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.TaskLocality.TaskLocality @@ -61,7 +62,7 @@ private[spark] class TaskSchedulerImpl( import TaskSchedulerImpl._ def this(sc: SparkContext) = { - this(sc, sc.conf.get(config.MAX_TASK_FAILURES)) + this(sc, sc.conf.get(config.TASK_MAX_FAILURES)) } // Lazily initializing blacklistTrackerOpt to avoid getting empty ExecutorAllocationClient, @@ -71,7 +72,7 @@ private[spark] class TaskSchedulerImpl( val conf = sc.conf // How often to check for speculative tasks - val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms") + val SPECULATION_INTERVAL_MS = conf.get(SPECULATION_INTERVAL) // Duplicate copies of a task will only be launched if the original copy has been running for // at least this amount of time. This is to avoid the overhead of launching speculative copies @@ -85,7 +86,7 @@ private[spark] class TaskSchedulerImpl( val STARVATION_TIMEOUT_MS = conf.getTimeAsMs("spark.starvation.timeout", "15s") // CPUs to request per task - val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) + val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK) // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. @@ -131,7 +132,7 @@ private[spark] class TaskSchedulerImpl( private var schedulableBuilder: SchedulableBuilder = null // default scheduler is FIFO - private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString) + private val schedulingModeConf = conf.get(SCHEDULER_MODE) val schedulingMode: SchedulingMode = try { SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT)) @@ -183,7 +184,7 @@ private[spark] class TaskSchedulerImpl( override def start() { backend.start() - if (!isLocal && conf.getBoolean("spark.speculation", false)) { + if (!isLocal && conf.get(SPECULATION_ENABLED)) { logInfo("Starting speculative execution thread") speculationScheduler.scheduleWithFixedDelay(new Runnable { override def run(): Unit = Utils.tryOrStopSparkContext(sc) { @@ -857,7 +858,7 @@ private[spark] class TaskSchedulerImpl( private[spark] object TaskSchedulerImpl { - val SCHEDULER_MODE_PROPERTY = "spark.scheduler.mode" + val SCHEDULER_MODE_PROPERTY = SCHEDULER_MODE.key /** * Used to balance containers across hosts. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 1b42cb4..6f3f77c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -28,6 +28,7 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.config._ import org.apache.spark.scheduler.SchedulingMode._ import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, SystemClock, Utils} import org.apache.spark.util.collection.MedianHeap @@ -61,12 +62,12 @@ private[spark] class TaskSetManager( private val addedFiles = HashMap[String, Long](sched.sc.addedFiles.toSeq: _*) // Quantile of tasks at which to start speculation - val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75) - val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5) + val speculationQuantile = conf.get(SPECULATION_QUANTILE) + val speculationMultiplier = conf.get(SPECULATION_MULTIPLIER) val maxResultSize = conf.get(config.MAX_RESULT_SIZE) - val speculationEnabled = conf.getBoolean("spark.speculation", false) + val speculationEnabled = conf.get(SPECULATION_ENABLED) // Serializer for closures and tasks. val env = SparkEnv.get @@ -1015,13 +1016,13 @@ private[spark] class TaskSetManager( return false } var foundTasks = false - val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt + val minFinishedForSpeculation = (speculationQuantile * numTasks).floor.toInt logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation) if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) { val time = clock.getTimeMillis() val medianDuration = successfulTaskDurations.median - val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation) + val threshold = max(speculationMultiplier * medianDuration, minTimeToSpeculation) // TODO: Threshold should also look at standard deviation of task durations and have a lower // bound based on that. logDebug("Task length threshold for speculation: " + threshold) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 98ed2ff..3b05875 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -30,6 +30,8 @@ import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, Tas import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Network._ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -58,11 +60,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Submit tasks only after (registered resources / total expected resources) // is equal to at least this value, that is double between 0 and 1. private val _minRegisteredRatio = - math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0)) + math.min(1, conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).getOrElse(0.0)) // Submit tasks after maxRegisteredWaitingTime milliseconds // if minRegisteredRatio has not yet been reached private val maxRegisteredWaitingTimeMs = - conf.getTimeAsMs("spark.scheduler.maxRegisteredResourcesWaitingTime", "30s") + conf.get(SCHEDULER_MAX_REGISTERED_RESOURCE_WAITING_TIME) private val createTime = System.currentTimeMillis() // Accessing `executorDataMap` in `DriverEndpoint.receive/receiveAndReply` doesn't need any @@ -118,7 +120,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp override def onStart() { // Periodically revive offers to allow delay scheduling to work - val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s") + val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L) reviveThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { @@ -301,8 +303,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp Option(scheduler.taskIdToTaskSetManager.get(task.taskId)).foreach { taskSetMgr => try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + - "spark.rpc.message.maxSize (%d bytes). Consider increasing " + - "spark.rpc.message.maxSize or using broadcast variables for large values." + s"${RPC_MESSAGE_MAX_SIZE.key} (%d bytes). Consider increasing " + + s"${RPC_MESSAGE_MAX_SIZE.key} or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize) taskSetMgr.abort(msg) } catch { diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index 12c5d4d..1067cdc 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -25,8 +25,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import org.apache.spark._ -import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} +import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.CPUS_PER_TASK import org.apache.spark.internal.config.Status._ import org.apache.spark.scheduler._ import org.apache.spark.status.api.v1 @@ -151,7 +152,7 @@ private[spark] class AppStatusListener( details.getOrElse("System Properties", Nil), details.getOrElse("Classpath Entries", Nil)) - coresPerTask = envInfo.sparkProperties.toMap.get("spark.task.cpus").map(_.toInt) + coresPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt) .getOrElse(coresPerTask) kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo)) @@ -434,7 +435,7 @@ private[spark] class AppStatusListener( val stage = getOrCreateStage(event.stageInfo) stage.status = v1.StageStatus.ACTIVE stage.schedulingPool = Option(event.properties).flatMap { p => - Option(p.getProperty("spark.scheduler.pool")) + Option(p.getProperty(SparkContext.SPARK_SCHEDULER_POOL)) }.getOrElse(SparkUI.DEFAULT_POOL_NAME) // Look at all active jobs to find the ones that mention this stage. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 8f993bf..dd05cb3 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -36,7 +36,9 @@ import com.codahale.metrics.{MetricRegistry, MetricSet} import org.apache.spark._ import org.apache.spark.executor.DataReadMethod -import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config +import org.apache.spark.internal.config.Network import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.metrics.source.Source import org.apache.spark.network._ @@ -133,7 +135,7 @@ private[spark] class BlockManager( private[spark] val externalShuffleServiceEnabled = conf.get(config.SHUFFLE_SERVICE_ENABLED) private val remoteReadNioBufferConversion = - conf.getBoolean("spark.network.remoteReadNioBufferConversion", false) + conf.get(Network.NETWORK_REMOTE_READ_NIO_BUFFER_CONVERSION) val diskBlockManager = { // Only perform cleanup if an external service is not serving our shuffle files. diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 6229e80..8845dcf 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -22,6 +22,7 @@ import java.util.concurrent.Semaphore import scala.util.Random import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.config.SCHEDULER_MODE import org.apache.spark.scheduler.SchedulingMode // scalastyle:off @@ -50,14 +51,14 @@ private[spark] object UIWorkloadGenerator { val schedulingMode = SchedulingMode.withName(args(1)) if (schedulingMode == SchedulingMode.FAIR) { - conf.set("spark.scheduler.mode", "FAIR") + conf.set(SCHEDULER_MODE, "FAIR") } val nJobSet = args(2).toInt val sc = new SparkContext(conf) def setProperties(s: String): Unit = { if (schedulingMode == SchedulingMode.FAIR) { - sc.setLocalProperty("spark.scheduler.pool", s) + sc.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, s) } sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala index b35ea5b..4523326 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala @@ -28,6 +28,7 @@ import scala.xml._ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.JobExecutionStatus +import org.apache.spark.internal.config.SCHEDULER_MODE import org.apache.spark.scheduler._ import org.apache.spark.status.AppStatusStore import org.apache.spark.status.api.v1 @@ -295,7 +296,7 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We } val schedulingMode = store.environmentInfo().sparkProperties.toMap - .get("spark.scheduler.mode") + .get(SCHEDULER_MODE.key) .map { mode => SchedulingMode.withName(mode).toString } .getOrElse("Unknown") diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index 37bb292..1c1915e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -22,6 +22,7 @@ import javax.servlet.http.HttpServletRequest import scala.collection.JavaConverters._ import org.apache.spark.JobExecutionStatus +import org.apache.spark.internal.config.SCHEDULER_MODE import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.status.AppStatusStore import org.apache.spark.ui._ @@ -37,7 +38,7 @@ private[ui] class JobsTab(parent: SparkUI, store: AppStatusStore) store .environmentInfo() .sparkProperties - .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString)) + .contains((SCHEDULER_MODE.key, SchedulingMode.FAIR.toString)) } def getSparkUser: String = parent.getSparkUser diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index e16c337..b74f3db 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -19,6 +19,7 @@ package org.apache.spark.ui.jobs import javax.servlet.http.HttpServletRequest +import org.apache.spark.internal.config.SCHEDULER_MODE import org.apache.spark.scheduler.SchedulingMode import org.apache.spark.status.AppStatusStore import org.apache.spark.status.api.v1.StageStatus @@ -40,7 +41,7 @@ private[ui] class StagesTab(val parent: SparkUI, val store: AppStatusStore) store .environmentInfo() .sparkProperties - .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString)) + .contains((SCHEDULER_MODE.key, SchedulingMode.FAIR.toString)) } def handleKillRequest(request: HttpServletRequest): Unit = { diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index 902e48f..7272b37 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import org.apache.spark.SparkConf import org.apache.spark.internal.config +import org.apache.spark.internal.config.Network._ import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout} private[spark] object RpcUtils { @@ -35,32 +36,32 @@ private[spark] object RpcUtils { /** Returns the configured number of times to retry connecting */ def numRetries(conf: SparkConf): Int = { - conf.getInt("spark.rpc.numRetries", 3) + conf.get(RPC_NUM_RETRIES) } /** Returns the configured number of milliseconds to wait on each retry */ def retryWaitMs(conf: SparkConf): Long = { - conf.getTimeAsMs("spark.rpc.retry.wait", "3s") + conf.get(RPC_RETRY_WAIT) } /** Returns the default Spark timeout to use for RPC ask operations. */ def askRpcTimeout(conf: SparkConf): RpcTimeout = { - RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s") + RpcTimeout(conf, Seq(RPC_ASK_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s") } /** Returns the default Spark timeout to use for RPC remote endpoint lookup. */ def lookupRpcTimeout(conf: SparkConf): RpcTimeout = { - RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s") + RpcTimeout(conf, Seq(RPC_LOOKUP_TIMEOUT.key, NETWORK_TIMEOUT.key), "120s") } private val MAX_MESSAGE_SIZE_IN_MB = Int.MaxValue / 1024 / 1024 /** Returns the configured max message size for messages in bytes. */ def maxMessageSizeBytes(conf: SparkConf): Int = { - val maxSizeInMB = conf.getInt("spark.rpc.message.maxSize", 128) + val maxSizeInMB = conf.get(RPC_MESSAGE_MAX_SIZE) if (maxSizeInMB > MAX_MESSAGE_SIZE_IN_MB) { throw new IllegalArgumentException( - s"spark.rpc.message.maxSize should not be greater than $MAX_MESSAGE_SIZE_IN_MB MB") + s"${RPC_MESSAGE_MAX_SIZE.key} should not be greater than $MAX_MESSAGE_SIZE_IN_MB MB") } maxSizeInMB * 1024 * 1024 } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1a6dc1f..0ad1ffc 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2465,9 +2465,9 @@ private[spark] object Utils extends Logging { * Return whether dynamic allocation is enabled in the given conf. */ def isDynamicAllocationEnabled(conf: SparkConf): Boolean = { - val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false) + val dynamicAllocationEnabled = conf.get(DYN_ALLOCATION_ENABLED) dynamicAllocationEnabled && - (!isLocalMaster(conf) || conf.getBoolean("spark.dynamicAllocation.testing", false)) + (!isLocalMaster(conf) || conf.get(DYN_ALLOCATION_TESTING)) } /** diff --git a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala index d49ab4a..ca839d1 100644 --- a/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala +++ b/core/src/test/scala/org/apache/spark/BarrierStageOnSubmittedSuite.scala @@ -20,9 +20,9 @@ package org.apache.spark import scala.concurrent.duration._ import scala.language.postfixOps +import org.apache.spark.internal.config._ import org.apache.spark.rdd.{PartitionPruningRDD, RDD} import org.apache.spark.scheduler.BarrierJobAllocationFailed._ -import org.apache.spark.scheduler.DAGScheduler import org.apache.spark.util.ThreadUtils /** @@ -157,8 +157,8 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext test("submit a barrier ResultStage with dynamic resource allocation enabled") { val conf = new SparkConf() - .set("spark.dynamicAllocation.enabled", "true") - .set("spark.dynamicAllocation.testing", "true") + .set(DYN_ALLOCATION_ENABLED, true) + .set(DYN_ALLOCATION_TESTING, true) .setMaster("local[4]") .setAppName("test") sc = createSparkContext(Some(conf)) @@ -172,8 +172,8 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext test("submit a barrier ShuffleMapStage with dynamic resource allocation enabled") { val conf = new SparkConf() - .set("spark.dynamicAllocation.enabled", "true") - .set("spark.dynamicAllocation.testing", "true") + .set(DYN_ALLOCATION_ENABLED, true) + .set(DYN_ALLOCATION_TESTING, true) .setMaster("local[4]") .setAppName("test") sc = createSparkContext(Some(conf)) @@ -191,9 +191,9 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext "mode") { val conf = new SparkConf() // Shorten the time interval between two failed checks to make the test fail faster. - .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s") + .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL.key, "1s") // Reduce max check failures allowed to make the test fail faster. - .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3") + .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 3) .setMaster("local[4]") .setAppName("test") sc = createSparkContext(Some(conf)) @@ -208,9 +208,9 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext "local mode") { val conf = new SparkConf() // Shorten the time interval between two failed checks to make the test fail faster. - .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s") + .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL.key, "1s") // Reduce max check failures allowed to make the test fail faster. - .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3") + .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 3) .setMaster("local[4]") .setAppName("test") sc = createSparkContext(Some(conf)) @@ -226,11 +226,11 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext test("submit a barrier ResultStage that requires more slots than current total under " + "local-cluster mode") { val conf = new SparkConf() - .set("spark.task.cpus", "2") + .set(CPUS_PER_TASK, 2) // Shorten the time interval between two failed checks to make the test fail faster. - .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s") + .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL.key, "1s") // Reduce max check failures allowed to make the test fail faster. - .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3") + .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 3) .setMaster("local-cluster[4, 3, 1024]") .setAppName("test") sc = createSparkContext(Some(conf)) @@ -244,11 +244,11 @@ class BarrierStageOnSubmittedSuite extends SparkFunSuite with LocalSparkContext test("submit a barrier ShuffleMapStage that requires more slots than current total under " + "local-cluster mode") { val conf = new SparkConf() - .set("spark.task.cpus", "2") + .set(CPUS_PER_TASK, 2) // Shorten the time interval between two failed checks to make the test fail faster. - .set("spark.scheduler.barrier.maxConcurrentTasksCheck.interval", "1s") + .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_INTERVAL.key, "1s") // Reduce max check failures allowed to make the test fail faster. - .set("spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures", "3") + .set(BARRIER_MAX_CONCURRENT_TASKS_CHECK_MAX_FAILURES, 3) .setMaster("local-cluster[4, 3, 1024]") .setAppName("test") sc = createSparkContext(Some(conf)) diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 9e28284..b9b47d4 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -29,10 +29,10 @@ import org.scalatest.concurrent.PatienceConfiguration import org.scalatest.time.SpanSugar._ import org.apache.spark.internal.{config, Logging} +import org.apache.spark.internal.config._ import org.apache.spark.rdd.{RDD, ReliableRDDCheckpointData} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage._ -import org.apache.spark.util.Utils /** * An abstract base class for context cleaner tests, which sets up a context with a config @@ -46,9 +46,9 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[So val conf = new SparkConf() .setMaster("local[2]") .setAppName("ContextCleanerSuite") - .set("spark.cleaner.referenceTracking.blocking", "true") - .set("spark.cleaner.referenceTracking.blocking.shuffle", "true") - .set("spark.cleaner.referenceTracking.cleanCheckpoints", "true") + .set(CLEANER_REFERENCE_TRACKING_BLOCKING, true) + .set(CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE, true) + .set(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS, true) .set(config.SHUFFLE_MANAGER, shuffleManager.getName) before { @@ -234,7 +234,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { val conf = new SparkConf() .setMaster("local[2]") .setAppName("cleanupCheckpoint") - .set("spark.cleaner.referenceTracking.cleanCheckpoints", "false") + .set(CLEANER_REFERENCE_TRACKING_CLEAN_CHECKPOINTS, false) sc = new SparkContext(conf) rdd = newPairRDD() sc.setCheckpointDir(checkpointDir.toString) @@ -317,8 +317,8 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { val conf2 = new SparkConf() .setMaster("local-cluster[2, 1, 1024]") .setAppName("ContextCleanerSuite") - .set("spark.cleaner.referenceTracking.blocking", "true") - .set("spark.cleaner.referenceTracking.blocking.shuffle", "true") + .set(CLEANER_REFERENCE_TRACKING_BLOCKING, true) + .set(CLEANER_REFERENCE_TRACKING_BLOCKING_SHUFFLE, true) .set(config.SHUFFLE_MANAGER, shuffleManager.getName) sc = new SparkContext(conf2) diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 6b310b9..fdaea28 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -63,19 +63,19 @@ class ExecutorAllocationManagerSuite val conf = new SparkConf() .setMaster("myDummyLocalExternalClusterManager") .setAppName("test-executor-allocation-manager") - .set("spark.dynamicAllocation.enabled", "true") - .set("spark.dynamicAllocation.testing", "true") + .set(config.DYN_ALLOCATION_ENABLED, true) + .set(config.DYN_ALLOCATION_TESTING, true) val sc0 = new SparkContext(conf) contexts += sc0 assert(sc0.executorAllocationManager.isDefined) sc0.stop() // Min < 0 - val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1") + val conf1 = conf.clone().set(config.DYN_ALLOCATION_MIN_EXECUTORS, -1) intercept[SparkException] { contexts += new SparkContext(conf1) } // Max < 0 - val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1") + val conf2 = conf.clone().set(config.DYN_ALLOCATION_MAX_EXECUTORS, -1) intercept[SparkException] { contexts += new SparkContext(conf2) } // Both min and max, but min > max @@ -151,11 +151,11 @@ class ExecutorAllocationManagerSuite val conf = new SparkConf() .setMaster("myDummyLocalExternalClusterManager") .setAppName("test-executor-allocation-manager") - .set("spark.dynamicAllocation.enabled", "true") - .set("spark.dynamicAllocation.testing", "true") - .set("spark.dynamicAllocation.maxExecutors", "15") - .set("spark.dynamicAllocation.minExecutors", "3") - .set("spark.dynamicAllocation.executorAllocationRatio", divisor.toString) + .set(config.DYN_ALLOCATION_ENABLED, true) + .set(config.DYN_ALLOCATION_TESTING, true) + .set(config.DYN_ALLOCATION_MAX_EXECUTORS, 15) + .set(config.DYN_ALLOCATION_MIN_EXECUTORS, 3) + .set(config.DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO, divisor) .set(config.EXECUTOR_CORES, cores) val sc = new SparkContext(conf) contexts += sc @@ -1093,14 +1093,14 @@ class ExecutorAllocationManagerSuite val initialExecutors = 1 val maxExecutors = 2 val conf = new SparkConf() - .set("spark.dynamicAllocation.enabled", "true") - .set(config.SHUFFLE_SERVICE_ENABLED.key, "true") - .set("spark.dynamicAllocation.minExecutors", minExecutors.toString) - .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString) - .set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString) - .set("spark.dynamicAllocation.schedulerBacklogTimeout", "1000ms") - .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", "1000ms") - .set("spark.dynamicAllocation.executorIdleTimeout", s"3000ms") + .set(config.DYN_ALLOCATION_ENABLED, true) + .set(config.SHUFFLE_SERVICE_ENABLED, true) + .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors) + .set(config.DYN_ALLOCATION_MAX_EXECUTORS, maxExecutors) + .set(config.DYN_ALLOCATION_INITIAL_EXECUTORS, initialExecutors) + .set(config.DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT.key, "1000ms") + .set(config.DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key, "1000ms") + .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "3000ms") val mockAllocationClient = mock(classOf[ExecutorAllocationClient]) val mockBMM = mock(classOf[BlockManagerMaster]) val manager = new ExecutorAllocationManager( @@ -1155,16 +1155,16 @@ class ExecutorAllocationManagerSuite val conf = new SparkConf() .setMaster("myDummyLocalExternalClusterManager") .setAppName("test-executor-allocation-manager") - .set("spark.dynamicAllocation.enabled", "true") - .set("spark.dynamicAllocation.minExecutors", minExecutors.toString) - .set("spark.dynamicAllocation.maxExecutors", maxExecutors.toString) - .set("spark.dynamicAllocation.initialExecutors", initialExecutors.toString) - .set("spark.dynamicAllocation.schedulerBacklogTimeout", - s"${schedulerBacklogTimeout.toString}s") - .set("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout", + .set(config.DYN_ALLOCATION_ENABLED, true) + .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors) + .set(config.DYN_ALLOCATION_MAX_EXECUTORS, maxExecutors) + .set(config.DYN_ALLOCATION_INITIAL_EXECUTORS, initialExecutors) + .set(config.DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT.key, + s"${schedulerBacklogTimeout.toString}s") + .set(config.DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key, s"${sustainedSchedulerBacklogTimeout.toString}s") - .set("spark.dynamicAllocation.executorIdleTimeout", s"${executorIdleTimeout.toString}s") - .set("spark.dynamicAllocation.testing", "true") + .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, s"${executorIdleTimeout.toString}s") + .set(config.DYN_ALLOCATION_TESTING, true) // SPARK-22864: effectively disable the allocation schedule by setting the period to a // really long value. .set(TEST_SCHEDULE_INTERVAL, 10000L) diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index a69e589..dbe187d 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -28,6 +28,7 @@ import org.mockito.Mockito.{mock, spy, verify, when} import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester} import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} +import org.apache.spark.internal.config.DYN_ALLOCATION_TESTING import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ @@ -67,7 +68,7 @@ class HeartbeatReceiverSuite val conf = new SparkConf() .setMaster("local[2]") .setAppName("test") - .set("spark.dynamicAllocation.testing", "true") + .set(DYN_ALLOCATION_TESTING, true) sc = spy(new SparkContext(conf)) scheduler = mock(classOf[TaskSchedulerImpl]) when(sc.taskScheduler).thenReturn(scheduler) diff --git a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala index f8adaf5..0c59259 100644 --- a/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala +++ b/core/src/test/scala/org/apache/spark/JobCancellationSuite.scala @@ -27,6 +27,7 @@ import scala.concurrent.duration._ import org.scalatest.BeforeAndAfter import org.scalatest.Matchers +import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Deploy._ import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd, SparkListenerTaskStart} import org.apache.spark.util.ThreadUtils @@ -52,7 +53,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } test("local mode, FIFO scheduler") { - val conf = new SparkConf().set("spark.scheduler.mode", "FIFO") + val conf = new SparkConf().set(SCHEDULER_MODE, "FIFO") sc = new SparkContext("local[2]", "test", conf) testCount() testTake() @@ -61,9 +62,9 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } test("local mode, fair scheduler") { - val conf = new SparkConf().set("spark.scheduler.mode", "FAIR") + val conf = new SparkConf().set(SCHEDULER_MODE, "FAIR") val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - conf.set("spark.scheduler.allocation.file", xmlPath) + conf.set(SCHEDULER_ALLOCATION_FILE, xmlPath) sc = new SparkContext("local[2]", "test", conf) testCount() testTake() @@ -72,7 +73,7 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } test("cluster mode, FIFO scheduler") { - val conf = new SparkConf().set("spark.scheduler.mode", "FIFO") + val conf = new SparkConf().set(SCHEDULER_MODE, "FIFO") sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) testCount() testTake() @@ -81,9 +82,9 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft } test("cluster mode, fair scheduler") { - val conf = new SparkConf().set("spark.scheduler.mode", "FAIR") + val conf = new SparkConf().set(SCHEDULER_MODE, "FAIR") val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - conf.set("spark.scheduler.allocation.file", xmlPath) + conf.set(SCHEDULER_ALLOCATION_FILE, xmlPath) sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) testCount() testTake() @@ -217,8 +218,8 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft test("task reaper kills JVM if killed tasks keep running for too long") { val conf = new SparkConf() - .set("spark.task.reaper.enabled", "true") - .set("spark.task.reaper.killTimeout", "5s") + .set(TASK_REAPER_ENABLED, true) + .set(TASK_REAPER_KILL_TIMEOUT.key, "5s") sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) // Add a listener to release the semaphore once any tasks are launched. @@ -254,9 +255,9 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft test("task reaper will not kill JVM if spark.task.killTimeout == -1") { val conf = new SparkConf() - .set("spark.task.reaper.enabled", "true") - .set("spark.task.reaper.killTimeout", "-1") - .set("spark.task.reaper.PollingInterval", "1s") + .set(TASK_REAPER_ENABLED, true) + .set(TASK_REAPER_KILL_TIMEOUT.key, "-1") + .set(TASK_REAPER_POLLING_INTERVAL.key, "1s") .set(MAX_EXECUTOR_RETRIES, 1) sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index adaa069..d869759 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -25,6 +25,7 @@ import org.mockito.Mockito._ import org.apache.spark.LocalSparkContext._ import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Network.{RPC_ASK_TIMEOUT, RPC_MESSAGE_MAX_SIZE} import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv} import org.apache.spark.scheduler.{CompressedMapStatus, MapStatus} import org.apache.spark.shuffle.FetchFailedException @@ -170,8 +171,8 @@ class MapOutputTrackerSuite extends SparkFunSuite { test("remote fetch below max RPC message size") { val newConf = new SparkConf - newConf.set("spark.rpc.message.maxSize", "1") - newConf.set("spark.rpc.askTimeout", "1") // Fail fast + newConf.set(RPC_MESSAGE_MAX_SIZE, 1) + newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 1048576L) val masterTracker = newTrackerMaster(newConf) @@ -199,8 +200,8 @@ class MapOutputTrackerSuite extends SparkFunSuite { test("min broadcast size exceeds max RPC message size") { val newConf = new SparkConf - newConf.set("spark.rpc.message.maxSize", "1") - newConf.set("spark.rpc.askTimeout", "1") // Fail fast + newConf.set(RPC_MESSAGE_MAX_SIZE, 1) + newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, Int.MaxValue.toLong) intercept[IllegalArgumentException] { newTrackerMaster(newConf) } @@ -243,8 +244,8 @@ class MapOutputTrackerSuite extends SparkFunSuite { test("remote fetch using broadcast") { val newConf = new SparkConf - newConf.set("spark.rpc.message.maxSize", "1") - newConf.set("spark.rpc.askTimeout", "1") // Fail fast + newConf.set(RPC_MESSAGE_MAX_SIZE, 1) + newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 10240L) // 10 KiB << 1MiB framesize // needs TorrentBroadcast so need a SparkContext diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 4071dd4..079170d 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -29,6 +29,7 @@ import com.esotericsoftware.kryo.Kryo import org.apache.spark.internal.config._ import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.Kryo._ +import org.apache.spark.internal.config.Network._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer} import org.apache.spark.util.{ResetSystemProperties, RpcUtils} @@ -142,7 +143,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst test("creating SparkContext with cpus per tasks bigger than cores per executors") { val conf = new SparkConf(false) .set(EXECUTOR_CORES, 1) - .set("spark.task.cpus", "2") + .set(CPUS_PER_TASK, 2) intercept[SparkException] { sc = new SparkContext(conf) } } @@ -268,10 +269,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst test("akka deprecated configs") { val conf = new SparkConf() - assert(!conf.contains("spark.rpc.numRetries")) - assert(!conf.contains("spark.rpc.retry.wait")) - assert(!conf.contains("spark.rpc.askTimeout")) - assert(!conf.contains("spark.rpc.lookupTimeout")) + assert(!conf.contains(RPC_NUM_RETRIES)) + assert(!conf.contains(RPC_RETRY_WAIT)) + assert(!conf.contains(RPC_ASK_TIMEOUT)) + assert(!conf.contains(RPC_LOOKUP_TIMEOUT)) conf.set("spark.akka.num.retries", "1") assert(RpcUtils.numRetries(conf) === 1) @@ -322,12 +323,12 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst val conf = new SparkConf() conf.validateSettings() - conf.set(NETWORK_ENCRYPTION_ENABLED, true) + conf.set(NETWORK_CRYPTO_ENABLED, true) intercept[IllegalArgumentException] { conf.validateSettings() } - conf.set(NETWORK_ENCRYPTION_ENABLED, false) + conf.set(NETWORK_CRYPTO_ENABLED, false) conf.set(SASL_ENCRYPTION_ENABLED, true) intercept[IllegalArgumentException] { conf.validateSettings() @@ -341,7 +342,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst val conf = new SparkConf() conf.validateSettings() - conf.set("spark.network.timeout", "5s") + conf.set(NETWORK_TIMEOUT.key, "5s") intercept[IllegalArgumentException] { conf.validateSettings() } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 41d5dee..7a16f7b 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -423,7 +423,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu test("No exception when both num-executors and dynamic allocation set.") { noException should be thrownBy { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local") - .set("spark.dynamicAllocation.enabled", "true").set("spark.executor.instances", "6")) + .set(DYN_ALLOCATION_ENABLED, true).set("spark.executor.instances", "6")) assert(sc.executorAllocationManager.isEmpty) assert(sc.getConf.getInt("spark.executor.instances", 0) === 6) } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 30efbb0..3712d1a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -195,7 +195,7 @@ class SparkSubmitSuite "--name", "myApp", "--class", "Foo", "--num-executors", "0", - "--conf", "spark.dynamicAllocation.enabled=true", + "--conf", s"${DYN_ALLOCATION_ENABLED.key}=true", "thejar.jar") new SparkSubmitArguments(clArgs1) @@ -203,7 +203,7 @@ class SparkSubmitSuite "--name", "myApp", "--class", "Foo", "--num-executors", "0", - "--conf", "spark.dynamicAllocation.enabled=false", + "--conf", s"${DYN_ALLOCATION_ENABLED.key}=false", "thejar.jar") val e = intercept[SparkException](new SparkSubmitArguments(clArgs2)) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 8c3c38d..a6d5066 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -457,9 +457,9 @@ class StandaloneDynamicAllocationSuite test("initial executor limit") { val initialExecutorLimit = 1 val myConf = appConf - .set("spark.dynamicAllocation.enabled", "true") - .set(config.SHUFFLE_SERVICE_ENABLED.key, "true") - .set("spark.dynamicAllocation.initialExecutors", initialExecutorLimit.toString) + .set(config.DYN_ALLOCATION_ENABLED, true) + .set(config.SHUFFLE_SERVICE_ENABLED, true) + .set(config.DYN_ALLOCATION_INITIAL_EXECUTORS, initialExecutorLimit) sc = new SparkContext(myConf) val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index f1cf14d..544d52d 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -33,6 +33,7 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Network import org.apache.spark.network.{BlockDataManager, BlockTransferService} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.BlockFetchingListener @@ -101,8 +102,8 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi .set(NETWORK_AUTH_ENABLED, true) .set(AUTH_SECRET, "good") .set("spark.app.id", "app-id") - .set("spark.network.crypto.enabled", "true") - .set("spark.network.crypto.saslFallback", "false") + .set(Network.NETWORK_CRYPTO_ENABLED, true) + .set(Network.NETWORK_CRYPTO_SASL_FALLBACK, false) testConnection(conf, conf) match { case Success(_) => // expected case Failure(t) => fail(t) diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 51bf5c2..1b7f166 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -37,6 +37,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark.{SecurityManager, SparkConf, SparkEnv, SparkException, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Network import org.apache.spark.util.{ThreadUtils, Utils} /** @@ -172,8 +173,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { val conf = new SparkConf() val shortProp = "spark.rpc.short.timeout" - conf.set("spark.rpc.retry.wait", "0") - conf.set("spark.rpc.numRetries", "1") + conf.set(Network.RPC_RETRY_WAIT, 0L) + conf.set(Network.RPC_NUM_RETRIES, 1) val anotherEnv = createRpcEnv(conf, "remote", 0, clientMode = true) // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef(env.address, "ask-timeout") @@ -709,8 +710,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { testSend(new SparkConf() .set(NETWORK_AUTH_ENABLED, true) .set(AUTH_SECRET, "good") - .set("spark.network.crypto.enabled", "true") - .set("spark.network.crypto.saslFallback", "false")) + .set(Network.NETWORK_CRYPTO_ENABLED, true) + .set(Network.NETWORK_CRYPTO_SASL_FALLBACK, false)) } test("ask with authentication") { @@ -730,8 +731,8 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { testAsk(new SparkConf() .set(NETWORK_AUTH_ENABLED, true) .set(AUTH_SECRET, "good") - .set("spark.network.crypto.enabled", "true") - .set("spark.network.crypto.saslFallback", "false")) + .set(Network.NETWORK_CRYPTO_ENABLED, true) + .set(Network.NETWORK_CRYPTO_SASL_FALLBACK, false)) } test("construct RpcTimeout with conf property") { diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala index aadddca..0fe0e5b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistIntegrationSuite.scala @@ -58,7 +58,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM "With default settings, job can succeed despite multiple bad executors on node", extraConfs = Seq( config.BLACKLIST_ENABLED.key -> "true", - config.MAX_TASK_FAILURES.key -> "4", + config.TASK_MAX_FAILURES.key -> "4", TEST_N_HOSTS.key -> "2", TEST_N_EXECUTORS_HOST.key -> "5", TEST_N_CORES_EXECUTOR.key -> "10" @@ -106,7 +106,7 @@ class BlacklistIntegrationSuite extends SchedulerIntegrationSuite[MultiExecutorM TEST_N_HOSTS.key -> "2", TEST_N_EXECUTORS_HOST.key -> "1", TEST_N_CORES_EXECUTOR.key -> "1", - "spark.scheduler.blacklist.unschedulableTaskSetTimeout" -> "0s" + config.UNSCHEDULABLE_TASKSET_TIMEOUT.key -> "0s" ) ) { def runBackend(): Unit = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala index aea4c5f..0adfb07 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BlacklistTrackerSuite.scala @@ -443,20 +443,20 @@ class BlacklistTrackerSuite extends SparkFunSuite with BeforeAndAfterEach with M (2, 2), (2, 3) ).foreach { case (maxTaskFailures, maxNodeAttempts) => - conf.set(config.MAX_TASK_FAILURES, maxTaskFailures) + conf.set(config.TASK_MAX_FAILURES, maxTaskFailures) conf.set(config.MAX_TASK_ATTEMPTS_PER_NODE.key, maxNodeAttempts.toString) val excMsg = intercept[IllegalArgumentException] { BlacklistTracker.validateBlacklistConfs(conf) }.getMessage() assert(excMsg === s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key} " + - s"( = ${maxNodeAttempts}) was >= ${config.MAX_TASK_FAILURES.key} " + + s"( = ${maxNodeAttempts}) was >= ${config.TASK_MAX_FAILURES.key} " + s"( = ${maxTaskFailures} ). Though blacklisting is enabled, with this configuration, " + s"Spark will not be robust to one bad node. Decrease " + - s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.MAX_TASK_FAILURES.key}, " + + s"${config.MAX_TASK_ATTEMPTS_PER_NODE.key}, increase ${config.TASK_MAX_FAILURES.key}, " + s"or disable blacklisting with ${config.BLACKLIST_ENABLED.key}") } - conf.remove(config.MAX_TASK_FAILURES) + conf.remove(config.TASK_MAX_FAILURES) conf.remove(config.MAX_TASK_ATTEMPTS_PER_NODE) Seq( diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index c5a3966..b31b8cf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -24,6 +24,8 @@ import scala.concurrent.duration._ import org.scalatest.concurrent.Eventually import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite} +import org.apache.spark.internal.config.CPUS_PER_TASK +import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE import org.apache.spark.rdd.RDD import org.apache.spark.util.{RpcUtils, SerializableBuffer} @@ -34,7 +36,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo test("serialized task larger than max RPC message size") { val conf = new SparkConf - conf.set("spark.rpc.message.maxSize", "1") + conf.set(RPC_MESSAGE_MAX_SIZE, 1) conf.set("spark.default.parallelism", "1") sc = new SparkContext("local-cluster[2, 1, 1024]", "test", conf) val frameSize = RpcUtils.maxMessageSizeBytes(sc.conf) @@ -62,7 +64,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo test("compute max number of concurrent tasks can be launched when spark.task.cpus > 1") { val conf = new SparkConf() - .set("spark.task.cpus", "2") + .set(CPUS_PER_TASK, 2) .setMaster("local-cluster[4, 3, 1024]") .setAppName("test") sc = new SparkContext(conf) @@ -76,7 +78,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo test("compute max number of concurrent tasks can be launched when some executors are busy") { val conf = new SparkConf() - .set("spark.task.cpus", "2") + .set(CPUS_PER_TASK, 2) .setMaster("local-cluster[4, 3, 1024]") .setAppName("test") sc = new SparkContext(conf) diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 5bd3955..b953add 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -21,6 +21,7 @@ import java.io.FileNotFoundException import java.util.Properties import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE import org.apache.spark.scheduler.SchedulingMode._ /** @@ -31,7 +32,6 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val LOCAL = "local" val APP_NAME = "PoolSuite" - val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file" val TEST_POOL = "testPool" def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl) @@ -80,7 +80,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { */ test("Fair Scheduler Test") { val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile() - val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath) + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) sc = new SparkContext(LOCAL, APP_NAME, conf) val taskScheduler = new TaskSchedulerImpl(sc) @@ -182,7 +182,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { test("SPARK-17663: FairSchedulableBuilder sets default values for blank or invalid datas") { val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml") .getFile() - val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath) + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) val rootPool = new Pool("", FAIR, 0, 0) val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf) @@ -218,7 +218,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { val taskSetManager1 = createTaskSetManager(stageId = 1, numTasks = 1, taskScheduler) val properties = new Properties() - properties.setProperty("spark.scheduler.pool", TEST_POOL) + properties.setProperty(SparkContext.SPARK_SCHEDULER_POOL, TEST_POOL) // When FIFO Scheduler is used and task sets are submitted, they should be added to // the root pool, and no additional pools should be created @@ -296,7 +296,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { test("Fair Scheduler should build fair scheduler when " + "valid spark.scheduler.allocation.file property is set") { val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-valid-data.xml").getFile() - val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath) + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath) sc = new SparkContext(LOCAL, APP_NAME, conf) val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) @@ -326,7 +326,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { test("Fair Scheduler should throw FileNotFoundException " + "when invalid spark.scheduler.allocation.file property is set") { - val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, "INVALID_FILE_PATH") + val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, "INVALID_FILE_PATH") sc = new SparkContext(LOCAL, APP_NAME, conf) val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index ff0f99b..aa6db8d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -34,6 +34,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.TaskState._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.SCHEDULER_REVIVE_INTERVAL import org.apache.spark.rdd.RDD import org.apache.spark.util.{CallSite, ThreadUtils, Utils} @@ -290,7 +291,7 @@ private[spark] abstract class MockBackend( // Periodically revive offers to allow delay scheduling to work private val reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") - private val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "10ms") + private val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(10L) /** * Test backends should call this to get a task that has been assigned to them by the scheduler. diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 6ffd1e8..2940aef 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -29,6 +29,7 @@ import org.scalatest.Matchers import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{ResetSystemProperties, RpcUtils} @@ -358,7 +359,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("onTaskGettingResult() called when result fetched remotely") { - val conf = new SparkConf().set("spark.rpc.message.maxSize", "1") + val conf = new SparkConf().set(RPC_MESSAGE_MAX_SIZE, 1) sc = new SparkContext("local", "SparkListenerSuite", conf) val listener = new SaveTaskEvents sc.addSparkListener(listener) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index ea1439c..5b17dbf 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -35,6 +35,7 @@ import org.scalatest.concurrent.Eventually._ import org.apache.spark._ import org.apache.spark.TestUtils.JavaSourceFromString +import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE import org.apache.spark.storage.TaskResultBlockId import org.apache.spark.util.{MutableURLClassLoader, RpcUtils, Utils} @@ -110,7 +111,7 @@ class TaskResultGetterSuite extends SparkFunSuite with BeforeAndAfter with Local // Set the RPC message size to be as small as possible (it must be an integer, so 1 is as small // as we can make it) so the tests don't take too long. - def conf: SparkConf = new SparkConf().set("spark.rpc.message.maxSize", "1") + def conf: SparkConf = new SparkConf().set(RPC_MESSAGE_MAX_SIZE, 1) test("handling results smaller than max RPC message size") { sc = new SparkContext("local", "test", conf) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 85c87b9..016adb8 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -92,7 +92,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B sc = new SparkContext(conf) taskScheduler = - new TaskSchedulerImpl(sc, sc.conf.getInt("spark.task.maxFailures", 4)) { + new TaskSchedulerImpl(sc, sc.conf.get(config.TASK_MAX_FAILURES)) { override def createTaskSetManager(taskSet: TaskSet, maxFailures: Int): TaskSetManager = { val tsm = super.createTaskSetManager(taskSet, maxFailures) // we need to create a spied tsm just so we can set the TaskSetBlacklist @@ -155,7 +155,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("Scheduler correctly accounts for multiple CPUs per task") { val taskCpus = 2 - val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) + val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString) // Give zero core offers. Should not generate any tasks val zeroCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 0), new WorkerOffer("executor1", "host1", 0)) @@ -185,7 +185,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("Scheduler does not crash when tasks are not serializable") { val taskCpus = 2 - val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) + val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString) val numFreeCores = 1 val taskSet = new TaskSet( Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 1)), 0, 0, 0, null) @@ -1208,7 +1208,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("don't schedule for a barrier taskSet if available slots are less than pending tasks") { val taskCpus = 2 - val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) + val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString) val numFreeCores = 3 val workerOffers = IndexedSeq( @@ -1225,7 +1225,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B test("schedule tasks for a barrier taskSet if all tasks can be launched together") { val taskCpus = 2 - val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString) + val taskScheduler = setupScheduler(config.CPUS_PER_TASK.key -> taskCpus.toString) val numFreeCores = 3 val workerOffers = IndexedSeq( diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 502a013..d0f98b5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -680,7 +680,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("[SPARK-13931] taskSetManager should not send Resubmitted tasks after being a zombie") { - val conf = new SparkConf().set("spark.speculation", "true") + val conf = new SparkConf().set(config.SPECULATION_ENABLED, true) sc = new SparkContext("local", "test", conf) sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execB", "host2")) @@ -747,12 +747,12 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("[SPARK-22074] Task killed by other attempt task should not be resubmitted") { - val conf = new SparkConf().set("spark.speculation", "true") + val conf = new SparkConf().set(config.SPECULATION_ENABLED, true) sc = new SparkContext("local", "test", conf) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately - sc.conf.set("spark.speculation.multiplier", "0.0") - sc.conf.set("spark.speculation.quantile", "0.5") - sc.conf.set("spark.speculation", "true") + sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) + sc.conf.set(config.SPECULATION_QUANTILE, 0.5) + sc.conf.set(config.SPECULATION_ENABLED, true) var killTaskCalled = false sched = new FakeTaskScheduler(sc, ("exec1", "host1"), @@ -1013,8 +1013,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(4) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately - sc.conf.set("spark.speculation.multiplier", "0.0") - sc.conf.set("spark.speculation", "true") + sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) + sc.conf.set(config.SPECULATION_ENABLED, true) val clock = new ManualClock() val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => @@ -1070,9 +1070,9 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(5) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately - sc.conf.set("spark.speculation.multiplier", "0.0") - sc.conf.set("spark.speculation.quantile", "0.6") - sc.conf.set("spark.speculation", "true") + sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) + sc.conf.set(config.SPECULATION_QUANTILE, 0.6) + sc.conf.set(config.SPECULATION_ENABLED, true) val clock = new ManualClock() val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => @@ -1366,12 +1366,12 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg } test("[SPARK-24677] Avoid NoSuchElementException from MedianHeap") { - val conf = new SparkConf().set("spark.speculation", "true") + val conf = new SparkConf().set(config.SPECULATION_ENABLED, true) sc = new SparkContext("local", "test", conf) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately - sc.conf.set("spark.speculation.multiplier", "0.0") - sc.conf.set("spark.speculation.quantile", "0.1") - sc.conf.set("spark.speculation", "true") + sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) + sc.conf.set(config.SPECULATION_QUANTILE, 0.1) + sc.conf.set(config.SPECULATION_ENABLED, true) sched = new FakeTaskScheduler(sc) sched.initialize(new FakeSchedulerBackend()) @@ -1416,13 +1416,13 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("SPARK-24755 Executor loss can cause task to not be resubmitted") { - val conf = new SparkConf().set("spark.speculation", "true") + val conf = new SparkConf().set(config.SPECULATION_ENABLED, true) sc = new SparkContext("local", "test", conf) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately - sc.conf.set("spark.speculation.multiplier", "0.0") + sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) - sc.conf.set("spark.speculation.quantile", "0.5") - sc.conf.set("spark.speculation", "true") + sc.conf.set(config.SPECULATION_QUANTILE, 0.5) + sc.conf.set(config.SPECULATION_ENABLED, true) var killTaskCalled = false sched = new FakeTaskScheduler(sc, ("exec1", "host1"), @@ -1538,8 +1538,8 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = FakeTask.createTaskSet(4) // Set the speculation multiplier to be 0 so speculative tasks are launched immediately - sc.conf.set("spark.speculation.multiplier", "0.0") - sc.conf.set("spark.speculation", "true") + sc.conf.set(config.SPECULATION_MULTIPLIER, 0.0) + sc.conf.set(config.SPECULATION_ENABLED, true) val clock = new ManualClock() val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala index ae87109..57ed1f6 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerDistributedSuite.scala @@ -30,7 +30,7 @@ class KryoSerializerDistributedSuite extends SparkFunSuite with LocalSparkContex val conf = new SparkConf(false) .set(config.SERIALIZER, "org.apache.spark.serializer.KryoSerializer") .set(config.Kryo.KRYO_USER_REGISTRATORS, classOf[AppJarRegistrator].getName) - .set(config.MAX_TASK_FAILURES, 1) + .set(config.TASK_MAX_FAILURES, 1) .set(config.BLACKLIST_ENABLED, false) val jar = TestUtils.createJarWithClasses(List(AppJarRegistrator.customClassName)) diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 71eeb04..ede9466 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -154,7 +154,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { val jobProps = new Properties() jobProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup") - jobProps.setProperty("spark.scheduler.pool", "schedPool") + jobProps.setProperty(SparkContext.SPARK_SCHEDULER_POOL, "schedPool") listener.onJobStart(SparkListenerJobStart(1, time, stages, jobProps)) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 7aca0ad..0086a79 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -833,32 +833,32 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { conf.set(SUBMIT_DEPLOY_MODE, "client") assert(Utils.isDynamicAllocationEnabled(conf) === false) assert(Utils.isDynamicAllocationEnabled( - conf.set("spark.dynamicAllocation.enabled", "false")) === false) + conf.set(DYN_ALLOCATION_ENABLED, false)) === false) assert(Utils.isDynamicAllocationEnabled( - conf.set("spark.dynamicAllocation.enabled", "true")) === true) + conf.set(DYN_ALLOCATION_ENABLED, true)) === true) assert(Utils.isDynamicAllocationEnabled( conf.set("spark.executor.instances", "1")) === true) assert(Utils.isDynamicAllocationEnabled( conf.set("spark.executor.instances", "0")) === true) assert(Utils.isDynamicAllocationEnabled(conf.set("spark.master", "local")) === false) - assert(Utils.isDynamicAllocationEnabled(conf.set("spark.dynamicAllocation.testing", "true"))) + assert(Utils.isDynamicAllocationEnabled(conf.set(DYN_ALLOCATION_TESTING, true))) } test("getDynamicAllocationInitialExecutors") { val conf = new SparkConf() assert(Utils.getDynamicAllocationInitialExecutors(conf) === 0) assert(Utils.getDynamicAllocationInitialExecutors( - conf.set("spark.dynamicAllocation.minExecutors", "3")) === 3) + conf.set(DYN_ALLOCATION_MIN_EXECUTORS, 3)) === 3) assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors conf.set("spark.executor.instances", "2")) === 3) assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances conf.set("spark.executor.instances", "4")) === 4) assert(Utils.getDynamicAllocationInitialExecutors( // should use executor.instances - conf.set("spark.dynamicAllocation.initialExecutors", "3")) === 4) + conf.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 3)) === 4) assert(Utils.getDynamicAllocationInitialExecutors( // should use initialExecutors - conf.set("spark.dynamicAllocation.initialExecutors", "5")) === 5) + conf.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 5)) === 5) assert(Utils.getDynamicAllocationInitialExecutors( // should use minExecutors - conf.set("spark.dynamicAllocation.initialExecutors", "2") + conf.set(DYN_ALLOCATION_INITIAL_EXECUTORS, 2) .set("spark.executor.instances", "1")) === 3) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala index 95d874b..2853b75 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/LocalClusterSparkContext.scala @@ -20,6 +20,7 @@ package org.apache.spark.mllib.util import org.scalatest.{BeforeAndAfterAll, Suite} import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE trait LocalClusterSparkContext extends BeforeAndAfterAll { self: Suite => @transient var sc: SparkContext = _ @@ -29,7 +30,7 @@ trait LocalClusterSparkContext extends BeforeAndAfterAll { self: Suite => val conf = new SparkConf() .setMaster("local-cluster[2, 1, 1024]") .setAppName("test-cluster") - .set("spark.rpc.message.maxSize", "1") // set to 1MB to detect direct serialization of data + .set(RPC_MESSAGE_MAX_SIZE, 1) // set to 1MB to detect direct serialization of data sc = new SparkContext(conf) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index e285e20..0d2737e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -26,6 +26,7 @@ import org.apache.spark.SparkContext import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.security.HadoopDelegationTokenManager +import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} @@ -47,7 +48,7 @@ private[spark] class KubernetesClusterSchedulerBackend( ExecutionContext.fromExecutorService(requestExecutorsService) protected override val minRegisteredRatio = - if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + if (conf.get(SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).isEmpty) { 0.8 } else { super.minRegisteredRatio diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index fb23535..be2854f 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -630,8 +630,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .registerDriverWithShuffleService( slave.hostname, externalShufflePort, - sc.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", - s"${sc.conf.getTimeAsSeconds("spark.network.timeout", "120s")}s"), + sc.conf.get(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT), sc.conf.get(config.EXECUTOR_HEARTBEAT_INTERVAL)) slave.shuffleRegistered = true } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 0cfaa0a..f2fd5e6 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -591,7 +591,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val expectedCores = 1 setBackend(Map( "spark.cores.max" -> expectedCores.toString, - "spark.scheduler.minRegisteredResourcesRatio" -> "1.0")) + SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO.key -> "1.0")) val offers = List(Resources(backend.executorMemory(sc), expectedCores)) offerResources(offers) @@ -604,8 +604,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite test("supports data locality with dynamic allocation") { setBackend(Map( - "spark.dynamicAllocation.enabled" -> "true", - "spark.dynamicAllocation.testing" -> "true", + DYN_ALLOCATION_ENABLED.key -> "true", + DYN_ALLOCATION_TESTING.key -> "true", "spark.locality.wait" -> "1s")) assert(backend.getExecutorIds().isEmpty) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index a7bed75..3dae11e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -33,6 +33,7 @@ import org.apache.spark.SparkContext import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager import org.apache.spark.internal.Logging +import org.apache.spark.internal.config import org.apache.spark.internal.config.UI._ import org.apache.spark.rpc._ import org.apache.spark.scheduler._ @@ -52,7 +53,7 @@ private[spark] abstract class YarnSchedulerBackend( private val stopped = new AtomicBoolean(false) override val minRegisteredRatio = - if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { + if (conf.get(config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO).isEmpty) { 0.8 } else { super.minRegisteredRatio diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala index 37bccaf..8c62069 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ +import org.apache.spark.internal.config.Network._ import org.apache.spark.network.shuffle.ShuffleTestAccessor import org.apache.spark.network.yarn.{YarnShuffleService, YarnTestAccessor} import org.apache.spark.tags.ExtendedYarnTest @@ -94,14 +95,14 @@ class YarnShuffleAuthSuite extends YarnShuffleIntegrationSuite { override def newYarnConfig(): YarnConfiguration = { val yarnConfig = super.newYarnConfig() yarnConfig.set(NETWORK_AUTH_ENABLED.key, "true") - yarnConfig.set(NETWORK_ENCRYPTION_ENABLED.key, "true") + yarnConfig.set(NETWORK_CRYPTO_ENABLED.key, "true") yarnConfig } override protected def extraSparkConf(): Map[String, String] = { super.extraSparkConf() ++ Map( NETWORK_AUTH_ENABLED.key -> "true", - NETWORK_ENCRYPTION_ENABLED.key -> "true" + NETWORK_CRYPTO_ENABLED.key -> "true" ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala index 6196757..3284231 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/RuntimeConfigSuite.scala @@ -63,7 +63,7 @@ class RuntimeConfigSuite extends SparkFunSuite { assert(!conf.isModifiable("spark.sql.sources.schemaStringLengthThreshold")) assert(conf.isModifiable("spark.sql.streaming.checkpointLocation")) // Core configs - assert(!conf.isModifiable("spark.task.cpus")) + assert(!conf.isModifiable(config.CPUS_PER_TASK.key)) assert(!conf.isModifiable("spark.executor.cores")) // Invalid config parameters assert(!conf.isModifiable("")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 5e97314..12bc68c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -35,6 +35,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.LocalSparkContext._ +import org.apache.spark.internal.config.Network.RPC_NUM_RETRIES import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.util.quietly @@ -393,7 +394,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] .set(StateStore.MAINTENANCE_INTERVAL_CONFIG, "10ms") // Make sure that when SparkContext stops, the StateStore maintenance thread 'quickly' // fails to talk to the StateStoreCoordinator and unloads all the StateStores - .set("spark.rpc.numRetries", "1") + .set(RPC_NUM_RETRIES, 1) val opId = 0 val dir = newDir() val storeProviderId = StateStoreProviderId(StateStoreId(dir, opId, 0), UUID.randomUUID) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index d79c0cf..4c008c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -603,7 +603,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite { val conf = new SparkConf() .setMaster("local") .setAppName("test") - .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly + .set(config.TASK_MAX_FAILURES, 1) // Don't retry the tasks to run this test quickly .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly .set(ASYNC_TRACKING_ENABLED, false) withSpark(new SparkContext(conf)) { sc => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index 6e4f2bb..daca65f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkContext +import org.apache.spark.internal.config.SPECULATION_ENABLED import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.v2.reader._ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index e68c601..83dfa74 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -32,6 +32,7 @@ import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession +import org.apache.spark.SparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLContext} import org.apache.spark.sql.execution.HiveResult @@ -226,7 +227,7 @@ private[hive] class SparkExecuteStatementOperation( sqlContext.sparkContext.setJobGroup(statementId, statement) val pool = sessionToActivePool.get(parentSession.getSessionHandle) if (pool != null) { - sqlContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool) + sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool) } try { result = sqlContext.sql(statement) @@ -234,7 +235,8 @@ private[hive] class SparkExecuteStatementOperation( result.queryExecution.logical match { case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) => sessionToActivePool.put(parentSession.getSessionHandle, value) - logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.") + logInfo(s"Setting ${SparkContext.SPARK_SCHEDULER_POOL}=$value for future statements " + + "in this session.") case _ => } HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString()) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala index d8d2a80..2707107 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala @@ -30,6 +30,7 @@ import org.apache.hadoop.mapred.{JobConf, Reporter} import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.SPECULATION_ENABLED import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory} @@ -69,7 +70,7 @@ class HiveFileFormat(fileSinkConf: FileSinkDesc) // When speculation is on and output committer class name contains "Direct", we should warn // users that they may loss data if they are using a direct output committer. - val speculationEnabled = sparkSession.sparkContext.conf.getBoolean("spark.speculation", false) + val speculationEnabled = sparkSession.sparkContext.conf.get(SPECULATION_ENABLED) val outputCommitterClass = conf.get("mapred.output.committer.class", "") if (speculationEnabled && outputCommitterClass.contains("Direct")) { val warningMessage = diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala index 7ec02c4..4707d6e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala @@ -25,6 +25,7 @@ import org.scalatest.mockito.MockitoSugar import org.scalatest.time.SpanSugar._ import org.apache.spark.{ExecutorAllocationClient, SparkConf, SparkFunSuite} +import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_TESTING} import org.apache.spark.streaming.{DummyInputDStream, Seconds, StreamingContext} import org.apache.spark.util.{ManualClock, Utils} @@ -332,8 +333,8 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite val confWithBothDynamicAllocationEnabled = new SparkConf() .set("spark.streaming.dynamicAllocation.enabled", "true") - .set("spark.dynamicAllocation.enabled", "true") - .set("spark.dynamicAllocation.testing", "true") + .set(DYN_ALLOCATION_ENABLED, true) + .set(DYN_ALLOCATION_TESTING, true) require(Utils.isDynamicAllocationEnabled(confWithBothDynamicAllocationEnabled) === true) withStreamingContext(confWithBothDynamicAllocationEnabled) { ssc => intercept[IllegalArgumentException] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org