Repository: spark Updated Branches: refs/heads/master 926a93e54 -> abacf5f25
[HOTFIX][SQL] Don't stop ContinuousQuery in quietly ## What changes were proposed in this pull request? Try to fix a flaky hang ## How was this patch tested? Existing Jenkins test Author: Shixiong Zhu <shixi...@databricks.com> Closes #11909 from zsxwing/hotfix2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abacf5f2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abacf5f2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abacf5f2 Branch: refs/heads/master Commit: abacf5f258e9bc5c9218ddbee3909dfe5c08d0ea Parents: 926a93e Author: Shixiong Zhu <shixi...@databricks.com> Authored: Wed Mar 23 00:00:35 2016 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Wed Mar 23 00:00:35 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/StreamTest.scala | 13 ----------- .../streaming/DataFrameReaderWriterSuite.scala | 24 ++++++++++---------- 2 files changed, 12 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/abacf5f2/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala index 62dc492..2dd6416 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala @@ -65,19 +65,6 @@ import org.apache.spark.util.Utils */ trait StreamTest extends QueryTest with Timeouts { - implicit class RichContinuousQuery(cq: ContinuousQuery) { - def stopQuietly(): Unit = quietly { - try { - failAfter(10.seconds) { - cq.stop() - } - } catch { - case e: TestFailedDueToTimeoutException => - logError(e.getMessage(), e) - } - } - } - implicit class RichSource(s: Source) { def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, StreamingRelation(s)) http://git-wip-us.apache.org/repos/asf/spark/blob/abacf5f2/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala index e485aa8..c1bab9b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/DataFrameReaderWriterSuite.scala @@ -72,7 +72,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath after { - sqlContext.streams.active.foreach(_.stopQuietly()) + sqlContext.streams.active.foreach(_.stop()) } test("resolve default source") { @@ -83,7 +83,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .startStream() - .stopQuietly() + .stop() } test("resolve full class") { @@ -94,7 +94,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .startStream() - .stopQuietly() + .stop() } test("options") { @@ -121,7 +121,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .options(map) .option("checkpointLocation", newMetadataDir) .startStream() - .stopQuietly() + .stop() assert(LastOptions.parameters("opt1") == "1") assert(LastOptions.parameters("opt2") == "2") @@ -137,7 +137,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .startStream() - .stopQuietly() + .stop() assert(LastOptions.partitionColumns == Nil) df.write @@ -145,7 +145,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("checkpointLocation", newMetadataDir) .partitionBy("a") .startStream() - .stopQuietly() + .stop() assert(LastOptions.partitionColumns == Seq("a")) withSQLConf("spark.sql.caseSensitive" -> "false") { @@ -154,7 +154,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("checkpointLocation", newMetadataDir) .partitionBy("A") .startStream() - .stopQuietly() + .stop() assert(LastOptions.partitionColumns == Seq("a")) } @@ -164,7 +164,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("checkpointLocation", newMetadataDir) .partitionBy("b") .startStream() - .stopQuietly() + .stop() } } @@ -182,7 +182,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .format("org.apache.spark.sql.streaming.test") .option("checkpointLocation", newMetadataDir) .startStream("/test") - .stopQuietly() + .stop() assert(LastOptions.parameters("path") == "/test") } @@ -207,7 +207,7 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B .option("doubleOpt", 6.7) .option("checkpointLocation", newMetadataDir) .startStream("/test") - .stopQuietly() + .stop() assert(LastOptions.parameters("intOpt") == "56") assert(LastOptions.parameters("boolOpt") == "false") @@ -269,9 +269,9 @@ class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with B } // Should be able to start query with that name after stopping the previous query - q1.stopQuietly() + q1.stop() val q5 = startQueryWithName("name") assert(activeStreamNames.contains("name")) - sqlContext.streams.active.foreach(_.stopQuietly()) + sqlContext.streams.active.foreach(_.stop()) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org