This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ca02481b6be2 [SPARK-52971][PYTHON] Limit idle Python worker queue size ca02481b6be2 is described below commit ca02481b6be2dd87cf61e3a9ee682887c6cc8d6e Author: Tibi Craiu <tibi.cr...@databricks.com> AuthorDate: Tue Aug 5 11:34:59 2025 -0700 [SPARK-52971][PYTHON] Limit idle Python worker queue size ### What changes were proposed in this pull request? Makes the number of idle workers in the `PythonWorkerFactory` pool configurable. ### Why are the changes needed? Without limiting the maximum queue size, the idle worker pool can grow unbounded. Allows better control over number of workers allowed. ### Does this PR introduce _any_ user-facing change? Yes, adds a new optional configuration entry: `spark.python.factory.idleWorkerMaxPoolSize`, from Spark 4.1.0 ### How was this patch tested? This patch adds two new test to verify behavior with and without the worker limit configuration. ### Was this patch authored or co-authored using generative AI tooling? No Closes #51684 from craiuconstantintiberiu/SPARK-52971-idle-pool. Authored-by: Tibi Craiu <tibi.cr...@databricks.com> Signed-off-by: Takuya Ueshin <ues...@databricks.com> --- .../spark/api/python/PythonWorkerFactory.scala | 30 ++++++++---- .../org/apache/spark/internal/config/Python.scala | 11 +++++ .../api/python/PythonWorkerFactorySuite.scala | 53 ++++++++++++++++++++++ 3 files changed, 85 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index bbce14a20749..e02f10cc3fe6 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -34,7 +34,7 @@ import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys._ -import org.apache.spark.internal.config.Python.{PYTHON_UNIX_DOMAIN_SOCKET_DIR, PYTHON_UNIX_DOMAIN_SOCKET_ENABLED} +import org.apache.spark.internal.config.Python.PYTHON_FACTORY_IDLE_WORKER_MAX_POOL_SIZE import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util.{RedirectThread, Utils} @@ -98,8 +98,9 @@ private[spark] class PythonWorkerFactory( !Utils.isWindows && useDaemonEnabled } - private val authHelper = new SocketAuthHelper(SparkEnv.get.conf) - private val isUnixDomainSock = authHelper.conf.get(PYTHON_UNIX_DOMAIN_SOCKET_ENABLED) + private val conf = SparkEnv.get.conf + private val authHelper = new SocketAuthHelper(conf) + private val isUnixDomainSock = authHelper.isUnixDomainSock @GuardedBy("self") private var daemon: Process = null @@ -111,7 +112,11 @@ private[spark] class PythonWorkerFactory( @GuardedBy("self") private var daemonSockPath: String = _ @GuardedBy("self") - private val idleWorkers = new mutable.Queue[PythonWorker]() + // Visible for testing + private[spark] val idleWorkers = new mutable.Queue[PythonWorker]() + @GuardedBy("self") + private val maxIdleWorkerPoolSize = + conf.get(PYTHON_FACTORY_IDLE_WORKER_MAX_POOL_SIZE) @GuardedBy("self") private var lastActivityNs = 0L new MonitorThread().start() @@ -127,7 +132,7 @@ private[spark] class PythonWorkerFactory( def create(): (PythonWorker, Option[ProcessHandle]) = { if (useDaemon) { self.synchronized { - // Pull from idle workers until we one that is alive, otherwise create a new one. + // Pull from idle workers until we get one that is alive, otherwise create a new one. while (idleWorkers.nonEmpty) { val worker = idleWorkers.dequeue() daemonWorkers.get(worker).foreach { workerHandle => @@ -203,8 +208,7 @@ private[spark] class PythonWorkerFactory( blockingMode: Boolean): (PythonWorker, Option[ProcessHandle]) = { var serverSocketChannel: ServerSocketChannel = null lazy val sockPath = new File( - authHelper.conf.get(PYTHON_UNIX_DOMAIN_SOCKET_DIR) - .getOrElse(System.getProperty("java.io.tmpdir")), + authHelper.sockDir, s".${UUID.randomUUID()}.sock") try { if (isUnixDomainSock) { @@ -307,8 +311,7 @@ private[spark] class PythonWorkerFactory( if (isUnixDomainSock) { workerEnv.put( "PYTHON_WORKER_FACTORY_SOCK_DIR", - authHelper.conf.get(PYTHON_UNIX_DOMAIN_SOCKET_DIR) - .getOrElse(System.getProperty("java.io.tmpdir"))) + authHelper.sockDir) workerEnv.put("PYTHON_UNIX_DOMAIN_ENABLED", "True") } else { workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret) @@ -483,6 +486,15 @@ private[spark] class PythonWorkerFactory( if (useDaemon) { self.synchronized { lastActivityNs = System.nanoTime() + if (maxIdleWorkerPoolSize.exists(idleWorkers.size >= _)) { + val oldestWorker = idleWorkers.dequeue() + try { + stopWorker(oldestWorker) + } catch { + case e: Exception => + logWarning("Failed to stop evicted worker", e) + } + } idleWorkers.enqueue(worker) } } else { diff --git a/core/src/main/scala/org/apache/spark/internal/config/Python.scala b/core/src/main/scala/org/apache/spark/internal/config/Python.scala index 79f6d5bcb854..de95e2fa1f7a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Python.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Python.scala @@ -127,4 +127,15 @@ private[spark] object Python { .timeConf(TimeUnit.SECONDS) .checkValue(_ >= 0, "The interval should be 0 or positive.") .createWithDefault(0) + + val PYTHON_FACTORY_IDLE_WORKER_MAX_POOL_SIZE = + ConfigBuilder("spark.python.factory.idleWorkerMaxPoolSize") + .doc("Maximum number of idle Python workers to keep. " + + "If unset, the number is unbounded. " + + "If set to a positive integer N, at most N idle workers are retained; " + + "least-recently used workers are evicted first.") + .version("4.1.0") + .intConf + .checkValue(_ > 0, "If set, the idle worker max size must be > 0.") + .createOptional } diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala b/core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala index 4d994ea5dffa..4f9dafb6cbea 100644 --- a/core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala +++ b/core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.api.python import java.net.SocketTimeoutException +import scala.collection.mutable // scalastyle:off executioncontextglobal import scala.concurrent.ExecutionContext.Implicits.global // scalastyle:on executioncontextglobal @@ -56,4 +57,56 @@ class PythonWorkerFactorySuite extends SparkFunSuite with SharedSparkContext { // Timeout ensures that the test fails in 5 minutes if createSimplerWorker() doesn't return. ThreadUtils.awaitReady(createFuture, 5.minutes) } + + test("idle worker pool is unbounded when idleWorkerMaxPoolSize is not set") { + sc.conf.remove("spark.python.factory.idleWorkerMaxPoolSize") + + val factory = new PythonWorkerFactory("python3", "pyspark.worker", Map.empty, true) + + assert(factory.idleWorkers.size === 0) + + val mockWorkers: mutable.Queue[PythonWorker] = mutable.Queue.empty + try { + (1 to 3).foreach { _ => + val mockChannel = java.nio.channels.SocketChannel.open() + mockChannel.configureBlocking(false) + mockWorkers.enqueue(PythonWorker(mockChannel)) + } + mockWorkers.foreach(factory.releaseWorker) + assert(factory.idleWorkers.size === 3) + + } finally { + mockWorkers.foreach(factory.stopWorker) + } + } + + test("idle worker pool is bounded when idleWorkerMaxPoolSize is set") { + sc.conf.set("spark.python.factory.idleWorkerMaxPoolSize", "2") + + val factory = new PythonWorkerFactory("python3", "pyspark.worker", Map.empty, true) + + assert(factory.idleWorkers.size === 0) + val mockWorkers: mutable.Queue[PythonWorker] = mutable.Queue.empty + try { + (1 to 2).foreach { _ => + val mockChannel = java.nio.channels.SocketChannel.open() + mockChannel.configureBlocking(false) + mockWorkers.enqueue(PythonWorker(mockChannel)) + } + mockWorkers.foreach(factory.releaseWorker) + assert(factory.idleWorkers.size === 2) + + + val worker3 = { + val mockChannel = java.nio.channels.SocketChannel.open() + mockChannel.configureBlocking(false) + PythonWorker(mockChannel) + } + mockWorkers.enqueue(worker3) + factory.releaseWorker(worker3) + assert(factory.idleWorkers.size === 2) + } finally { + mockWorkers.foreach(factory.stopWorker) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org