ueshin commented on code in PR #51684:
URL: https://github.com/apache/spark/pull/51684#discussion_r2244010972
##########
core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala:
##########
@@ -33,6 +33,12 @@ import org.apache.spark.util.ThreadUtils
// Tests for PythonWorkerFactory.
class PythonWorkerFactorySuite extends SparkFunSuite with SharedSparkContext {
+ private def getIdleWorkerCount(factory: PythonWorkerFactory): Int = {
+ val field = factory.getClass.getDeclaredField("idleWorkers")
Review Comment:
I guess it's ok to make `idleWorkers` as `private[spark]`? cc @HyukjinKwon
##########
core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala:
##########
@@ -484,6 +486,15 @@ private[spark] class PythonWorkerFactory(
self.synchronized {
lastActivityNs = System.nanoTime()
idleWorkers.enqueue(worker)
+ if (idleWorkerPoolSize.exists(idleWorkers.size > _)) {
+ val oldestWorker = idleWorkers.dequeue()
+ try {
+ oldestWorker.stop()
Review Comment:
We should call `stopWorker`?
##########
core/src/test/scala/org/apache/spark/api/python/PythonWorkerFactorySuite.scala:
##########
@@ -56,4 +62,49 @@ class PythonWorkerFactorySuite extends SparkFunSuite with
SharedSparkContext {
// Timeout ensures that the test fails in 5 minutes if
createSimplerWorker() doesn't return.
ThreadUtils.awaitReady(createFuture, 5.minutes)
}
+
+ test("idle worker pool is unbounded when idleWorkerMaxPoolSize is not set") {
+ sc.conf.remove("spark.python.factory.idleWorkerMaxPoolSize")
+
+ val factory = new PythonWorkerFactory("python3", "pyspark.worker",
Map.empty, true)
+
+ assert(getIdleWorkerCount(factory) === 0)
+
+ val mockWorkers = (1 to 3).map { _ =>
+ val mockChannel = java.nio.channels.SocketChannel.open()
+ mockChannel.configureBlocking(false)
+ PythonWorker(mockChannel)
+ }
+ mockWorkers.foreach(factory.releaseWorker)
+ assert(getIdleWorkerCount(factory) === 3)
+
+ mockWorkers.foreach(_.stop())
+ }
+
+ test("idle worker pool is bounded when idleWorkerMaxPoolSize is set") {
+ sc.conf.set("spark.python.factory.idleWorkerMaxPoolSize", "2")
+
+ val factory = new PythonWorkerFactory("python3", "pyspark.worker",
Map.empty, true)
+
+ assert(getIdleWorkerCount(factory) === 0)
+
+ val mockWorkers = (1 to 2).map { _ =>
+ val mockChannel = java.nio.channels.SocketChannel.open()
+ mockChannel.configureBlocking(false)
+ PythonWorker(mockChannel)
+ }
+ mockWorkers.foreach(factory.releaseWorker)
+ assert(getIdleWorkerCount(factory) === 2)
+
+ val worker3 = {
+ val mockChannel = java.nio.channels.SocketChannel.open()
+ mockChannel.configureBlocking(false)
+ PythonWorker(mockChannel)
+ }
+ factory.releaseWorker(worker3)
+ assert(getIdleWorkerCount(factory) === 2)
+
+ mockWorkers.foreach(_.stop())
+ worker3.stop()
Review Comment:
Shall we surround the above with try block and put these in the final clause
to make sure all the workers are cleaned up?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]