Repository: spark
Updated Branches:
  refs/heads/master 6ebe419f3 -> eb19d3f75


[SPARK-6964] [SQL] Support Cancellation in the Thrift Server

Support runInBackground in SparkExecuteStatementOperation, and add cancellation

Author: Dong Wang <d...@databricks.com>

Closes #6207 from dongwang218/SPARK-6964-jdbc-cancel and squashes the following 
commits:

687c113 [Dong Wang] fix 100 characters
7bfa2a7 [Dong Wang] fix merge
380480f [Dong Wang] fix for liancheng's comments
eb3e385 [Dong Wang] small nit
341885b [Dong Wang] small fix
3d8ebf8 [Dong Wang] add spark.sql.hive.thriftServer.async flag
04142c3 [Dong Wang] set SQLSession for async execution
184ec35 [Dong Wang] keep hive conf
819ae03 [Dong Wang] [SPARK-6964][SQL][WIP] Support Cancellation in the Thrift 
Server


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb19d3f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb19d3f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb19d3f7

Branch: refs/heads/master
Commit: eb19d3f75cbd002f7e72ce02017a8de67f562792
Parents: 6ebe419
Author: Dong Wang <d...@databricks.com>
Authored: Fri Jun 5 17:41:12 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Jun 5 17:41:12 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala |   5 +
 .../SparkExecuteStatementOperation.scala        | 164 +++++++++++++++++--
 .../server/SparkSQLOperationManager.scala       |   7 +-
 .../thriftserver/HiveThriftServer2Suites.scala  |  42 ++++-
 .../org/apache/spark/sql/hive/HiveContext.scala |   6 +
 5 files changed, 208 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/eb19d3f7/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 0aab7fa..ddb5402 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -916,6 +916,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
     tlSession.remove()
   }
 
+  protected[sql] def setSession(session: SQLSession): Unit = {
+    detachSession()
+    tlSession.set(session)
+  }
+
   protected[sql] class SQLSession {
     // Note that this is a lazy val so we can override the default value in 
subclasses.
     protected[sql] lazy val conf: SQLConf = new SQLConf

http://git-wip-us.apache.org/repos/asf/spark/blob/eb19d3f7/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index c0d1266..e071103 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -17,11 +17,23 @@
 
 package org.apache.spark.sql.hive.thriftserver
 
+import java.security.PrivilegedExceptionAction
 import java.sql.{Date, Timestamp}
+import java.util.concurrent.RejectedExecutionException
 import java.util.{Map => JMap, UUID}
 
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, Map => SMap}
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.metastore.api.FieldSchema
 import org.apache.hive.service.cli._
+import org.apache.hadoop.hive.ql.metadata.Hive
+import org.apache.hadoop.hive.ql.metadata.HiveException
+import org.apache.hadoop.hive.ql.session.SessionState
+import org.apache.hadoop.hive.shims.ShimLoader
+import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hive.service.cli.operation.ExecuteStatementOperation
 import org.apache.hive.service.cli.session.HiveSession
 
@@ -31,8 +43,6 @@ import org.apache.spark.sql.hive.{HiveContext, 
HiveMetastoreTypes}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{DataFrame, Row => SparkRow, SQLConf}
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, Map => SMap}
 
 private[hive] class SparkExecuteStatementOperation(
     parentSession: HiveSession,
@@ -40,17 +50,19 @@ private[hive] class SparkExecuteStatementOperation(
     confOverlay: JMap[String, String],
     runInBackground: Boolean = true)
     (hiveContext: HiveContext, sessionToActivePool: SMap[SessionHandle, 
String])
-  // NOTE: `runInBackground` is set to `false` intentionally to disable 
asynchronous execution
-  extends ExecuteStatementOperation(parentSession, statement, confOverlay, 
false)
+  extends ExecuteStatementOperation(parentSession, statement, confOverlay, 
runInBackground)
   with Logging {
 
   private var result: DataFrame = _
   private var iter: Iterator[SparkRow] = _
   private var dataTypes: Array[DataType] = _
+  private var statementId: String = _
 
   def close(): Unit = {
     // RDDs will be cleaned automatically upon garbage collection.
-    logDebug("CLOSING")
+    hiveContext.sparkContext.clearJobGroup()
+    logDebug(s"CLOSING $statementId")
+    cleanup(OperationState.CLOSED)
   }
 
   def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: 
Int) {
@@ -114,10 +126,10 @@ private[hive] class SparkExecuteStatementOperation(
   }
 
   def getResultSetSchema: TableSchema = {
-    logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
-    if (result.queryExecution.analyzed.output.size == 0) {
+    if (result == null || result.queryExecution.analyzed.output.size == 0) {
       new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
     } else {
+      logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}")
       val schema = result.queryExecution.analyzed.output.map { attr =>
         new FieldSchema(attr.name, 
HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
       }
@@ -125,9 +137,73 @@ private[hive] class SparkExecuteStatementOperation(
     }
   }
 
-  def run(): Unit = {
-    val statementId = UUID.randomUUID().toString
-    logInfo(s"Running query '$statement'")
+  override def run(): Unit = {
+    setState(OperationState.PENDING)
+    setHasResultSet(true) // avoid no resultset for async run
+
+    if (!runInBackground) {
+      runInternal()
+    } else {
+      val parentSessionState = SessionState.get()
+      val hiveConf = getConfigForOperation()
+      val sparkServiceUGI = ShimLoader.getHadoopShims.getUGIForConf(hiveConf)
+      val sessionHive = getCurrentHive()
+      val currentSqlSession = hiveContext.currentSession
+
+      // Runnable impl to call runInternal asynchronously,
+      // from a different thread
+      val backgroundOperation = new Runnable() {
+
+        override def run(): Unit = {
+          val doAsAction = new PrivilegedExceptionAction[Object]() {
+            override def run(): Object = {
+
+              // User information is part of the metastore client member in 
Hive
+              hiveContext.setSession(currentSqlSession)
+              Hive.set(sessionHive)
+              SessionState.setCurrentSessionState(parentSessionState)
+              try {
+                runInternal()
+              } catch {
+                case e: HiveSQLException =>
+                  setOperationException(e)
+                  log.error("Error running hive query: ", e)
+              }
+              return null
+            }
+          }
+
+          try {
+            ShimLoader.getHadoopShims().doAs(sparkServiceUGI, doAsAction)
+          } catch {
+            case e: Exception =>
+              setOperationException(new HiveSQLException(e))
+              logError("Error running hive query as user : " +
+                sparkServiceUGI.getShortUserName(), e)
+          }
+        }
+      }
+      try {
+        // This submit blocks if no background threads are available to run 
this operation
+        val backgroundHandle =
+          
getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation)
+        setBackgroundHandle(backgroundHandle)
+      } catch {
+        case rejected: RejectedExecutionException =>
+          setState(OperationState.ERROR)
+          throw new HiveSQLException("The background threadpool cannot accept" 
+
+            " new task for execution, please retry the operation", rejected)
+        case NonFatal(e) =>
+          logError(s"Error executing query in background", e)
+          setState(OperationState.ERROR)
+          throw e
+      }
+    }
+  }
+
+  private def runInternal(): Unit = {
+    statementId = UUID.randomUUID().toString
+    logInfo(s"Running query '$statement' with $statementId")
     setState(OperationState.RUNNING)
     HiveThriftServer2.listener.onStatementStart(
       statementId,
@@ -159,18 +235,82 @@ private[hive] class SparkExecuteStatementOperation(
         }
       }
       dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
-      setHasResultSet(true)
     } catch {
+      case e: HiveSQLException =>
+        if (getStatus().getState() == OperationState.CANCELED) {
+          return
+        } else {
+          setState(OperationState.ERROR);
+          throw e
+        }
       // Actually do need to catch Throwable as some failures don't inherit 
from Exception and
       // HiveServer will silently swallow them.
       case e: Throwable =>
+        val currentState = getStatus().getState()
+        logError(s"Error executing query, currentState $currentState, ", e)
         setState(OperationState.ERROR)
         HiveThriftServer2.listener.onStatementError(
           statementId, e.getMessage, e.getStackTraceString)
-        logError("Error executing query:", e)
         throw new HiveSQLException(e.toString)
     }
     setState(OperationState.FINISHED)
     HiveThriftServer2.listener.onStatementFinish(statementId)
   }
+
+  override def cancel(): Unit = {
+    logInfo(s"Cancel '$statement' with $statementId")
+    if (statementId != null) {
+      hiveContext.sparkContext.cancelJobGroup(statementId)
+    }
+    cleanup(OperationState.CANCELED)
+  }
+
+  private def cleanup(state: OperationState) {
+    setState(state)
+    if (runInBackground) {
+      val backgroundHandle = getBackgroundHandle()
+      if (backgroundHandle != null) {
+        backgroundHandle.cancel(true)
+      }
+    }
+  }
+
+  /**
+   * If there are query specific settings to overlay, then create a copy of 
config
+   * There are two cases we need to clone the session config that's being 
passed to hive driver
+   * 1. Async query -
+   *    If the client changes a config setting, that shouldn't reflect in the 
execution
+   *    already underway
+   * 2. confOverlay -
+   *    The query specific settings should only be applied to the query config 
and not session
+   * @return new configuration
+   * @throws HiveSQLException
+   */
+  private def getConfigForOperation(): HiveConf = {
+    var sqlOperationConf = getParentSession().getHiveConf()
+    if (!getConfOverlay().isEmpty() || runInBackground) {
+      // clone the partent session config for this query
+      sqlOperationConf = new HiveConf(sqlOperationConf)
+
+      // apply overlay query specific settings, if any
+      getConfOverlay().foreach { case (k, v) =>
+        try {
+          sqlOperationConf.verifyAndSet(k, v)
+        } catch {
+          case e: IllegalArgumentException =>
+            throw new HiveSQLException("Error applying statement specific 
settings", e)
+        }
+      }
+    }
+    return sqlOperationConf
+  }
+
+  private def getCurrentHive(): Hive = {
+    try {
+      return Hive.get()
+    } catch {
+      case e: HiveException =>
+        throw new HiveSQLException("Failed to get current Hive object", e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/eb19d3f7/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index 9c0bf02..c8031ed 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -44,9 +44,12 @@ private[thriftserver] class 
SparkSQLOperationManager(hiveContext: HiveContext)
       confOverlay: JMap[String, String],
       async: Boolean): ExecuteStatementOperation = synchronized {
 
-    val operation = new SparkExecuteStatementOperation(parentSession, 
statement, confOverlay)(
-      hiveContext, sessionToActivePool)
+    val runInBackground = async && hiveContext.hiveThriftServerAsync
+    val operation = new SparkExecuteStatementOperation(parentSession, 
statement, confOverlay,
+      runInBackground)(hiveContext, sessionToActivePool)
     handleToOperation.put(operation.getHandle, operation)
+    logDebug(s"Created Operation for $statement with session=$parentSession, " 
+
+      s"runInBackground=$runInBackground")
     operation
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/eb19d3f7/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index f57c708..178bd1f 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -19,11 +19,13 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.io.File
 import java.net.URL
-import java.sql.{Date, DriverManager, Statement}
+import java.nio.charset.StandardCharsets
+import java.sql.{Date, DriverManager, SQLException, Statement}
 
 import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.duration._
-import scala.concurrent.{Await, Promise}
+import scala.concurrent.{Await, Promise, future}
+import scala.concurrent.ExecutionContext.Implicits.global
 import scala.sys.process.{Process, ProcessLogger}
 import scala.util.{Random, Try}
 
@@ -338,6 +340,42 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
       }
     )
   }
+
+  test("test jdbc cancel") {
+    withJdbcStatement { statement =>
+      val queries = Seq(
+        "DROP TABLE IF EXISTS test_map",
+        "CREATE TABLE test_map(key INT, value STRING)",
+        s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE 
test_map")
+
+      queries.foreach(statement.execute)
+
+      val largeJoin = "SELECT COUNT(*) FROM test_map " +
+        List.fill(10)("join test_map").mkString(" ")
+      val f = future { Thread.sleep(100); statement.cancel(); }
+      val e = intercept[SQLException] {
+        statement.executeQuery(largeJoin)
+      }
+      assert(e.getMessage contains "cancelled")
+      Await.result(f, 3.minute)
+
+      // cancel is a noop
+      statement.executeQuery("SET spark.sql.hive.thriftServer.async=false")
+      val sf = future { Thread.sleep(100); statement.cancel(); }
+      val smallJoin = "SELECT COUNT(*) FROM test_map " +
+        List.fill(4)("join test_map").mkString(" ")
+      val rs1 = statement.executeQuery(smallJoin)
+      Await.result(sf, 3.minute)
+      rs1.next()
+      assert(rs1.getInt(1) === math.pow(5, 5))
+      rs1.close()
+
+      val rs2 = statement.executeQuery("SELECT COUNT(*) FROM test_map")
+      rs2.next()
+      assert(rs2.getInt(1) === 5)
+      rs2.close()
+    }
+  }
 }
 
 class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {

http://git-wip-us.apache.org/repos/asf/spark/blob/eb19d3f7/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 800f51c..b8f294c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -144,6 +144,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) 
{
     getConf("spark.sql.hive.metastore.barrierPrefixes", "")
       .split(",").filterNot(_ == "")
 
+  /*
+   * hive thrift server use background spark sql thread pool to execute sql 
queries
+   */
+  protected[hive] def hiveThriftServerAsync: Boolean =
+    getConf("spark.sql.hive.thriftServer.async", "true").toBoolean
+
   @transient
   protected[sql] lazy val substitutor = new VariableSubstitution()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to