Repository: spark Updated Branches: refs/heads/branch-2.1 616a78a56 -> 042e32d18
[SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext is stopped ## What changes were proposed in this pull request? In SparkSession initialization, we store created the instance of SparkSession into a class variable _instantiatedContext. Next time we can use SparkSession.builder.getOrCreate() to retrieve the existing SparkSession instance. However, when the active SparkContext is stopped and we create another new SparkContext to use, the existing SparkSession is still associated with the stopped SparkContext. So the operations with this existing SparkSession will be failed. We need to detect such case in SparkSession and renew the class variable _instantiatedContext if needed. ## How was this patch tested? New test added in PySpark. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #16454 from viirya/fix-pyspark-sparksession. (cherry picked from commit c6c37b8af714c8ddc8c77ac943a379f703558f27) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/042e32d1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/042e32d1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/042e32d1 Branch: refs/heads/branch-2.1 Commit: 042e32d18ad10be5c60907959e55b0324df5b2c0 Parents: 616a78a Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Thu Jan 12 20:53:31 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Thu Jan 12 20:54:16 2017 +0800 ---------------------------------------------------------------------- python/pyspark/sql/session.py | 16 ++++++++++------ python/pyspark/sql/tests.py | 23 +++++++++++++++++++++++ 2 files changed, 33 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/042e32d1/python/pyspark/sql/session.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 1e40b9c..9f4772e 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -161,8 +161,8 @@ class SparkSession(object): with self._lock: from pyspark.context import SparkContext from pyspark.conf import SparkConf - session = SparkSession._instantiatedContext - if session is None: + session = SparkSession._instantiatedSession + if session is None or session._sc._jsc is None: sparkConf = SparkConf() for key, value in self._options.items(): sparkConf.set(key, value) @@ -183,7 +183,7 @@ class SparkSession(object): builder = Builder() - _instantiatedContext = None + _instantiatedSession = None @ignore_unicode_prefix def __init__(self, sparkContext, jsparkSession=None): @@ -214,8 +214,12 @@ class SparkSession(object): self._wrapped = SQLContext(self._sc, self, self._jwrapped) _monkey_patch_RDD(self) install_exception_handler() - if SparkSession._instantiatedContext is None: - SparkSession._instantiatedContext = self + # If we had an instantiated SparkSession attached with a SparkContext + # which is stopped now, we need to renew the instantiated SparkSession. + # Otherwise, we will use invalid SparkSession when we call Builder.getOrCreate. + if SparkSession._instantiatedSession is None \ + or SparkSession._instantiatedSession._sc._jsc is None: + SparkSession._instantiatedSession = self @since(2.0) def newSession(self): @@ -595,7 +599,7 @@ class SparkSession(object): """Stop the underlying :class:`SparkContext`. """ self._sc.stop() - SparkSession._instantiatedContext = None + SparkSession._instantiatedSession = None @since(2.0) def __enter__(self): http://git-wip-us.apache.org/repos/asf/spark/blob/042e32d1/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6de63e6..fe034bc 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -46,6 +46,7 @@ if sys.version_info[:2] <= (2, 6): else: import unittest +from pyspark import SparkContext from pyspark.sql import SparkSession, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type @@ -1877,6 +1878,28 @@ class HiveSparkSubmitTests(SparkSubmitTests): self.assertTrue(os.path.exists(metastore_path)) +class SQLTests2(ReusedPySparkTestCase): + + @classmethod + def setUpClass(cls): + ReusedPySparkTestCase.setUpClass() + cls.spark = SparkSession(cls.sc) + + @classmethod + def tearDownClass(cls): + ReusedPySparkTestCase.tearDownClass() + cls.spark.stop() + + # We can't include this test into SQLTests because we will stop class's SparkContext and cause + # other tests failed. + def test_sparksession_with_stopped_sparkcontext(self): + self.sc.stop() + sc = SparkContext('local[4]', self.sc.appName) + spark = SparkSession.builder.getOrCreate() + df = spark.createDataFrame([(1, 2)], ["c", "c"]) + df.collect() + + class HiveContextSQLTests(ReusedPySparkTestCase): @classmethod --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org