[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...
Github user aray commented on a diff in the pull request: https://github.com/apache/spark/pull/18306#discussion_r143345713 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -562,6 +563,8 @@ class SparkContext(config: SparkConf) extends Logging { } _cleaner.foreach(_.start()) +_stopHooks = new SparkShutdownHookManager() --- End diff -- @felixcheung I don't get the objection. This is not adding a new shutdown hook, just reusing the class to add "stop hooks" that are called by `sc.stop()`. Furthermore this is needed because `sc.stop()` can not just call `stopAllQueries` unassisted without creating a circular package dependency. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18306#discussion_r136516938 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -562,6 +563,8 @@ class SparkContext(config: SparkConf) extends Logging { } _cleaner.foreach(_.start()) +_stopHooks = new SparkShutdownHookManager() --- End diff -- I think that's my point. if sc.stop() calls streams.stopAllQueries() then if someone calls sc.stop() it works if someone shutdown the JVM, sc.stop() is already getting called. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...
Github user aray commented on a diff in the pull request: https://github.com/apache/spark/pull/18306#discussion_r136436631 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -562,6 +563,8 @@ class SparkContext(config: SparkConf) extends Logging { } _cleaner.foreach(_.start()) +_stopHooks = new SparkShutdownHookManager() --- End diff -- The queries also need to be gracefully stopped if someone calls `sc.stop()` without shutting down the JVM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18306#discussion_r136410312 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -562,6 +563,8 @@ class SparkContext(config: SparkConf) extends Logging { } _cleaner.foreach(_.start()) +_stopHooks = new SparkShutdownHookManager() --- End diff -- there's already a shutdown hook to call sc.stop() - perhaps just add the clean up in stop() https://github.com/aray/spark/blob/005472ed10fad3d1bc8feff12fc55c5682724a0e/core/src/main/scala/org/apache/spark/SparkContext.scala#L584 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/18306#discussion_r122122675 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala --- @@ -239,6 +237,40 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { } } + test("stopAllQueries") { +val datasets = Seq.fill(5)(makeDataset._2) +withQueriesOn(datasets: _*) { queries => + assert(queries.forall(_.isActive)) + spark.streams.stopAllQueries() + assert(queries.forall(_.isActive == false), "Queries are still running") +} + } + + test("stop session stops all queries") { +val inputData = MemoryStream[Int] +val mapped = inputData.toDS.map(6 / _) +var query: StreamingQuery = null +try { + query = mapped.toDF.writeStream +.format("memory") +.queryName(s"queryInNewSession") +.outputMode("append") +.start() + assert(query.isActive) + spark.stop() + assert(spark.sparkContext.isStopped) + assert(query.isActive == false, "Query is still running") +} catch { + case NonFatal(e) => +if (query != null) query.stop() +throw e --- End diff -- why still try/catch to stop the query? since this is a test of this specific behavior, if the query isn't stopped, or throws, the test actually is failing? more importantly, why ignore NonFatal exception thrown? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...
Github user aray commented on a diff in the pull request: https://github.com/apache/spark/pull/18306#discussion_r122112073 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala --- @@ -690,6 +690,7 @@ class SparkSession private( * @since 2.0.0 */ def stop(): Unit = { +streams.stopAllQueries() --- End diff -- I did look into that method. It does not work because the [application end event is generated]( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1897) after the [SparkContext stopped flag is set]( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1888) which prevents the queries from stopping properly. We could add a new event type that gets called before stopped flag is set in the close method. Do you think that would be an appropriate solution? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18306#discussion_r122092151 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala --- @@ -690,6 +690,7 @@ class SparkSession private( * @since 2.0.0 */ def stop(): Unit = { +streams.stopAllQueries() --- End diff -- This one actually doesn't stop queries in the child `SparkSession`s. You can register a SparkListener when creating StreamingQueryManager to monitor `SparkListenerApplicationEnd` and stop queries. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18306#discussion_r122091309 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala --- @@ -321,6 +321,17 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo query } + private[sql] def stopAllQueries(): Unit = { +active.foreach { query => + try { +query.stop() + } catch { +case e: Throwable => --- End diff -- nit: It's better to just catch `NonFatal(e)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...
GitHub user aray opened a pull request: https://github.com/apache/spark/pull/18306 [SPARK-21029][SS] All StreamingQuery should be stopped when the SparkSession is stopped ## What changes were proposed in this pull request? Adds method to `StreamingQueryManager` that stops all active queries. Calls this method when `SparkSession.stop` is called. ## How was this patch tested? Two unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aray/spark SPARK-21029 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18306.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18306 commit 7da14aefcaf618e9f64459d50c5f78502eeb1f59 Author: Andrew Ray Date: 2017-06-14T18:50:15Z SPARK-21029 commit 38f9566fd6503dace04b47a4d79bc6b412e49113 Author: Andrew Ray Date: 2017-06-14T20:16:00Z call in spark session close instead of shutdown hook commit a64c0ff1c057dca48f0101232c433ef80d24e883 Author: Andrew Ray Date: 2017-06-14T20:29:26Z clean imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org