This is an automated email from the ASF dual-hosted git repository.

weichenxu123 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 3b9cca7aa32 [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to 
be spark.task.cpus by default for spark executor JVM processes
3b9cca7aa32 is described below

commit 3b9cca7aa32f659ea7413abeb373fe7ed069e6f7
Author: Weichen Xu <weichen...@databricks.com>
AuthorDate: Sat Nov 19 17:23:20 2022 +0800

    [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be 
spark.task.cpus by default for spark executor JVM processes
    
    Signed-off-by: Weichen Xu <weichen.xudatabricks.com>
    
    ### What changes were proposed in this pull request?
    
    Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark 
executor JVM processes.
    
    ### Why are the changes needed?
    
    This is for limiting the thread number for OpenBLAS routine to the number 
of cores assigned to this executor because some spark ML algorithms calls 
OpenBlAS via netlib-java,
    e.g.:
    Spark ALS estimator training calls LAPACK API `dppsv` (internally it will 
call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use 
all CPU cores. But spark will launch multiple spark tasks on a spark worker, 
and each spark task might call `dppsv` API at the same time, and each call 
internally it will create multiple threads (threads number equals to CPU 
cores), this causes CPU oversubscription.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually.
    
    Closes #38699 from WeichenXu123/SPARK-41188.
    
    Authored-by: Weichen Xu <weichen...@databricks.com>
    Signed-off-by: Weichen Xu <weichen...@databricks.com>
    (cherry picked from commit 82a41d8ca273e7a93333268324c6958f8bb14d9e)
    Signed-off-by: Weichen Xu <weichen...@databricks.com>
---
 core/src/main/scala/org/apache/spark/SparkContext.scala        | 10 ++++++++++
 .../main/scala/org/apache/spark/api/python/PythonRunner.scala  |  7 -------
 .../scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala | 10 ++++++----
 3 files changed, 16 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f7d8c799029..f991d2ea09c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -542,6 +542,16 @@ class SparkContext(config: SparkConf) extends Logging {
     executorEnvs ++= _conf.getExecutorEnv
     executorEnvs("SPARK_USER") = sparkUser
 
+    if (_conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
+      // if OMP_NUM_THREADS is not explicitly set, override it with the value 
of "spark.task.cpus"
+      // SPARK-41188: limit the thread number for OpenBLAS routine to the 
number of cores assigned
+      // to this executor because some spark ML algorithms calls OpenBlAS via 
netlib-java
+      // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
+      // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
+      // see https://github.com/numpy/numpy/issues/10455
+      executorEnvs.put("OMP_NUM_THREADS", _conf.get("spark.task.cpus", "1"))
+    }
+
     _shuffleDriverComponents = 
ShuffleDataIOUtils.loadShuffleDataIO(config).driver()
     _shuffleDriverComponents.initializeApplication().asScala.foreach { case 
(k, v) =>
       _conf.set(ShuffleDataIOUtils.SHUFFLE_SPARK_CONF_PREFIX + k, v)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 3a3e7e04e7f..d854874c0e8 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -131,13 +131,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     val execCoresProp = 
Option(context.getLocalProperty(EXECUTOR_CORES_LOCAL_PROPERTY))
     val memoryMb = 
Option(context.getLocalProperty(PYSPARK_MEMORY_LOCAL_PROPERTY)).map(_.toLong)
     val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
-    // if OMP_NUM_THREADS is not explicitly set, override it with the number 
of cores
-    if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
-      // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
-      // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
-      // see https://github.com/numpy/numpy/issues/10455
-      execCoresProp.foreach(envVars.put("OMP_NUM_THREADS", _))
-    }
     envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor 
thread
     if (reuseWorker) {
       envVars.put("SPARK_REUSE_WORKER", "1")
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
index 92676cc4e73..0a2c0cef31e 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -128,8 +128,9 @@ object Utils {
       .getEnvironment
       .getVariablesList
       .asScala
-    assert(envVars
-      .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env 
vars
+    assert(envVars.count { x =>
+      !x.getName.startsWith("SPARK_") && x.getName != "OMP_NUM_THREADS"
+    } == 2) // user-defined secret env vars
     val variableOne = envVars.filter(_.getName == "SECRET_ENV_KEY").head
     assert(variableOne.getSecret.isInitialized)
     assert(variableOne.getSecret.getType == Secret.Type.REFERENCE)
@@ -157,8 +158,9 @@ object Utils {
       .getEnvironment
       .getVariablesList
       .asScala
-    assert(envVars
-      .count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env 
vars
+    assert(envVars.count { x =>
+      !x.getName.startsWith("SPARK_") && x.getName != "OMP_NUM_THREADS"
+    } == 2) // user-defined secret env vars
     val variableOne = envVars.filter(_.getName == "USER").head
     assert(variableOne.getSecret.isInitialized)
     assert(variableOne.getSecret.getType == Secret.Type.VALUE)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to