Repository: spark
Updated Branches:
  refs/heads/master 93da8565f -> 2235cd444


[SPARK-11823][SQL] Fix flaky JDBC cancellation test in 
HiveThriftBinaryServerSuite

This patch fixes a flaky "test jdbc cancel" test in 
HiveThriftBinaryServerSuite. This test is prone to a race-condition which 
causes it to block indefinitely with while waiting for an extremely slow query 
to complete, which caused many Jenkins builds to time out.

For more background, see my comments on #6207 (the PR which introduced this 
test).

Author: Josh Rosen <joshro...@databricks.com>

Closes #10425 from JoshRosen/SPARK-11823.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2235cd44
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2235cd44
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2235cd44

Branch: refs/heads/master
Commit: 2235cd44407e3b6b401fb84a2096ade042c51d36
Parents: 93da856
Author: Josh Rosen <joshro...@databricks.com>
Authored: Mon Dec 21 23:12:05 2015 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Mon Dec 21 23:12:05 2015 -0800

----------------------------------------------------------------------
 .../thriftserver/HiveThriftServer2Suites.scala  | 85 +++++++++++++-------
 1 file changed, 56 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2235cd44/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 139d8e8..ebb2575 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -23,9 +23,8 @@ import java.sql.{Date, DriverManager, SQLException, Statement}
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration._
-import scala.concurrent.{Await, Promise, future}
+import scala.concurrent.{Await, ExecutionContext, Promise, future}
 import scala.io.Source
 import scala.util.{Random, Try}
 
@@ -43,7 +42,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 import org.apache.spark.{Logging, SparkFunSuite}
 
 object TestData {
@@ -356,31 +355,54 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
         s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE 
test_map")
 
       queries.foreach(statement.execute)
-
-      val largeJoin = "SELECT COUNT(*) FROM test_map " +
-        List.fill(10)("join test_map").mkString(" ")
-      val f = future { Thread.sleep(100); statement.cancel(); }
-      val e = intercept[SQLException] {
-        statement.executeQuery(largeJoin)
+      implicit val ec = ExecutionContext.fromExecutorService(
+        ThreadUtils.newDaemonSingleThreadExecutor("test-jdbc-cancel"))
+      try {
+        // Start a very-long-running query that will take hours to finish, 
then cancel it in order
+        // to demonstrate that cancellation works.
+        val f = future {
+          statement.executeQuery(
+            "SELECT COUNT(*) FROM test_map " +
+            List.fill(10)("join test_map").mkString(" "))
+        }
+        // Note that this is slightly race-prone: if the cancel is issued 
before the statement
+        // begins executing then we'll fail with a timeout. As a result, this 
fixed delay is set
+        // slightly more conservatively than may be strictly necessary.
+        Thread.sleep(1000)
+        statement.cancel()
+        val e = intercept[SQLException] {
+          Await.result(f, 3.minute)
+        }
+        assert(e.getMessage.contains("cancelled"))
+
+        // Cancellation is a no-op if spark.sql.hive.thriftServer.async=false
+        statement.executeQuery("SET spark.sql.hive.thriftServer.async=false")
+        try {
+          val sf = future {
+            statement.executeQuery(
+              "SELECT COUNT(*) FROM test_map " +
+                List.fill(4)("join test_map").mkString(" ")
+            )
+          }
+          // Similarly, this is also slightly race-prone on fast machines 
where the query above
+          // might race and complete before we issue the cancel.
+          Thread.sleep(1000)
+          statement.cancel()
+          val rs1 = Await.result(sf, 3.minute)
+          rs1.next()
+          assert(rs1.getInt(1) === math.pow(5, 5))
+          rs1.close()
+
+          val rs2 = statement.executeQuery("SELECT COUNT(*) FROM test_map")
+          rs2.next()
+          assert(rs2.getInt(1) === 5)
+          rs2.close()
+        } finally {
+          statement.executeQuery("SET spark.sql.hive.thriftServer.async=true")
+        }
+      } finally {
+        ec.shutdownNow()
       }
-      assert(e.getMessage contains "cancelled")
-      Await.result(f, 3.minute)
-
-      // cancel is a noop
-      statement.executeQuery("SET spark.sql.hive.thriftServer.async=false")
-      val sf = future { Thread.sleep(100); statement.cancel(); }
-      val smallJoin = "SELECT COUNT(*) FROM test_map " +
-        List.fill(4)("join test_map").mkString(" ")
-      val rs1 = statement.executeQuery(smallJoin)
-      Await.result(sf, 3.minute)
-      rs1.next()
-      assert(rs1.getInt(1) === math.pow(5, 5))
-      rs1.close()
-
-      val rs2 = statement.executeQuery("SELECT COUNT(*) FROM test_map")
-      rs2.next()
-      assert(rs2.getInt(1) === 5)
-      rs2.close()
     }
   }
 
@@ -817,6 +839,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite 
with BeforeAndAfterAl
   }
 
   override protected def beforeAll(): Unit = {
+    super.beforeAll()
     // Chooses a random port between 10000 and 19999
     listeningPort = 10000 + Random.nextInt(10000)
     diagnosisBuffer.clear()
@@ -838,7 +861,11 @@ abstract class HiveThriftServer2Test extends SparkFunSuite 
with BeforeAndAfterAl
   }
 
   override protected def afterAll(): Unit = {
-    stopThriftServer()
-    logInfo("HiveThriftServer2 stopped")
+    try {
+      stopThriftServer()
+      logInfo("HiveThriftServer2 stopped")
+    } finally {
+      super.afterAll()
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to