This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 705bb2ae2c [KYUUBI #6583] Support to cancel Spark python operation
705bb2ae2c is described below
commit 705bb2ae2c09b0add15172ab567a613a7da5c2f1
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Aug 20 09:50:10 2024 -0700
[KYUUBI #6583] Support to cancel Spark python operation
# :mag: Description
## Issue References ๐
This pull request fixes #6583
## Background and Goals
Currently, kyuubi cannot perform operation level interrupts when executing
Python code. When it is necessary to cancel an operation that has been running
for a long time, the entire session needs to be interrupted, and the execution
context will be lost, which is very unfriendly to users. Therefore, it is
necessary to support operation level interrupts so that the execution context
is not lost when the user terminal operates.
## Describe Your Solution ๐ง
Refer to the implementation of Jupyter Notebook and let the Python process
listen to Signel SIGINT semaphore, when receiving a signel When SIGINT,
interrupt the current executing code and capture KeyboardInterrupt to treat it
as cancelled
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6612 from yoock/features/support-operation-cancel.
Closes #6583
bf6334d8c [Wang, Fei] log error to do not break the cleanup process
ae7ad3f3c [Wang, Fei] comments
509627e65 [็้พ] PySpark support operation cancel
Lead-authored-by: Wang, Fei <[email protected]>
Co-authored-by: ็้พ <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../src/main/resources/python/execute_python.py | 91 +++++++++++++---------
.../engine/spark/operation/ExecutePython.scala | 25 ++++++
.../apache/kyuubi/engine/spark/PySparkTests.scala | 35 +++++++++
3 files changed, 113 insertions(+), 38 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py
b/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py
index f33c10c400..3e7f3caa25 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py
+++
b/externals/kyuubi-spark-sql-engine/src/main/resources/python/execute_python.py
@@ -471,53 +471,68 @@ def main():
try:
while True:
- line = sys_stdin.readline()
-
- if line == "":
- break
- elif line == "\n":
- continue
-
- try:
- content = json.loads(line)
- except ValueError:
- continue
-
- if content["cmd"] == "exit_worker":
- break
-
- result = execute_request(content)
-
try:
- result = json.dumps(result)
- except ValueError:
+ line = sys_stdin.readline()
+
+ if line == "":
+ break
+ elif line == "\n":
+ continue
+
+ try:
+ content = json.loads(line)
+ except ValueError:
+ continue
+
+ if content["cmd"] == "exit_worker":
+ break
+
+ result = execute_request(content)
+
+ try:
+ result = json.dumps(result)
+ except ValueError:
+ result = json.dumps(
+ {
+ "msg_type": "inspect_reply",
+ "content": {
+ "status": "error",
+ "ename": "ValueError",
+ "evalue": "cannot json-ify %s" % result,
+ "traceback": [],
+ },
+ }
+ )
+ except Exception:
+ exc_type, exc_value, tb = sys.exc_info()
+ result = json.dumps(
+ {
+ "msg_type": "inspect_reply",
+ "content": {
+ "status": "error",
+ "ename": str(exc_type.__name__),
+ "evalue": "cannot json-ify %s: %s"
+ % (result, str(exc_value)),
+ "traceback": [],
+ },
+ }
+ )
+
+ print(result, file=sys_stdout)
+ except KeyboardInterrupt:
result = json.dumps(
{
"msg_type": "inspect_reply",
"content": {
- "status": "error",
- "ename": "ValueError",
- "evalue": "cannot json-ify %s" % result,
+ "status": "canceled",
+ "ename": "KeyboardInterrupt",
+ "evalue": "execution interrupted by user",
"traceback": [],
},
}
)
- except Exception:
- exc_type, exc_value, tb = sys.exc_info()
- result = json.dumps(
- {
- "msg_type": "inspect_reply",
- "content": {
- "status": "error",
- "ename": str(exc_type.__name__),
- "evalue": "cannot json-ify %s: %s"
- % (result, str(exc_value)),
- "traceback": [],
- },
- }
- )
-
- print(result, file=sys_stdout)
+ print(result, file=sys_stdout)
+ print("execution interrupted by user: " + line,
file=sys_stderr)
sys_stdout.flush()
clearOutputs()
finally:
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index e350232c87..d58a22e45a 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -40,8 +40,10 @@ import
org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYU
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.util.JsonUtils
import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationHandle,
OperationState}
+import org.apache.kyuubi.operation.OperationState.OperationState
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.reflect.DynFields
class ExecutePython(
session: Session,
@@ -171,6 +173,14 @@ class ExecutePython(
}
}
}
+
+ override def cleanup(targetState: OperationState): Unit = {
+ if (!isTerminalState(state)) {
+ info(s"Staring to cancel python code: $statement")
+ worker.interrupt()
+ }
+ super.cleanup(targetState)
+ }
}
case class SessionPythonWorker(
@@ -225,6 +235,21 @@ case class SessionPythonWorker(
pythonWorkerMonitor.interrupt()
workerProcess.destroy()
}
+
+ def interrupt(): Unit = {
+ val pid = DynFields.builder()
+ .hiddenImpl(workerProcess.getClass, "pid")
+ .build[java.lang.Integer](workerProcess)
+ .get()
+ // sends a SIGINT (interrupt) signal, similar to Ctrl-C
+ val builder = new ProcessBuilder(Seq("kill", "-2", pid.toString).asJava)
+ val process = builder.start()
+ val exitCode = process.waitFor()
+ process.destroy()
+ if (exitCode != 0) {
+ error(s"Process `${builder.command().asScala.mkString(" ")}` exit with
value: $exitCode")
+ }
+ }
}
object ExecutePython extends Logging {
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/PySparkTests.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/PySparkTests.scala
index 4358728762..6ec0b87f1d 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/PySparkTests.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/PySparkTests.scala
@@ -207,6 +207,41 @@ class PySparkTests extends WithKyuubiServer with
HiveJDBCTestHelper {
}
}
+ test("Support to cancel Spark python operation") {
+ checkPythonRuntimeAndVersion()
+ withMultipleConnectionJdbcStatement()({ stmt =>
+ val statement = stmt.asInstanceOf[KyuubiStatement]
+ statement.executeQuery("SET kyuubi.operation.language=PYTHON")
+ val code1 =
+ """
+ |i = 0
+ |i
+ |""".stripMargin
+ val resultSet1 = statement.executeQuery(code1)
+ assert(resultSet1.next())
+ assert(resultSet1.getString("status") === "ok")
+ assert(resultSet1.getString("output") === "0")
+ val code2 =
+ """
+ |import time
+ |while True:
+ | i +=1
+ | time.sleep(1)
+ |""".stripMargin
+ statement.executeAsync(code2)
+ statement.cancel()
+
+ val code3 =
+ """
+ |i
+ |""".stripMargin
+ val resultSet3 = statement.executeQuery(code3)
+ assert(resultSet3.next())
+ assert(resultSet3.getString("status") === "ok")
+ assert(resultSet3.getString("output").toInt > 0)
+ })
+ }
+
private def runPySparkTest(
pyCode: String,
output: String): Unit = {