This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d96ee1f [SPARK-37785][SQL][CORE] Add Utils.isInRunningSparkTask d96ee1f is described below commit d96ee1f3b3d136971b1893741f4b022a9f15ae20 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Thu Dec 30 23:09:45 2021 +0900 [SPARK-37785][SQL][CORE] Add Utils.isInRunningSparkTask ### What changes were proposed in this pull request? This PR proposes to add `Utils.isInRunningSparkTask` to see if the codes are running on tasks e.g., on executors. ### Why are the changes needed? There is currently no single call to see if we're in a running Spark task (e.g., in executors). `TaskContext.get == null` is being used for that way. We should better explicitly factor out to `Utils`. ### Does this PR introduce _any_ user-facing change? No, dev-only. ### How was this patch tested? Existing unittests should cover this case. Closes #35065 from HyukjinKwon/mindor-util-at-executor. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- core/src/main/scala/org/apache/spark/util/Utils.scala | 5 +++++ .../spark/sql/catalyst/expressions/EquivalentExpressions.scala | 4 ++-- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala | 4 ++-- .../src/main/scala/org/apache/spark/sql/execution/Columnar.scala | 3 ++- 6 files changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a14efa5..0c5fb0a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2680,7 +2680,7 @@ object SparkContext extends Logging { * Throws an exception if a SparkContext is about to be created in executors. */ private def assertOnDriver(): Unit = { - if (TaskContext.get != null) { + if (Utils.isInRunningSparkTask) { // we're accessing it during task execution, fail. throw new IllegalStateException( "SparkContext should only be created and accessed on the driver.") diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 6597750..4410fe7 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -875,6 +875,11 @@ private[spark] object Utils extends Logging { } /** + * Returns if the current codes are running in a Spark task, e.g., in executors. + */ + def isInRunningSparkTask: Boolean = TaskContext.get() != null + + /** * Gets or creates the directories listed in spark.local.dir or SPARK_LOCAL_DIRS, * and returns only the directories that exist / could be created. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala index 269ab31..59e2be4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala @@ -21,9 +21,9 @@ import java.util.Objects import scala.collection.mutable -import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.expressions.objects.LambdaVariable +import org.apache.spark.util.Utils /** * This class is used to compute equality of (sub)expression trees. Expressions can be added @@ -197,7 +197,7 @@ class EquivalentExpressions { expr.find(_.isInstanceOf[LambdaVariable]).isDefined || // `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor, // can cause error like NPE. - (expr.isInstanceOf[PlanExpression[_]] && TaskContext.get != null) + (expr.isInstanceOf[PlanExpression[_]] && Utils.isInRunningSparkTask) if (!skip && !updateExprInMap(expr, map, useCount)) { val uc = useCount.signum diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2ca68c6..105a1c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -198,7 +198,7 @@ object SQLConf { * run unit tests (that does not involve SparkSession) in serial order. */ def get: SQLConf = { - if (TaskContext.get != null) { + if (Utils.isInRunningSparkTask) { val conf = existingConf.get() if (conf != null) { conf diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index cdd57d7..734b8e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -1023,7 +1023,7 @@ object SparkSession extends Logging { * @since 2.2.0 */ def getActiveSession: Option[SparkSession] = { - if (TaskContext.get != null) { + if (Utils.isInRunningSparkTask) { // Return None when running on executors. None } else { @@ -1039,7 +1039,7 @@ object SparkSession extends Logging { * @since 2.2.0 */ def getDefaultSession: Option[SparkSession] = { - if (TaskContext.get != null) { + if (Utils.isInRunningSparkTask) { // Return None when running on executors. None } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index 628d4a3..70a508e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector} import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} +import org.apache.spark.util.Utils /** * Holds a user defined rule that can be used to inject columnar implementations of various @@ -66,7 +67,7 @@ trait ColumnarToRowTransition extends UnaryExecNode */ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition with CodegenSupport { // supportsColumnar requires to be only called on driver side, see also SPARK-37779. - assert(TaskContext.get != null || child.supportsColumnar) + assert(Utils.isInRunningSparkTask || child.supportsColumnar) override def output: Seq[Attribute] = child.output --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org