This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ca02481b6be2 [SPARK-52971][PYTHON] Limit idle Python worker queue size
ca02481b6be2 is described below

commit ca02481b6be2dd87cf61e3a9ee682887c6cc8d6e
Author: Tibi Craiu <tibi.cr...@databricks.com>
AuthorDate: Tue Aug 5 11:34:59 2025 -0700

    [SPARK-52971][PYTHON] Limit idle Python worker queue size
    
    ### What changes were proposed in this pull request?
    
    Makes the number of idle workers in the `PythonWorkerFactory` pool 
configurable.
    
    ### Why are the changes needed?
    
    Without limiting the maximum queue size, the idle worker pool can grow 
unbounded. Allows better control over number of workers allowed.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, adds a new optional configuration entry: 
`spark.python.factory.idleWorkerMaxPoolSize`, from Spark 4.1.0
    
    ### How was this patch tested?
    
    This patch adds two new test to verify behavior with and without the worker 
limit configuration.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51684 from craiuconstantintiberiu/SPARK-52971-idle-pool.
    
    Authored-by: Tibi Craiu <tibi.cr...@databricks.com>
    Signed-off-by: Takuya Ueshin <ues...@databricks.com>
---
 .../spark/api/python/PythonWorkerFactory.scala     | 30 ++++++++----
 .../org/apache/spark/internal/config/Python.scala  | 11 +++++
 .../api/python/PythonWorkerFactorySuite.scala      | 53 ++++++++++++++++++++++
 3 files changed, 85 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index bbce14a20749..e02f10cc3fe6 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -34,7 +34,7 @@ import org.apache.spark._
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.LogKeys._
-import org.apache.spark.internal.config.Python.{PYTHON_UNIX_DOMAIN_SOCKET_DIR, 
PYTHON_UNIX_DOMAIN_SOCKET_ENABLED}
+import 
org.apache.spark.internal.config.Python.PYTHON_FACTORY_IDLE_WORKER_MAX_POOL_SIZE
 import org.apache.spark.security.SocketAuthHelper
 import org.apache.spark.util.{RedirectThread, Utils}
 
@@ -98,8 +98,9 @@ private[spark] class PythonWorkerFactory(
     !Utils.isWindows && useDaemonEnabled
   }
 
-  private val authHelper = new SocketAuthHelper(SparkEnv.get.conf)
-  private val isUnixDomainSock = 
authHelper.conf.get(PYTHON_UNIX_DOMAIN_SOCKET_ENABLED)
+  private val conf = SparkEnv.get.conf
+  private val authHelper = new SocketAuthHelper(conf)
+  private val isUnixDomainSock = authHelper.isUnixDomainSock
 
   @GuardedBy("self")
   private var daemon: Process = null
@@ -111,7 +112,11 @@ private[spark] class PythonWorkerFactory(
   @GuardedBy("self")
   private var daemonSockPath: String = _
   @GuardedBy("self")
-  private val idleWorkers = new mutable.Queue[PythonWorker]()
+  // Visible for testing
+  private[spark] val idleWorkers = new mutable.Queue[PythonWorker]()
+  @GuardedBy("self")
+  private val maxIdleWorkerPoolSize =
+    conf.get(PYTHON_FACTORY_IDLE_WORKER_MAX_POOL_SIZE)
   @GuardedBy("self")
   private var lastActivityNs = 0L
   new MonitorThread().start()
@@ -127,7 +132,7 @@ private[spark] class PythonWorkerFactory(
   def create(): (PythonWorker, Option[ProcessHandle]) = {
     if (useDaemon) {
       self.synchronized {
-        // Pull from idle workers until we one that is alive, otherwise create 
a new one.
+        // Pull from idle workers until we get one that is alive, otherwise 
create a new one.
         while (idleWorkers.nonEmpty) {
           val worker = idleWorkers.dequeue()
           daemonWorkers.get(worker).foreach { workerHandle =>
@@ -203,8 +208,7 @@ private[spark] class PythonWorkerFactory(
       blockingMode: Boolean): (PythonWorker, Option[ProcessHandle]) = {
     var serverSocketChannel: ServerSocketChannel = null
     lazy val sockPath = new File(
-      authHelper.conf.get(PYTHON_UNIX_DOMAIN_SOCKET_DIR)
-        .getOrElse(System.getProperty("java.io.tmpdir")),
+      authHelper.sockDir,
       s".${UUID.randomUUID()}.sock")
     try {
       if (isUnixDomainSock) {
@@ -307,8 +311,7 @@ private[spark] class PythonWorkerFactory(
         if (isUnixDomainSock) {
           workerEnv.put(
             "PYTHON_WORKER_FACTORY_SOCK_DIR",
-            authHelper.conf.get(PYTHON_UNIX_DOMAIN_SOCKET_DIR)
-              .getOrElse(System.getProperty("java.io.tmpdir")))
+            authHelper.sockDir)
           workerEnv.put("PYTHON_UNIX_DOMAIN_ENABLED", "True")
         } else {
           workerEnv.put("PYTHON_WORKER_FACTORY_SECRET", authHelper.secret)
@@ -483,6 +486,15 @@ private[spark] class PythonWorkerFactory(
     if (useDaemon) {
       self.synchronized {
         lastActivityNs = System.nanoTime()
+        if (maxIdleWorkerPoolSize.exists(idleWorkers.size >= _)) {
+          val oldestWorker = idleWorkers.dequeue()
+          try {
+            stopWorker(oldestWorker)
+          } catch {
+            case e: Exception =>
+              logWarning("Failed to stop evicted worker", e)
+          }
+        }
         idleWorkers.enqueue(worker)
       }
     } else {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Python.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
index 79f6d5bcb854..de95e2fa1f7a 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Python.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Python.scala
@@ -127,4 +127,15 @@ private[spark] object Python {
       .timeConf(TimeUnit.SECONDS)
       .checkValue(_ >= 0, "The interval should be 0 or positive.")
       .createWithDefault(0)
+
+  val PYTHON_FACTORY_IDLE_WORKER_MAX_POOL_SIZE =
+    ConfigBuilder("spark.python.factory.idleWorkerMaxPoolSize")
+      .doc("Maximum number of idle Python workers to keep. " +
+        "If unset, the number is unbounded. " +
+        "If set to a positive integer N, at most N idle workers are retained; 
" +
+        "least-recently used workers are evicted first.")
+      .version("4.1.0")
+      .intConf
+      .checkValue(_ > 0, "If set, the idle worker max size must be > 0.")
+      .createOptional
 }
diff --git 
a/core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala
 
b/core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala
index 4d994ea5dffa..4f9dafb6cbea 100644
--- 
a/core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.api.python
 
 import java.net.SocketTimeoutException
 
+import scala.collection.mutable
 // scalastyle:off executioncontextglobal
 import scala.concurrent.ExecutionContext.Implicits.global
 // scalastyle:on executioncontextglobal
@@ -56,4 +57,56 @@ class PythonWorkerFactorySuite extends SparkFunSuite with 
SharedSparkContext {
     // Timeout ensures that the test fails in 5 minutes if 
createSimplerWorker() doesn't return.
     ThreadUtils.awaitReady(createFuture, 5.minutes)
   }
+
+  test("idle worker pool is unbounded when idleWorkerMaxPoolSize is not set") {
+    sc.conf.remove("spark.python.factory.idleWorkerMaxPoolSize")
+
+    val factory = new PythonWorkerFactory("python3", "pyspark.worker", 
Map.empty, true)
+
+    assert(factory.idleWorkers.size === 0)
+
+    val mockWorkers: mutable.Queue[PythonWorker] = mutable.Queue.empty
+    try {
+      (1 to 3).foreach { _ =>
+        val mockChannel = java.nio.channels.SocketChannel.open()
+        mockChannel.configureBlocking(false)
+        mockWorkers.enqueue(PythonWorker(mockChannel))
+      }
+      mockWorkers.foreach(factory.releaseWorker)
+      assert(factory.idleWorkers.size === 3)
+
+    } finally {
+      mockWorkers.foreach(factory.stopWorker)
+    }
+  }
+
+  test("idle worker pool is bounded when idleWorkerMaxPoolSize is set") {
+    sc.conf.set("spark.python.factory.idleWorkerMaxPoolSize", "2")
+
+    val factory = new PythonWorkerFactory("python3", "pyspark.worker", 
Map.empty, true)
+
+    assert(factory.idleWorkers.size === 0)
+    val mockWorkers: mutable.Queue[PythonWorker] = mutable.Queue.empty
+    try {
+      (1 to 2).foreach { _ =>
+        val mockChannel = java.nio.channels.SocketChannel.open()
+        mockChannel.configureBlocking(false)
+        mockWorkers.enqueue(PythonWorker(mockChannel))
+      }
+      mockWorkers.foreach(factory.releaseWorker)
+      assert(factory.idleWorkers.size === 2)
+
+
+      val worker3 = {
+        val mockChannel = java.nio.channels.SocketChannel.open()
+        mockChannel.configureBlocking(false)
+        PythonWorker(mockChannel)
+      }
+      mockWorkers.enqueue(worker3)
+      factory.releaseWorker(worker3)
+      assert(factory.idleWorkers.size === 2)
+    } finally {
+      mockWorkers.foreach(factory.stopWorker)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to