This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new 2fc562c [SPARK-30556][SQL][2.4] Copy sparkContext.localproperties to child thread inSubqueryExec.executionContext 2fc562c is described below commit 2fc562cafd71ec8f438f37a28b65118906ab2ad2 Author: Ajith <ajith2...@gmail.com> AuthorDate: Thu Jan 23 09:00:01 2020 -0800 [SPARK-30556][SQL][2.4] Copy sparkContext.localproperties to child thread inSubqueryExec.executionContext ### What changes were proposed in this pull request? In `org.apache.spark.sql.execution.SubqueryExec#relationFuture` make a copy of `org.apache.spark.SparkContext#localProperties` and pass it to the sub-execution thread in `org.apache.spark.sql.execution.SubqueryExec#executionContext` ### Why are the changes needed? Local properties set via sparkContext are not available as TaskContext properties when executing jobs and threadpools have idle threads which are reused Explanation: When `SubqueryExec`, the relationFuture is evaluated via a separate thread. The threads inherit the `localProperties` from `sparkContext` as they are the child threads. These threads are created in the `executionContext` (thread pools). Each Thread pool has a default keepAliveSeconds of 60 seconds for idle threads. Scenarios where the thread pool has threads which are idle and reused for a subsequent new query, the thread local properties will not be inherited from spark context (thread properties are inherited only on thread creation) hence end up having old or no properties set. This will cause taskset properties to be missing when properties are transferred by child thread via `sparkContext.runJob/submitJob` ### Does this PR introduce any user-facing change? No ### How was this patch tested? Added UT Closes #27340 from ajithme/subquerylocalprop2. Authored-by: Ajith <ajith2...@gmail.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../main/scala/org/apache/spark/util/Utils.scala | 7 ++++ .../apache/spark/sql/internal/StaticSQLConf.scala | 7 ++++ .../apache/spark/sql/execution/SQLExecution.scala | 20 +++++++++++- .../sql/execution/basicPhysicalOperators.scala | 10 ++++-- .../sql/internal/ExecutorSideSQLConfSuite.scala | 38 ++++++++++++++++++++-- 5 files changed, 76 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8f86b47..2e51614 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2863,6 +2863,13 @@ private[spark] object Utils extends Logging { def stringHalfWidth(str: String): Int = { if (str == null) 0 else str.length + fullWidthRegex.findAllIn(str).size } + + /** Create a new properties object with the same values as `props` */ + def cloneProperties(props: Properties): Properties = { + val resultProps = new Properties() + props.asScala.foreach(entry => resultProps.put(entry._1, entry._2)) + resultProps + } } private[util] object CallerContext extends Logging { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index d9c354b..4b5bb85 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -126,4 +126,11 @@ object StaticSQLConf { .intConf .createWithDefault(1000) + val SUBQUERY_MAX_THREAD_THRESHOLD = + buildStaticConf("spark.sql.subquery.maxThreadThreshold") + .internal() + .doc("The maximum degree of parallelism to execute the subquery.") + .intConf + .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be in (0,128].") + .createWithDefault(16) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 439932b..296076d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicLong -import org.apache.spark.SparkContext +import scala.concurrent.{ExecutionContext, Future} + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} +import org.apache.spark.util.Utils object SQLExecution { @@ -129,4 +131,20 @@ object SQLExecution { } } } + + /** + * Wrap passed function to ensure necessary thread-local variables like + * SparkContext local properties are forwarded to execution thread + */ + def withThreadLocalCaptured[T]( + sparkSession: SparkSession, exec: ExecutionContext)(body: => T): Future[T] = { + val activeSession = sparkSession + val sc = sparkSession.sparkContext + val localProps = Utils.cloneProperties(sc.getLocalProperties) + Future { + SparkSession.setActiveSession(activeSession) + sc.setLocalProperties(localProps) + body + }(exec) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 27aa9b8..29af844 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.{LongType, StructType} import org.apache.spark.util.ThreadUtils import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} @@ -658,7 +659,9 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { private lazy val relationFuture: Future[Array[InternalRow]] = { // relationFuture is used in "doExecute". Therefore we can get the execution id correctly here. val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - Future { + SQLExecution.withThreadLocalCaptured[Array[InternalRow]]( + sqlContext.sparkSession, + SubqueryExec.executionContext) { // This will run in another thread. Set the execution id so that we can connect these jobs // with the correct execution. SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) { @@ -673,7 +676,7 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq) rows } - }(SubqueryExec.executionContext) + } } protected override def doPrepare(): Unit = { @@ -691,5 +694,6 @@ case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode { object SubqueryExec { private[execution] val executionContext = ExecutionContext.fromExecutorService( - ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) + ThreadUtils.newDaemonCachedThreadPool("subquery", + SQLConf.get.getConf(StaticSQLConf.SUBQUERY_MAX_THREAD_THRESHOLD))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala index ae7206b..2233f4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala @@ -17,9 +17,9 @@ package org.apache.spark.sql.internal -import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.{SparkException, SparkFunSuite, TaskContext} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LocalRelation @@ -129,6 +129,40 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { } } } + + test("SPARK-30556 propagate local properties to subquery execution thread") { + withSQLConf(StaticSQLConf.SUBQUERY_MAX_THREAD_THRESHOLD.key -> "1") { + withTempView("l", "m", "n") { + Seq(true).toDF().createOrReplaceTempView("l") + val confKey = "spark.sql.y" + + def createDataframe(confKey: String, confValue: String): Dataset[Boolean] = { + Seq(true) + .toDF() + .mapPartitions { _ => + TaskContext.get.getLocalProperty(confKey) == confValue match { + case true => Iterator(true) + case false => Iterator.empty + } + } + } + + // set local configuration and assert + val confValue1 = "e" + createDataframe(confKey, confValue1).createOrReplaceTempView("m") + spark.sparkContext.setLocalProperty(confKey, confValue1) + val result1 = sql("SELECT value, (SELECT MAX(*) FROM m) x FROM l").collect + assert(result1.forall(_.getBoolean(1))) + + // change the conf value and assert again + val confValue2 = "f" + createDataframe(confKey, confValue2).createOrReplaceTempView("n") + spark.sparkContext.setLocalProperty(confKey, confValue2) + val result2 = sql("SELECT value, (SELECT MAX(*) FROM n) x FROM l").collect + assert(result2.forall(_.getBoolean(1))) + } + } + } } case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends LeafExecNode { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org