This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 088e05d2518 [SPARK-38677][PYSPARK] Python MonitorThread should detect deadlock due to blocking I/O 088e05d2518 is described below commit 088e05d2518883aa27d0b8144107e45f41dd6b90 Author: Ankur Dave <ankurd...@gmail.com> AuthorDate: Tue Apr 12 12:01:19 2022 +0900 [SPARK-38677][PYSPARK] Python MonitorThread should detect deadlock due to blocking I/O ### What changes were proposed in this pull request? When calling a Python UDF on a DataFrame with large rows, a deadlock can occur involving the following three threads: 1. The Scala task executor thread. During task execution, this is responsible for reading output produced by the Python process. However, in this case the task has finished early, and this thread is no longer reading output produced by the Python process. Instead, it is waiting for the Scala WriterThread to exit so that it can finish the task. 2. The Scala WriterThread. This is trying to send a large row to the Python process, and is waiting for the Python process to read that row. 3. The Python process. This is trying to send a large output to the Scala task executor thread, and is waiting for that thread to read that output, which will never happen. We considered the following three solutions for the deadlock: 1. When the task completes, make the Scala task executor thread close the socket before waiting for the Scala WriterThread to exit. If the WriterThread is blocked on a large write, this would interrupt that write and allow the WriterThread to exit. However, it would prevent Python worker reuse. 2. Modify PythonWorkerFactory to use interruptible I/O. [java.nio.channels.SocketChannel](https://docs.oracle.com/javase/6/docs/api/java/nio/channels/SocketChannel.html#write(java.nio.ByteBuffer)) supports interruptible blocking operations. The goal is that when the WriterThread is interrupted, it should exit even if it was blocked on a large write. However, this would be invasive. 3. Add a watchdog thread similar to the existing PythonRunner.MonitorThread to detect this deadlock and kill the Python worker. The MonitorThread currently kills the Python worker only if the task itself is interrupted. In this case, the task completes normally, so the MonitorThread does not take action. We want the new watchdog thread (WriterMonitorThread) to detect that the task is completed but the Python writer thread has not stopped, indicating a deadlock. This PR implements Option 3. ### Why are the changes needed? To fix a deadlock that can cause PySpark queries to hang. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added a test that previously encountered the deadlock and timed out, and now succeeds. Closes #36065 from ankurdave/SPARK-38677. Authored-by: Ankur Dave <ankurd...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../org/apache/spark/api/python/PythonRunner.scala | 49 ++++++++++++++++++++++ python/pyspark/tests/test_rdd.py | 35 ++++++++++++++++ 2 files changed, 84 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 6a4871ba269..15707ab9157 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -183,6 +183,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( } writerThread.start() + new WriterMonitorThread(SparkEnv.get, worker, writerThread, context).start() if (reuseWorker) { val key = (worker, context.taskAttemptId) // SPARK-35009: avoid creating multiple monitor threads for the same python worker @@ -646,6 +647,54 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( } } } + + /** + * This thread monitors the WriterThread and kills it in case of deadlock. + * + * A deadlock can arise if the task completes while the writer thread is sending input to the + * Python process (e.g. due to the use of `take()`), and the Python process is still producing + * output. When the inputs are sufficiently large, this can result in a deadlock due to the use of + * blocking I/O (SPARK-38677). To resolve the deadlock, we need to close the socket. + */ + class WriterMonitorThread( + env: SparkEnv, worker: Socket, writerThread: WriterThread, context: TaskContext) + extends Thread(s"Writer Monitor for $pythonExec (writer thread id ${writerThread.getId})") { + + /** + * How long to wait before closing the socket if the writer thread has not exited after the task + * ends. + */ + private val taskKillTimeout = env.conf.get(PYTHON_TASK_KILL_TIMEOUT) + + setDaemon(true) + + override def run(): Unit = { + // Wait until the task is completed (or the writer thread exits, in which case this thread has + // nothing to do). + while (!context.isCompleted && writerThread.isAlive) { + Thread.sleep(2000) + } + if (writerThread.isAlive) { + Thread.sleep(taskKillTimeout) + // If the writer thread continues running, this indicates a deadlock. Kill the worker to + // resolve the deadlock. + if (writerThread.isAlive) { + try { + // Mimic the task name used in `Executor` to help the user find out the task to blame. + val taskName = s"${context.partitionId}.${context.attemptNumber} " + + s"in stage ${context.stageId} (TID ${context.taskAttemptId})" + logWarning( + s"Detected deadlock while completing task $taskName: " + + "Attempting to kill Python Worker") + env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker) + } catch { + case e: Exception => + logError("Exception when trying to kill worker", e) + } + } + } + } + } } private[spark] object PythonRunner { diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py index bf066e80b6b..d5d6cdbae8a 100644 --- a/python/pyspark/tests/test_rdd.py +++ b/python/pyspark/tests/test_rdd.py @@ -34,6 +34,7 @@ from pyspark.serializers import ( UTF8Deserializer, NoOpSerializer, ) +from pyspark.sql import SparkSession from pyspark.testing.utils import ReusedPySparkTestCase, SPARK_HOME, QuietTest @@ -697,6 +698,40 @@ class RDDTests(ReusedPySparkTestCase): rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x)) rdd._jrdd.first() + def test_take_on_jrdd_with_large_rows_should_not_cause_deadlock(self): + # Regression test for SPARK-38677. + # + # Create a DataFrame with many columns, call a Python function on each row, and take only + # the first result row. + # + # This produces large rows that trigger a deadlock involving the following three threads: + # + # 1. The Scala task executor thread. During task execution, this is responsible for reading + # output produced by the Python process. However, in this case the task has finished + # early, and this thread is no longer reading output produced by the Python process. + # Instead, it is waiting for the Scala WriterThread to exit so that it can finish the + # task. + # + # 2. The Scala WriterThread. This is trying to send a large row to the Python process, and + # is waiting for the Python process to read that row. + # + # 3. The Python process. This is trying to send a large output to the Scala task executor + # thread, and is waiting for that thread to read that output. + # + # For this test to succeed rather than hanging, the Scala MonitorThread must detect this + # deadlock and kill the Python worker. + import numpy as np + import pandas as pd + + num_rows = 100000 + num_columns = 134 + data = np.zeros((num_rows, num_columns)) + columns = map(str, range(num_columns)) + df = SparkSession(self.sc).createDataFrame(pd.DataFrame(data, columns=columns)) + actual = CPickleSerializer().loads(df.rdd.map(list)._jrdd.first()) + expected = [list(data[0])] + self.assertEqual(expected, actual) + def test_sortByKey_uses_all_partitions_not_only_first_and_last(self): # Regression test for SPARK-5969 seq = [(i * 59 % 101, i) for i in range(101)] # unsorted sequence --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org