This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6450c59 [SPARK-26992][STS] Fix STS scheduler pool correct delivery 6450c59 is described below commit 6450c5948adb58cbf3afaaf249560c81a4164cf6 Author: cxzl25 <cxz...@users.noreply.github.com> AuthorDate: Sat Apr 6 17:14:29 2019 -0500 [SPARK-26992][STS] Fix STS scheduler pool correct delivery ## What changes were proposed in this pull request? The user sets the value of spark.sql.thriftserver.scheduler.pool. Spark thrift server saves this value in the LocalProperty of threadlocal type, but does not clean up after running, causing other sessions to run in the previously set pool name. ## How was this patch tested? manual tests Closes #23895 from cxzl25/thrift_server_scheduler_pool_pollute. Lead-authored-by: cxzl25 <cxz...@users.noreply.github.com> Co-authored-by: sychen <syc...@ctrip.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../SparkExecuteStatementOperation.scala | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index b05307e..3862d6c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -109,7 +109,7 @@ private[hive] class SparkExecuteStatementOperation( } } - def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = { + def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = withSchedulerPool { validateDefaultFetchOrientation(order) assertState(OperationState.FINISHED) setHasResultSet(true) @@ -210,7 +210,7 @@ private[hive] class SparkExecuteStatementOperation( } } - private def execute(): Unit = { + private def execute(): Unit = withSchedulerPool { statementId = UUID.randomUUID().toString logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) @@ -225,10 +225,6 @@ private[hive] class SparkExecuteStatementOperation( statementId, parentSession.getUsername) sqlContext.sparkContext.setJobGroup(statementId, statement) - val pool = sessionToActivePool.get(parentSession.getSessionHandle) - // It may have unpredictably behavior since we use thread pools to execute quries and - // the 'spark.scheduler.pool' may not be 'default' when we did not set its value.(SPARK-26914) - sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool) try { result = sqlContext.sql(statement) logDebug(result.queryExecution.toString()) @@ -291,6 +287,20 @@ private[hive] class SparkExecuteStatementOperation( sqlContext.sparkContext.cancelJobGroup(statementId) } } + + private def withSchedulerPool[T](body: => T): T = { + val pool = sessionToActivePool.get(parentSession.getSessionHandle) + if (pool != null) { + sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, pool) + } + try { + body + } finally { + if (pool != null) { + sqlContext.sparkContext.setLocalProperty(SparkContext.SPARK_SCHEDULER_POOL, null) + } + } + } } object SparkExecuteStatementOperation { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org