This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 088e05d2518 [SPARK-38677][PYSPARK] Python MonitorThread should detect 
deadlock due to blocking I/O
088e05d2518 is described below

commit 088e05d2518883aa27d0b8144107e45f41dd6b90
Author: Ankur Dave <ankurd...@gmail.com>
AuthorDate: Tue Apr 12 12:01:19 2022 +0900

    [SPARK-38677][PYSPARK] Python MonitorThread should detect deadlock due to 
blocking I/O
    
    ### What changes were proposed in this pull request?
    When calling a Python UDF on a DataFrame with large rows, a deadlock can 
occur involving the following three threads:
    
    1. The Scala task executor thread. During task execution, this is 
responsible for reading output produced by the Python process. However, in this 
case the task has finished early, and this thread is no longer reading output 
produced by the Python process. Instead, it is waiting for the Scala 
WriterThread to exit so that it can finish the task.
    2. The Scala WriterThread. This is trying to send a large row to the Python 
process, and is waiting for the Python process to read that row.
    3. The Python process. This is trying to send a large output to the Scala 
task executor thread, and is waiting for that thread to read that output, which 
will never happen.
    
    We considered the following three solutions for the deadlock:
    
    1. When the task completes, make the Scala task executor thread close the 
socket before waiting for the Scala WriterThread to exit. If the WriterThread 
is blocked on a large write, this would interrupt that write and allow the 
WriterThread to exit. However, it would prevent Python worker reuse.
    2. Modify PythonWorkerFactory to use interruptible I/O. 
[java.nio.channels.SocketChannel](https://docs.oracle.com/javase/6/docs/api/java/nio/channels/SocketChannel.html#write(java.nio.ByteBuffer))
 supports interruptible blocking operations. The goal is that when the 
WriterThread is interrupted, it should exit even if it was blocked on a large 
write. However, this would be invasive.
    3. Add a watchdog thread similar to the existing PythonRunner.MonitorThread 
to detect this deadlock and kill the Python worker. The MonitorThread currently 
kills the Python worker only if the task itself is interrupted. In this case, 
the task completes normally, so the MonitorThread does not take action. We want 
the new watchdog thread (WriterMonitorThread) to detect that the task is 
completed but the Python writer thread has not stopped, indicating a deadlock.
    
    This PR implements Option 3.
    
    ### Why are the changes needed?
    To fix a deadlock that can cause PySpark queries to hang.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added a test that previously encountered the deadlock and timed out, and 
now succeeds.
    
    Closes #36065 from ankurdave/SPARK-38677.
    
    Authored-by: Ankur Dave <ankurd...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../org/apache/spark/api/python/PythonRunner.scala | 49 ++++++++++++++++++++++
 python/pyspark/tests/test_rdd.py                   | 35 ++++++++++++++++
 2 files changed, 84 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 6a4871ba269..15707ab9157 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -183,6 +183,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     }
 
     writerThread.start()
+    new WriterMonitorThread(SparkEnv.get, worker, writerThread, 
context).start()
     if (reuseWorker) {
       val key = (worker, context.taskAttemptId)
       // SPARK-35009: avoid creating multiple monitor threads for the same 
python worker
@@ -646,6 +647,54 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
       }
     }
   }
+
+  /**
+   * This thread monitors the WriterThread and kills it in case of deadlock.
+   *
+   * A deadlock can arise if the task completes while the writer thread is 
sending input to the
+   * Python process (e.g. due to the use of `take()`), and the Python process 
is still producing
+   * output. When the inputs are sufficiently large, this can result in a 
deadlock due to the use of
+   * blocking I/O (SPARK-38677). To resolve the deadlock, we need to close the 
socket.
+   */
+  class WriterMonitorThread(
+      env: SparkEnv, worker: Socket, writerThread: WriterThread, context: 
TaskContext)
+    extends Thread(s"Writer Monitor for $pythonExec (writer thread id 
${writerThread.getId})") {
+
+    /**
+     * How long to wait before closing the socket if the writer thread has not 
exited after the task
+     * ends.
+     */
+    private val taskKillTimeout = env.conf.get(PYTHON_TASK_KILL_TIMEOUT)
+
+    setDaemon(true)
+
+    override def run(): Unit = {
+      // Wait until the task is completed (or the writer thread exits, in 
which case this thread has
+      // nothing to do).
+      while (!context.isCompleted && writerThread.isAlive) {
+        Thread.sleep(2000)
+      }
+      if (writerThread.isAlive) {
+        Thread.sleep(taskKillTimeout)
+        // If the writer thread continues running, this indicates a deadlock. 
Kill the worker to
+        // resolve the deadlock.
+        if (writerThread.isAlive) {
+          try {
+            // Mimic the task name used in `Executor` to help the user find 
out the task to blame.
+            val taskName = s"${context.partitionId}.${context.attemptNumber} " 
+
+              s"in stage ${context.stageId} (TID ${context.taskAttemptId})"
+            logWarning(
+              s"Detected deadlock while completing task $taskName: " +
+                "Attempting to kill Python Worker")
+            env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker)
+          } catch {
+            case e: Exception =>
+              logError("Exception when trying to kill worker", e)
+          }
+        }
+      }
+    }
+  }
 }
 
 private[spark] object PythonRunner {
diff --git a/python/pyspark/tests/test_rdd.py b/python/pyspark/tests/test_rdd.py
index bf066e80b6b..d5d6cdbae8a 100644
--- a/python/pyspark/tests/test_rdd.py
+++ b/python/pyspark/tests/test_rdd.py
@@ -34,6 +34,7 @@ from pyspark.serializers import (
     UTF8Deserializer,
     NoOpSerializer,
 )
+from pyspark.sql import SparkSession
 from pyspark.testing.utils import ReusedPySparkTestCase, SPARK_HOME, QuietTest
 
 
@@ -697,6 +698,40 @@ class RDDTests(ReusedPySparkTestCase):
         rdd = self.sc.parallelize(range(1 << 20)).map(lambda x: str(x))
         rdd._jrdd.first()
 
+    def test_take_on_jrdd_with_large_rows_should_not_cause_deadlock(self):
+        # Regression test for SPARK-38677.
+        #
+        # Create a DataFrame with many columns, call a Python function on each 
row, and take only
+        # the first result row.
+        #
+        # This produces large rows that trigger a deadlock involving the 
following three threads:
+        #
+        # 1. The Scala task executor thread. During task execution, this is 
responsible for reading
+        #    output produced by the Python process. However, in this case the 
task has finished
+        #    early, and this thread is no longer reading output produced by 
the Python process.
+        #    Instead, it is waiting for the Scala WriterThread to exit so that 
it can finish the
+        #    task.
+        #
+        # 2. The Scala WriterThread. This is trying to send a large row to the 
Python process, and
+        #    is waiting for the Python process to read that row.
+        #
+        # 3. The Python process. This is trying to send a large output to the 
Scala task executor
+        #    thread, and is waiting for that thread to read that output.
+        #
+        # For this test to succeed rather than hanging, the Scala 
MonitorThread must detect this
+        # deadlock and kill the Python worker.
+        import numpy as np
+        import pandas as pd
+
+        num_rows = 100000
+        num_columns = 134
+        data = np.zeros((num_rows, num_columns))
+        columns = map(str, range(num_columns))
+        df = SparkSession(self.sc).createDataFrame(pd.DataFrame(data, 
columns=columns))
+        actual = CPickleSerializer().loads(df.rdd.map(list)._jrdd.first())
+        expected = [list(data[0])]
+        self.assertEqual(expected, actual)
+
     def test_sortByKey_uses_all_partitions_not_only_first_and_last(self):
         # Regression test for SPARK-5969
         seq = [(i * 59 % 101, i) for i in range(101)]  # unsorted sequence


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to