This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b3c11ef8e090 Revert "Revert "[SPARK-45302][PYTHON] Remove PID communication between Python workers when no demon is used"" b3c11ef8e090 is described below commit b3c11ef8e09082e0091ed9f459b432208a3ccd43 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Wed Apr 24 09:54:41 2024 +0900 Revert "Revert "[SPARK-45302][PYTHON] Remove PID communication between Python workers when no demon is used"" This reverts commit e8f529bb89a6edfcedbca7a08993e6e8d9612009. --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 4 ++-- .../main/scala/org/apache/spark/api/python/PythonRunner.scala | 10 +++++----- .../org/apache/spark/api/python/PythonWorkerFactory.scala | 11 +++++------ python/pyspark/daemon.py | 4 ++-- .../sql/connect/streaming/worker/foreach_batch_worker.py | 2 -- .../pyspark/sql/connect/streaming/worker/listener_worker.py | 2 -- python/pyspark/sql/worker/analyze_udtf.py | 3 --- python/pyspark/worker.py | 3 --- .../apache/spark/sql/execution/python/PythonArrowOutput.scala | 2 +- .../apache/spark/sql/execution/python/PythonUDFRunner.scala | 2 +- 10 files changed, 16 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 34c7d955fedd..50d0358004d4 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -142,7 +142,7 @@ class SparkEnv ( workerModule: String, daemonModule: String, envVars: Map[String, String], - useDaemon: Boolean): (PythonWorker, Option[Int]) = { + useDaemon: Boolean): (PythonWorker, Option[Long]) = { synchronized { val key = PythonWorkersKey(pythonExec, workerModule, daemonModule, envVars) val workerFactory = pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory( @@ -161,7 +161,7 @@ class SparkEnv ( pythonExec: String, workerModule: String, envVars: Map[String, String], - useDaemon: Boolean): (PythonWorker, Option[Int]) = { + useDaemon: Boolean): (PythonWorker, Option[Long]) = { createPythonWorker( pythonExec, workerModule, PythonWorkerFactory.defaultDaemonModule, envVars, useDaemon) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 7ff782db210d..17cb0c5a55dd 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -88,7 +88,7 @@ private object BasePythonRunner { private lazy val faultHandlerLogDir = Utils.createTempDir(namePrefix = "faulthandler") - private def faultHandlerLogPath(pid: Int): Path = { + private def faultHandlerLogPath(pid: Long): Path = { new File(faultHandlerLogDir, pid.toString).toPath } } @@ -204,7 +204,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( envVars.put("SPARK_JOB_ARTIFACT_UUID", jobArtifactUUID.getOrElse("default")) - val (worker: PythonWorker, pid: Option[Int]) = env.createPythonWorker( + val (worker: PythonWorker, pid: Option[Long]) = env.createPythonWorker( pythonExec, workerModule, daemonModule, envVars.asScala.toMap) // Whether is the worker released into idle pool or closed. When any codes try to release or // close a worker, they should use `releasedOrClosed.compareAndSet` to flip the state to make @@ -257,7 +257,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( startTime: Long, env: SparkEnv, worker: PythonWorker, - pid: Option[Int], + pid: Option[Long], releasedOrClosed: AtomicBoolean, context: TaskContext): Iterator[OUT] @@ -465,7 +465,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( startTime: Long, env: SparkEnv, worker: PythonWorker, - pid: Option[Int], + pid: Option[Long], releasedOrClosed: AtomicBoolean, context: TaskContext) extends Iterator[OUT] { @@ -842,7 +842,7 @@ private[spark] class PythonRunner( startTime: Long, env: SparkEnv, worker: PythonWorker, - pid: Option[Int], + pid: Option[Long], releasedOrClosed: AtomicBoolean, context: TaskContext): Iterator[Array[Byte]] = { new ReaderIterator( diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 875cf6bc2770..eb740b72987c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -92,7 +92,7 @@ private[spark] class PythonWorkerFactory( envVars.getOrElse("PYTHONPATH", ""), sys.env.getOrElse("PYTHONPATH", "")) - def create(): (PythonWorker, Option[Int]) = { + def create(): (PythonWorker, Option[Long]) = { if (useDaemon) { self.synchronized { // Pull from idle workers until we one that is alive, otherwise create a new one. @@ -122,9 +122,9 @@ private[spark] class PythonWorkerFactory( * processes itself to avoid the high cost of forking from Java. This currently only works * on UNIX-based systems. */ - private def createThroughDaemon(): (PythonWorker, Option[Int]) = { + private def createThroughDaemon(): (PythonWorker, Option[Long]) = { - def createWorker(): (PythonWorker, Option[Int]) = { + def createWorker(): (PythonWorker, Option[Long]) = { val socketChannel = SocketChannel.open(new InetSocketAddress(daemonHost, daemonPort)) // These calls are blocking. val pid = new DataInputStream(Channels.newInputStream(socketChannel)).readInt() @@ -165,7 +165,7 @@ private[spark] class PythonWorkerFactory( /** * Launch a worker by executing worker.py (by default) directly and telling it to connect to us. */ - private[spark] def createSimpleWorker(blockingMode: Boolean): (PythonWorker, Option[Int]) = { + private[spark] def createSimpleWorker(blockingMode: Boolean): (PythonWorker, Option[Long]) = { var serverSocketChannel: ServerSocketChannel = null try { serverSocketChannel = ServerSocketChannel.open() @@ -209,8 +209,7 @@ private[spark] class PythonWorkerFactory( "Timed out while waiting for the Python worker to connect back") } authHelper.authClient(socketChannel.socket()) - // TODO: When we drop JDK 8, we can just use workerProcess.pid() - val pid = new DataInputStream(Channels.newInputStream(socketChannel)).readInt() + val pid = workerProcess.toHandle.pid() if (pid < 0) { throw new IllegalStateException("Python failed to launch worker with code " + pid) } diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index bbbc495d053e..b0e06d13beda 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -28,7 +28,7 @@ from errno import EINTR, EAGAIN from socket import AF_INET, AF_INET6, SOCK_STREAM, SOMAXCONN from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN, SIGINT -from pyspark.serializers import read_int, write_int, write_with_length, UTF8Deserializer +from pyspark.serializers import read_long, write_int, write_with_length, UTF8Deserializer if len(sys.argv) > 1: import importlib @@ -139,7 +139,7 @@ def manager(): if 0 in ready_fds: try: - worker_pid = read_int(stdin_bin) + worker_pid = read_long(stdin_bin) except EOFError: # Spark told us to exit by closing stdin shutdown(0) diff --git a/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py b/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py index 0c92de6372b6..80cc69126916 100644 --- a/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py +++ b/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py @@ -96,6 +96,4 @@ if __name__ == "__main__": (sock_file, sock) = local_connect_and_auth(java_port, auth_secret) # There could be a long time between each micro batch. sock.settimeout(None) - write_int(os.getpid(), sock_file) - sock_file.flush() main(sock_file, sock_file) diff --git a/python/pyspark/sql/connect/streaming/worker/listener_worker.py b/python/pyspark/sql/connect/streaming/worker/listener_worker.py index a7a5066ca0d7..3709e50ba026 100644 --- a/python/pyspark/sql/connect/streaming/worker/listener_worker.py +++ b/python/pyspark/sql/connect/streaming/worker/listener_worker.py @@ -110,6 +110,4 @@ if __name__ == "__main__": (sock_file, sock) = local_connect_and_auth(java_port, auth_secret) # There could be a long time between each listener event. sock.settimeout(None) - write_int(os.getpid(), sock_file) - sock_file.flush() main(sock_file, sock_file) diff --git a/python/pyspark/sql/worker/analyze_udtf.py b/python/pyspark/sql/worker/analyze_udtf.py index 7dafb87c4221..d0a24363c0c1 100644 --- a/python/pyspark/sql/worker/analyze_udtf.py +++ b/python/pyspark/sql/worker/analyze_udtf.py @@ -264,7 +264,4 @@ if __name__ == "__main__": java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"]) auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"] (sock_file, _) = local_connect_and_auth(java_port, auth_secret) - # TODO: Remove the following two lines and use `Process.pid()` when we drop JDK 8. - write_int(os.getpid(), sock_file) - sock_file.flush() main(sock_file, sock_file) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index e9c259e68a27..41f6c35bc445 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -1868,7 +1868,4 @@ if __name__ == "__main__": java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"]) auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"] (sock_file, _) = local_connect_and_auth(java_port, auth_secret) - # TODO: Remove the following two lines and use `Process.pid()` when we drop JDK 8. - write_int(os.getpid(), sock_file) - sock_file.flush() main(sock_file, sock_file) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala index e7d4aa9f0460..90922d89ad10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala @@ -49,7 +49,7 @@ private[python] trait PythonArrowOutput[OUT <: AnyRef] { self: BasePythonRunner[ startTime: Long, env: SparkEnv, worker: PythonWorker, - pid: Option[Int], + pid: Option[Long], releasedOrClosed: AtomicBoolean, context: TaskContext): Iterator[OUT] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala index 87ff5a0ec433..bbe9fbfc748d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala @@ -80,7 +80,7 @@ abstract class BasePythonUDFRunner( startTime: Long, env: SparkEnv, worker: PythonWorker, - pid: Option[Int], + pid: Option[Long], releasedOrClosed: AtomicBoolean, context: TaskContext): Iterator[Array[Byte]] = { new ReaderIterator( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org