LantaoJin commented on pull request #31119:
URL: https://github.com/apache/spark/pull/31119#issuecomment-761926681


   ```
   diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
   index 50cc47d0f8..3d2827ecb7 100644
   --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
   +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
   @@ -977,6 +977,15 @@ object SQLConf {
          .timeConf(TimeUnit.SECONDS)
          .createWithDefault(0L)
   
   +  val THRIFTSERVER_BROADCAST_CANCEL =
   +    buildConf("spark.sql.thriftServer.broadcastCancel")
   +      .internal()
   +      .doc("When true, cancel the related broadcast sub-jobs when SQL 
statement is cancelled. " +
   +        "This configuration is only used internally and don't set it 
manually.")
   +      .version("3.2.0")
   +      .booleanConf
   +      .createWithDefault(false)
   +
      val THRIFTSERVER_UI_STATEMENT_LIMIT =
        buildConf("spark.sql.thriftserver.ui.retainedStatements")
          .doc("The number of SQL statements kept in the JDBC/ODBC web UI 
history.")
   diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
   index c322d5eef5..39ea035d62 100644
   --- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
   +++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
   @@ -76,8 +76,12 @@ case class BroadcastExchangeExec(
   
      // Cancelling a SQL statement from Spark ThriftServer needs to cancel
      // its related broadcast sub-jobs. So set the run id to job group id if 
exists.
   -  override val runId: UUID = 
Option(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
   -      .map(UUID.fromString).getOrElse(UUID.randomUUID)
   +  override val runId: UUID =
   +    if (SQLConf.get.getConf(SQLConf.THRIFTSERVER_BROADCAST_CANCEL)) {
   +      
UUID.fromString(sparkContext.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID))
   +    } else {
   +      UUID.randomUUID
   +    }
   
      override lazy val metrics = Map(
        "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
   diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
   index 8ca0ab91a7..68a02842ef 100644
   --- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
   +++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
   @@ -285,7 +285,7 @@ private[hive] class SparkExecuteStatementOperation(
          if (!runInBackground) {
            
parentSession.getSessionState.getConf.setClassLoader(executionHiveClassLoader)
          }
   -
   +      sqlContext.conf.setConf(SQLConf.THRIFTSERVER_BROADCAST_CANCEL, true)
          sqlContext.sparkContext.setJobGroup(statementId, 
substitutorStatement, forceCancel)
          result = sqlContext.sql(statement)
          logDebug(result.queryExecution.toString())
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to