This is an automated email from the ASF dual-hosted git repository. weichenxu123 pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 3b9cca7aa32 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes 3b9cca7aa32 is described below commit 3b9cca7aa32f659ea7413abeb373fe7ed069e6f7 Author: Weichen Xu <weichen...@databricks.com> AuthorDate: Sat Nov 19 17:23:20 2022 +0800 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes Signed-off-by: Weichen Xu <weichen.xudatabricks.com> ### What changes were proposed in this pull request? Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes. ### Why are the changes needed? This is for limiting the thread number for OpenBLAS routine to the number of cores assigned to this executor because some spark ML algorithms calls OpenBlAS via netlib-java, e.g.: Spark ALS estimator training calls LAPACK API `dppsv` (internally it will call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use all CPU cores. But spark will launch multiple spark tasks on a spark worker, and each spark task might call `dppsv` API at the same time, and each call internally it will create multiple threads (threads number equals to CPU cores), this causes CPU oversubscription. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. Closes #38699 from WeichenXu123/SPARK-41188. Authored-by: Weichen Xu <weichen...@databricks.com> Signed-off-by: Weichen Xu <weichen...@databricks.com> (cherry picked from commit 82a41d8ca273e7a93333268324c6958f8bb14d9e) Signed-off-by: Weichen Xu <weichen...@databricks.com> --- core/src/main/scala/org/apache/spark/SparkContext.scala | 10 ++++++++++ .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 7 ------- .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++++++---- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f7d8c799029..f991d2ea09c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -542,6 +542,16 @@ class SparkContext(config: SparkConf) extends Logging { executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser + if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { + // if OMP_NUM_THREADS is not explicitly set, override it with the value of "spark.task.cpus" + // SPARK-41188: limit the thread number for OpenBLAS routine to the number of cores assigned + // to this executor because some spark ML algorithms calls OpenBlAS via netlib-java + // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor + // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool + // see https://github.com/numpy/numpy/issues/10455 + executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1")) + } + _shuffleDriverComponents = ShuffleDataIOUtils.loadShuffleDataIO(config).driver() _shuffleDriverComponents.initializeApplication().asScala.foreach { case (k, v) => _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 3a3e7e04e7f..d854874c0e8 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -131,13 +131,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val execCoresProp = Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY)) val memoryMb = Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong) val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") - // if OMP_NUM_THREADS is not explicitly set, override it with the number of cores - if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { - // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor - // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool - // see https://github.com/numpy/numpy/issues/10455 - execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _)) - } envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread if (reuseWorker) { envVars.put("SPARK_REUSE_WORKER", "1") diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala index 92676cc4e73..0a2c0cef31e 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala @@ -128,8 +128,9 @@ object Utils { .getEnvironment .getVariablesList .asScala - assert(envVars - .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars + assert(envVars.count { x => + !x.getName.startsWith("SPARK_") && x.getName != "OMP_NUM_THREADS" + } == 2) // user-defined secret env vars val variableOne = envVars.filter(_.getName == "SECRET_ENV_KEY").head assert(variableOne.getSecret.isInitialized) assert(variableOne.getSecret.getType == Secret.Type.REFERENCE) @@ -157,8 +158,9 @@ object Utils { .getEnvironment .getVariablesList .asScala - assert(envVars - .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars + assert(envVars.count { x => + !x.getName.startsWith("SPARK_") && x.getName != "OMP_NUM_THREADS" + } == 2) // user-defined secret env vars val variableOne = envVars.filter(_.getName == "USER").head assert(variableOne.getSecret.isInitialized) assert(variableOne.getSecret.getType == Secret.Type.VALUE) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org