This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 86efa45  [SPARK-32160][CORE][PYSPARK] Disallow to create SparkContext 
in executors
86efa45 is described below

commit 86efa456d8e11a2b7e10bce70d4ead20c75acbe1
Author: Takuya UESHIN <ues...@databricks.com>
AuthorDate: Thu Jul 9 15:51:56 2020 +0900

    [SPARK-32160][CORE][PYSPARK] Disallow to create SparkContext in executors
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to disallow to create `SparkContext` in executors, e.g., 
in UDFs.
    
    ### Why are the changes needed?
    
    Currently executors can create SparkContext, but shouldn't be able to 
create it.
    
    ```scala
    sc.range(0, 1).foreach { _ =>
      new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
    }
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, users won't be able to create `SparkContext` in executors.
    
    ### How was this patch tested?
    
    Addes tests.
    
    Closes #28986 from 
ueshin/issues/SPARK-32160/disallow_spark_context_in_executors.
    
    Authored-by: Takuya UESHIN <ues...@databricks.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
    (cherry picked from commit cfecc2030d8b4774c1f4754fe81f57fbc61c9c75)
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../main/scala/org/apache/spark/SparkContext.scala | 16 ++++++++++
 .../scala/org/apache/spark/SparkContextSuite.scala | 12 ++++++++
 python/pyspark/context.py                          | 14 +++++++++
 python/pyspark/tests/test_context.py               |  8 +++++
 .../scala/org/apache/spark/sql/SparkSession.scala  |  2 +-
 .../ExternalAppendOnlyUnsafeRowArraySuite.scala    | 35 ++++++++++------------
 6 files changed, 67 insertions(+), 20 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bcbb7e4..2761f0d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -82,6 +82,9 @@ class SparkContext(config: SparkConf) extends Logging {
   // The call site where this SparkContext was constructed.
   private val creationSite: CallSite = Utils.getCallSite()
 
+  // In order to prevent SparkContext from being created in executors.
+  SparkContext.assertOnDriver()
+
   // In order to prevent multiple SparkContexts from being active at the same 
time, mark this
   // context as having started construction.
   // NOTE: this must be placed at the beginning of the SparkContext 
constructor.
@@ -2540,6 +2543,19 @@ object SparkContext extends Logging {
   }
 
   /**
+   * Called to ensure that SparkContext is created or accessed only on the 
Driver.
+   *
+   * Throws an exception if a SparkContext is about to be created in executors.
+   */
+  private def assertOnDriver(): Unit = {
+    if (TaskContext.get != null) {
+      // we're accessing it during task execution, fail.
+      throw new IllegalStateException(
+        "SparkContext should only be created and accessed on the driver.")
+    }
+  }
+
+  /**
    * This function may be used to get or instantiate a SparkContext and 
register it as a
    * singleton object. Because we can only have one active SparkContext per 
JVM,
    * this is useful when applications may wish to share a SparkContext.
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 9f8fa89..2b1e110 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -950,6 +950,18 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       }
     }
   }
+
+  test("SPARK-32160: Disallow to create SparkContext in executors") {
+    sc = new SparkContext(new 
SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]"))
+
+    val error = intercept[SparkException] {
+      sc.range(0, 1).foreach { _ =>
+        new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
+      }
+    }.getMessage()
+
+    assert(error.contains("SparkContext should only be created and accessed on 
the driver."))
+  }
 }
 
 object SparkContextSuite {
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5bb991e..ecd171a 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -38,6 +38,7 @@ from pyspark.serializers import PickleSerializer, 
BatchedSerializer, UTF8Deseria
 from pyspark.storagelevel import StorageLevel
 from pyspark.resource import ResourceInformation
 from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
+from pyspark.taskcontext import TaskContext
 from pyspark.traceback_utils import CallSite, first_spark_call
 from pyspark.status import StatusTracker
 from pyspark.profiler import ProfilerCollector, BasicProfiler
@@ -118,6 +119,9 @@ class SparkContext(object):
             ...
         ValueError:...
         """
+        # In order to prevent SparkContext from being created in executors.
+        SparkContext._assert_on_driver()
+
         self._callsite = first_spark_call() or CallSite(None, None, None)
         if gateway is not None and gateway.gateway_parameters.auth_token is 
None:
             raise ValueError(
@@ -1145,6 +1149,16 @@ class SparkContext(object):
             resources[name] = ResourceInformation(name, addrs)
         return resources
 
+    @staticmethod
+    def _assert_on_driver():
+        """
+        Called to ensure that SparkContext is created only on the Driver.
+
+        Throws an exception if a SparkContext is about to be created in 
executors.
+        """
+        if TaskContext.get() is not None:
+            raise Exception("SparkContext should only be created and accessed 
on the driver.")
+
 
 def _test():
     import atexit
diff --git a/python/pyspark/tests/test_context.py 
b/python/pyspark/tests/test_context.py
index c7f435a..303635d 100644
--- a/python/pyspark/tests/test_context.py
+++ b/python/pyspark/tests/test_context.py
@@ -267,6 +267,14 @@ class ContextTests(unittest.TestCase):
             resources = sc.resources
             self.assertEqual(len(resources), 0)
 
+    def test_disallow_to_create_spark_context_in_executors(self):
+        # SPARK-32160: SparkContext should not be created in executors.
+        with SparkContext("local-cluster[3, 1, 1024]") as sc:
+            with self.assertRaises(Exception) as context:
+                sc.range(2).foreach(lambda _: SparkContext())
+            self.assertIn("SparkContext should only be created and accessed on 
the driver.",
+                          str(context.exception))
+
 
 class ContextTestsWithResources(unittest.TestCase):
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 60a6037..e5d8710 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -1086,7 +1086,7 @@ object SparkSession extends Logging {
   }
 
   private def assertOnDriver(): Unit = {
-    if (Utils.isTesting && TaskContext.get != null) {
+    if (TaskContext.get != null) {
       // we're accessing it during task execution, fail.
       throw new IllegalStateException(
         "SparkSession should only be created and accessed on the driver.")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
index b29de9c..98aba3b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala
@@ -27,32 +27,29 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 
 class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with 
LocalSparkContext {
   private val random = new java.util.Random()
-  private var taskContext: TaskContext = _
-
-  override def afterAll(): Unit = try {
-    TaskContext.unset()
-  } finally {
-    super.afterAll()
-  }
 
   private def withExternalArray(inMemoryThreshold: Int, spillThreshold: Int)
                                (f: ExternalAppendOnlyUnsafeRowArray => Unit): 
Unit = {
     sc = new SparkContext("local", "test", new SparkConf(false))
 
-    taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
+    val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
     TaskContext.setTaskContext(taskContext)
 
-    val array = new ExternalAppendOnlyUnsafeRowArray(
-      taskContext.taskMemoryManager(),
-      SparkEnv.get.blockManager,
-      SparkEnv.get.serializerManager,
-      taskContext,
-      1024,
-      SparkEnv.get.memoryManager.pageSizeBytes,
-      inMemoryThreshold,
-      spillThreshold)
-    try f(array) finally {
-      array.clear()
+    try {
+      val array = new ExternalAppendOnlyUnsafeRowArray(
+        taskContext.taskMemoryManager(),
+        SparkEnv.get.blockManager,
+        SparkEnv.get.serializerManager,
+        taskContext,
+        1024,
+        SparkEnv.get.memoryManager.pageSizeBytes,
+        inMemoryThreshold,
+        spillThreshold)
+      try f(array) finally {
+        array.clear()
+      }
+    } finally {
+      TaskContext.unset()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to