Repository: spark Updated Branches: refs/heads/master 6ebe419f3 -> eb19d3f75
[SPARK-6964] [SQL] Support Cancellation in the Thrift Server Support runInBackground in SparkExecuteStatementOperation, and add cancellation Author: Dong Wang <d...@databricks.com> Closes #6207 from dongwang218/SPARK-6964-jdbc-cancel and squashes the following commits: 687c113 [Dong Wang] fix 100 characters 7bfa2a7 [Dong Wang] fix merge 380480f [Dong Wang] fix for liancheng's comments eb3e385 [Dong Wang] small nit 341885b [Dong Wang] small fix 3d8ebf8 [Dong Wang] add spark.sql.hive.thriftServer.async flag 04142c3 [Dong Wang] set SQLSession for async execution 184ec35 [Dong Wang] keep hive conf 819ae03 [Dong Wang] [SPARK-6964][SQL][WIP] Support Cancellation in the Thrift Server Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb19d3f7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb19d3f7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb19d3f7 Branch: refs/heads/master Commit: eb19d3f75cbd002f7e72ce02017a8de67f562792 Parents: 6ebe419 Author: Dong Wang <d...@databricks.com> Authored: Fri Jun 5 17:41:12 2015 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Fri Jun 5 17:41:12 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/SQLContext.scala | 5 + .../SparkExecuteStatementOperation.scala | 164 +++++++++++++++++-- .../server/SparkSQLOperationManager.scala | 7 +- .../thriftserver/HiveThriftServer2Suites.scala | 42 ++++- .../org/apache/spark/sql/hive/HiveContext.scala | 6 + 5 files changed, 208 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/eb19d3f7/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 0aab7fa..ddb5402 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -916,6 +916,11 @@ class SQLContext(@transient val sparkContext: SparkContext) tlSession.remove() } + protected[sql] def setSession(session: SQLSession): Unit = { + detachSession() + tlSession.set(session) + } + protected[sql] class SQLSession { // Note that this is a lazy val so we can override the default value in subclasses. protected[sql] lazy val conf: SQLConf = new SQLConf http://git-wip-us.apache.org/repos/asf/spark/blob/eb19d3f7/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index c0d1266..e071103 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -17,11 +17,23 @@ package org.apache.spark.sql.hive.thriftserver +import java.security.PrivilegedExceptionAction import java.sql.{Date, Timestamp} +import java.util.concurrent.RejectedExecutionException import java.util.{Map => JMap, UUID} +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ArrayBuffer, Map => SMap} +import scala.util.control.NonFatal + +import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hive.service.cli._ +import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.metadata.HiveException +import org.apache.hadoop.hive.ql.session.SessionState +import org.apache.hadoop.hive.shims.ShimLoader +import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.cli.operation.ExecuteStatementOperation import org.apache.hive.service.cli.session.HiveSession @@ -31,8 +43,6 @@ import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf} -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, Map => SMap} private[hive] class SparkExecuteStatementOperation( parentSession: HiveSession, @@ -40,17 +50,19 @@ private[hive] class SparkExecuteStatementOperation( confOverlay: JMap[String, String], runInBackground: Boolean = true) (hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, String]) - // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution - extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) + extends ExecuteStatementOperation(parentSession, statement, confOverlay, runInBackground) with Logging { private var result: DataFrame = _ private var iter: Iterator[SparkRow] = _ private var dataTypes: Array[DataType] = _ + private var statementId: String = _ def close(): Unit = { // RDDs will be cleaned automatically upon garbage collection. - logDebug("CLOSING") + hiveContext.sparkContext.clearJobGroup() + logDebug(s"CLOSING $statementId") + cleanup(OperationState.CLOSED) } def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) { @@ -114,10 +126,10 @@ private[hive] class SparkExecuteStatementOperation( } def getResultSetSchema: TableSchema = { - logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") - if (result.queryExecution.analyzed.output.size == 0) { + if (result == null || result.queryExecution.analyzed.output.size == 0) { new TableSchema(new FieldSchema("Result", "string", "") :: Nil) } else { + logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") val schema = result.queryExecution.analyzed.output.map { attr => new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") } @@ -125,9 +137,73 @@ private[hive] class SparkExecuteStatementOperation( } } - def run(): Unit = { - val statementId = UUID.randomUUID().toString - logInfo(s"Running query '$statement'") + override def run(): Unit = { + setState(OperationState.PENDING) + setHasResultSet(true) // avoid no resultset for async run + + if (!runInBackground) { + runInternal() + } else { + val parentSessionState = SessionState.get() + val hiveConf = getConfigForOperation() + val sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf) + val sessionHive = getCurrentHive() + val currentSqlSession = hiveContext.currentSession + + // Runnable impl to call runInternal asynchronously, + // from a different thread + val backgroundOperation = new Runnable() { + + override def run(): Unit = { + val doAsAction = new PrivilegedExceptionAction[Object]() { + override def run(): Object = { + + // User information is part of the metastore client member in Hive + hiveContext.setSession(currentSqlSession) + Hive.set(sessionHive) + SessionState.setCurrentSessionState(parentSessionState) + try { + runInternal() + } catch { + case e: HiveSQLException => + setOperationException(e) + log.error("Error running hive query: ", e) + } + return null + } + } + + try { + ShimLoader.getHadoopShims().doAs(sparkServiceUGI, doAsAction) + } catch { + case e: Exception => + setOperationException(new HiveSQLException(e)) + logError("Error running hive query as user : " + + sparkServiceUGI.getShortUserName(), e) + } + } + } + try { + // This submit blocks if no background threads are available to run this operation + val backgroundHandle = + getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation) + setBackgroundHandle(backgroundHandle) + } catch { + case rejected: RejectedExecutionException => + setState(OperationState.ERROR) + throw new HiveSQLException("The background threadpool cannot accept" + + " new task for execution, please retry the operation", rejected) + case NonFatal(e) => + logError(s"Error executing query in background", e) + setState(OperationState.ERROR) + throw e + } + } + } + + private def runInternal(): Unit = { + statementId = UUID.randomUUID().toString + logInfo(s"Running query '$statement' with $statementId") setState(OperationState.RUNNING) HiveThriftServer2.listener.onStatementStart( statementId, @@ -159,18 +235,82 @@ private[hive] class SparkExecuteStatementOperation( } } dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray - setHasResultSet(true) } catch { + case e: HiveSQLException => + if (getStatus().getState() == OperationState.CANCELED) { + return + } else { + setState(OperationState.ERROR); + throw e + } // Actually do need to catch Throwable as some failures don't inherit from Exception and // HiveServer will silently swallow them. case e: Throwable => + val currentState = getStatus().getState() + logError(s"Error executing query, currentState $currentState, ", e) setState(OperationState.ERROR) HiveThriftServer2.listener.onStatementError( statementId, e.getMessage, e.getStackTraceString) - logError("Error executing query:", e) throw new HiveSQLException(e.toString) } setState(OperationState.FINISHED) HiveThriftServer2.listener.onStatementFinish(statementId) } + + override def cancel(): Unit = { + logInfo(s"Cancel '$statement' with $statementId") + if (statementId != null) { + hiveContext.sparkContext.cancelJobGroup(statementId) + } + cleanup(OperationState.CANCELED) + } + + private def cleanup(state: OperationState) { + setState(state) + if (runInBackground) { + val backgroundHandle = getBackgroundHandle() + if (backgroundHandle != null) { + backgroundHandle.cancel(true) + } + } + } + + /** + * If there are query specific settings to overlay, then create a copy of config + * There are two cases we need to clone the session config that's being passed to hive driver + * 1. Async query - + * If the client changes a config setting, that shouldn't reflect in the execution + * already underway + * 2. confOverlay - + * The query specific settings should only be applied to the query config and not session + * @return new configuration + * @throws HiveSQLException + */ + private def getConfigForOperation(): HiveConf = { + var sqlOperationConf = getParentSession().getHiveConf() + if (!getConfOverlay().isEmpty() || runInBackground) { + // clone the partent session config for this query + sqlOperationConf = new HiveConf(sqlOperationConf) + + // apply overlay query specific settings, if any + getConfOverlay().foreach { case (k, v) => + try { + sqlOperationConf.verifyAndSet(k, v) + } catch { + case e: IllegalArgumentException => + throw new HiveSQLException("Error applying statement specific settings", e) + } + } + } + return sqlOperationConf + } + + private def getCurrentHive(): Hive = { + try { + return Hive.get() + } catch { + case e: HiveException => + throw new HiveSQLException("Failed to get current Hive object", e); + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/eb19d3f7/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala index 9c0bf02..c8031ed 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala @@ -44,9 +44,12 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext) confOverlay: JMap[String, String], async: Boolean): ExecuteStatementOperation = synchronized { - val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay)( - hiveContext, sessionToActivePool) + val runInBackground = async && hiveContext.hiveThriftServerAsync + val operation = new SparkExecuteStatementOperation(parentSession, statement, confOverlay, + runInBackground)(hiveContext, sessionToActivePool) handleToOperation.put(operation.getHandle, operation) + logDebug(s"Created Operation for $statement with session=$parentSession, " + + s"runInBackground=$runInBackground") operation } } http://git-wip-us.apache.org/repos/asf/spark/blob/eb19d3f7/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index f57c708..178bd1f 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -19,11 +19,13 @@ package org.apache.spark.sql.hive.thriftserver import java.io.File import java.net.URL -import java.sql.{Date, DriverManager, Statement} +import java.nio.charset.StandardCharsets +import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ -import scala.concurrent.{Await, Promise} +import scala.concurrent.{Await, Promise, future} +import scala.concurrent.ExecutionContext.Implicits.global import scala.sys.process.{Process, ProcessLogger} import scala.util.{Random, Try} @@ -338,6 +340,42 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } ) } + + test("test jdbc cancel") { + withJdbcStatement { statement => + val queries = Seq( + "DROP TABLE IF EXISTS test_map", + "CREATE TABLE test_map(key INT, value STRING)", + s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map") + + queries.foreach(statement.execute) + + val largeJoin = "SELECT COUNT(*) FROM test_map " + + List.fill(10)("join test_map").mkString(" ") + val f = future { Thread.sleep(100); statement.cancel(); } + val e = intercept[SQLException] { + statement.executeQuery(largeJoin) + } + assert(e.getMessage contains "cancelled") + Await.result(f, 3.minute) + + // cancel is a noop + statement.executeQuery("SET spark.sql.hive.thriftServer.async=false") + val sf = future { Thread.sleep(100); statement.cancel(); } + val smallJoin = "SELECT COUNT(*) FROM test_map " + + List.fill(4)("join test_map").mkString(" ") + val rs1 = statement.executeQuery(smallJoin) + Await.result(sf, 3.minute) + rs1.next() + assert(rs1.getInt(1) === math.pow(5, 5)) + rs1.close() + + val rs2 = statement.executeQuery("SELECT COUNT(*) FROM test_map") + rs2.next() + assert(rs2.getInt(1) === 5) + rs2.close() + } + } } class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { http://git-wip-us.apache.org/repos/asf/spark/blob/eb19d3f7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 800f51c..b8f294c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -144,6 +144,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { getConf("spark.sql.hive.metastore.barrierPrefixes", "") .split(",").filterNot(_ == "") + /* + * hive thrift server use background spark sql thread pool to execute sql queries + */ + protected[hive] def hiveThriftServerAsync: Boolean = + getConf("spark.sql.hive.thriftServer.async", "true").toBoolean + @transient protected[sql] lazy val substitutor = new VariableSubstitution() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org