Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21299#discussion_r187841313 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala --- @@ -90,13 +92,33 @@ object SQLExecution { * thread from the original one, this method can be used to connect the Spark jobs in this action * with the known executionId, e.g., `BroadcastExchangeExec.relationFuture`. */ - def withExecutionId[T](sc: SparkContext, executionId: String)(body: => T): T = { + def withExecutionId[T](sparkSession: SparkSession, executionId: String)(body: => T): T = { + val sc = sparkSession.sparkContext val oldExecutionId = sc.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + withSQLConfPropagated(sparkSession) { + try { + sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId) + body + } finally { + sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId) + } + } + } + + def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T = { + // Set all the specified SQL configs to local properties, so that they can be available at + // the executor side. + val allConfigs = sparkSession.sessionState.conf.getAllConfs + for ((key, value) <- allConfigs) { + // Excludes external configs defined by users. + if (key.startsWith("spark")) sparkSession.sparkContext.setLocalProperty(key, value) + } try { - sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, executionId) body } finally { - sc.setLocalProperty(SQLExecution.EXECUTION_ID_KEY, oldExecutionId) + allConfigs.foreach { + case (key, _) => sparkSession.sparkContext.setLocalProperty(key, null) --- End diff -- good point, although it's very unlikely that users set some sql configs to local property. let me change it.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org