This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new 275079e1de [KYUUBI #6583] Support to cancel Spark python operation
275079e1de is described below

commit 275079e1de449e55a5c9db1831b741690da69968
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Aug 20 09:50:10 2024 -0700

    [KYUUBI #6583] Support to cancel Spark python operation
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6583
    
    ## Background and Goals
    Currently, kyuubi cannot perform operation level interrupts when executing 
Python code. When it is necessary to cancel an operation that has been running 
for a long time, the entire session needs to be interrupted, and the execution 
context will be lost, which is very unfriendly to users. Therefore, it is 
necessary to support operation level interrupts so that the execution context 
is not lost when the user terminal operates.
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Refer to the implementation of Jupyter Notebook and let the Python process 
listen to Signel SIGINT semaphore, when receiving a signel When SIGINT, 
interrupt the current executing code and capture KeyboardInterrupt to treat it 
as cancelled
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [x] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6612 from yoock/features/support-operation-cancel.
    
    Closes #6583
    
    bf6334d8c [Wang, Fei] log error to do not break the cleanup process
    ae7ad3f3c [Wang, Fei] comments
    509627e65 [็Ž‹้พ™] PySpark support operation cancel
    
    Lead-authored-by: Wang, Fei <[email protected]>
    Co-authored-by: ็Ž‹้พ™ <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit 705bb2ae2c09b0add15172ab567a613a7da5c2f1)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../src/main/resources/python/execute_python.py    | 91 +++++++++++++---------
 .../engine/spark/operation/ExecutePython.scala     | 25 ++++++
 .../apache/kyuubi/engine/spark/PySparkTests.scala  | 35 +++++++++
 3 files changed, 113 insertions(+), 38 deletions(-)

diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py 
b/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py
index f33c10c400..3e7f3caa25 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py
@@ -471,53 +471,68 @@ def main():
     try:
 
         while True:
-            line = sys_stdin.readline()
-
-            if line == "":
-                break
-            elif line == "\n":
-                continue
-
-            try:
-                content = json.loads(line)
-            except ValueError:
-                continue
-
-            if content["cmd"] == "exit_worker":
-                break
-
-            result = execute_request(content)
-
             try:
-                result = json.dumps(result)
-            except ValueError:
+                line = sys_stdin.readline()
+
+                if line == "":
+                    break
+                elif line == "\n":
+                    continue
+
+                try:
+                    content = json.loads(line)
+                except ValueError:
+                    continue
+
+                if content["cmd"] == "exit_worker":
+                    break
+
+                result = execute_request(content)
+
+                try:
+                    result = json.dumps(result)
+                except ValueError:
+                    result = json.dumps(
+                        {
+                            "msg_type": "inspect_reply",
+                            "content": {
+                                "status": "error",
+                                "ename": "ValueError",
+                                "evalue": "cannot json-ify %s" % result,
+                                "traceback": [],
+                            },
+                        }
+                    )
+                except Exception:
+                    exc_type, exc_value, tb = sys.exc_info()
+                    result = json.dumps(
+                        {
+                            "msg_type": "inspect_reply",
+                            "content": {
+                                "status": "error",
+                                "ename": str(exc_type.__name__),
+                                "evalue": "cannot json-ify %s: %s"
+                                % (result, str(exc_value)),
+                                "traceback": [],
+                            },
+                        }
+                    )
+
+                print(result, file=sys_stdout)
+            except KeyboardInterrupt:
                 result = json.dumps(
                     {
                         "msg_type": "inspect_reply",
                         "content": {
-                            "status": "error",
-                            "ename": "ValueError",
-                            "evalue": "cannot json-ify %s" % result,
+                            "status": "canceled",
+                            "ename": "KeyboardInterrupt",
+                            "evalue": "execution interrupted by user",
                             "traceback": [],
                         },
                     }
                 )
-            except Exception:
-                exc_type, exc_value, tb = sys.exc_info()
-                result = json.dumps(
-                    {
-                        "msg_type": "inspect_reply",
-                        "content": {
-                            "status": "error",
-                            "ename": str(exc_type.__name__),
-                            "evalue": "cannot json-ify %s: %s"
-                            % (result, str(exc_value)),
-                            "traceback": [],
-                        },
-                    }
-                )
-
-            print(result, file=sys_stdout)
+                print(result, file=sys_stdout)
+                print("execution interrupted by user: " + line, 
file=sys_stderr)
             sys_stdout.flush()
             clearOutputs()
     finally:
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index e350232c87..d58a22e45a 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -40,8 +40,10 @@ import 
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYU
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
 import org.apache.kyuubi.engine.spark.util.JsonUtils
 import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationHandle, 
OperationState}
+import org.apache.kyuubi.operation.OperationState.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.reflect.DynFields
 
 class ExecutePython(
     session: Session,
@@ -171,6 +173,14 @@ class ExecutePython(
       }
     }
   }
+
+  override def cleanup(targetState: OperationState): Unit = {
+    if (!isTerminalState(state)) {
+      info(s"Staring to cancel python code: $statement")
+      worker.interrupt()
+    }
+    super.cleanup(targetState)
+  }
 }
 
 case class SessionPythonWorker(
@@ -225,6 +235,21 @@ case class SessionPythonWorker(
     pythonWorkerMonitor.interrupt()
     workerProcess.destroy()
   }
+
+  def interrupt(): Unit = {
+    val pid = DynFields.builder()
+      .hiddenImpl(workerProcess.getClass, "pid")
+      .build[java.lang.Integer](workerProcess)
+      .get()
+    // sends a SIGINT (interrupt) signal, similar to Ctrl-C
+    val builder = new ProcessBuilder(Seq("kill", "-2", pid.toString).asJava)
+    val process = builder.start()
+    val exitCode = process.waitFor()
+    process.destroy()
+    if (exitCode != 0) {
+      error(s"Process `${builder.command().asScala.mkString(" ")}` exit with 
value: $exitCode")
+    }
+  }
 }
 
 object ExecutePython extends Logging {
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/PySparkTests.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/PySparkTests.scala
index 4358728762..6ec0b87f1d 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/PySparkTests.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/PySparkTests.scala
@@ -207,6 +207,41 @@ class PySparkTests extends WithKyuubiServer with 
HiveJDBCTestHelper {
     }
   }
 
+  test("Support to cancel Spark python operation") {
+    checkPythonRuntimeAndVersion()
+    withMultipleConnectionJdbcStatement()({ stmt =>
+      val statement = stmt.asInstanceOf[KyuubiStatement]
+      statement.executeQuery("SET kyuubi.operation.language=PYTHON")
+      val code1 =
+        """
+          |i = 0
+          |i
+          |""".stripMargin
+      val resultSet1 = statement.executeQuery(code1)
+      assert(resultSet1.next())
+      assert(resultSet1.getString("status") === "ok")
+      assert(resultSet1.getString("output") === "0")
+      val code2 =
+        """
+          |import time
+          |while True:
+          |   i +=1
+          |   time.sleep(1)
+          |""".stripMargin
+      statement.executeAsync(code2)
+      statement.cancel()
+
+      val code3 =
+        """
+          |i
+          |""".stripMargin
+      val resultSet3 = statement.executeQuery(code3)
+      assert(resultSet3.next())
+      assert(resultSet3.getString("status") === "ok")
+      assert(resultSet3.getString("output").toInt > 0)
+    })
+  }
+
   private def runPySparkTest(
       pyCode: String,
       output: String): Unit = {

Reply via email to