This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 86efa45 [SPARK-32160][CORE][PYSPARK] Disallow to create SparkContext in executors 86efa45 is described below commit 86efa456d8e11a2b7e10bce70d4ead20c75acbe1 Author: Takuya UESHIN <ues...@databricks.com> AuthorDate: Thu Jul 9 15:51:56 2020 +0900 [SPARK-32160][CORE][PYSPARK] Disallow to create SparkContext in executors ### What changes were proposed in this pull request? This PR proposes to disallow to create `SparkContext` in executors, e.g., in UDFs. ### Why are the changes needed? Currently executors can create SparkContext, but shouldn't be able to create it. ```scala sc.range(0, 1).foreach { _ => new SparkContext(new SparkConf().setAppName("test").setMaster("local")) } ``` ### Does this PR introduce _any_ user-facing change? Yes, users won't be able to create `SparkContext` in executors. ### How was this patch tested? Addes tests. Closes #28986 from ueshin/issues/SPARK-32160/disallow_spark_context_in_executors. Authored-by: Takuya UESHIN <ues...@databricks.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> (cherry picked from commit cfecc2030d8b4774c1f4754fe81f57fbc61c9c75) Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../main/scala/org/apache/spark/SparkContext.scala | 16 ++++++++++ .../scala/org/apache/spark/SparkContextSuite.scala | 12 ++++++++ python/pyspark/context.py | 14 +++++++++ python/pyspark/tests/test_context.py | 8 +++++ .../scala/org/apache/spark/sql/SparkSession.scala | 2 +- .../ExternalAppendOnlyUnsafeRowArraySuite.scala | 35 ++++++++++------------ 6 files changed, 67 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index bcbb7e4..2761f0d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -82,6 +82,9 @@ class SparkContext(config: SparkConf) extends Logging { // The call site where this SparkContext was constructed. private val creationSite: CallSite = Utils.getCallSite() + // In order to prevent SparkContext from being created in executors. + SparkContext.assertOnDriver() + // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having started construction. // NOTE: this must be placed at the beginning of the SparkContext constructor. @@ -2540,6 +2543,19 @@ object SparkContext extends Logging { } /** + * Called to ensure that SparkContext is created or accessed only on the Driver. + * + * Throws an exception if a SparkContext is about to be created in executors. + */ + private def assertOnDriver(): Unit = { + if (TaskContext.get != null) { + // we're accessing it during task execution, fail. + throw new IllegalStateException( + "SparkContext should only be created and accessed on the driver.") + } + } + + /** * This function may be used to get or instantiate a SparkContext and register it as a * singleton object. Because we can only have one active SparkContext per JVM, * this is useful when applications may wish to share a SparkContext. diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 9f8fa89..2b1e110 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -950,6 +950,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } } + + test("SPARK-32160: Disallow to create SparkContext in executors") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local-cluster[3, 1, 1024]")) + + val error = intercept[SparkException] { + sc.range(0, 1).foreach { _ => + new SparkContext(new SparkConf().setAppName("test").setMaster("local")) + } + }.getMessage() + + assert(error.contains("SparkContext should only be created and accessed on the driver.")) + } } object SparkContextSuite { diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 5bb991e..ecd171a 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -38,6 +38,7 @@ from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deseria from pyspark.storagelevel import StorageLevel from pyspark.resource import ResourceInformation from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix +from pyspark.taskcontext import TaskContext from pyspark.traceback_utils import CallSite, first_spark_call from pyspark.status import StatusTracker from pyspark.profiler import ProfilerCollector, BasicProfiler @@ -118,6 +119,9 @@ class SparkContext(object): ... ValueError:... """ + # In order to prevent SparkContext from being created in executors. + SparkContext._assert_on_driver() + self._callsite = first_spark_call() or CallSite(None, None, None) if gateway is not None and gateway.gateway_parameters.auth_token is None: raise ValueError( @@ -1145,6 +1149,16 @@ class SparkContext(object): resources[name] = ResourceInformation(name, addrs) return resources + @staticmethod + def _assert_on_driver(): + """ + Called to ensure that SparkContext is created only on the Driver. + + Throws an exception if a SparkContext is about to be created in executors. + """ + if TaskContext.get() is not None: + raise Exception("SparkContext should only be created and accessed on the driver.") + def _test(): import atexit diff --git a/python/pyspark/tests/test_context.py b/python/pyspark/tests/test_context.py index c7f435a..303635d 100644 --- a/python/pyspark/tests/test_context.py +++ b/python/pyspark/tests/test_context.py @@ -267,6 +267,14 @@ class ContextTests(unittest.TestCase): resources = sc.resources self.assertEqual(len(resources), 0) + def test_disallow_to_create_spark_context_in_executors(self): + # SPARK-32160: SparkContext should not be created in executors. + with SparkContext("local-cluster[3, 1, 1024]") as sc: + with self.assertRaises(Exception) as context: + sc.range(2).foreach(lambda _: SparkContext()) + self.assertIn("SparkContext should only be created and accessed on the driver.", + str(context.exception)) + class ContextTestsWithResources(unittest.TestCase): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 60a6037..e5d8710 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -1086,7 +1086,7 @@ object SparkSession extends Logging { } private def assertOnDriver(): Unit = { - if (Utils.isTesting && TaskContext.get != null) { + if (TaskContext.get != null) { // we're accessing it during task execution, fail. throw new IllegalStateException( "SparkSession should only be created and accessed on the driver.") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala index b29de9c..98aba3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArraySuite.scala @@ -27,32 +27,29 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow class ExternalAppendOnlyUnsafeRowArraySuite extends SparkFunSuite with LocalSparkContext { private val random = new java.util.Random() - private var taskContext: TaskContext = _ - - override def afterAll(): Unit = try { - TaskContext.unset() - } finally { - super.afterAll() - } private def withExternalArray(inMemoryThreshold: Int, spillThreshold: Int) (f: ExternalAppendOnlyUnsafeRowArray => Unit): Unit = { sc = new SparkContext("local", "test", new SparkConf(false)) - taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) + val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get) TaskContext.setTaskContext(taskContext) - val array = new ExternalAppendOnlyUnsafeRowArray( - taskContext.taskMemoryManager(), - SparkEnv.get.blockManager, - SparkEnv.get.serializerManager, - taskContext, - 1024, - SparkEnv.get.memoryManager.pageSizeBytes, - inMemoryThreshold, - spillThreshold) - try f(array) finally { - array.clear() + try { + val array = new ExternalAppendOnlyUnsafeRowArray( + taskContext.taskMemoryManager(), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + taskContext, + 1024, + SparkEnv.get.memoryManager.pageSizeBytes, + inMemoryThreshold, + spillThreshold) + try f(array) finally { + array.clear() + } + } finally { + TaskContext.unset() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org