[ 
https://issues.apache.org/jira/browse/SPARK-47740?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-47740:
-----------------------------------
    Description: 
{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.driver.memory", 
"2g").config("spark.another.property", "hi").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '2g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process before and after stop
spark.stop()

# try to recreate Spark session with different driver memory, and no 
"spark.another.property" at all
spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '3g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process
{code}

After first call of {{.getOrCreate()}} JVM process with following options have 
been started:
{code}
maxim      76625 10.9  2.7 7975292 447068 pts/8  Sl+  16:59   0:11 
/home/maxim/.sdkman/candidates/java/current/bin/java -cp 
/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/*
 -Xmx2g -XX:+IgnoreUnrecognizedVMOptions 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
--add-opens=java.base/sun.security.action=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
-Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit 
--conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell
{code}

But it have not been stopped by {{.stop()}} method call. New Spark session 
which reports 3Gb of driver memory actually uses the same 2Gb, and config 
options from old session which should be closed at this moment.

Summarizing:
* {{spark.stop()}} stops only SparkContext, but not JVM
* {{spark.stop()}} does not clean up session config

This behavior has been observed for a long time, at least since 2.2.0, and 
still present in 3.5.1.

This could be solved by stopping JVM after Spark session is stopped.
But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket 
will be closed, and it will be impossible to start new Spark session:
{code:python}
spark._jvm.System.exit(0)
{code}

{code}
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
 line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py",
 line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:java.lang.System.exit
{code}

Here is an article describing the same issue, and also providing a safe way to 
stop JVM without breaking py4j (in Russian):
https://habr.com/ru/companies/sberbank/articles/805285/

{code:python}
from pyspark.sql import SparkSession
from pyspark import SparkContext
from threading import RLock
from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART


from pyspark.sql import SparkSession
from pyspark import SparkContext
from threading import RLock
from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART

def stop_spark_session(spark_session: SparkSession):
    # stop spark session as usual
    spark.stop()
    # build command to stop JVM
    jvm_sys_exit_obj = spark._jvm.System.exit
    jvm_sys_exit_command = jvm_sys_exit_obj.command_header
    jvm_sys_exit_args_command = jvm_sys_exit_obj._build_args(0)[0]
    command_to_exit_driver_jvm_process = CALL_COMMAND_NAME + 
jvm_sys_exit_command + jvm_sys_exit_args_command + END_COMMAND_PART
    gateway_connection = jvm_sys_exit_obj.gateway_client._get_connection()
    gateway_socket = gateway_connection.socket
    # execute it
    gateway_socket.sendall(command_to_exit_driver_jvm_process.encode("utf-8"))
    # close gateway sockets on Python side
    SparkContext._gateway.shutdown()
    # reset globals to initial state
    SparkContext._next_accum_id = 0
    SparkContext._active_spark_context = None
    SparkContext._lock = RLock()
    SparkContext._jvm = None
    SparkContext._gateway = None
    SparkContext._python_includes = None
{code}

Calling {{stop_spark_session(spark)}} stops JVM, and allows users to create 
fresh new Spark session using PySpark. No config pollution, no weird 
semi-closed global contexts.

Can something like this be added to {{SparkSession.stop()}} implementation? 
Only if py4j is used to start up the session, of course.

  was:
{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.driver.memory", 
"2g").config("spark.another.property", "hi").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '2g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process before and after stop
spark.stop()

# try to recreate Spark session with different driver memory, and no 
"spark.another.property" at all
spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate()
spark.conf.get("spark.driver.memory")
# prints '3g'
spark.conf.get("spark.another.property")
# prints 'hi'

# check for JVM process
{code}

After first call of {{.getOrCreate()}} JVM process with following options have 
been started:
{code}
maxim      76625 10.9  2.7 7975292 447068 pts/8  Sl+  16:59   0:11 
/home/maxim/.sdkman/candidates/java/current/bin/java -cp 
/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/*
 -Xmx2g -XX:+IgnoreUnrecognizedVMOptions 
--add-opens=java.base/java.lang=ALL-UNNAMED 
--add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
--add-opens=java.base/java.io=ALL-UNNAMED 
--add-opens=java.base/java.net=ALL-UNNAMED 
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/java.util=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
--add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
--add-opens=java.base/sun.security.action=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
-Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit 
--conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell
{code}

But it have not been stopped by {{.stop()}} method call. New Spark session 
which reports 3Gb of driver memory actually uses the same 2Gb, and config 
options from old session which should be closed at this moment.

Summarizing:
* {{spark.stop()}} stops only SparkContext, but not JVM
* {{spark.stop()}} does not clean up session config

This behavior has been observed for a long time, at least since 2.2.0, and 
still present in 3.5.1.

This could be solved by stopping JVM after Spark session is stopped.
But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket 
will be closed, and it will be impossible to start new Spark session:
{code:python}
spark._jvm.System.exit(0)
{code}

{code}
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
 line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
 line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
 line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File 
"/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py",
 line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling z:java.lang.System.exit
{code}

Here is an article describing the same issue, and also providing a safe way to 
stop JVM without breaking py4j (in Russian):
https://habr.com/ru/companies/sberbank/articles/805285/

{code:python}
from pyspark.sql import SparkSession
from pyspark import SparkContext
from threading import RLock
from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART


from pyspark.sql import SparkSession
from pyspark import SparkContext
from threading import RLock
from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART

def stop_spark_session(spark_session: SparkSession):
    # stop spark session as usual
    spark.stop()
    # build command to stop JVM
    jvm_sys_exit_obj = spark._jvm.System.exit
    jvm_sys_exit_command = jvm_sys_exit_obj.command_header
    jvm_sys_exit_args_command = jvm_sys_exit_obj._build_args(0)[0]
    command_to_exit_driver_jvm_process = CALL_COMMAND_NAME + 
jvm_sys_exit_command + jvm_sys_exit_args_command + END_COMMAND_PART
    gateway_connection = jvm_sys_exit_obj.gateway_client._get_connection()
    gateway_socket = gateway_connection.socket
    # execute it
    gateway_socket.sendall(command_to_exit_driver_jvm_process.encode("utf-8"))
    # close gateway sockets on Python side
    SparkContext._gateway.shutdown()
    # reset globals to initial state
    SparkContext._next_accum_id = 0
    SparkContext._active_spark_context = None
    SparkContext._lock = RLock()
    SparkContext._jvm = None
    SparkContext._gateway = None
    SparkContext._python_includes = None
{code}

Calling {{stop_spark_session(spark)}} stops JVM and allows users to create 
fresh new Spark session using PySpark. No config pollution.

Can something like this be added to {{SparkSession.stop()}} implementation? 
Only if py4j is used to start up the session, of course.


> Stop JVM by calling SparkSession.stop from PySpark
> --------------------------------------------------
>
>                 Key: SPARK-47740
>                 URL: https://issues.apache.org/jira/browse/SPARK-47740
>             Project: Spark
>          Issue Type: Improvement
>          Components: PySpark
>    Affects Versions: 3.5.1
>            Reporter: Maxim Martynov
>            Priority: Major
>
> {code:python}
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.config("spark.driver.memory", 
> "2g").config("spark.another.property", "hi").getOrCreate()
> spark.conf.get("spark.driver.memory")
> # prints '2g'
> spark.conf.get("spark.another.property")
> # prints 'hi'
> # check for JVM process before and after stop
> spark.stop()
> # try to recreate Spark session with different driver memory, and no 
> "spark.another.property" at all
> spark = SparkSession.builder.config("spark.driver.memory", "3g").getOrCreate()
> spark.conf.get("spark.driver.memory")
> # prints '3g'
> spark.conf.get("spark.another.property")
> # prints 'hi'
> # check for JVM process
> {code}
> After first call of {{.getOrCreate()}} JVM process with following options 
> have been started:
> {code}
> maxim      76625 10.9  2.7 7975292 447068 pts/8  Sl+  16:59   0:11 
> /home/maxim/.sdkman/candidates/java/current/bin/java -cp 
> /home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/conf:/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/jars/*
>  -Xmx2g -XX:+IgnoreUnrecognizedVMOptions 
> --add-opens=java.base/java.lang=ALL-UNNAMED 
> --add-opens=java.base/java.lang.invoke=ALL-UNNAMED 
> --add-opens=java.base/java.lang.reflect=ALL-UNNAMED 
> --add-opens=java.base/java.io=ALL-UNNAMED 
> --add-opens=java.base/java.net=ALL-UNNAMED 
> --add-opens=java.base/java.nio=ALL-UNNAMED 
> --add-opens=java.base/java.util=ALL-UNNAMED 
> --add-opens=java.base/java.util.concurrent=ALL-UNNAMED 
> --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED 
> --add-opens=java.base/sun.nio.ch=ALL-UNNAMED 
> --add-opens=java.base/sun.nio.cs=ALL-UNNAMED 
> --add-opens=java.base/sun.security.action=ALL-UNNAMED 
> --add-opens=java.base/sun.util.calendar=ALL-UNNAMED 
> --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED 
> -Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit 
> --conf spark.driver.memory=2g --conf spark.another.property=hi pyspark-shell
> {code}
> But it have not been stopped by {{.stop()}} method call. New Spark session 
> which reports 3Gb of driver memory actually uses the same 2Gb, and config 
> options from old session which should be closed at this moment.
> Summarizing:
> * {{spark.stop()}} stops only SparkContext, but not JVM
> * {{spark.stop()}} does not clean up session config
> This behavior has been observed for a long time, at least since 2.2.0, and 
> still present in 3.5.1.
> This could be solved by stopping JVM after Spark session is stopped.
> But just calling {{spark._jvm.System.exit(0)}} fill fail because py4j socket 
> will be closed, and it will be impossible to start new Spark session:
> {code:python}
> spark._jvm.System.exit(0)
> {code}
> {code}
> ERROR:root:Exception while sending command.
> Traceback (most recent call last):
>   File 
> "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
>  line 516, in send_command
>     raise Py4JNetworkError("Answer from Java side is empty")
> py4j.protocol.Py4JNetworkError: Answer from Java side is empty
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
>   File 
> "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
>  line 1038, in send_command
>     response = connection.send_command(command)
>                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
>   File 
> "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/clientserver.py",
>  line 539, in send_command
>     raise Py4JNetworkError(
> py4j.protocol.Py4JNetworkError: Error while sending or receiving
> Traceback (most recent call last):
>   File "<stdin>", line 1, in <module>
>   File 
> "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/java_gateway.py",
>  line 1322, in __call__
>     return_value = get_return_value(
>                    ^^^^^^^^^^^^^^^^^
>   File 
> "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/pyspark/errors/exceptions/captured.py",
>  line 179, in deco
>     return f(*a, **kw)
>            ^^^^^^^^^^^
>   File 
> "/home/maxim/Repo/devdocs/DataLineage/0-RnD/marquez/venv/lib/python3.11/site-packages/py4j/protocol.py",
>  line 334, in get_return_value
>     raise Py4JError(
> py4j.protocol.Py4JError: An error occurred while calling 
> z:java.lang.System.exit
> {code}
> Here is an article describing the same issue, and also providing a safe way 
> to stop JVM without breaking py4j (in Russian):
> https://habr.com/ru/companies/sberbank/articles/805285/
> {code:python}
> from pyspark.sql import SparkSession
> from pyspark import SparkContext
> from threading import RLock
> from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART
> from pyspark.sql import SparkSession
> from pyspark import SparkContext
> from threading import RLock
> from py4j.protocol import CALL_COMMAND_NAME, END_COMMAND_PART
> def stop_spark_session(spark_session: SparkSession):
>     # stop spark session as usual
>     spark.stop()
>     # build command to stop JVM
>     jvm_sys_exit_obj = spark._jvm.System.exit
>     jvm_sys_exit_command = jvm_sys_exit_obj.command_header
>     jvm_sys_exit_args_command = jvm_sys_exit_obj._build_args(0)[0]
>     command_to_exit_driver_jvm_process = CALL_COMMAND_NAME + 
> jvm_sys_exit_command + jvm_sys_exit_args_command + END_COMMAND_PART
>     gateway_connection = jvm_sys_exit_obj.gateway_client._get_connection()
>     gateway_socket = gateway_connection.socket
>     # execute it
>     gateway_socket.sendall(command_to_exit_driver_jvm_process.encode("utf-8"))
>     # close gateway sockets on Python side
>     SparkContext._gateway.shutdown()
>     # reset globals to initial state
>     SparkContext._next_accum_id = 0
>     SparkContext._active_spark_context = None
>     SparkContext._lock = RLock()
>     SparkContext._jvm = None
>     SparkContext._gateway = None
>     SparkContext._python_includes = None
> {code}
> Calling {{stop_spark_session(spark)}} stops JVM, and allows users to create 
> fresh new Spark session using PySpark. No config pollution, no weird 
> semi-closed global contexts.
> Can something like this be added to {{SparkSession.stop()}} implementation? 
> Only if py4j is used to start up the session, of course.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to