This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 2fc562c  [SPARK-30556][SQL][2.4] Copy sparkContext.localproperties to 
child thread inSubqueryExec.executionContext
2fc562c is described below

commit 2fc562cafd71ec8f438f37a28b65118906ab2ad2
Author: Ajith <ajith2...@gmail.com>
AuthorDate: Thu Jan 23 09:00:01 2020 -0800

    [SPARK-30556][SQL][2.4] Copy sparkContext.localproperties to child thread 
inSubqueryExec.executionContext
    
    ### What changes were proposed in this pull request?
    In `org.apache.spark.sql.execution.SubqueryExec#relationFuture` make a copy 
of `org.apache.spark.SparkContext#localProperties` and pass it to the 
sub-execution thread in 
`org.apache.spark.sql.execution.SubqueryExec#executionContext`
    
    ### Why are the changes needed?
    Local properties set via sparkContext are not available as TaskContext 
properties when executing  jobs and threadpools have idle threads which are 
reused
    
    Explanation:
    When `SubqueryExec`, the relationFuture is evaluated via a separate thread. 
The threads inherit the `localProperties` from `sparkContext` as they are the 
child threads.
    These threads are created in the `executionContext` (thread pools). Each 
Thread pool has a default keepAliveSeconds of 60 seconds for idle threads.
    Scenarios where the thread pool has threads which are idle and reused for a 
subsequent new query, the thread local properties will not be inherited from 
spark context (thread properties are inherited only on thread creation) hence 
end up having old or no properties set. This will cause taskset properties to 
be missing when properties are transferred by child thread via 
`sparkContext.runJob/submitJob`
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    Added UT
    
    Closes #27340 from ajithme/subquerylocalprop2.
    
    Authored-by: Ajith <ajith2...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../main/scala/org/apache/spark/util/Utils.scala   |  7 ++++
 .../apache/spark/sql/internal/StaticSQLConf.scala  |  7 ++++
 .../apache/spark/sql/execution/SQLExecution.scala  | 20 +++++++++++-
 .../sql/execution/basicPhysicalOperators.scala     | 10 ++++--
 .../sql/internal/ExecutorSideSQLConfSuite.scala    | 38 ++++++++++++++++++++--
 5 files changed, 76 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8f86b47..2e51614 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2863,6 +2863,13 @@ private[spark] object Utils extends Logging {
   def stringHalfWidth(str: String): Int = {
     if (str == null) 0 else str.length + fullWidthRegex.findAllIn(str).size
   }
+
+  /** Create a new properties object with the same values as `props` */
+  def cloneProperties(props: Properties): Properties = {
+    val resultProps = new Properties()
+    props.asScala.foreach(entry => resultProps.put(entry._1, entry._2))
+    resultProps
+  }
 }
 
 private[util] object CallerContext extends Logging {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
index d9c354b..4b5bb85 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
@@ -126,4 +126,11 @@ object StaticSQLConf {
       .intConf
       .createWithDefault(1000)
 
+  val SUBQUERY_MAX_THREAD_THRESHOLD =
+    buildStaticConf("spark.sql.subquery.maxThreadThreshold")
+      .internal()
+      .doc("The maximum degree of parallelism to execute the subquery.")
+      .intConf
+      .checkValue(thres => thres > 0 && thres <= 128, "The threshold must be 
in (0,128].")
+      .createWithDefault(16)
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 439932b..296076d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -20,9 +20,11 @@ package org.apache.spark.sql.execution
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicLong
 
-import org.apache.spark.SparkContext
+import scala.concurrent.{ExecutionContext, Future}
+
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, 
SparkListenerSQLExecutionStart}
+import org.apache.spark.util.Utils
 
 object SQLExecution {
 
@@ -129,4 +131,20 @@ object SQLExecution {
       }
     }
   }
+
+  /**
+   * Wrap passed function to ensure necessary thread-local variables like
+   * SparkContext local properties are forwarded to execution thread
+   */
+  def withThreadLocalCaptured[T](
+      sparkSession: SparkSession, exec: ExecutionContext)(body: => T): 
Future[T] = {
+    val activeSession = sparkSession
+    val sc = sparkSession.sparkContext
+    val localProps = Utils.cloneProperties(sc.getLocalProperties)
+    Future {
+      SparkSession.setActiveSession(activeSession)
+      sc.setLocalProperties(localProps)
+      body
+    }(exec)
+  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 27aa9b8..29af844 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.types.{LongType, StructType}
 import org.apache.spark.util.ThreadUtils
 import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
@@ -658,7 +659,9 @@ case class SubqueryExec(name: String, child: SparkPlan) 
extends UnaryExecNode {
   private lazy val relationFuture: Future[Array[InternalRow]] = {
     // relationFuture is used in "doExecute". Therefore we can get the 
execution id correctly here.
     val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    Future {
+    SQLExecution.withThreadLocalCaptured[Array[InternalRow]](
+      sqlContext.sparkSession,
+      SubqueryExec.executionContext) {
       // This will run in another thread. Set the execution id so that we can 
connect these jobs
       // with the correct execution.
       SQLExecution.withExecutionId(sqlContext.sparkSession, executionId) {
@@ -673,7 +676,7 @@ case class SubqueryExec(name: String, child: SparkPlan) 
extends UnaryExecNode {
         SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, 
metrics.values.toSeq)
         rows
       }
-    }(SubqueryExec.executionContext)
+    }
   }
 
   protected override def doPrepare(): Unit = {
@@ -691,5 +694,6 @@ case class SubqueryExec(name: String, child: SparkPlan) 
extends UnaryExecNode {
 
 object SubqueryExec {
   private[execution] val executionContext = 
ExecutionContext.fromExecutorService(
-    ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
+    ThreadUtils.newDaemonCachedThreadPool("subquery",
+      SQLConf.get.getConf(StaticSQLConf.SUBQUERY_MAX_THREAD_THRESHOLD)))
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
index ae7206b..2233f4d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
@@ -17,9 +17,9 @@
 
 package org.apache.spark.sql.internal
 
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.{SparkException, SparkFunSuite, TaskContext}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
@@ -129,6 +129,40 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with 
SQLTestUtils {
       }
     }
   }
+
+  test("SPARK-30556 propagate local properties to subquery execution thread") {
+    withSQLConf(StaticSQLConf.SUBQUERY_MAX_THREAD_THRESHOLD.key -> "1") {
+      withTempView("l", "m", "n") {
+        Seq(true).toDF().createOrReplaceTempView("l")
+        val confKey = "spark.sql.y"
+
+        def createDataframe(confKey: String, confValue: String): 
Dataset[Boolean] = {
+          Seq(true)
+            .toDF()
+            .mapPartitions { _ =>
+              TaskContext.get.getLocalProperty(confKey) == confValue match {
+                case true => Iterator(true)
+                case false => Iterator.empty
+              }
+            }
+        }
+
+        // set local configuration and assert
+        val confValue1 = "e"
+        createDataframe(confKey, confValue1).createOrReplaceTempView("m")
+        spark.sparkContext.setLocalProperty(confKey, confValue1)
+        val result1 = sql("SELECT value, (SELECT MAX(*) FROM m) x FROM 
l").collect
+        assert(result1.forall(_.getBoolean(1)))
+
+        // change the conf value and assert again
+        val confValue2 = "f"
+        createDataframe(confKey, confValue2).createOrReplaceTempView("n")
+        spark.sparkContext.setLocalProperty(confKey, confValue2)
+        val result2 = sql("SELECT value, (SELECT MAX(*) FROM n) x FROM 
l").collect
+        assert(result2.forall(_.getBoolean(1)))
+      }
+    }
+  }
 }
 
 case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends 
LeafExecNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to