Repository: spark
Updated Branches:
  refs/heads/master 926a93e54 -> abacf5f25


[HOTFIX][SQL] Don't stop ContinuousQuery in quietly

## What changes were proposed in this pull request?

Try to fix a flaky hang

## How was this patch tested?

Existing Jenkins test

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #11909 from zsxwing/hotfix2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abacf5f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abacf5f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abacf5f2

Branch: refs/heads/master
Commit: abacf5f258e9bc5c9218ddbee3909dfe5c08d0ea
Parents: 926a93e
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Wed Mar 23 00:00:35 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Mar 23 00:00:35 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/StreamTest.scala | 13 -----------
 .../streaming/DataFrameReaderWriterSuite.scala  | 24 ++++++++++----------
 2 files changed, 12 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/abacf5f2/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
index 62dc492..2dd6416 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
@@ -65,19 +65,6 @@ import org.apache.spark.util.Utils
  */
 trait StreamTest extends QueryTest with Timeouts {
 
-  implicit class RichContinuousQuery(cq: ContinuousQuery) {
-    def stopQuietly(): Unit = quietly {
-      try {
-        failAfter(10.seconds) {
-          cq.stop()
-        }
-      } catch {
-        case e: TestFailedDueToTimeoutException =>
-          logError(e.getMessage(), e)
-      }
-    }
-  }
-
   implicit class RichSource(s: Source) {
     def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, 
StreamingRelation(s))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/abacf5f2/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
index e485aa8..c1bab9b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala
@@ -72,7 +72,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
SharedSQLContext with B
   private def newMetadataDir = 
Utils.createTempDir("streaming.metadata").getCanonicalPath
 
   after {
-    sqlContext.streams.active.foreach(_.stopQuietly())
+    sqlContext.streams.active.foreach(_.stop())
   }
 
   test("resolve default source") {
@@ -83,7 +83,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
SharedSQLContext with B
       .format("org.apache.spark.sql.streaming.test")
       .option("checkpointLocation", newMetadataDir)
       .startStream()
-      .stopQuietly()
+      .stop()
   }
 
   test("resolve full class") {
@@ -94,7 +94,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
SharedSQLContext with B
       .format("org.apache.spark.sql.streaming.test")
       .option("checkpointLocation", newMetadataDir)
       .startStream()
-      .stopQuietly()
+      .stop()
   }
 
   test("options") {
@@ -121,7 +121,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
SharedSQLContext with B
       .options(map)
       .option("checkpointLocation", newMetadataDir)
       .startStream()
-      .stopQuietly()
+      .stop()
 
     assert(LastOptions.parameters("opt1") == "1")
     assert(LastOptions.parameters("opt2") == "2")
@@ -137,7 +137,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
SharedSQLContext with B
       .format("org.apache.spark.sql.streaming.test")
       .option("checkpointLocation", newMetadataDir)
       .startStream()
-      .stopQuietly()
+      .stop()
     assert(LastOptions.partitionColumns == Nil)
 
     df.write
@@ -145,7 +145,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
SharedSQLContext with B
       .option("checkpointLocation", newMetadataDir)
       .partitionBy("a")
       .startStream()
-      .stopQuietly()
+      .stop()
     assert(LastOptions.partitionColumns == Seq("a"))
 
     withSQLConf("spark.sql.caseSensitive" -> "false") {
@@ -154,7 +154,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
SharedSQLContext with B
         .option("checkpointLocation", newMetadataDir)
         .partitionBy("A")
         .startStream()
-        .stopQuietly()
+        .stop()
       assert(LastOptions.partitionColumns == Seq("a"))
     }
 
@@ -164,7 +164,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
SharedSQLContext with B
         .option("checkpointLocation", newMetadataDir)
         .partitionBy("b")
         .startStream()
-        .stopQuietly()
+        .stop()
     }
   }
 
@@ -182,7 +182,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
SharedSQLContext with B
       .format("org.apache.spark.sql.streaming.test")
       .option("checkpointLocation", newMetadataDir)
       .startStream("/test")
-      .stopQuietly()
+      .stop()
 
     assert(LastOptions.parameters("path") == "/test")
   }
@@ -207,7 +207,7 @@ class DataFrameReaderWriterSuite extends StreamTest with 
SharedSQLContext with B
       .option("doubleOpt", 6.7)
       .option("checkpointLocation", newMetadataDir)
       .startStream("/test")
-      .stopQuietly()
+      .stop()
 
     assert(LastOptions.parameters("intOpt") == "56")
     assert(LastOptions.parameters("boolOpt") == "false")
@@ -269,9 +269,9 @@ class DataFrameReaderWriterSuite extends StreamTest with 
SharedSQLContext with B
     }
 
     // Should be able to start query with that name after stopping the 
previous query
-    q1.stopQuietly()
+    q1.stop()
     val q5 = startQueryWithName("name")
     assert(activeStreamNames.contains("name"))
-    sqlContext.streams.active.foreach(_.stopQuietly())
+    sqlContext.streams.active.foreach(_.stop())
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to