[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...

2017-10-07 Thread aray
Github user aray commented on a diff in the pull request:

https://github.com/apache/spark/pull/18306#discussion_r143345713
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -562,6 +563,8 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 _cleaner.foreach(_.start())
 
+_stopHooks = new SparkShutdownHookManager()
--- End diff --

@felixcheung I don't get the objection. This is not adding a new shutdown 
hook, just reusing the class to add "stop hooks" that are called by 
`sc.stop()`. Furthermore this is needed because `sc.stop()` can not just call 
`stopAllQueries` unassisted without creating a circular package dependency.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...

2017-09-01 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18306#discussion_r136516938
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -562,6 +563,8 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 _cleaner.foreach(_.start())
 
+_stopHooks = new SparkShutdownHookManager()
--- End diff --

I think that's my point.
if sc.stop() calls streams.stopAllQueries()
then if someone calls sc.stop() it works
if someone shutdown the JVM, sc.stop() is already getting called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...

2017-08-31 Thread aray
Github user aray commented on a diff in the pull request:

https://github.com/apache/spark/pull/18306#discussion_r136436631
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -562,6 +563,8 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 _cleaner.foreach(_.start())
 
+_stopHooks = new SparkShutdownHookManager()
--- End diff --

The queries also need to be gracefully stopped if someone calls `sc.stop()` 
without shutting down the JVM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...

2017-08-31 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18306#discussion_r136410312
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -562,6 +563,8 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 _cleaner.foreach(_.start())
 
+_stopHooks = new SparkShutdownHookManager()
--- End diff --

there's already a shutdown hook to call sc.stop() - perhaps just add the 
clean up in stop()

https://github.com/aray/spark/blob/005472ed10fad3d1bc8feff12fc55c5682724a0e/core/src/main/scala/org/apache/spark/SparkContext.scala#L584



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...

2017-06-14 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/18306#discussion_r122122675
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
 ---
@@ -239,6 +237,40 @@ class StreamingQueryManagerSuite extends StreamTest 
with BeforeAndAfter {
 }
   }
 
+  test("stopAllQueries") {
+val datasets = Seq.fill(5)(makeDataset._2)
+withQueriesOn(datasets: _*) { queries =>
+  assert(queries.forall(_.isActive))
+  spark.streams.stopAllQueries()
+  assert(queries.forall(_.isActive == false), "Queries are still 
running")
+}
+  }
+
+  test("stop session stops all queries") {
+val inputData = MemoryStream[Int]
+val mapped = inputData.toDS.map(6 / _)
+var query: StreamingQuery = null
+try {
+  query = mapped.toDF.writeStream
+.format("memory")
+.queryName(s"queryInNewSession")
+.outputMode("append")
+.start()
+  assert(query.isActive)
+  spark.stop()
+  assert(spark.sparkContext.isStopped)
+  assert(query.isActive == false, "Query is still running")
+} catch {
+  case NonFatal(e) =>
+if (query != null) query.stop()
+throw e
--- End diff --

why still try/catch to stop the query? since this is a test of this 
specific behavior, if the query isn't stopped, or throws, the test actually is 
failing?

more importantly, why ignore NonFatal exception thrown?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...

2017-06-14 Thread aray
Github user aray commented on a diff in the pull request:

https://github.com/apache/spark/pull/18306#discussion_r122112073
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
---
@@ -690,6 +690,7 @@ class SparkSession private(
* @since 2.0.0
*/
   def stop(): Unit = {
+streams.stopAllQueries()
--- End diff --

I did look into that method. It does not work because the [application end 
event is generated]( 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1897)
 after the [SparkContext stopped flag is set]( 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1888)
 which prevents the queries from stopping properly. We could add a new event 
type that gets called before stopped flag is set in the close method. Do you 
think that would be an appropriate solution?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...

2017-06-14 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18306#discussion_r122092151
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
---
@@ -690,6 +690,7 @@ class SparkSession private(
* @since 2.0.0
*/
   def stop(): Unit = {
+streams.stopAllQueries()
--- End diff --

This one actually doesn't stop queries in the child `SparkSession`s. You 
can register a SparkListener when creating StreamingQueryManager to monitor 
`SparkListenerApplicationEnd` and stop queries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...

2017-06-14 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18306#discussion_r122091309
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 ---
@@ -321,6 +321,17 @@ class StreamingQueryManager private[sql] 
(sparkSession: SparkSession) extends Lo
 query
   }
 
+  private[sql] def stopAllQueries(): Unit = {
+active.foreach { query =>
+  try {
+query.stop()
+  } catch {
+case e: Throwable =>
--- End diff --

nit: It's better to just catch `NonFatal(e)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18306: [SPARK-21029][SS] All StreamingQuery should be st...

2017-06-14 Thread aray
GitHub user aray opened a pull request:

https://github.com/apache/spark/pull/18306

[SPARK-21029][SS] All StreamingQuery should be stopped when the 
SparkSession is stopped

## What changes were proposed in this pull request?

Adds method to `StreamingQueryManager` that stops all active queries. Calls 
this method when `SparkSession.stop` is called.

## How was this patch tested?

Two unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aray/spark SPARK-21029

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18306.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18306


commit 7da14aefcaf618e9f64459d50c5f78502eeb1f59
Author: Andrew Ray 
Date:   2017-06-14T18:50:15Z

SPARK-21029

commit 38f9566fd6503dace04b47a4d79bc6b412e49113
Author: Andrew Ray 
Date:   2017-06-14T20:16:00Z

call in spark session close instead of shutdown hook

commit a64c0ff1c057dca48f0101232c433ef80d24e883
Author: Andrew Ray 
Date:   2017-06-14T20:29:26Z

clean imports




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org