This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d96ee1f  [SPARK-37785][SQL][CORE] Add Utils.isInRunningSparkTask
d96ee1f is described below

commit d96ee1f3b3d136971b1893741f4b022a9f15ae20
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Thu Dec 30 23:09:45 2021 +0900

    [SPARK-37785][SQL][CORE] Add Utils.isInRunningSparkTask
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to add `Utils.isInRunningSparkTask` to see if the codes 
are running on tasks e.g., on executors.
    
    ### Why are the changes needed?
    
    There is currently no single call to see if we're in a running Spark task 
(e.g., in executors). `TaskContext.get == null` is being used for that way. We 
should better explicitly factor out to `Utils`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, dev-only.
    
    ### How was this patch tested?
    
    Existing unittests should cover this case.
    
    Closes #35065 from HyukjinKwon/mindor-util-at-executor.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 core/src/main/scala/org/apache/spark/SparkContext.scala              | 2 +-
 core/src/main/scala/org/apache/spark/util/Utils.scala                | 5 +++++
 .../spark/sql/catalyst/expressions/EquivalentExpressions.scala       | 4 ++--
 .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala       | 2 +-
 sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala      | 4 ++--
 .../src/main/scala/org/apache/spark/sql/execution/Columnar.scala     | 3 ++-
 6 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a14efa5..0c5fb0a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2680,7 +2680,7 @@ object SparkContext extends Logging {
    * Throws an exception if a SparkContext is about to be created in executors.
    */
   private def assertOnDriver(): Unit = {
-    if (TaskContext.get != null) {
+    if (Utils.isInRunningSparkTask) {
       // we're accessing it during task execution, fail.
       throw new IllegalStateException(
         "SparkContext should only be created and accessed on the driver.")
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 6597750..4410fe7 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -875,6 +875,11 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Returns if the current codes are running in a Spark task, e.g., in 
executors.
+   */
+  def isInRunningSparkTask: Boolean = TaskContext.get() != null
+
+  /**
    * Gets or creates the directories listed in spark.local.dir or 
SPARK_LOCAL_DIRS,
    * and returns only the directories that exist / could be created.
    *
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
index 269ab31..59e2be4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
@@ -21,9 +21,9 @@ import java.util.Objects
 
 import scala.collection.mutable
 
-import org.apache.spark.TaskContext
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.expressions.objects.LambdaVariable
+import org.apache.spark.util.Utils
 
 /**
  * This class is used to compute equality of (sub)expression trees. 
Expressions can be added
@@ -197,7 +197,7 @@ class EquivalentExpressions {
       expr.find(_.isInstanceOf[LambdaVariable]).isDefined ||
       // `PlanExpression` wraps query plan. To compare query plans of 
`PlanExpression` on executor,
       // can cause error like NPE.
-      (expr.isInstanceOf[PlanExpression[_]] && TaskContext.get != null)
+      (expr.isInstanceOf[PlanExpression[_]] && Utils.isInRunningSparkTask)
 
     if (!skip && !updateExprInMap(expr, map, useCount)) {
       val uc = useCount.signum
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 2ca68c6..105a1c4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -198,7 +198,7 @@ object SQLConf {
    * run unit tests (that does not involve SparkSession) in serial order.
    */
   def get: SQLConf = {
-    if (TaskContext.get != null) {
+    if (Utils.isInRunningSparkTask) {
       val conf = existingConf.get()
       if (conf != null) {
         conf
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index cdd57d7..734b8e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -1023,7 +1023,7 @@ object SparkSession extends Logging {
    * @since 2.2.0
    */
   def getActiveSession: Option[SparkSession] = {
-    if (TaskContext.get != null) {
+    if (Utils.isInRunningSparkTask) {
       // Return None when running on executors.
       None
     } else {
@@ -1039,7 +1039,7 @@ object SparkSession extends Logging {
    * @since 2.2.0
    */
   def getDefaultSession: Option[SparkSession] = {
-    if (TaskContext.get != null) {
+    if (Utils.isInRunningSparkTask) {
       // Return None when running on executors.
       None
     } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 628d4a3..70a508e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLMetrics}
 import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector, WritableColumnVector}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.util.Utils
 
 /**
  * Holds a user defined rule that can be used to inject columnar 
implementations of various
@@ -66,7 +67,7 @@ trait ColumnarToRowTransition extends UnaryExecNode
  */
 case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition 
with CodegenSupport {
   // supportsColumnar requires to be only called on driver side, see also 
SPARK-37779.
-  assert(TaskContext.get != null || child.supportsColumnar)
+  assert(Utils.isInRunningSparkTask || child.supportsColumnar)
 
   override def output: Seq[Attribute] = child.output
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to