[ https://issues.apache.org/jira/browse/SPARK-47740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Maxim Martynov updated SPARK-47740: ----------------------------------- Description: {code:python} from pyspark.sql import SparkSession spark = SparkSession.builder.config("spark.driver.memory", "2g").config("spark.another.property", "hi").getOrCreate() spark.conf.get("spark.driver.memory") # prints '2g' spark.conf.get("spark.another.property") # prints 'hi' # check for JVM process before and after stop spark.stop() # try to recreate Spark session with different driver memory, and no "spark.another.property" at all spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate() spark.conf.get("spark.driver.memory") # prints '3g' spark.conf.get("spark.another.property") # prints 'hi' # check for JVM process {code} After first call of {{.getOrCreate()}} JVM process with following options have been started: {code} maxim 76625 10.9 2.7 7975292 447068 pts/8 Sl+ 16:59 0:11 /home/maxim/.sdkman/candidates/java/current/bin/java -cp /home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/* -Xmx2g -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit --conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell {code} But it have not been stopped by {{.stop()}} method call. New Spark session which reports 3Gb of driver memory actually uses the same 2Gb, and config options from old session which should be closed at this moment. Summarizing: * {{spark.stop()}} stops only SparkContext, but not JVM * {{spark.stop()}} does not clean up session config This behavior has been observed for a long time, at least since 2.2.0, and still present in 3.5.1. This could be solved by stopping JVM after Spark session is stopped. But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket will be closed, and it will be impossible to start new Spark session: {code:python} spark._jvm.System.exit(0) {code} {code} ERROR:root:Exception while sending command. Traceback (most recent call last): File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py", line 516, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command response = connection.send_command(command) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py", line 539, in send_command raise Py4JNetworkError( py4j.protocol.Py4JNetworkError: Error while sending or receiving Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__ return_value = get_return_value( ^^^^^^^^^^^^^^^^^ File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco return f(*a, **kw) ^^^^^^^^^^^ File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py", line 334, in get_return_value raise Py4JError( py4j.protocol.Py4JError: An error occurred while calling z:java.lang.System.exit {code} Here is an article describing the same issue, and also providing a safe way to stop JVM without breaking py4j (in Russian): https://habr.com/ru/companies/sberbank/articles/805285/ {code:python} from pyspark.sql import SparkSession from pyspark import SparkContext from threading import RLock from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART from pyspark.sql import SparkSession from pyspark import SparkContext from threading import RLock from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART def stop_spark_session(spark_session: SparkSession): # stop spark session as usual spark.stop() # build command to stop JVM jvm_sys_exit_obj = spark._jvm.System.exit jvm_sys_exit_command = jvm_sys_exit_obj.command_header jvm_sys_exit_args_command = jvm_sys_exit_obj._build_args(0)[0] command_to_exit_driver_jvm_process = CALL_COMMAND_NAME + jvm_sys_exit_command + jvm_sys_exit_args_command + END_COMMAND_PART gateway_connection = jvm_sys_exit_obj.gateway_client._get_connection() gateway_socket = gateway_connection.socket # execute it gateway_socket.sendall(command_to_exit_driver_jvm_process.encode("utf-8")) # close gateway sockets on Python side SparkContext._gateway.shutdown() # reset globals to initial state SparkContext._next_accum_id = 0 SparkContext._active_spark_context = None SparkContext._lock = RLock() SparkContext._jvm = None SparkContext._gateway = None SparkContext._python_includes = None {code} Calling {{stop_spark_session(spark)}} stops JVM, and allows users to create fresh new Spark session using PySpark. No config pollution, no weird semi-closed global contexts. Can something like this be added to {{SparkSession.stop()}} implementation? Only if py4j is used to start up the session, of course. was: {code:python} from pyspark.sql import SparkSession spark = SparkSession.builder.config("spark.driver.memory", "2g").config("spark.another.property", "hi").getOrCreate() spark.conf.get("spark.driver.memory") # prints '2g' spark.conf.get("spark.another.property") # prints 'hi' # check for JVM process before and after stop spark.stop() # try to recreate Spark session with different driver memory, and no "spark.another.property" at all spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate() spark.conf.get("spark.driver.memory") # prints '3g' spark.conf.get("spark.another.property") # prints 'hi' # check for JVM process {code} After first call of {{.getOrCreate()}} JVM process with following options have been started: {code} maxim 76625 10.9 2.7 7975292 447068 pts/8 Sl+ 16:59 0:11 /home/maxim/.sdkman/candidates/java/current/bin/java -cp /home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/* -Xmx2g -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit --conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell {code} But it have not been stopped by {{.stop()}} method call. New Spark session which reports 3Gb of driver memory actually uses the same 2Gb, and config options from old session which should be closed at this moment. Summarizing: * {{spark.stop()}} stops only SparkContext, but not JVM * {{spark.stop()}} does not clean up session config This behavior has been observed for a long time, at least since 2.2.0, and still present in 3.5.1. This could be solved by stopping JVM after Spark session is stopped. But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket will be closed, and it will be impossible to start new Spark session: {code:python} spark._jvm.System.exit(0) {code} {code} ERROR:root:Exception while sending command. Traceback (most recent call last): File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py", line 516, in send_command raise Py4JNetworkError("Answer from Java side is empty") py4j.protocol.Py4JNetworkError: Answer from Java side is empty During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command response = connection.send_command(command) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py", line 539, in send_command raise Py4JNetworkError( py4j.protocol.Py4JNetworkError: Error while sending or receiving Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py", line 1322, in __call__ return_value = get_return_value( ^^^^^^^^^^^^^^^^^ File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco return f(*a, **kw) ^^^^^^^^^^^ File "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py", line 334, in get_return_value raise Py4JError( py4j.protocol.Py4JError: An error occurred while calling z:java.lang.System.exit {code} Here is an article describing the same issue, and also providing a safe way to stop JVM without breaking py4j (in Russian): https://habr.com/ru/companies/sberbank/articles/805285/ {code:python} from pyspark.sql import SparkSession from pyspark import SparkContext from threading import RLock from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART from pyspark.sql import SparkSession from pyspark import SparkContext from threading import RLock from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART def stop_spark_session(spark_session: SparkSession): # stop spark session as usual spark.stop() # build command to stop JVM jvm_sys_exit_obj = spark._jvm.System.exit jvm_sys_exit_command = jvm_sys_exit_obj.command_header jvm_sys_exit_args_command = jvm_sys_exit_obj._build_args(0)[0] command_to_exit_driver_jvm_process = CALL_COMMAND_NAME + jvm_sys_exit_command + jvm_sys_exit_args_command + END_COMMAND_PART gateway_connection = jvm_sys_exit_obj.gateway_client._get_connection() gateway_socket = gateway_connection.socket # execute it gateway_socket.sendall(command_to_exit_driver_jvm_process.encode("utf-8")) # close gateway sockets on Python side SparkContext._gateway.shutdown() # reset globals to initial state SparkContext._next_accum_id = 0 SparkContext._active_spark_context = None SparkContext._lock = RLock() SparkContext._jvm = None SparkContext._gateway = None SparkContext._python_includes = None {code} Calling {{stop_spark_session(spark)}} stops JVM and allows users to create fresh new Spark session using PySpark. No config pollution. Can something like this be added to {{SparkSession.stop()}} implementation? Only if py4j is used to start up the session, of course. > Stop JVM by calling SparkSession.stop from PySpark > -------------------------------------------------- > > Key: SPARK-47740 > URL: https://issues.apache.org/jira/browse/SPARK-47740 > Project: Spark > Issue Type: Improvement > Components: PySpark > Affects Versions: 3.5.1 > Reporter: Maxim Martynov > Priority: Major > > {code:python} > from pyspark.sql import SparkSession > spark = SparkSession.builder.config("spark.driver.memory", > "2g").config("spark.another.property", "hi").getOrCreate() > spark.conf.get("spark.driver.memory") > # prints '2g' > spark.conf.get("spark.another.property") > # prints 'hi' > # check for JVM process before and after stop > spark.stop() > # try to recreate Spark session with different driver memory, and no > "spark.another.property" at all > spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate() > spark.conf.get("spark.driver.memory") > # prints '3g' > spark.conf.get("spark.another.property") > # prints 'hi' > # check for JVM process > {code} > After first call of {{.getOrCreate()}} JVM process with following options > have been started: > {code} > maxim 76625 10.9 2.7 7975292 447068 pts/8 Sl+ 16:59 0:11 > /home/maxim/.sdkman/candidates/java/current/bin/java -cp > /home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/* > -Xmx2g -XX:+IgnoreUnrecognizedVMOptions > --add-opens=java.base/java.lang=ALL-UNNAMED > --add-opens=java.base/java.lang.invoke=ALL-UNNAMED > --add-opens=java.base/java.lang.reflect=ALL-UNNAMED > --add-opens=java.base/java.io=ALL-UNNAMED > --add-opens=java.base/java.net=ALL-UNNAMED > --add-opens=java.base/java.nio=ALL-UNNAMED > --add-opens=java.base/java.util=ALL-UNNAMED > --add-opens=java.base/java.util.concurrent=ALL-UNNAMED > --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED > --add-opens=java.base/sun.nio.ch=ALL-UNNAMED > --add-opens=java.base/sun.nio.cs=ALL-UNNAMED > --add-opens=java.base/sun.security.action=ALL-UNNAMED > --add-opens=java.base/sun.util.calendar=ALL-UNNAMED > --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED > -Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit > --conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell > {code} > But it have not been stopped by {{.stop()}} method call. New Spark session > which reports 3Gb of driver memory actually uses the same 2Gb, and config > options from old session which should be closed at this moment. > Summarizing: > * {{spark.stop()}} stops only SparkContext, but not JVM > * {{spark.stop()}} does not clean up session config > This behavior has been observed for a long time, at least since 2.2.0, and > still present in 3.5.1. > This could be solved by stopping JVM after Spark session is stopped. > But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket > will be closed, and it will be impossible to start new Spark session: > {code:python} > spark._jvm.System.exit(0) > {code} > {code} > ERROR:root:Exception while sending command. > Traceback (most recent call last): > File > "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py", > line 516, in send_command > raise Py4JNetworkError("Answer from Java side is empty") > py4j.protocol.Py4JNetworkError: Answer from Java side is empty > During handling of the above exception, another exception occurred: > Traceback (most recent call last): > File > "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py", > line 1038, in send_command > response = connection.send_command(command) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File > "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py", > line 539, in send_command > raise Py4JNetworkError( > py4j.protocol.Py4JNetworkError: Error while sending or receiving > Traceback (most recent call last): > File "<stdin>", line 1, in <module> > File > "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py", > line 1322, in __call__ > return_value = get_return_value( > ^^^^^^^^^^^^^^^^^ > File > "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py", > line 179, in deco > return f(*a, **kw) > ^^^^^^^^^^^ > File > "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py", > line 334, in get_return_value > raise Py4JError( > py4j.protocol.Py4JError: An error occurred while calling > z:java.lang.System.exit > {code} > Here is an article describing the same issue, and also providing a safe way > to stop JVM without breaking py4j (in Russian): > https://habr.com/ru/companies/sberbank/articles/805285/ > {code:python} > from pyspark.sql import SparkSession > from pyspark import SparkContext > from threading import RLock > from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART > from pyspark.sql import SparkSession > from pyspark import SparkContext > from threading import RLock > from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART > def stop_spark_session(spark_session: SparkSession): > # stop spark session as usual > spark.stop() > # build command to stop JVM > jvm_sys_exit_obj = spark._jvm.System.exit > jvm_sys_exit_command = jvm_sys_exit_obj.command_header > jvm_sys_exit_args_command = jvm_sys_exit_obj._build_args(0)[0] > command_to_exit_driver_jvm_process = CALL_COMMAND_NAME + > jvm_sys_exit_command + jvm_sys_exit_args_command + END_COMMAND_PART > gateway_connection = jvm_sys_exit_obj.gateway_client._get_connection() > gateway_socket = gateway_connection.socket > # execute it > gateway_socket.sendall(command_to_exit_driver_jvm_process.encode("utf-8")) > # close gateway sockets on Python side > SparkContext._gateway.shutdown() > # reset globals to initial state > SparkContext._next_accum_id = 0 > SparkContext._active_spark_context = None > SparkContext._lock = RLock() > SparkContext._jvm = None > SparkContext._gateway = None > SparkContext._python_includes = None > {code} > Calling {{stop_spark_session(spark)}} stops JVM, and allows users to create > fresh new Spark session using PySpark. No config pollution, no weird > semi-closed global contexts. > Can something like this be added to {{SparkSession.stop()}} implementation? > Only if py4j is used to start up the session, of course. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org