[ 
https://issues.apache.org/jira/browse/SPARK-38438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17502633#comment-17502633
 ] 

Rafal Wojdyla edited comment on SPARK-38438 at 3/9/22, 12:35 PM:
-----------------------------------------------------------------

The workaround actually doesn't stop the existing JVM, it does stop most of the 
threads in the JVM (including spark context related, and py4j gateway), turns 
out the only (non-daemon) thread left is the `main` thread:

{noformat}
"main" #1 prio=5 os_prio=31 cpu=1381.53ms elapsed=67.25s tid=0x00007fc478809000 
nid=0x2703 runnable  [0x000070000c094000]
   java.lang.Thread.State: RUNNABLE
        at java.io.FileInputStream.readBytes(java.base@11.0.9.1/Native Method)
        at 
java.io.FileInputStream.read(java.base@11.0.9.1/FileInputStream.java:279)
        at 
java.io.BufferedInputStream.fill(java.base@11.0.9.1/BufferedInputStream.java:252)
        at 
java.io.BufferedInputStream.read(java.base@11.0.9.1/BufferedInputStream.java:271)
        - locked <0x00000007c1012ca0> (a java.io.BufferedInputStream)
        at 
org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:68)
        at 
org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.9.1/Native 
Method)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.9.1/NativeMethodAccessorImpl.java:62)
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.9.1/DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(java.base@11.0.9.1/Method.java:566)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{noformat}

This is waiting on the python process to stop: 
https://github.com/apache/spark/blob/71991f75ff441e80a52cb71f66f46bfebdb05671/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala#L68-L70

Would it make sense to just close the stdin to trigger shutdown of the JVM, in 
which case the hard reset would be:

{code:python}
s.stop()
s._sc._gateway.shutdown()
s._sc._gateway.proc.stdin.close()
SparkContext._gateway = None
SparkContext._jvm = None
{code}

Edit: updated the issue description with the extra line.


was (Author: ravwojdyla):
The workaround actually doesn't stop the existing JVM, it does stop most of the 
threads in the JVM (including spark context related, and py4j gateway), turns 
out the only (non-daemon) thread left is the `main` thread:

{noformat}
"main" #1 prio=5 os_prio=31 cpu=1381.53ms elapsed=67.25s tid=0x00007fc478809000 
nid=0x2703 runnable  [0x000070000c094000]
   java.lang.Thread.State: RUNNABLE
        at java.io.FileInputStream.readBytes(java.base@11.0.9.1/Native Method)
        at 
java.io.FileInputStream.read(java.base@11.0.9.1/FileInputStream.java:279)
        at 
java.io.BufferedInputStream.fill(java.base@11.0.9.1/BufferedInputStream.java:252)
        at 
java.io.BufferedInputStream.read(java.base@11.0.9.1/BufferedInputStream.java:271)
        - locked <0x00000007c1012ca0> (a java.io.BufferedInputStream)
        at 
org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:68)
        at 
org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.9.1/Native 
Method)
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.9.1/NativeMethodAccessorImpl.java:62)
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.9.1/DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(java.base@11.0.9.1/Method.java:566)
        at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{noformat}

This is waiting on the python process to stop: 
https://github.com/apache/spark/blob/71991f75ff441e80a52cb71f66f46bfebdb05671/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala#L68-L70

Would it make sense to just close the stdin to trigger shutdown of the JVM, in 
which case the hard reset would be:

{code:python}
s.stop()
s._sc._gateway.shutdown()
s._sc._gateway.proc.stdin.close()
SparkContext._gateway = None
SparkContext._jvm = None
{code}

> Can't update spark.jars.packages on existing global/default context
> -------------------------------------------------------------------
>
>                 Key: SPARK-38438
>                 URL: https://issues.apache.org/jira/browse/SPARK-38438
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Spark Core
>    Affects Versions: 3.2.1
>         Environment: py: 3.9
> spark: 3.2.1
>            Reporter: Rafal Wojdyla
>            Priority: Major
>
> Reproduction:
> {code:python}
> from pyspark.sql import SparkSession
> # default session:
> s = SparkSession.builder.getOrCreate()
> # later on we want to update jars.packages, here's e.g. spark-hats
> s = (SparkSession.builder
>      .config("spark.jars.packages", "za.co.absa:spark-hats_2.12:0.2.2")
>      .getOrCreate())
> # line below returns None, the config was not propagated:
> s._sc._conf.get("spark.jars.packages")
> {code}
> Stopping the context doesn't help, in fact it's even more confusing, because 
> the configuration is updated, but doesn't have an effect:
> {code:python}
> from pyspark.sql import SparkSession
> # default session:
> s = SparkSession.builder.getOrCreate()
> s.stop()
> s = (SparkSession.builder
>      .config("spark.jars.packages", "za.co.absa:spark-hats_2.12:0.2.2")
>      .getOrCreate())
> # now this line returns 'za.co.absa:spark-hats_2.12:0.2.2', but the context
> # doesn't download the jar/package, as it would if there was no global context
> # thus the extra package is unusable. It's not downloaded, or added to the
> # classpath.
> s._sc._conf.get("spark.jars.packages")
> {code}
> One workaround is to stop the context AND kill the JVM gateway, which seems 
> to be a kind of hard reset:
> {code:python}
> from pyspark import SparkContext
> from pyspark.sql import SparkSession
> # default session:
> s = SparkSession.builder.getOrCreate()
> # Hard reset:
> s.stop()
> s._sc._gateway.shutdown()
> s._sc._gateway.proc.stdin.close()
> SparkContext._gateway = None
> SparkContext._jvm = None
> s = (SparkSession.builder
>      .config("spark.jars.packages", "za.co.absa:spark-hats_2.12:0.2.2")
>      .getOrCreate())
> # Now we are guaranteed there's a new spark session, and packages
> # are downloaded, added to the classpath etc.
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to