[ https://issues.apache.org/jira/browse/SPARK-38438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17502633#comment-17502633 ]
Rafal Wojdyla edited comment on SPARK-38438 at 3/9/22, 12:35 PM: ----------------------------------------------------------------- The workaround actually doesn't stop the existing JVM, it does stop most of the threads in the JVM (including spark context related, and py4j gateway), turns out the only (non-daemon) thread left is the `main` thread: {noformat} "main" #1 prio=5 os_prio=31 cpu=1381.53ms elapsed=67.25s tid=0x00007fc478809000 nid=0x2703 runnable [0x000070000c094000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.readBytes(java.base@11.0.9.1/Native Method) at java.io.FileInputStream.read(java.base@11.0.9.1/FileInputStream.java:279) at java.io.BufferedInputStream.fill(java.base@11.0.9.1/BufferedInputStream.java:252) at java.io.BufferedInputStream.read(java.base@11.0.9.1/BufferedInputStream.java:271) - locked <0x00000007c1012ca0> (a java.io.BufferedInputStream) at org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:68) at org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.9.1/Native Method) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.9.1/NativeMethodAccessorImpl.java:62) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.9.1/DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(java.base@11.0.9.1/Method.java:566) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) {noformat} This is waiting on the python process to stop: https://github.com/apache/spark/blob/71991f75ff441e80a52cb71f66f46bfebdb05671/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala#L68-L70 Would it make sense to just close the stdin to trigger shutdown of the JVM, in which case the hard reset would be: {code:python} s.stop() s._sc._gateway.shutdown() s._sc._gateway.proc.stdin.close() SparkContext._gateway = None SparkContext._jvm = None {code} Edit: updated the issue description with the extra line. was (Author: ravwojdyla): The workaround actually doesn't stop the existing JVM, it does stop most of the threads in the JVM (including spark context related, and py4j gateway), turns out the only (non-daemon) thread left is the `main` thread: {noformat} "main" #1 prio=5 os_prio=31 cpu=1381.53ms elapsed=67.25s tid=0x00007fc478809000 nid=0x2703 runnable [0x000070000c094000] java.lang.Thread.State: RUNNABLE at java.io.FileInputStream.readBytes(java.base@11.0.9.1/Native Method) at java.io.FileInputStream.read(java.base@11.0.9.1/FileInputStream.java:279) at java.io.BufferedInputStream.fill(java.base@11.0.9.1/BufferedInputStream.java:252) at java.io.BufferedInputStream.read(java.base@11.0.9.1/BufferedInputStream.java:271) - locked <0x00000007c1012ca0> (a java.io.BufferedInputStream) at org.apache.spark.api.python.PythonGatewayServer$.main(PythonGatewayServer.scala:68) at org.apache.spark.api.python.PythonGatewayServer.main(PythonGatewayServer.scala) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.9.1/Native Method) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.9.1/NativeMethodAccessorImpl.java:62) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.9.1/DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(java.base@11.0.9.1/Method.java:566) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) {noformat} This is waiting on the python process to stop: https://github.com/apache/spark/blob/71991f75ff441e80a52cb71f66f46bfebdb05671/core/src/main/scala/org/apache/spark/api/python/PythonGatewayServer.scala#L68-L70 Would it make sense to just close the stdin to trigger shutdown of the JVM, in which case the hard reset would be: {code:python} s.stop() s._sc._gateway.shutdown() s._sc._gateway.proc.stdin.close() SparkContext._gateway = None SparkContext._jvm = None {code} > Can't update spark.jars.packages on existing global/default context > ------------------------------------------------------------------- > > Key: SPARK-38438 > URL: https://issues.apache.org/jira/browse/SPARK-38438 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core > Affects Versions: 3.2.1 > Environment: py: 3.9 > spark: 3.2.1 > Reporter: Rafal Wojdyla > Priority: Major > > Reproduction: > {code:python} > from pyspark.sql import SparkSession > # default session: > s = SparkSession.builder.getOrCreate() > # later on we want to update jars.packages, here's e.g. spark-hats > s = (SparkSession.builder > .config("spark.jars.packages", "za.co.absa:spark-hats_2.12:0.2.2") > .getOrCreate()) > # line below returns None, the config was not propagated: > s._sc._conf.get("spark.jars.packages") > {code} > Stopping the context doesn't help, in fact it's even more confusing, because > the configuration is updated, but doesn't have an effect: > {code:python} > from pyspark.sql import SparkSession > # default session: > s = SparkSession.builder.getOrCreate() > s.stop() > s = (SparkSession.builder > .config("spark.jars.packages", "za.co.absa:spark-hats_2.12:0.2.2") > .getOrCreate()) > # now this line returns 'za.co.absa:spark-hats_2.12:0.2.2', but the context > # doesn't download the jar/package, as it would if there was no global context > # thus the extra package is unusable. It's not downloaded, or added to the > # classpath. > s._sc._conf.get("spark.jars.packages") > {code} > One workaround is to stop the context AND kill the JVM gateway, which seems > to be a kind of hard reset: > {code:python} > from pyspark import SparkContext > from pyspark.sql import SparkSession > # default session: > s = SparkSession.builder.getOrCreate() > # Hard reset: > s.stop() > s._sc._gateway.shutdown() > s._sc._gateway.proc.stdin.close() > SparkContext._gateway = None > SparkContext._jvm = None > s = (SparkSession.builder > .config("spark.jars.packages", "za.co.absa:spark-hats_2.12:0.2.2") > .getOrCreate()) > # Now we are guaranteed there's a new spark session, and packages > # are downloaded, added to the classpath etc. > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org