Repository: spark
Updated Branches:
  refs/heads/master 9d9ca91fe -> 9eb49d413


[SPARK-3809][SQL] Fixes test suites in hive-thriftserver

As scwf pointed out, `HiveThriftServer2Suite` isn't effective anymore after the 
Thrift server was made a daemon. On the other hand, these test suites were 
known flaky, PR #2214 tried to fix them but failed because of unknown Jenkins 
build error. This PR fixes both sets of issues.

In this PR, instead of watching `start-thriftserver.sh` output, the test code 
start a `tail` process to watch the log file. A `Thread.sleep` has to be 
introduced because the `kill` command used in `stop-thriftserver.sh` is not 
synchronous.

As for the root cause of the mysterious Jenkins build failure. Please refer to 
[this comment](https://github.com/apache/spark/pull/2675#issuecomment-58464189) 
below for details.

----

(Copied from PR description of #2214)

This PR fixes two issues of `HiveThriftServer2Suite` and brings 1 enhancement:

1. Although metastore, warehouse directories and listening port are randomly 
chosen, all test cases share the same configuration. Due to parallel test 
execution, one of the two test case is doomed to fail
2. We caught any exceptions thrown from a test case and print diagnosis 
information, but forgot to re-throw the exception...
3. When the forked server process ends prematurely (e.g., fails to start), the 
`serverRunning` promise is completed with a failure, preventing the test code 
to keep waiting until timeout.

So, embarrassingly, this test suite was failing continuously for several days 
but no one had ever noticed it... Fortunately no bugs in the production code 
were covered under the hood.

Author: Cheng Lian <lian.cs....@gmail.com>
Author: wangfei <wangf...@huawei.com>

Closes #2675 from liancheng/fix-thriftserver-tests and squashes the following 
commits:

1c384b7 [Cheng Lian] Minor code cleanup, restore the logging level hack in 
TestHive.scala
7805c33 [wangfei]  reset SPARK_TESTING to avoid loading Log4J configurations in 
testing class paths
af2b5a9 [Cheng Lian] Removes log level hacks from TestHiveContext
d116405 [wangfei] make sure that log4j level is INFO
ee92a82 [Cheng Lian] Relaxes timeout
7fd6757 [Cheng Lian] Fixes test suites in hive-thriftserver


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9eb49d41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9eb49d41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9eb49d41

Branch: refs/heads/master
Commit: 9eb49d4134e23a15142fb592d54d920e89bd8786
Parents: 9d9ca91
Author: Cheng Lian <lian.cs....@gmail.com>
Authored: Mon Oct 13 13:50:27 2014 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Mon Oct 13 13:50:27 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/thriftserver/CliSuite.scala  | 13 ++-
 .../thriftserver/HiveThriftServer2Suite.scala   | 86 ++++++++++++--------
 2 files changed, 60 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9eb49d41/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index d68dd09..fc97a25 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, Logging}
 import org.apache.spark.sql.catalyst.util.getTempFilePath
 
 class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
@@ -62,8 +62,11 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with 
Logging {
 
     def captureOutput(source: String)(line: String) {
       buffer += s"$source> $line"
+      // If we haven't found all expected answers...
       if (next.get() < expectedAnswers.size) {
+        // If another expected answer is found...
         if (line.startsWith(expectedAnswers(next.get()))) {
+          // If all expected answers have been found...
           if (next.incrementAndGet() == expectedAnswers.size) {
             foundAllExpectedAnswers.trySuccess(())
           }
@@ -77,7 +80,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with 
Logging {
 
     Future {
       val exitValue = process.exitValue()
-      logInfo(s"Spark SQL CLI process exit value: $exitValue")
+      foundAllExpectedAnswers.tryFailure(
+        new SparkException(s"Spark SQL CLI process exit value: $exitValue"))
     }
 
     try {
@@ -98,6 +102,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with 
Logging {
            |End CliSuite failure output
            |===========================
          """.stripMargin, cause)
+      throw cause
     } finally {
       warehousePath.delete()
       metastorePath.delete()
@@ -109,7 +114,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with 
Logging {
     val dataFilePath =
       
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
 
-    runCliWithin(1.minute)(
+    runCliWithin(3.minute)(
       "CREATE TABLE hive_test(key INT, val STRING);"
         -> "OK",
       "SHOW TABLES;"
@@ -120,7 +125,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with 
Logging {
         -> "Time taken: ",
       "SELECT COUNT(*) FROM hive_test;"
         -> "5",
-      "DROP TABLE hive_test"
+      "DROP TABLE hive_test;"
         -> "Time taken: "
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9eb49d41/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index 38977ff..e3b4e45 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -17,17 +17,17 @@
 
 package org.apache.spark.sql.hive.thriftserver
 
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-import scala.concurrent.{Await, Future, Promise}
-import scala.sys.process.{Process, ProcessLogger}
-
 import java.io.File
 import java.net.ServerSocket
 import java.sql.{DriverManager, Statement}
 import java.util.concurrent.TimeoutException
 
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Promise}
+import scala.sys.process.{Process, ProcessLogger}
+import scala.util.Try
+
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 import org.apache.hive.jdbc.HiveDriver
 import org.scalatest.FunSuite
@@ -41,25 +41,25 @@ import org.apache.spark.sql.catalyst.util.getTempFilePath
 class HiveThriftServer2Suite extends FunSuite with Logging {
   Class.forName(classOf[HiveDriver].getCanonicalName)
 
-  private val listeningHost = "localhost"
-  private val listeningPort =  {
-    // Let the system to choose a random available port to avoid collision 
with other parallel
-    // builds.
-    val socket = new ServerSocket(0)
-    val port = socket.getLocalPort
-    socket.close()
-    port
-  }
-
-  private val warehousePath = getTempFilePath("warehouse")
-  private val metastorePath = getTempFilePath("metastore")
-  private val metastoreJdbcUri = 
s"jdbc:derby:;databaseName=$metastorePath;create=true"
-
-  def startThriftServerWithin(timeout: FiniteDuration = 30.seconds)(f: 
Statement => Unit) {
-    val serverScript = 
"../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
+  def startThriftServerWithin(timeout: FiniteDuration = 1.minute)(f: Statement 
=> Unit) {
+    val startScript = 
"../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
+    val stopScript = 
"../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
+
+    val warehousePath = getTempFilePath("warehouse")
+    val metastorePath = getTempFilePath("metastore")
+    val metastoreJdbcUri = 
s"jdbc:derby:;databaseName=$metastorePath;create=true"
+    val listeningHost = "localhost"
+    val listeningPort =  {
+      // Let the system to choose a random available port to avoid collision 
with other parallel
+      // builds.
+      val socket = new ServerSocket(0)
+      val port = socket.getLocalPort
+      socket.close()
+      port
+    }
 
     val command =
-      s"""$serverScript
+      s"""$startScript
          |  --master local
          |  --hiveconf hive.root.logger=INFO,console
          |  --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
@@ -68,29 +68,40 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
          |  --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$listeningPort
        """.stripMargin.split("\\s+").toSeq
 
-    val serverStarted = Promise[Unit]()
+    val serverRunning = Promise[Unit]()
     val buffer = new ArrayBuffer[String]()
+    val LOGGING_MARK =
+      s"starting 
${HiveThriftServer2.getClass.getCanonicalName.stripSuffix("$")}, logging to "
+    var logTailingProcess: Process = null
+    var logFilePath: String = null
 
-    def captureOutput(source: String)(line: String) {
-      buffer += s"$source> $line"
+    def captureLogOutput(line: String): Unit = {
+      buffer += line
       if (line.contains("ThriftBinaryCLIService listening on")) {
-        serverStarted.success(())
+        serverRunning.success(())
       }
     }
 
-    val process = Process(command).run(
-      ProcessLogger(captureOutput("stdout"), captureOutput("stderr")))
-
-    Future {
-      val exitValue = process.exitValue()
-      logInfo(s"Spark SQL Thrift server process exit value: $exitValue")
+    def captureThriftServerOutput(source: String)(line: String): Unit = {
+      if (line.startsWith(LOGGING_MARK)) {
+        logFilePath = line.drop(LOGGING_MARK.length).trim
+        // Ensure that the log file is created so that the `tail' command 
won't fail
+        Try(new File(logFilePath).createNewFile())
+        logTailingProcess = Process(s"/usr/bin/env tail -f $logFilePath")
+          .run(ProcessLogger(captureLogOutput, _ => ()))
+      }
     }
 
+    // Resets SPARK_TESTING to avoid loading Log4J configurations in testing 
class paths
+    Process(command, None, "SPARK_TESTING" -> "0").run(ProcessLogger(
+      captureThriftServerOutput("stdout"),
+      captureThriftServerOutput("stderr")))
+
     val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/"
     val user = System.getProperty("user.name")
 
     try {
-      Await.result(serverStarted.future, timeout)
+      Await.result(serverRunning.future, timeout)
 
       val connection = DriverManager.getConnection(jdbcUri, user, "")
       val statement = connection.createStatement()
@@ -122,10 +133,15 @@ class HiveThriftServer2Suite extends FunSuite with 
Logging {
              |End HiveThriftServer2Suite failure output
              |=========================================
            """.stripMargin, cause)
+        throw cause
     } finally {
       warehousePath.delete()
       metastorePath.delete()
-      process.destroy()
+      Process(stopScript).run().exitValue()
+      // The `spark-daemon.sh' script uses kill, which is not synchronous, 
have to wait for a while.
+      Thread.sleep(3.seconds.toMillis)
+      Option(logTailingProcess).map(_.destroy())
+      Option(logFilePath).map(new File(_).delete())
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to