srowen commented on a change in pull request #25545: [SPARK-28843][PYTHON] Set OMP_NUM_THREADS to executor cores for python URL: https://github.com/apache/spark/pull/25545#discussion_r317627588
########## File path: core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ########## @@ -106,6 +106,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val startTime = System.currentTimeMillis val env = SparkEnv.get val localdir = env.blockManager.diskBlockManager.localDirs.map(f => f.getPath()).mkString(",") + // if OMP_NUM_THREADS is not explicitly set, override it with the number of cores + if (conf.getOption("spark.executorEnv.OMP_NUM_THREADS").isEmpty) { + // SPARK-28843: limit the OpenMP thread pool to the number of cores assigned to this executor + // this avoids high memory consumption with pandas/numpy because of a large OpenMP thread pool + // see https://github.com/numpy/numpy/issues/10455 + conf.getOption("spark.executor.cores").foreach(envVars.put("OMP_NUM_THREADS", _)) Review comment: 1 is just the default in YARN; the meaning is the same everywhere. Let me break it down further, to illustrate. All this is doing is ensuring that a process isn't using more cores than it should, which ought to always be a good thing. (And it saves memory along the way.) Suppose there's a 16-core machine. Case 1: JVM Spark a) `spark.executor.cores` = 16. There is one JVM using all cores. b) `spark.executor.cores` = 4. There are (up to) 4 JVMs using 4 cores each. Case 2: Pyspark a) `spark.executor.cores` = 16. There are 16 Python processes b) `spark.executor.cores` = 4. There are still 16 Python processes. In case 1a, imagine using MLlib that uses OpenBLAS or MKL. By default, OpenMP will use all 16 cores now. This is fine, and does not change with this change. In case 1b, each JVM will use 16 cores, so OpenMP will attempt to use 64 total threads (to my understanding here), right now. This change would make this use 16 cores (4 x 4). That's better. In case 2a and 2b, imagine using numpy. 256 threads will be used in total on the machine! That's bad; it's a little slower because of all the context switching, but also uses more memory. This change does not however help case 2a. It does help 2b, where 'at least' only 64 threads are started. The more aggressive change would be to set the default to 1, always, for Pyspark as well as this matches the execution better. However, this is at least a more conservative step to merely cap it at the number of allocated executor cores. Yes, the situation isn't as bad if the executor isn't actually fully utilized, but, I don't think we should optimize for that case? at least, this more conservative change still errs on the side of over-committing the cores at the cost of memory, just not nearly as extremely as the default. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org