Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21376#discussion_r189465307
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala ---
    @@ -90,13 +92,37 @@ object SQLExecution {
        * thread from the original one, this method can be used to connect the 
Spark jobs in this action
        * with the known executionId, e.g., 
`BroadcastExchangeExec.relationFuture`.
        */
    -  def withExecutionId[T](sc: SparkContext, executionId: String)(body: => 
T): T = {
    +  def withExecutionId[T](sparkSession: SparkSession, executionId: 
String)(body: => T): T = {
    +    val sc = sparkSession.sparkContext
         val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
    +    withSQLConfPropagated(sparkSession) {
    +      try {
    +        sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId)
    +        body
    +      } finally {
    +        sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
    +      }
    +    }
    +  }
    +
    +  def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T 
= {
    +    val sc = sparkSession.sparkContext
    +    // Set all the specified SQL configs to local properties, so that they 
can be available at
    +    // the executor side.
    +    val allConfigs = sparkSession.sessionState.conf.getAllConfs
    +    val originalLocalProps = allConfigs.collect {
    +      case (key, value) if key.startsWith("spark") =>
    +        val originalValue = sc.getLocalProperty(key)
    +        sc.setLocalProperty(key, value)
    +        (key, originalValue)
    +    }
    +
         try {
    -      sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId)
           body
         } finally {
    -      sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId)
    +      for ((key, value) <- originalLocalProps) {
    --- End diff --
    
    before we set the original one, should we reset the new key with null 
values?
    
    ```Scala
      def setLocalProperty(key: String, value: String) {
        if (value == null) {
          localProperties.get.remove(key)
        } else {
          localProperties.get.setProperty(key, value)
        }
      }
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to