This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e7443d6 [SPARK-27774][CORE][MLLIB] Avoid hardcoded configs e7443d6 is described below commit e7443d6412582aa16769e298c31d889a5ba0143c Author: wenxuanguan <choose_h...@126.com> AuthorDate: Wed May 22 10:45:11 2019 +0900 [SPARK-27774][CORE][MLLIB] Avoid hardcoded configs ## What changes were proposed in this pull request? avoid hardcoded configs in `SparkConf` and `SparkSubmit` and test ## How was this patch tested? N/A Closes #24631 from wenxuanguan/minor-fix. Authored-by: wenxuanguan <choose_h...@126.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../main/scala/org/apache/spark/SparkConf.scala | 8 ++++---- .../main/scala/org/apache/spark/api/r/RUtils.scala | 2 +- .../scala/org/apache/spark/deploy/RRunner.scala | 3 ++- .../org/apache/spark/deploy/SparkSubmit.scala | 6 +++--- .../apache/spark/deploy/worker/DriverWrapper.scala | 2 +- .../scala/org/apache/spark/SparkConfSuite.scala | 2 +- .../scala/org/apache/spark/SparkContextSuite.scala | 2 +- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 4 ++-- .../deploy/StandaloneDynamicAllocationSuite.scala | 6 +++--- .../deploy/rest/SubmitRestProtocolSuite.scala | 24 +++++++++++----------- .../storage/BlockManagerReplicationSuite.scala | 6 +++--- .../apache/spark/storage/BlockManagerSuite.scala | 5 +++-- .../apache/spark/storage/MemoryStoreSuite.scala | 5 +++-- .../org/apache/spark/ml/feature/Word2Vec.scala | 3 ++- .../org/apache/spark/mllib/feature/Word2Vec.scala | 3 ++- .../cluster/mesos/MesosClusterScheduler.scala | 2 +- .../spark/deploy/yarn/ApplicationMaster.scala | 2 +- 17 files changed, 45 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index aa93f42..bd2ef5b 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -595,7 +595,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria // it will almost always cause ExecutorLostFailure. See SPARK-22754. require(executorTimeoutThresholdMs > executorHeartbeatIntervalMs, "The value of " + s"${networkTimeout}=${executorTimeoutThresholdMs}ms must be no less than the value of " + - s"spark.executor.heartbeatInterval=${executorHeartbeatIntervalMs}ms.") + s"${EXECUTOR_HEARTBEAT_INTERVAL.key}=${executorHeartbeatIntervalMs}ms.") } /** @@ -667,12 +667,12 @@ private[spark] object SparkConf extends Logging { translation = s => s"${s.toLong * 10}s")), REDUCER_MAX_SIZE_IN_FLIGHT.key -> Seq( AlternateConfig("spark.reducer.maxMbInFlight", "1.4")), - "spark.kryoserializer.buffer" -> Seq( + KRYO_SERIALIZER_BUFFER_SIZE.key -> Seq( AlternateConfig("spark.kryoserializer.buffer.mb", "1.4", translation = s => s"${(s.toDouble * 1000).toInt}k")), - "spark.kryoserializer.buffer.max" -> Seq( + KRYO_SERIALIZER_MAX_BUFFER_SIZE.key -> Seq( AlternateConfig("spark.kryoserializer.buffer.max.mb", "1.4")), - "spark.shuffle.file.buffer" -> Seq( + SHUFFLE_FILE_BUFFER_SIZE.key -> Seq( AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")), EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> Seq( AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")), diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index 5a43302..311fade 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -60,7 +60,7 @@ private[spark] object RUtils { def sparkRPackagePath(isDriver: Boolean): Seq[String] = { val (master, deployMode) = if (isDriver) { - (sys.props("spark.master"), sys.props("spark.submit.deployMode")) + (sys.props("spark.master"), sys.props(SUBMIT_DEPLOY_MODE.key)) } else { val sparkConf = SparkEnv.get.conf (sparkConf.get("spark.master"), sparkConf.get(SUBMIT_DEPLOY_MODE)) diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala index 6284e6a..60ba047 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{SparkException, SparkUserAppException} import org.apache.spark.api.r.{RBackend, RUtils} import org.apache.spark.internal.config.R._ +import org.apache.spark.internal.config.SUBMIT_DEPLOY_MODE import org.apache.spark.util.RedirectThread /** @@ -46,7 +47,7 @@ object RRunner { // but kept here for backward compatibility. var cmd = sys.props.getOrElse(SPARKR_COMMAND.key, SPARKR_COMMAND.defaultValue.get) cmd = sys.props.getOrElse(R_COMMAND.key, cmd) - if (sys.props.getOrElse("spark.submit.deployMode", "client") == "client") { + if (sys.props.getOrElse(SUBMIT_DEPLOY_MODE.key, "client") == "client") { cmd = sys.props.getOrElse("spark.r.driver.command", cmd) } cmd diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 49d9395..59b638b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -517,7 +517,7 @@ private[spark] class SparkSubmit extends Logging { // All cluster managers OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"), OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, - confKey = "spark.submit.deployMode"), + confKey = SUBMIT_DEPLOY_MODE.key), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"), OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, @@ -747,8 +747,8 @@ private[spark] class SparkSubmit extends Logging { // Resolve paths in certain spark properties val pathConfigs = Seq( - "spark.jars", - "spark.files", + JARS.key, + FILES.key, "spark.yarn.dist.files", "spark.yarn.dist.archives", "spark.yarn.dist.jars") diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 1e8ad0b..9b51bea 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -91,7 +91,7 @@ object DriverWrapper extends Logging { val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(packagesExclusions, packages, repositories, ivyRepoPath, Option(ivySettingsPath)) val jars = { - val jarsProp = sys.props.get("spark.jars").orNull + val jarsProp = sys.props.get(config.JARS.key).orNull if (!StringUtils.isBlank(resolvedMavenCoordinates)) { DependencyUtils.mergeFileLists(jarsProp, resolvedMavenCoordinates) } else { diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 3a5de8a..83a9ea3 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -248,7 +248,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(conf.getTimeAsSeconds("spark.yarn.am.waitTime") === 420) conf.set("spark.kryoserializer.buffer.mb", "1.1") - assert(conf.getSizeAsKb("spark.kryoserializer.buffer") === 1100) + assert(conf.getSizeAsKb(KRYO_SERIALIZER_BUFFER_SIZE.key) === 1100) conf.set("spark.history.fs.cleaner.maxAge.seconds", "42") assert(conf.get(MAX_LOG_AGE_S) === 42L) diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 3490eaf..1dcb2f7 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -671,7 +671,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu // task metrics via heartbeat to driver. .set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false") // Set a short heartbeat interval to send SparkListenerExecutorMetricsUpdate fast - .set("spark.executor.heartbeatInterval", "1s") + .set(EXECUTOR_HEARTBEAT_INTERVAL.key, "1s") sc = new SparkContext(conf) sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true") @volatile var runningTaskIds: Seq[Long] = null diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index ef6213e..b0c187d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -375,8 +375,8 @@ class SparkSubmitSuite confMap.keys should contain ("spark.app.name") confMap.keys should contain (JARS.key) confMap.keys should contain ("spark.driver.memory") - confMap.keys should contain ("spark.driver.cores") - confMap.keys should contain ("spark.driver.supervise") + confMap.keys should contain (DRIVER_CORES.key) + confMap.keys should contain (DRIVER_SUPERVISE.key) confMap.keys should contain (UI_ENABLED.key) confMap.keys should contain (SUBMIT_DEPLOY_MODE.key) conf.get(UI_ENABLED) should be (false) diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala index 39daf51..4c6a669 100644 --- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala @@ -135,7 +135,7 @@ class StandaloneDynamicAllocationSuite } test("dynamic allocation with max cores <= cores per worker") { - sc = new SparkContext(appConf.set("spark.cores.max", "8")) + sc = new SparkContext(appConf.set(config.CORES_MAX, 8)) val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() @@ -190,7 +190,7 @@ class StandaloneDynamicAllocationSuite } test("dynamic allocation with max cores > cores per worker") { - sc = new SparkContext(appConf.set("spark.cores.max", "16")) + sc = new SparkContext(appConf.set(config.CORES_MAX, 16)) val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() @@ -297,7 +297,7 @@ class StandaloneDynamicAllocationSuite test("dynamic allocation with cores per executor AND max cores") { sc = new SparkContext(appConf .set(config.EXECUTOR_CORES, 2) - .set("spark.cores.max", "8")) + .set(config.CORES_MAX, 8)) val appId = sc.applicationId eventually(timeout(10.seconds), interval(10.millis)) { val apps = getApplications() diff --git a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala index 87655f3..03102fd 100644 --- a/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/rest/SubmitRestProtocolSuite.scala @@ -95,27 +95,27 @@ class SubmitRestProtocolSuite extends SparkFunSuite { message.validate() // optional fields conf.set(JARS, Seq("mayonnaise.jar", "ketchup.jar")) - conf.set("spark.files", "fireball.png") + conf.set(FILES.key, "fireball.png") conf.set("spark.driver.memory", s"${Utils.DEFAULT_DRIVER_MEM_MB}m") - conf.set("spark.driver.cores", "180") + conf.set(DRIVER_CORES, 180) conf.set("spark.driver.extraJavaOptions", " -Dslices=5 -Dcolor=mostly_red") conf.set("spark.driver.extraClassPath", "food-coloring.jar") conf.set("spark.driver.extraLibraryPath", "pickle.jar") - conf.set("spark.driver.supervise", "false") + conf.set(DRIVER_SUPERVISE, false) conf.set("spark.executor.memory", "256m") - conf.set("spark.cores.max", "10000") + conf.set(CORES_MAX, 10000) message.sparkProperties = conf.getAll.toMap message.appArgs = Array("two slices", "a hint of cinnamon") message.environmentVariables = Map("PATH" -> "/dev/null") message.validate() // bad fields - var badConf = conf.clone().set("spark.driver.cores", "one hundred feet") + var badConf = conf.clone().set(DRIVER_CORES.key, "one hundred feet") message.sparkProperties = badConf.getAll.toMap intercept[SubmitRestProtocolException] { message.validate() } - badConf = conf.clone().set("spark.driver.supervise", "nope, never") + badConf = conf.clone().set(DRIVER_SUPERVISE.key, "nope, never") message.sparkProperties = badConf.getAll.toMap intercept[SubmitRestProtocolException] { message.validate() } - badConf = conf.clone().set("spark.cores.max", "two men") + badConf = conf.clone().set(CORES_MAX.key, "two men") message.sparkProperties = badConf.getAll.toMap intercept[SubmitRestProtocolException] { message.validate() } message.sparkProperties = conf.getAll.toMap @@ -127,17 +127,17 @@ class SubmitRestProtocolSuite extends SparkFunSuite { assert(newMessage.appResource === "honey-walnut-cherry.jar") assert(newMessage.mainClass === "org.apache.spark.examples.SparkPie") assert(newMessage.sparkProperties("spark.app.name") === "SparkPie") - assert(newMessage.sparkProperties("spark.jars") === "mayonnaise.jar,ketchup.jar") - assert(newMessage.sparkProperties("spark.files") === "fireball.png") + assert(newMessage.sparkProperties(JARS.key) === "mayonnaise.jar,ketchup.jar") + assert(newMessage.sparkProperties(FILES.key) === "fireball.png") assert(newMessage.sparkProperties("spark.driver.memory") === s"${Utils.DEFAULT_DRIVER_MEM_MB}m") - assert(newMessage.sparkProperties("spark.driver.cores") === "180") + assert(newMessage.sparkProperties(DRIVER_CORES.key) === "180") assert(newMessage.sparkProperties("spark.driver.extraJavaOptions") === " -Dslices=5 -Dcolor=mostly_red") assert(newMessage.sparkProperties("spark.driver.extraClassPath") === "food-coloring.jar") assert(newMessage.sparkProperties("spark.driver.extraLibraryPath") === "pickle.jar") - assert(newMessage.sparkProperties("spark.driver.supervise") === "false") + assert(newMessage.sparkProperties(DRIVER_SUPERVISE.key) === "false") assert(newMessage.sparkProperties("spark.executor.memory") === "256m") - assert(newMessage.sparkProperties("spark.cores.max") === "10000") + assert(newMessage.sparkProperties(CORES_MAX.key) === "10000") assert(newMessage.appArgs === message.appArgs) assert(newMessage.sparkProperties === message.sparkProperties) assert(newMessage.environmentVariables === message.environmentVariables) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index a739701..25c7ef3 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -416,12 +416,12 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite class BlockManagerReplicationSuite extends BlockManagerReplicationBehavior { val conf = new SparkConf(false).set("spark.app.id", "test") - conf.set("spark.kryoserializer.buffer", "1m") + conf.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m") } class BlockManagerProactiveReplicationSuite extends BlockManagerReplicationBehavior { val conf = new SparkConf(false).set("spark.app.id", "test") - conf.set("spark.kryoserializer.buffer", "1m") + conf.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m") conf.set(STORAGE_REPLICATION_PROACTIVE, true) conf.set(STORAGE_EXCEPTION_PIN_LEAK, true) @@ -496,7 +496,7 @@ class DummyTopologyMapper(conf: SparkConf) extends TopologyMapper(conf) with Log class BlockManagerBasicStrategyReplicationSuite extends BlockManagerReplicationBehavior { val conf: SparkConf = new SparkConf(false).set("spark.app.id", "test") - conf.set("spark.kryoserializer.buffer", "1m") + conf.set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m") conf.set( STORAGE_REPLICATION_POLICY, classOf[BasicBlockReplicationPolicy].getName) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 9f3d8f2..78f24fa 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -74,7 +74,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val shuffleManager = new SortShuffleManager(new SparkConf(false)) // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test - val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m")) + val serializer = new KryoSerializer( + new SparkConf(false).set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")) // Implicitly convert strings to BlockIds for test clarity. implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value) @@ -86,7 +87,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE .set(IS_TESTING, true) .set(MEMORY_FRACTION, 1.0) .set(MEMORY_STORAGE_FRACTION, 0.999) - .set("spark.kryoserializer.buffer", "1m") + .set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m") .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) } diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index 958d57d..a723141 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -25,7 +25,7 @@ import scala.reflect.ClassTag import org.scalatest._ import org.apache.spark._ -import org.apache.spark.internal.config.STORAGE_UNROLL_MEMORY_THRESHOLD +import org.apache.spark.internal.config._ import org.apache.spark.memory.{MemoryMode, UnifiedMemoryManager} import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator} @@ -42,7 +42,8 @@ class MemoryStoreSuite .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test - val serializer = new KryoSerializer(new SparkConf(false).set("spark.kryoserializer.buffer", "1m")) + val serializer = new KryoSerializer( + new SparkConf(false).set(Kryo.KRYO_SERIALIZER_BUFFER_SIZE.key, "1m")) val serializerManager = new SerializerManager(serializer, conf) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index fc9996d..6ae90b8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -20,6 +20,7 @@ package org.apache.spark.ml.feature import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since +import org.apache.spark.internal.config.Kryo.KRYO_SERIALIZER_MAX_BUFFER_SIZE import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT} import org.apache.spark.ml.param._ @@ -339,7 +340,7 @@ object Word2VecModel extends MLReadable[Word2VecModel] { val wordVectors = instance.wordVectors.getVectors val dataPath = new Path(path, "data").toString val bufferSizeInBytes = Utils.byteStringAsBytes( - sc.conf.get("spark.kryoserializer.buffer.max", "64m")) + sc.conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "64m")) val numPartitions = Word2VecModelWriter.calculateNumberOfPartitions( bufferSizeInBytes, instance.wordVectors.wordIndex.size, instance.getVectorSize) val spark = sparkSession diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 94c4fcc..9e19ff2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -32,6 +32,7 @@ import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaRDD import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Kryo.KRYO_SERIALIZER_MAX_BUFFER_SIZE import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd._ @@ -679,7 +680,7 @@ object Word2VecModel extends Loader[Word2VecModel] { // We want to partition the model in partitions smaller than // spark.kryoserializer.buffer.max val bufferSize = Utils.byteStringAsBytes( - spark.conf.get("spark.kryoserializer.buffer.max", "64m")) + spark.conf.get(KRYO_SERIALIZER_MAX_BUFFER_SIZE.key, "64m")) // We calculate the approximate size of the model // We only calculate the array size, considering an // average string size of 15 bytes, the formula is: diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 8566a30..289b109 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -541,7 +541,7 @@ private[spark] class MesosClusterScheduler( // --conf val replicatedOptionsBlacklist = Set( JARS.key, // Avoids duplicate classes in classpath - "spark.submit.deployMode", // this would be set to `cluster`, but we need client + SUBMIT_DEPLOY_MODE.key, // this would be set to `cluster`, but we need client "spark.master" // this contains the address of the dispatcher, not master ) val defaultConf = conf.getAllWithPrefix(config.DISPATCHER_DRIVER_DEFAULT_PREFIX).toMap diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e4b6b3d..5a67caf 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -215,7 +215,7 @@ private[spark] class ApplicationMaster( // Set the master and deploy mode property to match the requested mode. System.setProperty("spark.master", "yarn") - System.setProperty("spark.submit.deployMode", "cluster") + System.setProperty(SUBMIT_DEPLOY_MODE.key, "cluster") // Set this internal configuration if it is running on cluster mode, this // configuration will be checked in SparkContext to avoid misuse of yarn cluster mode. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org