srowen commented on a change in pull request #25545: [SPARK-28843][PYTHON] Set 
OMP_NUM_THREADS to executor cores for python
URL: https://github.com/apache/spark/pull/25545#discussion_r317627588
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
 ##########
 @@ -106,6 +106,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     val startTime = System.currentTimeMillis
     val env = SparkEnv.get
     val localdir = env.blockManager.diskBlockManager.localDirs.map(f => 
f.getPath()).mkString(",")
+    // if OMP_NUM_THREADS is not explicitly set, override it with the number 
of cores
+    if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) {
+      // SPARK-28843: limit the OpenMP thread pool to the number of cores 
assigned to this executor
+      // this avoids high memory consumption with pandas/numpy because of a 
large OpenMP thread pool
+      // see https://github.com/numpy/numpy/issues/10455
+      
conf.getOption("spark.executor.cores").foreach(envVars.put("OMP_NUM_THREADS", 
_))
 
 Review comment:
   1 is just the default in YARN; the meaning is the same everywhere.
   
   Let me break it down further, to illustrate. All this is doing is ensuring 
that a process isn't using more cores than it should, which ought to always be 
a good thing. (And it saves memory along the way.) Suppose there's a 16-core 
machine.
   
   Case 1: JVM Spark
   a) `spark.executor.cores` = 16. There is one JVM using all cores.
   b) `spark.executor.cores` = 4. There are (up to) 4 JVMs using 4 cores each.
   
   Case 2: Pyspark
   a) `spark.executor.cores` = 16. There are 16 Python processes
   b) `spark.executor.cores` = 4. There are still 16 Python processes.
   
   In case 1a, imagine using MLlib that uses OpenBLAS or MKL. By default, 
OpenMP will use all 16 cores now. This is fine, and does not change with this 
change. In case 1b, each JVM will use 16 cores, so OpenMP will attempt to use 
64 total threads (to my understanding here), right now. This change would make 
this use 16 cores (4 x 4). That's better.
   
   In case 2a and 2b, imagine using numpy. 256 threads will be used in total on 
the machine! That's bad; it's a little slower because of all the context 
switching, but also uses more memory. This change does not however help case 
2a. It does help 2b, where 'at least' only 64 threads are started.
   
   The more aggressive change would be to set the default to 1, always, for 
Pyspark as well as this matches the execution better. However, this is at least 
a more conservative step to merely cap it at the number of allocated executor 
cores.
   
   
   Yes, the situation isn't as bad if the executor isn't actually fully 
utilized, but, I don't think we should optimize for that case? at least, this 
more conservative change still errs on the side of over-committing the cores at 
the cost of memory, just not nearly as extremely as the default.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to