This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new f431cdf0944 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes f431cdf0944 is described below commit f431cdf09442b86133d17fa6e56cb8b77ca4e486 Author: Weichen Xu <weichen...@databricks.com> AuthorDate: Sat Nov 19 17:23:20 2022 +0800 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes Signed-off-by: Weichen Xu <weichen.xudatabricks.com> ### What changes were proposed in this pull request? Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes. ### Why are the changes needed? This is for limiting the thread number for OpenBLAS routine to the number of cores assigned to this executor because some spark ML algorithms calls OpenBlAS via netlib-java, e.g.: Spark ALS estimator training calls LAPACK API `dppsv` (internally it will call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use all CPU cores. But spark will launch multiple spark tasks on a spark worker, and each spark task might call `dppsv` API at the same time, and each call internally it will create multiple threads (threads number equals to CPU cores), this causes CPU oversubscription. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. Closes #38699 from WeichenXu123/SPARK-41188. Authored-by: Weichen Xu <weichen...@databricks.com> Signed-off-by: Weichen Xu <weichen...@databricks.com> (cherry picked from commit 82a41d8ca273e7a93333268324c6958f8bb14d9e) Signed-off-by: Weichen Xu <weichen...@databricks.com> --- core/src/main/scala/org/apache/spark/SparkContext.scala | 10 ++++++++++ .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 7 ------- .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++++++---- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 02c58d2a9b4..0d0d4fe83a4 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -546,6 +546,16 @@ class SparkContext(config: SparkConf) extends Logging { executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser + if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { + // if OMP_NUM_THREADS is not explicitly set, override it with the value of "spark.task.cpus" + // SPARK-41188: limit the thread number for OpenBLAS routine to the number of cores assigned + // to this executor because some spark ML algorithms calls OpenBlAS via netlib-java + // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor + // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool + // see https://github.com/numpy/numpy/issues/10455 + executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1")) + } + _shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver() _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) => _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index f32c80f3ef5..bf5b862438a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -133,13 +133,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY)) val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong) val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") - // if OMP_NUM_THREADS is not explicitly set, override it with the number of cores - if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { - // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor - // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool - // see https://github.com/numpy/numpy/issues/10455 - execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _)) - } envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread if (reuseWorker) { envVars.put("SPARK_REUSE_WORKER", "1") diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala index 92676cc4e73..0a2c0cef31e 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala @@ -128,8 +128,9 @@ object Utils { .getEnvironment .getVariablesList .asScala - assert(envVars - .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars + assert(envVars.count { x => + !x.getName.startsWith("SPARK_") && x.getName != "OMP_NUM_THREADS" + } == 2) // user-defined secret env vars val variableOne = envVars.filter(_.getName == "SECRET_ENV_KEY").head assert(variableOne.getSecret.isInitialized) assert(variableOne.getSecret.getType == Secret.Type.REFERENCE) @@ -157,8 +158,9 @@ object Utils { .getEnvironment .getVariablesList .asScala - assert(envVars - .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars + assert(envVars.count { x => + !x.getName.startsWith("SPARK_") && x.getName != "OMP_NUM_THREADS" + } == 2) // user-defined secret env vars val variableOne = envVars.filter(_.getName == "USER").head assert(variableOne.getSecret.isInitialized) assert(variableOne.getSecret.getType == Secret.Type.VALUE) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org