Repository: spark
Updated Branches:
  refs/heads/branch-2.0 3566e40a4 -> 55d2a1178


[SPARK-19055][SQL][PYSPARK] Fix SparkSession initialization when SparkContext 
is stopped

## What changes were proposed in this pull request?

In SparkSession initialization, we store created the instance of SparkSession 
into a class variable _instantiatedContext. Next time we can use 
SparkSession.builder.getOrCreate() to retrieve the existing SparkSession 
instance.

However, when the active SparkContext is stopped and we create another new 
SparkContext to use, the existing SparkSession is still associated with the 
stopped SparkContext. So the operations with this existing SparkSession will be 
failed.

We need to detect such case in SparkSession and renew the class variable 
_instantiatedContext if needed.

## How was this patch tested?

New test added in PySpark.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh <vii...@gmail.com>

Closes #16454 from viirya/fix-pyspark-sparksession.

(cherry picked from commit c6c37b8af714c8ddc8c77ac943a379f703558f27)
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55d2a117
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55d2a117
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55d2a117

Branch: refs/heads/branch-2.0
Commit: 55d2a117805d76fd27e2960f92ece88238488231
Parents: 3566e40
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Thu Jan 12 20:53:31 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Jan 12 20:54:41 2017 +0800

----------------------------------------------------------------------
 python/pyspark/sql/session.py | 16 ++++++++++------
 python/pyspark/sql/tests.py   | 23 +++++++++++++++++++++++
 2 files changed, 33 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/55d2a117/python/pyspark/sql/session.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index d25823d..79017c6 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -161,8 +161,8 @@ class SparkSession(object):
             with self._lock:
                 from pyspark.context import SparkContext
                 from pyspark.conf import SparkConf
-                session = SparkSession._instantiatedContext
-                if session is None:
+                session = SparkSession._instantiatedSession
+                if session is None or session._sc._jsc is None:
                     sparkConf = SparkConf()
                     for key, value in self._options.items():
                         sparkConf.set(key, value)
@@ -183,7 +183,7 @@ class SparkSession(object):
 
     builder = Builder()
 
-    _instantiatedContext = None
+    _instantiatedSession = None
 
     @ignore_unicode_prefix
     def __init__(self, sparkContext, jsparkSession=None):
@@ -214,8 +214,12 @@ class SparkSession(object):
         self._wrapped = SQLContext(self._sc, self, self._jwrapped)
         _monkey_patch_RDD(self)
         install_exception_handler()
-        if SparkSession._instantiatedContext is None:
-            SparkSession._instantiatedContext = self
+        # If we had an instantiated SparkSession attached with a SparkContext
+        # which is stopped now, we need to renew the instantiated SparkSession.
+        # Otherwise, we will use invalid SparkSession when we call 
Builder.getOrCreate.
+        if SparkSession._instantiatedSession is None \
+                or SparkSession._instantiatedSession._sc._jsc is None:
+            SparkSession._instantiatedSession = self
 
     @since(2.0)
     def newSession(self):
@@ -597,7 +601,7 @@ class SparkSession(object):
         """Stop the underlying :class:`SparkContext`.
         """
         self._sc.stop()
-        SparkSession._instantiatedContext = None
+        SparkSession._instantiatedSession = None
 
     @since(2.0)
     def __enter__(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/55d2a117/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index b3cf72b..796b964 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -46,6 +46,7 @@ if sys.version_info[:2] <= (2, 6):
 else:
     import unittest
 
+from pyspark import SparkContext
 from pyspark.sql import SparkSession, HiveContext, Column, Row
 from pyspark.sql.types import *
 from pyspark.sql.types import UserDefinedType, _infer_type
@@ -1772,6 +1773,28 @@ class HiveSparkSubmitTests(SparkSubmitTests):
         self.assertTrue(os.path.exists(metastore_path))
 
 
+class SQLTests2(ReusedPySparkTestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        ReusedPySparkTestCase.setUpClass()
+        cls.spark = SparkSession(cls.sc)
+
+    @classmethod
+    def tearDownClass(cls):
+        ReusedPySparkTestCase.tearDownClass()
+        cls.spark.stop()
+
+    # We can't include this test into SQLTests because we will stop class's 
SparkContext and cause
+    # other tests failed.
+    def test_sparksession_with_stopped_sparkcontext(self):
+        self.sc.stop()
+        sc = SparkContext('local[4]', self.sc.appName)
+        spark = SparkSession.builder.getOrCreate()
+        df = spark.createDataFrame([(1, 2)], ["c", "c"])
+        df.collect()
+
+
 class HiveContextSQLTests(ReusedPySparkTestCase):
 
     @classmethod


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to