Repository: spark Updated Branches: refs/heads/master e11eeb71f -> feaa3706f
SPARK-3470 [CORE] [STREAMING] Add Closeable / close() to Java context objects ... that expose a stop() lifecycle method. This doesn't add `AutoCloseable`, which is Java 7+ only. But it should be possible to use try-with-resources on a `Closeable` in Java 7, as long as the `close()` does not throw a checked exception, and these don't. Q.E.D. Author: Sean Owen <so...@cloudera.com> Closes #2346 from srowen/SPARK-3470 and squashes the following commits: 612c21d [Sean Owen] Add Closeable / close() to Java context objects that expose a stop() lifecycle method Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/feaa3706 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/feaa3706 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/feaa3706 Branch: refs/heads/master Commit: feaa3706f17e44efcdac9f0a543a5b91232771ce Parents: e11eeb7 Author: Sean Owen <so...@cloudera.com> Authored: Fri Sep 12 22:50:37 2014 -0700 Committer: Reynold Xin <r...@apache.org> Committed: Fri Sep 12 22:50:37 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/api/java/JavaSparkContext.scala | 7 ++++++- .../spark/streaming/api/java/JavaStreamingContext.scala | 7 +++++-- 2 files changed, 11 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/feaa3706/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 8e178bc..23f7e6b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -17,6 +17,7 @@ package org.apache.spark.api.java +import java.io.Closeable import java.util import java.util.{Map => JMap} @@ -40,7 +41,9 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD} * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones. */ -class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround { +class JavaSparkContext(val sc: SparkContext) + extends JavaSparkContextVarargsWorkaround with Closeable { + /** * Create a JavaSparkContext that loads settings from system properties (for instance, when * launching with ./bin/spark-submit). @@ -534,6 +537,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork sc.stop() } + override def close(): Unit = stop() + /** * Get Spark's home location from either a value set through the constructor, * or the spark.home Java property, or the SPARK_HOME environment variable http://git-wip-us.apache.org/repos/asf/spark/blob/feaa3706/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 18605ca..9dc26dc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -21,7 +21,7 @@ package org.apache.spark.streaming.api.java import scala.collection.JavaConversions._ import scala.reflect.ClassTag -import java.io.InputStream +import java.io.{Closeable, InputStream} import java.util.{List => JList, Map => JMap} import akka.actor.{Props, SupervisorStrategy} @@ -49,7 +49,7 @@ import org.apache.spark.streaming.receiver.Receiver * respectively. `context.awaitTransformation()` allows the current thread to wait for the * termination of a context by `stop()` or by an exception. */ -class JavaStreamingContext(val ssc: StreamingContext) { +class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { /** * Create a StreamingContext. @@ -540,6 +540,9 @@ class JavaStreamingContext(val ssc: StreamingContext) { def stop(stopSparkContext: Boolean, stopGracefully: Boolean) = { ssc.stop(stopSparkContext, stopGracefully) } + + override def close(): Unit = stop() + } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org