Repository: spark
Updated Branches:
  refs/heads/branch-1.6 17ea95133 -> f1f2cee4c


[SPARK-11043][SQL] BugFix:Set the operator log in the thrift server.

`SessionManager` will set the `operationLog` if the configuration 
`hive.server2.logging.operation.enabled` is true in version of hive 1.2.1.
But the spark did not adapt to this change, so no matter enabled the 
configuration or not, spark thrift server will always log the warn message.
PS: if `hive.server2.logging.operation.enabled` is false, it should log the 
warn message (the same as hive thrift server).

Author: huangzhaowei <carlmartin...@gmail.com>

Closes #9056 from SaintBacchus/SPARK-11043.

(cherry picked from commit d4a5e6f719079639ffd38470f4d8d1f6fde3228d)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1f2cee4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1f2cee4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1f2cee4

Branch: refs/heads/branch-1.6
Commit: f1f2cee4cebd624f86a3c59297aeb77e5a839e2a
Parents: 17ea951
Author: huangzhaowei <carlmartin...@gmail.com>
Authored: Tue Nov 24 23:24:49 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Tue Nov 24 23:25:14 2015 +0800

----------------------------------------------------------------------
 .../SparkExecuteStatementOperation.scala            |  8 ++++----
 .../hive/thriftserver/SparkSQLSessionManager.scala  |  5 +++++
 .../hive/thriftserver/HiveThriftServer2Suites.scala | 16 +++++++++++++++-
 3 files changed, 24 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f1f2cee4/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
index 82fef92..e022ee8 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala
@@ -134,12 +134,12 @@ private[hive] class SparkExecuteStatementOperation(
 
   def getResultSetSchema: TableSchema = resultSchema
 
-  override def run(): Unit = {
+  override def runInternal(): Unit = {
     setState(OperationState.PENDING)
     setHasResultSet(true) // avoid no resultset for async run
 
     if (!runInBackground) {
-      runInternal()
+      execute()
     } else {
       val sparkServiceUGI = Utils.getUGI()
 
@@ -151,7 +151,7 @@ private[hive] class SparkExecuteStatementOperation(
           val doAsAction = new PrivilegedExceptionAction[Unit]() {
             override def run(): Unit = {
               try {
-                runInternal()
+                execute()
               } catch {
                 case e: HiveSQLException =>
                   setOperationException(e)
@@ -188,7 +188,7 @@ private[hive] class SparkExecuteStatementOperation(
     }
   }
 
-  override def runInternal(): Unit = {
+  private def execute(): Unit = {
     statementId = UUID.randomUUID().toString
     logInfo(s"Running query '$statement' with $statementId")
     setState(OperationState.RUNNING)

http://git-wip-us.apache.org/repos/asf/spark/blob/f1f2cee4/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index af4fcdf..de4e9c6 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -41,6 +41,11 @@ private[hive] class SparkSQLSessionManager(hiveServer: 
HiveServer2, hiveContext:
   override def init(hiveConf: HiveConf) {
     setSuperField(this, "hiveConf", hiveConf)
 
+    // Create operation log root directory, if operation logging is enabled
+    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
+      invoke(classOf[SessionManager], this, "initOperationLogRootDir")
+    }
+
     val backgroundPoolSize = 
hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
     setSuperField(this, "backgroundOperationPool", 
Executors.newFixedThreadPool(backgroundPoolSize))
     getAncestorField[Log](this, 3, "LOG").info(

http://git-wip-us.apache.org/repos/asf/spark/blob/f1f2cee4/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 1dd898a..139d8e8 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration._
 import scala.concurrent.{Await, Promise, future}
+import scala.io.Source
 import scala.util.{Random, Try}
 
 import com.google.common.base.Charsets.UTF_8
@@ -507,6 +508,12 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
       assert(rs2.getInt(2) === 500)
     }
   }
+
+  test("SPARK-11043 check operation log root directory") {
+    val expectedLine =
+      "Operation log root directory is created: " + 
operationLogPath.getAbsoluteFile
+    
assert(Source.fromFile(logPath).getLines().exists(_.contains(expectedLine)))
+  }
 }
 
 class SingleSessionSuite extends HiveThriftJdbcTest {
@@ -642,7 +649,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite 
with BeforeAndAfterAl
   protected def metastoreJdbcUri = 
s"jdbc:derby:;databaseName=$metastorePath;create=true"
 
   private val pidDir: File = Utils.createTempDir("thriftserver-pid")
-  private var logPath: File = _
+  protected var logPath: File = _
+  protected var operationLogPath: File = _
   private var logTailingProcess: Process = _
   private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]
 
@@ -679,6 +687,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite 
with BeforeAndAfterAl
        |  --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
        |  --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
        |  --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
+       |  --hiveconf 
${ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}=$operationLogPath
        |  --hiveconf $portConf=$port
        |  --driver-class-path $driverClassPath
        |  --driver-java-options -Dlog4j.debug
@@ -706,6 +715,8 @@ abstract class HiveThriftServer2Test extends SparkFunSuite 
with BeforeAndAfterAl
     warehousePath.delete()
     metastorePath = Utils.createTempDir()
     metastorePath.delete()
+    operationLogPath = Utils.createTempDir()
+    operationLogPath.delete()
     logPath = null
     logTailingProcess = null
 
@@ -782,6 +793,9 @@ abstract class HiveThriftServer2Test extends SparkFunSuite 
with BeforeAndAfterAl
     metastorePath.delete()
     metastorePath = null
 
+    operationLogPath.delete()
+    operationLogPath = null
+
     Option(logPath).foreach(_.delete())
     logPath = null
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to