http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
new file mode 100644
index 0000000..c43de58
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.scheduler.SparkListenerEvent
+
+/**
+ * :: Experimental ::
+ * Interface for listening to events related to [[StreamingQuery 
StreamingQueries]].
+ * @note The methods are not thread-safe as they may be called from different 
threads.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+abstract class StreamingQueryListener {
+
+  import StreamingQueryListener._
+
+  /**
+   * Called when a query is started.
+   * @note This is called synchronously with
+   *       [[org.apache.spark.sql.DataFrameWriter 
`DataFrameWriter.startStream()`]],
+   *       that is, `onQueryStart` will be called on all listeners before
+   *       `DataFrameWriter.startStream()` returns the corresponding 
[[StreamingQuery]]. Please
+   *       don't block this method as it will block your query.
+   * @since 2.0.0
+   */
+  def onQueryStarted(queryStarted: QueryStarted): Unit
+
+  /**
+   * Called when there is some status update (ingestion rate updated, etc.)
+   *
+   * @note This method is asynchronous. The status in [[StreamingQuery]] will 
always be
+   *       latest no matter when this method is called. Therefore, the status 
of [[StreamingQuery]]
+   *       may be changed before/when you process the event. E.g., you may 
find [[StreamingQuery]]
+   *       is terminated when you are processing [[QueryProgress]].
+   * @since 2.0.0
+   */
+  def onQueryProgress(queryProgress: QueryProgress): Unit
+
+  /**
+   * Called when a query is stopped, with or without error.
+   * @since 2.0.0
+   */
+  def onQueryTerminated(queryTerminated: QueryTerminated): Unit
+}
+
+
+/**
+ * :: Experimental ::
+ * Companion object of [[StreamingQueryListener]] that defines the listener 
events.
+ * @since 2.0.0
+ */
+@Experimental
+object StreamingQueryListener {
+
+  /**
+   * :: Experimental ::
+   * Base type of [[StreamingQueryListener]] events
+   * @since 2.0.0
+   */
+  @Experimental
+  trait Event extends SparkListenerEvent
+
+  /**
+   * :: Experimental ::
+   * Event representing the start of a query
+   * @since 2.0.0
+   */
+  @Experimental
+  class QueryStarted private[sql](val queryInfo: StreamingQueryInfo) extends 
Event
+
+  /**
+   * :: Experimental ::
+   * Event representing any progress updates in a query
+   * @since 2.0.0
+   */
+  @Experimental
+  class QueryProgress private[sql](val queryInfo: StreamingQueryInfo) extends 
Event
+
+  /**
+   * :: Experimental ::
+   * Event representing that termination of a query
+   *
+   * @param queryInfo Information about the status of the query.
+   * @param exception The exception message of the [[StreamingQuery]] if the 
query was terminated
+   *                  with an exception. Otherwise, it will be `None`.
+   * @param stackTrace The stack trace of the exception if the query was 
terminated with an
+   *                   exception. It will be empty if there was no error.
+   * @since 2.0.0
+   */
+  @Experimental
+  class QueryTerminated private[sql](
+      val queryInfo: StreamingQueryInfo,
+      val exception: Option[String],
+      val stackTrace: Seq[StackTraceElement]) extends Event
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
new file mode 100644
index 0000000..bae7f56
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Clock, SystemClock, Utils}
+
+/**
+ * :: Experimental ::
+ * A class to manage all the [[StreamingQuery]] active on a [[SparkSession]].
+ *
+ * @since 2.0.0
+ */
+@Experimental
+class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
+
+  private[sql] val stateStoreCoordinator =
+    StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
+  private val listenerBus = new 
StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
+  private val activeQueries = new mutable.HashMap[Long, StreamingQuery]
+  private val activeQueriesLock = new Object
+  private val awaitTerminationLock = new Object
+
+  private var lastTerminatedQuery: StreamingQuery = null
+
+  /**
+   * Returns a list of active queries associated with this SQLContext
+   *
+   * @since 2.0.0
+   */
+  def active: Array[StreamingQuery] = activeQueriesLock.synchronized {
+    activeQueries.values.toArray
+  }
+
+  /**
+   * Returns the query if there is an active query with the given id, or null.
+   *
+   * @since 2.0.0
+   */
+  def get(id: Long): StreamingQuery = activeQueriesLock.synchronized {
+    activeQueries.get(id).orNull
+  }
+
+  /**
+   * Wait until any of the queries on the associated SQLContext has terminated 
since the
+   * creation of the context, or since `resetTerminated()` was called. If any 
query was terminated
+   * with an exception, then the exception will be thrown.
+   *
+   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
+   * return immediately (if the query was terminated by `query.stop()`),
+   * or throw the exception immediately (if the query was terminated with 
exception). Use
+   * `resetTerminated()` to clear past terminations and wait for new 
terminations.
+   *
+   * In the case where multiple queries have terminated since 
`resetTermination()` was called,
+   * if any query has terminated with exception, then `awaitAnyTermination()` 
will
+   * throw any of the exception. For correctly documenting exceptions across 
multiple queries,
+   * users need to stop all of them after any of them terminates with 
exception, and then check the
+   * `query.exception()` for each query.
+   *
+   * @throws StreamingQueryException, if any query has terminated with an 
exception
+   *
+   * @since 2.0.0
+   */
+  def awaitAnyTermination(): Unit = {
+    awaitTerminationLock.synchronized {
+      while (lastTerminatedQuery == null) {
+        awaitTerminationLock.wait(10)
+      }
+      if (lastTerminatedQuery != null && 
lastTerminatedQuery.exception.nonEmpty) {
+        throw lastTerminatedQuery.exception.get
+      }
+    }
+  }
+
+  /**
+   * Wait until any of the queries on the associated SQLContext has terminated 
since the
+   * creation of the context, or since `resetTerminated()` was called. Returns 
whether any query
+   * has terminated or not (multiple may have terminated). If any query has 
terminated with an
+   * exception, then the exception will be thrown.
+   *
+   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
+   * return `true` immediately (if the query was terminated by `query.stop()`),
+   * or throw the exception immediately (if the query was terminated with 
exception). Use
+   * `resetTerminated()` to clear past terminations and wait for new 
terminations.
+   *
+   * In the case where multiple queries have terminated since 
`resetTermination()` was called,
+   * if any query has terminated with exception, then `awaitAnyTermination()` 
will
+   * throw any of the exception. For correctly documenting exceptions across 
multiple queries,
+   * users need to stop all of them after any of them terminates with 
exception, and then check the
+   * `query.exception()` for each query.
+   *
+   * @throws StreamingQueryException, if any query has terminated with an 
exception
+   *
+   * @since 2.0.0
+   */
+  def awaitAnyTermination(timeoutMs: Long): Boolean = {
+
+    val startTime = System.currentTimeMillis
+    def isTimedout = System.currentTimeMillis - startTime >= timeoutMs
+
+    awaitTerminationLock.synchronized {
+      while (!isTimedout && lastTerminatedQuery == null) {
+        awaitTerminationLock.wait(10)
+      }
+      if (lastTerminatedQuery != null && 
lastTerminatedQuery.exception.nonEmpty) {
+        throw lastTerminatedQuery.exception.get
+      }
+      lastTerminatedQuery != null
+    }
+  }
+
+  /**
+   * Forget about past terminated queries so that `awaitAnyTermination()` can 
be used again to
+   * wait for new terminations.
+   *
+   * @since 2.0.0
+   */
+  def resetTerminated(): Unit = {
+    awaitTerminationLock.synchronized {
+      lastTerminatedQuery = null
+    }
+  }
+
+  /**
+   * Register a [[StreamingQueryListener]] to receive up-calls for life cycle 
events of
+   * [[StreamingQuery]].
+   *
+   * @since 2.0.0
+   */
+  def addListener(listener: StreamingQueryListener): Unit = {
+    listenerBus.addListener(listener)
+  }
+
+  /**
+   * Deregister a [[StreamingQueryListener]].
+   *
+   * @since 2.0.0
+   */
+  def removeListener(listener: StreamingQueryListener): Unit = {
+    listenerBus.removeListener(listener)
+  }
+
+  /** Post a listener event */
+  private[sql] def postListenerEvent(event: StreamingQueryListener.Event): 
Unit = {
+    listenerBus.post(event)
+  }
+
+  /**
+   * Start a [[StreamingQuery]].
+   * @param userSpecifiedName Query name optionally specified by the user.
+   * @param userSpecifiedCheckpointLocation  Checkpoint location optionally 
specified by the user.
+   * @param df Streaming DataFrame.
+   * @param sink  Sink to write the streaming outputs.
+   * @param outputMode  Output mode for the sink.
+   * @param useTempCheckpointLocation  Whether to use a temporary checkpoint 
location when the user
+   *                                   has not specified one. If false, then 
error will be thrown.
+   * @param recoverFromCheckpointLocation  Whether to recover query from the 
checkpoint location.
+   *                                       If false and the checkpoint 
location exists, then error
+   *                                       will be thrown.
+   * @param trigger [[Trigger]] for the query.
+   * @param triggerClock [[Clock]] to use for the triggering.
+   */
+  private[sql] def startQuery(
+      userSpecifiedName: Option[String],
+      userSpecifiedCheckpointLocation: Option[String],
+      df: DataFrame,
+      sink: Sink,
+      outputMode: OutputMode,
+      useTempCheckpointLocation: Boolean = false,
+      recoverFromCheckpointLocation: Boolean = true,
+      trigger: Trigger = ProcessingTime(0),
+      triggerClock: Clock = new SystemClock()): StreamingQuery = {
+    activeQueriesLock.synchronized {
+      val id = StreamExecution.nextId
+      val name = userSpecifiedName.getOrElse(s"query-$id")
+      if (activeQueries.values.exists(_.name == name)) {
+        throw new IllegalArgumentException(
+          s"Cannot start query with name $name as a query with that name is 
already active")
+      }
+      val checkpointLocation = userSpecifiedCheckpointLocation.map { 
userSpecified =>
+        new Path(userSpecified).toUri.toString
+      }.orElse {
+        df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location =>
+          new Path(location, name).toUri.toString
+        }
+      }.getOrElse {
+        if (useTempCheckpointLocation) {
+          Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
+        } else {
+          throw new AnalysisException(
+            "checkpointLocation must be specified either " +
+              """through option("checkpointLocation", ...) or """ +
+              s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", 
...)""")
+        }
+      }
+
+      // If offsets have already been created, we trying to resume a query.
+      if (!recoverFromCheckpointLocation) {
+        val checkpointPath = new Path(checkpointLocation, "offsets")
+        val fs = 
checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
+        if (fs.exists(checkpointPath)) {
+          throw new AnalysisException(
+            s"This query does not support recovering from checkpoint location. 
" +
+              s"Delete $checkpointPath to start over.")
+        }
+      }
+
+      val analyzedPlan = df.queryExecution.analyzed
+      df.queryExecution.assertAnalyzed()
+
+      if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+        UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
+      }
+
+      var nextSourceId = 0L
+
+      val logicalPlan = analyzedPlan.transform {
+        case StreamingRelation(dataSource, _, output) =>
+          // Materialize source to avoid creating it in every batch
+          val metadataPath = s"$checkpointLocation/sources/$nextSourceId"
+          val source = dataSource.createSource(metadataPath)
+          nextSourceId += 1
+          // We still need to use the previous `output` instead of 
`source.schema` as attributes in
+          // "df.logicalPlan" has already used attributes of the previous 
`output`.
+          StreamingExecutionRelation(source, output)
+      }
+      val query = new StreamExecution(
+        sparkSession,
+        id,
+        name,
+        checkpointLocation,
+        logicalPlan,
+        sink,
+        trigger,
+        triggerClock,
+        outputMode)
+      query.start()
+      activeQueries.put(id, query)
+      query
+    }
+  }
+
+  /** Notify (by the StreamingQuery) that the query has been terminated */
+  private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery): 
Unit = {
+    activeQueriesLock.synchronized {
+      activeQueries -= terminatedQuery.id
+    }
+    awaitTerminationLock.synchronized {
+      if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) {
+        lastTerminatedQuery = terminatedQuery
+      }
+      awaitTerminationLock.notifyAll()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
index d3fdbac..55be7a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
@@ -28,7 +28,7 @@ import org.apache.spark.unsafe.types.CalendarInterval
 
 /**
  * :: Experimental ::
- * Used to indicate how often results should be produced by a 
[[ContinuousQuery]].
+ * Used to indicate how often results should be produced by a 
[[StreamingQuery]].
  *
  * @since 2.0.0
  */
@@ -65,7 +65,7 @@ case class ProcessingTime(intervalMs: Long) extends Trigger {
 
 /**
  * :: Experimental ::
- * Used to create [[ProcessingTime]] triggers for [[ContinuousQuery]]s.
+ * Used to create [[ProcessingTime]] triggers for [[StreamingQuery]]s.
  *
  * @since 2.0.0
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
deleted file mode 100644
index 8e1de09..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import java.util.concurrent.ConcurrentLinkedQueue
-
-import org.scalatest.BeforeAndAfter
-import org.scalatest.PrivateMethodTester._
-import org.scalatest.concurrent.AsyncAssertions.Waiter
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.concurrent.PatienceConfiguration.Timeout
-import org.scalatest.time.SpanSugar._
-
-import org.apache.spark.SparkException
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.util.JsonProtocol
-
-
-class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
-
-  import testImplicits._
-  import ContinuousQueryListener._
-
-  after {
-    spark.streams.active.foreach(_.stop())
-    assert(spark.streams.active.isEmpty)
-    assert(addedListeners.isEmpty)
-    // Make sure we don't leak any events to the next test
-    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
-  }
-
-  test("single listener") {
-    val listener = new QueryStatusCollector
-    val input = MemoryStream[Int]
-    withListenerAdded(listener) {
-      testStream(input.toDS)(
-        StartStream(),
-        AssertOnQuery("Incorrect query status in onQueryStarted") { query =>
-          val status = listener.startStatus
-          assert(status != null)
-          assert(status.name === query.name)
-          assert(status.id === query.id)
-          assert(status.sourceStatuses.size === 1)
-          assert(status.sourceStatuses(0).description.contains("Memory"))
-
-          // The source and sink offsets must be None as this must be called 
before the
-          // batches have started
-          assert(status.sourceStatuses(0).offsetDesc === None)
-          assert(status.sinkStatus.offsetDesc === CompositeOffset(None :: 
Nil).toString)
-
-          // No progress events or termination events
-          assert(listener.progressStatuses.isEmpty)
-          assert(listener.terminationStatus === null)
-        },
-        AddDataMemory(input, Seq(1, 2, 3)),
-        CheckAnswer(1, 2, 3),
-        AssertOnQuery("Incorrect query status in onQueryProgress") { query =>
-          eventually(Timeout(streamingTimeout)) {
-
-            // There should be only on progress event as batch has been 
processed
-            assert(listener.progressStatuses.size === 1)
-            val status = listener.progressStatuses.peek()
-            assert(status != null)
-            assert(status.name === query.name)
-            assert(status.id === query.id)
-            assert(status.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString))
-            assert(status.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString)
-
-            // No termination events
-            assert(listener.terminationStatus === null)
-          }
-        },
-        StopStream,
-        AssertOnQuery("Incorrect query status in onQueryTerminated") { query =>
-          eventually(Timeout(streamingTimeout)) {
-            val status = listener.terminationStatus
-            assert(status != null)
-            assert(status.name === query.name)
-            assert(status.id === query.id)
-            assert(status.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString))
-            assert(status.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString)
-            assert(listener.terminationStackTrace.isEmpty)
-            assert(listener.terminationException === None)
-          }
-          listener.checkAsyncErrors()
-        }
-      )
-    }
-  }
-
-  test("adding and removing listener") {
-    def isListenerActive(listener: QueryStatusCollector): Boolean = {
-      listener.reset()
-      testStream(MemoryStream[Int].toDS)(
-        StartStream(),
-        StopStream
-      )
-      listener.startStatus != null
-    }
-
-    try {
-      val listener1 = new QueryStatusCollector
-      val listener2 = new QueryStatusCollector
-
-      spark.streams.addListener(listener1)
-      assert(isListenerActive(listener1) === true)
-      assert(isListenerActive(listener2) === false)
-      spark.streams.addListener(listener2)
-      assert(isListenerActive(listener1) === true)
-      assert(isListenerActive(listener2) === true)
-      spark.streams.removeListener(listener1)
-      assert(isListenerActive(listener1) === false)
-      assert(isListenerActive(listener2) === true)
-    } finally {
-      addedListeners.foreach(spark.streams.removeListener)
-    }
-  }
-
-  test("event ordering") {
-    val listener = new QueryStatusCollector
-    withListenerAdded(listener) {
-      for (i <- 1 to 100) {
-        listener.reset()
-        require(listener.startStatus === null)
-        testStream(MemoryStream[Int].toDS)(
-          StartStream(),
-          Assert(listener.startStatus !== null, "onQueryStarted not called 
before query returned"),
-          StopStream,
-          Assert { listener.checkAsyncErrors() }
-        )
-      }
-    }
-  }
-
-  test("exception should be reported in QueryTerminated") {
-    val listener = new QueryStatusCollector
-    withListenerAdded(listener) {
-      val input = MemoryStream[Int]
-      testStream(input.toDS.map(_ / 0))(
-        StartStream(),
-        AddData(input, 1),
-        ExpectFailure[SparkException](),
-        Assert {
-          spark.sparkContext.listenerBus.waitUntilEmpty(10000)
-          assert(listener.terminationStatus !== null)
-          assert(listener.terminationException.isDefined)
-          
assert(listener.terminationException.get.contains("java.lang.ArithmeticException"))
-          assert(listener.terminationStackTrace.nonEmpty)
-        }
-      )
-    }
-  }
-
-  test("QueryStarted serialization") {
-    val queryStartedInfo = new ContinuousQueryInfo(
-      "name",
-      1,
-      Seq(new SourceStatus("source1", None), new SourceStatus("source2", 
None)),
-      new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString))
-    val queryStarted = new 
ContinuousQueryListener.QueryStarted(queryStartedInfo)
-    val json = JsonProtocol.sparkEventToJson(queryStarted)
-    val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
-      .asInstanceOf[ContinuousQueryListener.QueryStarted]
-    assertContinuousQueryInfoEquals(queryStarted.queryInfo, 
newQueryStarted.queryInfo)
-  }
-
-  test("QueryProgress serialization") {
-    val queryProcessInfo = new ContinuousQueryInfo(
-      "name",
-      1,
-      Seq(
-        new SourceStatus("source1", Some(LongOffset(0).toString)),
-        new SourceStatus("source2", Some(LongOffset(1).toString))),
-      new SinkStatus("sink", new CompositeOffset(Array(None, 
Some(LongOffset(1)))).toString))
-    val queryProcess = new 
ContinuousQueryListener.QueryProgress(queryProcessInfo)
-    val json = JsonProtocol.sparkEventToJson(queryProcess)
-    val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
-      .asInstanceOf[ContinuousQueryListener.QueryProgress]
-    assertContinuousQueryInfoEquals(queryProcess.queryInfo, 
newQueryProcess.queryInfo)
-  }
-
-  test("QueryTerminated serialization") {
-    val queryTerminatedInfo = new ContinuousQueryInfo(
-      "name",
-      1,
-      Seq(
-        new SourceStatus("source1", Some(LongOffset(0).toString)),
-        new SourceStatus("source2", Some(LongOffset(1).toString))),
-      new SinkStatus("sink", new CompositeOffset(Array(None, 
Some(LongOffset(1)))).toString))
-    val exception = new RuntimeException("exception")
-    val queryQueryTerminated = new ContinuousQueryListener.QueryTerminated(
-      queryTerminatedInfo,
-      Some(exception.getMessage),
-      exception.getStackTrace)
-    val json =
-      JsonProtocol.sparkEventToJson(queryQueryTerminated)
-    val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
-      .asInstanceOf[ContinuousQueryListener.QueryTerminated]
-    assertContinuousQueryInfoEquals(queryQueryTerminated.queryInfo, 
newQueryTerminated.queryInfo)
-    assert(queryQueryTerminated.exception === newQueryTerminated.exception)
-  }
-
-  private def assertContinuousQueryInfoEquals(
-      expected: ContinuousQueryInfo,
-      actual: ContinuousQueryInfo): Unit = {
-    assert(expected.name === actual.name)
-    assert(expected.sourceStatuses.size === actual.sourceStatuses.size)
-    expected.sourceStatuses.zip(actual.sourceStatuses).foreach {
-      case (expectedSource, actualSource) =>
-        assertSourceStatus(expectedSource, actualSource)
-    }
-    assertSinkStatus(expected.sinkStatus, actual.sinkStatus)
-  }
-
-  private def assertSourceStatus(expected: SourceStatus, actual: 
SourceStatus): Unit = {
-    assert(expected.description === actual.description)
-    assert(expected.offsetDesc === actual.offsetDesc)
-  }
-
-  private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit 
= {
-    assert(expected.description === actual.description)
-    assert(expected.offsetDesc === actual.offsetDesc)
-  }
-
-  private def withListenerAdded(listener: ContinuousQueryListener)(body: => 
Unit): Unit = {
-    try {
-      failAfter(1 minute) {
-        spark.streams.addListener(listener)
-        body
-      }
-    } finally {
-      spark.streams.removeListener(listener)
-    }
-  }
-
-  private def addedListeners(): Array[ContinuousQueryListener] = {
-    val listenerBusMethod =
-      PrivateMethod[ContinuousQueryListenerBus]('listenerBus)
-    val listenerBus = spark.streams invokePrivate listenerBusMethod()
-    listenerBus.listeners.toArray.map(_.asInstanceOf[ContinuousQueryListener])
-  }
-
-  class QueryStatusCollector extends ContinuousQueryListener {
-    // to catch errors in the async listener events
-    @volatile private var asyncTestWaiter = new Waiter
-
-    @volatile var startStatus: ContinuousQueryInfo = null
-    @volatile var terminationStatus: ContinuousQueryInfo = null
-    @volatile var terminationException: Option[String] = null
-    @volatile var terminationStackTrace: Seq[StackTraceElement] = null
-
-    val progressStatuses = new ConcurrentLinkedQueue[ContinuousQueryInfo]
-
-    def reset(): Unit = {
-      startStatus = null
-      terminationStatus = null
-      progressStatuses.clear()
-      asyncTestWaiter = new Waiter
-    }
-
-    def checkAsyncErrors(): Unit = {
-      asyncTestWaiter.await(timeout(streamingTimeout))
-    }
-
-
-    override def onQueryStarted(queryStarted: QueryStarted): Unit = {
-      asyncTestWaiter {
-        startStatus = queryStarted.queryInfo
-      }
-    }
-
-    override def onQueryProgress(queryProgress: QueryProgress): Unit = {
-      asyncTestWaiter {
-        assert(startStatus != null, "onQueryProgress called before 
onQueryStarted")
-        progressStatuses.add(queryProgress.queryInfo)
-      }
-    }
-
-    override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
-      asyncTestWaiter {
-        assert(startStatus != null, "onQueryTerminated called before 
onQueryStarted")
-        terminationStatus = queryTerminated.queryInfo
-        terminationException = queryTerminated.exception
-        terminationStackTrace = queryTerminated.stackTrace
-      }
-      asyncTestWaiter.dismiss()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
deleted file mode 100644
index ef2fcbf..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala
+++ /dev/null
@@ -1,299 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import scala.concurrent.Future
-import scala.util.Random
-import scala.util.control.NonFatal
-
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.concurrent.PatienceConfiguration.Timeout
-import org.scalatest.time.Span
-import org.scalatest.time.SpanSugar._
-
-import org.apache.spark.SparkException
-import org.apache.spark.sql.Dataset
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.util.Utils
-
-class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter {
-
-  import AwaitTerminationTester._
-  import testImplicits._
-
-  override val streamingTimeout = 20.seconds
-
-  before {
-    assert(spark.streams.active.isEmpty)
-    spark.streams.resetTerminated()
-  }
-
-  after {
-    assert(spark.streams.active.isEmpty)
-    spark.streams.resetTerminated()
-  }
-
-  testQuietly("listing") {
-    val (m1, ds1) = makeDataset
-    val (m2, ds2) = makeDataset
-    val (m3, ds3) = makeDataset
-
-    withQueriesOn(ds1, ds2, ds3) { queries =>
-      require(queries.size === 3)
-      assert(spark.streams.active.toSet === queries.toSet)
-      val (q1, q2, q3) = (queries(0), queries(1), queries(2))
-
-      assert(spark.streams.get(q1.id).eq(q1))
-      assert(spark.streams.get(q2.id).eq(q2))
-      assert(spark.streams.get(q3.id).eq(q3))
-      assert(spark.streams.get(-1) === null) // non-existent id
-      q1.stop()
-
-      assert(spark.streams.active.toSet === Set(q2, q3))
-      assert(spark.streams.get(q1.id) === null)
-      assert(spark.streams.get(q2.id).eq(q2))
-
-      m2.addData(0)   // q2 should terminate with error
-
-      eventually(Timeout(streamingTimeout)) {
-        require(!q2.isActive)
-        require(q2.exception.isDefined)
-      }
-      assert(spark.streams.get(q2.id) === null)
-      assert(spark.streams.active.toSet === Set(q3))
-    }
-  }
-
-  testQuietly("awaitAnyTermination without timeout and resetTerminated") {
-    val datasets = Seq.fill(5)(makeDataset._2)
-    withQueriesOn(datasets: _*) { queries =>
-      require(queries.size === datasets.size)
-      assert(spark.streams.active.toSet === queries.toSet)
-
-      // awaitAnyTermination should be blocking
-      testAwaitAnyTermination(ExpectBlocked)
-
-      // Stop a query asynchronously and see if it is reported through 
awaitAnyTermination
-      val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = 
false)
-      testAwaitAnyTermination(ExpectNotBlocked)
-      require(!q1.isActive) // should be inactive by the time the prev 
awaitAnyTerm returned
-
-      // All subsequent calls to awaitAnyTermination should be non-blocking
-      testAwaitAnyTermination(ExpectNotBlocked)
-
-      // Resetting termination should make awaitAnyTermination() blocking again
-      spark.streams.resetTerminated()
-      testAwaitAnyTermination(ExpectBlocked)
-
-      // Terminate a query asynchronously with exception and see 
awaitAnyTermination throws
-      // the exception
-      val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
-      testAwaitAnyTermination(ExpectException[SparkException])
-      require(!q2.isActive) // should be inactive by the time the prev 
awaitAnyTerm returned
-
-      // All subsequent calls to awaitAnyTermination should throw the exception
-      testAwaitAnyTermination(ExpectException[SparkException])
-
-      // Resetting termination should make awaitAnyTermination() blocking again
-      spark.streams.resetTerminated()
-      testAwaitAnyTermination(ExpectBlocked)
-
-      // Terminate multiple queries, one with failure and see whether 
awaitAnyTermination throws
-      // the exception
-      val q3 = stopRandomQueryAsync(10 milliseconds, withError = false)
-      testAwaitAnyTermination(ExpectNotBlocked)
-      require(!q3.isActive)
-      val q4 = stopRandomQueryAsync(10 milliseconds, withError = true)
-      eventually(Timeout(streamingTimeout)) { require(!q4.isActive) }
-      // After q4 terminates with exception, awaitAnyTerm should start 
throwing exception
-      testAwaitAnyTermination(ExpectException[SparkException])
-    }
-  }
-
-  testQuietly("awaitAnyTermination with timeout and resetTerminated") {
-    val datasets = Seq.fill(6)(makeDataset._2)
-    withQueriesOn(datasets: _*) { queries =>
-      require(queries.size === datasets.size)
-      assert(spark.streams.active.toSet === queries.toSet)
-
-      // awaitAnyTermination should be blocking or non-blocking depending on 
timeout values
-      testAwaitAnyTermination(
-        ExpectBlocked,
-        awaitTimeout = 4 seconds,
-        expectedReturnedValue = false,
-        testBehaviorFor = 2 seconds)
-
-      testAwaitAnyTermination(
-        ExpectNotBlocked,
-        awaitTimeout = 50 milliseconds,
-        expectedReturnedValue = false,
-        testBehaviorFor = 1 second)
-
-      // Stop a query asynchronously within timeout and awaitAnyTerm should be 
unblocked
-      val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = 
false)
-      testAwaitAnyTermination(
-        ExpectNotBlocked,
-        awaitTimeout = 2 seconds,
-        expectedReturnedValue = true,
-        testBehaviorFor = 4 seconds)
-      require(!q1.isActive) // should be inactive by the time the prev 
awaitAnyTerm returned
-
-      // All subsequent calls to awaitAnyTermination should be non-blocking 
even if timeout is high
-      testAwaitAnyTermination(
-        ExpectNotBlocked, awaitTimeout = 4 seconds, expectedReturnedValue = 
true)
-
-      // Resetting termination should make awaitAnyTermination() blocking again
-      spark.streams.resetTerminated()
-      testAwaitAnyTermination(
-        ExpectBlocked,
-        awaitTimeout = 4 seconds,
-        expectedReturnedValue = false,
-        testBehaviorFor = 1 second)
-
-      // Terminate a query asynchronously with exception within timeout, 
awaitAnyTermination should
-      // throws the exception
-      val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
-      testAwaitAnyTermination(
-        ExpectException[SparkException],
-        awaitTimeout = 4 seconds,
-        testBehaviorFor = 6 seconds)
-      require(!q2.isActive) // should be inactive by the time the prev 
awaitAnyTerm returned
-
-      // All subsequent calls to awaitAnyTermination should throw the exception
-      testAwaitAnyTermination(
-        ExpectException[SparkException],
-        awaitTimeout = 2 seconds,
-        testBehaviorFor = 4 seconds)
-
-      // Terminate a query asynchronously outside the timeout, awaitAnyTerm 
should be blocked
-      spark.streams.resetTerminated()
-      val q3 = stopRandomQueryAsync(2 seconds, withError = true)
-      testAwaitAnyTermination(
-        ExpectNotBlocked,
-        awaitTimeout = 100 milliseconds,
-        expectedReturnedValue = false,
-        testBehaviorFor = 4 seconds)
-
-      // After that query is stopped, awaitAnyTerm should throw exception
-      eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait 
for query to stop
-      testAwaitAnyTermination(
-        ExpectException[SparkException],
-        awaitTimeout = 100 milliseconds,
-        testBehaviorFor = 4 seconds)
-
-
-      // Terminate multiple queries, one with failure and see whether 
awaitAnyTermination throws
-      // the exception
-      spark.streams.resetTerminated()
-
-      val q4 = stopRandomQueryAsync(10 milliseconds, withError = false)
-      testAwaitAnyTermination(
-        ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = 
true)
-      require(!q4.isActive)
-      val q5 = stopRandomQueryAsync(10 milliseconds, withError = true)
-      eventually(Timeout(streamingTimeout)) { require(!q5.isActive) }
-      // After q5 terminates with exception, awaitAnyTerm should start 
throwing exception
-      testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 
2 seconds)
-    }
-  }
-
-
-  /** Run a body of code by defining a query on each dataset */
-  private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[ContinuousQuery] 
=> Unit): Unit = {
-    failAfter(streamingTimeout) {
-      val queries = withClue("Error starting queries") {
-        datasets.zipWithIndex.map { case (ds, i) =>
-          @volatile var query: StreamExecution = null
-          try {
-            val df = ds.toDF
-            val metadataRoot =
-              Utils.createTempDir(namePrefix = 
"streaming.checkpoint").getCanonicalPath
-            query =
-              df.writeStream
-                .format("memory")
-                .queryName(s"query$i")
-                .option("checkpointLocation", metadataRoot)
-                .outputMode("append")
-                .start()
-                .asInstanceOf[StreamExecution]
-          } catch {
-            case NonFatal(e) =>
-              if (query != null) query.stop()
-              throw e
-          }
-          query
-        }
-      }
-      try {
-        body(queries)
-      } finally {
-        queries.foreach(_.stop())
-      }
-    }
-  }
-
-  /** Test the behavior of awaitAnyTermination */
-  private def testAwaitAnyTermination(
-      expectedBehavior: ExpectedBehavior,
-      expectedReturnedValue: Boolean = false,
-      awaitTimeout: Span = null,
-      testBehaviorFor: Span = 4 seconds
-    ): Unit = {
-
-    def awaitTermFunc(): Unit = {
-      if (awaitTimeout != null && awaitTimeout.toMillis > 0) {
-        val returnedValue = 
spark.streams.awaitAnyTermination(awaitTimeout.toMillis)
-        assert(returnedValue === expectedReturnedValue, "Returned value does 
not match expected")
-      } else {
-        spark.streams.awaitAnyTermination()
-      }
-    }
-
-    AwaitTerminationTester.test(expectedBehavior, awaitTermFunc, 
testBehaviorFor)
-  }
-
-  /** Stop a random active query either with `stop()` or with an error */
-  private def stopRandomQueryAsync(stopAfter: Span, withError: Boolean): 
ContinuousQuery = {
-
-    import scala.concurrent.ExecutionContext.Implicits.global
-
-    val activeQueries = spark.streams.active
-    val queryToStop = activeQueries(Random.nextInt(activeQueries.length))
-    Future {
-      Thread.sleep(stopAfter.toMillis)
-      if (withError) {
-        logDebug(s"Terminating query ${queryToStop.name} with error")
-        queryToStop.asInstanceOf[StreamExecution].logicalPlan.collect {
-          case StreamingExecutionRelation(source, _) =>
-            source.asInstanceOf[MemoryStream[Int]].addData(0)
-        }
-      } else {
-        logDebug(s"Stopping query ${queryToStop.name}")
-        queryToStop.stop()
-      }
-    }
-    queryToStop
-  }
-
-  private def makeDataset: (MemoryStream[Int], Dataset[Int]) = {
-    val inputData = MemoryStream[Int]
-    val mapped = inputData.toDS.map(6 / _)
-    (inputData, mapped)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
deleted file mode 100644
index ad6bc27..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.SparkException
-import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, 
MemoryStream, StreamExecution}
-import org.apache.spark.util.Utils
-
-
-class ContinuousQuerySuite extends StreamTest with BeforeAndAfter {
-
-  import AwaitTerminationTester._
-  import testImplicits._
-
-  after {
-    sqlContext.streams.active.foreach(_.stop())
-  }
-
-  test("names unique across active queries, ids unique across all started 
queries") {
-    val inputData = MemoryStream[Int]
-    val mapped = inputData.toDS().map { 6 / _}
-
-    def startQuery(queryName: String): ContinuousQuery = {
-      val metadataRoot = Utils.createTempDir(namePrefix = 
"streaming.checkpoint").getCanonicalPath
-      val writer = mapped.writeStream
-      writer
-        .queryName(queryName)
-        .format("memory")
-        .option("checkpointLocation", metadataRoot)
-        .start()
-    }
-
-    val q1 = startQuery("q1")
-    assert(q1.name === "q1")
-
-    // Verify that another query with same name cannot be started
-    val e1 = intercept[IllegalArgumentException] {
-      startQuery("q1")
-    }
-    Seq("q1", "already active").foreach { s => 
assert(e1.getMessage.contains(s)) }
-
-    // Verify q1 was unaffected by the above exception and stop it
-    assert(q1.isActive)
-    q1.stop()
-
-    // Verify another query can be started with name q1, but will have 
different id
-    val q2 = startQuery("q1")
-    assert(q2.name === "q1")
-    assert(q2.id !== q1.id)
-    q2.stop()
-  }
-
-  testQuietly("lifecycle states and awaitTermination") {
-    val inputData = MemoryStream[Int]
-    val mapped = inputData.toDS().map { 6 / _}
-
-    testStream(mapped)(
-      AssertOnQuery(_.isActive === true),
-      AssertOnQuery(_.exception.isEmpty),
-      AddData(inputData, 1, 2),
-      CheckAnswer(6, 3),
-      TestAwaitTermination(ExpectBlocked),
-      TestAwaitTermination(ExpectBlocked, timeoutMs = 2000),
-      TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, 
expectedReturnValue = false),
-      StopStream,
-      AssertOnQuery(_.isActive === false),
-      AssertOnQuery(_.exception.isEmpty),
-      TestAwaitTermination(ExpectNotBlocked),
-      TestAwaitTermination(ExpectNotBlocked, timeoutMs = 2000, 
expectedReturnValue = true),
-      TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, 
expectedReturnValue = true),
-      StartStream(),
-      AssertOnQuery(_.isActive === true),
-      AddData(inputData, 0),
-      ExpectFailure[SparkException],
-      AssertOnQuery(_.isActive === false),
-      TestAwaitTermination(ExpectException[SparkException]),
-      TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
-      TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
-      AssertOnQuery(
-        q =>
-          q.exception.get.startOffset.get === 
q.committedOffsets.toCompositeOffset(Seq(inputData)),
-        "incorrect start offset on exception")
-    )
-  }
-
-  testQuietly("source and sink statuses") {
-    val inputData = MemoryStream[Int]
-    val mapped = inputData.toDS().map(6 / _)
-
-    testStream(mapped)(
-      AssertOnQuery(_.sourceStatuses.length === 1),
-      AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === None),
-      AssertOnQuery(_.sinkStatus.description.contains("Memory")),
-      AssertOnQuery(_.sinkStatus.offsetDesc === new CompositeOffset(None :: 
Nil).toString),
-      AddData(inputData, 1, 2),
-      CheckAnswer(6, 3),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString)),
-      AssertOnQuery(_.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString),
-      AddData(inputData, 1, 2),
-      CheckAnswer(6, 3, 6, 3),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === 
Some(LongOffset(1).toString)),
-      AssertOnQuery(_.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(1)).toString),
-      AddData(inputData, 0),
-      ExpectFailure[SparkException],
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === 
Some(LongOffset(2).toString)),
-      AssertOnQuery(_.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(1)).toString)
-    )
-  }
-
-  /**
-   * A [[StreamAction]] to test the behavior of 
`ContinuousQuery.awaitTermination()`.
-   *
-   * @param expectedBehavior  Expected behavior (not blocked, blocked, or 
exception thrown)
-   * @param timeoutMs         Timeout in milliseconds
-   *                          When timeoutMs <= 0, awaitTermination() is 
tested (i.e. w/o timeout)
-   *                          When timeoutMs > 0, awaitTermination(timeoutMs) 
is tested
-   * @param expectedReturnValue Expected return value when 
awaitTermination(timeoutMs) is used
-   */
-  case class TestAwaitTermination(
-      expectedBehavior: ExpectedBehavior,
-      timeoutMs: Int = -1,
-      expectedReturnValue: Boolean = false
-    ) extends AssertOnQuery(
-      TestAwaitTermination.assertOnQueryCondition(expectedBehavior, timeoutMs, 
expectedReturnValue),
-      "Error testing awaitTermination behavior"
-    ) {
-    override def toString(): String = {
-      s"TestAwaitTermination($expectedBehavior, timeoutMs = $timeoutMs, " +
-        s"expectedReturnValue = $expectedReturnValue)"
-    }
-  }
-
-  object TestAwaitTermination {
-
-    /**
-     * Tests the behavior of `ContinuousQuery.awaitTermination`.
-     *
-     * @param expectedBehavior  Expected behavior (not blocked, blocked, or 
exception thrown)
-     * @param timeoutMs         Timeout in milliseconds
-     *                          When timeoutMs <= 0, awaitTermination() is 
tested (i.e. w/o timeout)
-     *                          When timeoutMs > 0, 
awaitTermination(timeoutMs) is tested
-     * @param expectedReturnValue Expected return value when 
awaitTermination(timeoutMs) is used
-     */
-    def assertOnQueryCondition(
-        expectedBehavior: ExpectedBehavior,
-        timeoutMs: Int,
-        expectedReturnValue: Boolean
-      )(q: StreamExecution): Boolean = {
-
-      def awaitTermFunc(): Unit = {
-        if (timeoutMs <= 0) {
-          q.awaitTermination()
-        } else {
-          val returnedValue = q.awaitTermination(timeoutMs)
-          assert(returnedValue === expectedReturnValue, "Returned value does 
not match expected")
-        }
-      }
-      AwaitTerminationTester.test(expectedBehavior, awaitTermFunc)
-      true // If the control reached here, then everything worked as expected
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index a5acc97..9d0a2b3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -124,7 +124,7 @@ class FileStreamSinkSuite extends StreamTest {
     val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
     val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
 
-    var query: ContinuousQuery = null
+    var query: StreamingQuery = null
 
     try {
       query =
@@ -156,7 +156,7 @@ class FileStreamSinkSuite extends StreamTest {
     val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
     val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
 
-    var query: ContinuousQuery = null
+    var query: StreamingQuery = null
 
     try {
       query =
@@ -240,7 +240,7 @@ class FileStreamSinkSuite extends StreamTest {
       val outputDir = Utils.createTempDir(namePrefix = 
"stream.output").getCanonicalPath
       val checkpointDir = Utils.createTempDir(namePrefix = 
"stream.checkpoint").getCanonicalPath
 
-      var query: ContinuousQuery = null
+      var query: StreamingQuery = null
 
       try {
         val writer =

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
index 0e157cf..f9e236c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala
@@ -57,7 +57,7 @@ class FileStressSuite extends StreamTest {
     @volatile
     var continue = true
     @volatile
-    var stream: ContinuousQuery = null
+    var stream: StreamingQuery = null
 
     val writer = new Thread("stream writer") {
       override def run(): Unit = {
@@ -100,7 +100,7 @@ class FileStressSuite extends StreamTest {
 
     val input = spark.readStream.format("text").load(inputDir)
 
-    def startStream(): ContinuousQuery = {
+    def startStream(): StreamingQuery = {
       val output = input
         .repartition(5)
         .as[String]
@@ -139,7 +139,7 @@ class FileStressSuite extends StreamTest {
         try {
           stream.awaitTermination()
         } catch {
-          case ce: ContinuousQueryException =>
+          case ce: StreamingQueryException =>
             failures += 1
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index cbfa6ff..720ffaf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -353,7 +353,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
           case ef: ExpectFailure[_] =>
             verify(currentStream != null, "can not expect failure when stream 
is not running")
             try failAfter(streamingTimeout) {
-              val thrownException = intercept[ContinuousQueryException] {
+              val thrownException = intercept[StreamingQueryException] {
                 currentStream.awaitTermination()
               }
               eventually("microbatch thread not stopped after termination with 
failure") {
@@ -563,7 +563,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
         case e: ExpectException[_] =>
           val thrownException =
             withClue(s"Did not throw ${e.t.runtimeClass.getSimpleName} when 
expected.") {
-              intercept[ContinuousQueryException] {
+              intercept[StreamingQueryException] {
                 failAfter(testTimeout) {
                   awaitTermFunc()
                 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 7f44227..8681199 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -40,8 +40,6 @@ class StreamingAggregationSuite extends StreamTest with 
BeforeAndAfterAll {
 
   import testImplicits._
 
-
-
   test("simple count, update mode") {
     val inputData = MemoryStream[Int]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
new file mode 100644
index 0000000..7f4d28c
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.PrivateMethodTester._
+import org.scalatest.concurrent.AsyncAssertions.Waiter
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.util.JsonProtocol
+
+
+class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
+
+  import testImplicits._
+  import StreamingQueryListener._
+
+  after {
+    spark.streams.active.foreach(_.stop())
+    assert(spark.streams.active.isEmpty)
+    assert(addedListeners.isEmpty)
+    // Make sure we don't leak any events to the next test
+    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+  }
+
+  test("single listener") {
+    val listener = new QueryStatusCollector
+    val input = MemoryStream[Int]
+    withListenerAdded(listener) {
+      testStream(input.toDS)(
+        StartStream(),
+        AssertOnQuery("Incorrect query status in onQueryStarted") { query =>
+          val status = listener.startStatus
+          assert(status != null)
+          assert(status.name === query.name)
+          assert(status.id === query.id)
+          assert(status.sourceStatuses.size === 1)
+          assert(status.sourceStatuses(0).description.contains("Memory"))
+
+          // The source and sink offsets must be None as this must be called 
before the
+          // batches have started
+          assert(status.sourceStatuses(0).offsetDesc === None)
+          assert(status.sinkStatus.offsetDesc === CompositeOffset(None :: 
Nil).toString)
+
+          // No progress events or termination events
+          assert(listener.progressStatuses.isEmpty)
+          assert(listener.terminationStatus === null)
+        },
+        AddDataMemory(input, Seq(1, 2, 3)),
+        CheckAnswer(1, 2, 3),
+        AssertOnQuery("Incorrect query status in onQueryProgress") { query =>
+          eventually(Timeout(streamingTimeout)) {
+
+            // There should be only on progress event as batch has been 
processed
+            assert(listener.progressStatuses.size === 1)
+            val status = listener.progressStatuses.peek()
+            assert(status != null)
+            assert(status.name === query.name)
+            assert(status.id === query.id)
+            assert(status.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString))
+            assert(status.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString)
+
+            // No termination events
+            assert(listener.terminationStatus === null)
+          }
+        },
+        StopStream,
+        AssertOnQuery("Incorrect query status in onQueryTerminated") { query =>
+          eventually(Timeout(streamingTimeout)) {
+            val status = listener.terminationStatus
+            assert(status != null)
+            assert(status.name === query.name)
+            assert(status.id === query.id)
+            assert(status.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString))
+            assert(status.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString)
+            assert(listener.terminationStackTrace.isEmpty)
+            assert(listener.terminationException === None)
+          }
+          listener.checkAsyncErrors()
+        }
+      )
+    }
+  }
+
+  test("adding and removing listener") {
+    def isListenerActive(listener: QueryStatusCollector): Boolean = {
+      listener.reset()
+      testStream(MemoryStream[Int].toDS)(
+        StartStream(),
+        StopStream
+      )
+      listener.startStatus != null
+    }
+
+    try {
+      val listener1 = new QueryStatusCollector
+      val listener2 = new QueryStatusCollector
+
+      spark.streams.addListener(listener1)
+      assert(isListenerActive(listener1) === true)
+      assert(isListenerActive(listener2) === false)
+      spark.streams.addListener(listener2)
+      assert(isListenerActive(listener1) === true)
+      assert(isListenerActive(listener2) === true)
+      spark.streams.removeListener(listener1)
+      assert(isListenerActive(listener1) === false)
+      assert(isListenerActive(listener2) === true)
+    } finally {
+      addedListeners.foreach(spark.streams.removeListener)
+    }
+  }
+
+  test("event ordering") {
+    val listener = new QueryStatusCollector
+    withListenerAdded(listener) {
+      for (i <- 1 to 100) {
+        listener.reset()
+        require(listener.startStatus === null)
+        testStream(MemoryStream[Int].toDS)(
+          StartStream(),
+          Assert(listener.startStatus !== null, "onQueryStarted not called 
before query returned"),
+          StopStream,
+          Assert { listener.checkAsyncErrors() }
+        )
+      }
+    }
+  }
+
+  test("exception should be reported in QueryTerminated") {
+    val listener = new QueryStatusCollector
+    withListenerAdded(listener) {
+      val input = MemoryStream[Int]
+      testStream(input.toDS.map(_ / 0))(
+        StartStream(),
+        AddData(input, 1),
+        ExpectFailure[SparkException](),
+        Assert {
+          spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+          assert(listener.terminationStatus !== null)
+          assert(listener.terminationException.isDefined)
+          
assert(listener.terminationException.get.contains("java.lang.ArithmeticException"))
+          assert(listener.terminationStackTrace.nonEmpty)
+        }
+      )
+    }
+  }
+
+  test("QueryStarted serialization") {
+    val queryStartedInfo = new StreamingQueryInfo(
+      "name",
+      1,
+      Seq(new SourceStatus("source1", None), new SourceStatus("source2", 
None)),
+      new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString))
+    val queryStarted = new 
StreamingQueryListener.QueryStarted(queryStartedInfo)
+    val json = JsonProtocol.sparkEventToJson(queryStarted)
+    val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
+      .asInstanceOf[StreamingQueryListener.QueryStarted]
+    assertStreamingQueryInfoEquals(queryStarted.queryInfo, 
newQueryStarted.queryInfo)
+  }
+
+  test("QueryProgress serialization") {
+    val queryProcessInfo = new StreamingQueryInfo(
+      "name",
+      1,
+      Seq(
+        new SourceStatus("source1", Some(LongOffset(0).toString)),
+        new SourceStatus("source2", Some(LongOffset(1).toString))),
+      new SinkStatus("sink", new CompositeOffset(Array(None, 
Some(LongOffset(1)))).toString))
+    val queryProcess = new 
StreamingQueryListener.QueryProgress(queryProcessInfo)
+    val json = JsonProtocol.sparkEventToJson(queryProcess)
+    val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
+      .asInstanceOf[StreamingQueryListener.QueryProgress]
+    assertStreamingQueryInfoEquals(queryProcess.queryInfo, 
newQueryProcess.queryInfo)
+  }
+
+  test("QueryTerminated serialization") {
+    val queryTerminatedInfo = new StreamingQueryInfo(
+      "name",
+      1,
+      Seq(
+        new SourceStatus("source1", Some(LongOffset(0).toString)),
+        new SourceStatus("source2", Some(LongOffset(1).toString))),
+      new SinkStatus("sink", new CompositeOffset(Array(None, 
Some(LongOffset(1)))).toString))
+    val exception = new RuntimeException("exception")
+    val queryQueryTerminated = new StreamingQueryListener.QueryTerminated(
+      queryTerminatedInfo,
+      Some(exception.getMessage),
+      exception.getStackTrace)
+    val json =
+      JsonProtocol.sparkEventToJson(queryQueryTerminated)
+    val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
+      .asInstanceOf[StreamingQueryListener.QueryTerminated]
+    assertStreamingQueryInfoEquals(queryQueryTerminated.queryInfo, 
newQueryTerminated.queryInfo)
+    assert(queryQueryTerminated.exception === newQueryTerminated.exception)
+  }
+
+  private def assertStreamingQueryInfoEquals(
+      expected: StreamingQueryInfo,
+      actual: StreamingQueryInfo): Unit = {
+    assert(expected.name === actual.name)
+    assert(expected.sourceStatuses.size === actual.sourceStatuses.size)
+    expected.sourceStatuses.zip(actual.sourceStatuses).foreach {
+      case (expectedSource, actualSource) =>
+        assertSourceStatus(expectedSource, actualSource)
+    }
+    assertSinkStatus(expected.sinkStatus, actual.sinkStatus)
+  }
+
+  private def assertSourceStatus(expected: SourceStatus, actual: 
SourceStatus): Unit = {
+    assert(expected.description === actual.description)
+    assert(expected.offsetDesc === actual.offsetDesc)
+  }
+
+  private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit 
= {
+    assert(expected.description === actual.description)
+    assert(expected.offsetDesc === actual.offsetDesc)
+  }
+
+  private def withListenerAdded(listener: StreamingQueryListener)(body: => 
Unit): Unit = {
+    try {
+      failAfter(1 minute) {
+        spark.streams.addListener(listener)
+        body
+      }
+    } finally {
+      spark.streams.removeListener(listener)
+    }
+  }
+
+  private def addedListeners(): Array[StreamingQueryListener] = {
+    val listenerBusMethod =
+      PrivateMethod[StreamingQueryListenerBus]('listenerBus)
+    val listenerBus = spark.streams invokePrivate listenerBusMethod()
+    listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener])
+  }
+
+  class QueryStatusCollector extends StreamingQueryListener {
+    // to catch errors in the async listener events
+    @volatile private var asyncTestWaiter = new Waiter
+
+    @volatile var startStatus: StreamingQueryInfo = null
+    @volatile var terminationStatus: StreamingQueryInfo = null
+    @volatile var terminationException: Option[String] = null
+    @volatile var terminationStackTrace: Seq[StackTraceElement] = null
+
+    val progressStatuses = new ConcurrentLinkedQueue[StreamingQueryInfo]
+
+    def reset(): Unit = {
+      startStatus = null
+      terminationStatus = null
+      progressStatuses.clear()
+      asyncTestWaiter = new Waiter
+    }
+
+    def checkAsyncErrors(): Unit = {
+      asyncTestWaiter.await(timeout(streamingTimeout))
+    }
+
+
+    override def onQueryStarted(queryStarted: QueryStarted): Unit = {
+      asyncTestWaiter {
+        startStatus = queryStarted.queryInfo
+      }
+    }
+
+    override def onQueryProgress(queryProgress: QueryProgress): Unit = {
+      asyncTestWaiter {
+        assert(startStatus != null, "onQueryProgress called before 
onQueryStarted")
+        progressStatuses.add(queryProgress.queryInfo)
+      }
+    }
+
+    override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
+      asyncTestWaiter {
+        assert(startStatus != null, "onQueryTerminated called before 
onQueryStarted")
+        terminationStatus = queryTerminated.queryInfo
+        terminationException = queryTerminated.exception
+        terminationStackTrace = queryTerminated.stackTrace
+      }
+      asyncTestWaiter.dismiss()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
new file mode 100644
index 0000000..41ffd56
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.concurrent.Future
+import scala.util.Random
+import scala.util.control.NonFatal
+
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.scalatest.time.Span
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.util.Utils
+
+class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
+
+  import AwaitTerminationTester._
+  import testImplicits._
+
+  override val streamingTimeout = 20.seconds
+
+  before {
+    assert(spark.streams.active.isEmpty)
+    spark.streams.resetTerminated()
+  }
+
+  after {
+    assert(spark.streams.active.isEmpty)
+    spark.streams.resetTerminated()
+  }
+
+  testQuietly("listing") {
+    val (m1, ds1) = makeDataset
+    val (m2, ds2) = makeDataset
+    val (m3, ds3) = makeDataset
+
+    withQueriesOn(ds1, ds2, ds3) { queries =>
+      require(queries.size === 3)
+      assert(spark.streams.active.toSet === queries.toSet)
+      val (q1, q2, q3) = (queries(0), queries(1), queries(2))
+
+      assert(spark.streams.get(q1.id).eq(q1))
+      assert(spark.streams.get(q2.id).eq(q2))
+      assert(spark.streams.get(q3.id).eq(q3))
+      assert(spark.streams.get(-1) === null) // non-existent id
+      q1.stop()
+
+      assert(spark.streams.active.toSet === Set(q2, q3))
+      assert(spark.streams.get(q1.id) === null)
+      assert(spark.streams.get(q2.id).eq(q2))
+
+      m2.addData(0)   // q2 should terminate with error
+
+      eventually(Timeout(streamingTimeout)) {
+        require(!q2.isActive)
+        require(q2.exception.isDefined)
+      }
+      assert(spark.streams.get(q2.id) === null)
+      assert(spark.streams.active.toSet === Set(q3))
+    }
+  }
+
+  testQuietly("awaitAnyTermination without timeout and resetTerminated") {
+    val datasets = Seq.fill(5)(makeDataset._2)
+    withQueriesOn(datasets: _*) { queries =>
+      require(queries.size === datasets.size)
+      assert(spark.streams.active.toSet === queries.toSet)
+
+      // awaitAnyTermination should be blocking
+      testAwaitAnyTermination(ExpectBlocked)
+
+      // Stop a query asynchronously and see if it is reported through 
awaitAnyTermination
+      val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = 
false)
+      testAwaitAnyTermination(ExpectNotBlocked)
+      require(!q1.isActive) // should be inactive by the time the prev 
awaitAnyTerm returned
+
+      // All subsequent calls to awaitAnyTermination should be non-blocking
+      testAwaitAnyTermination(ExpectNotBlocked)
+
+      // Resetting termination should make awaitAnyTermination() blocking again
+      spark.streams.resetTerminated()
+      testAwaitAnyTermination(ExpectBlocked)
+
+      // Terminate a query asynchronously with exception and see 
awaitAnyTermination throws
+      // the exception
+      val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
+      testAwaitAnyTermination(ExpectException[SparkException])
+      require(!q2.isActive) // should be inactive by the time the prev 
awaitAnyTerm returned
+
+      // All subsequent calls to awaitAnyTermination should throw the exception
+      testAwaitAnyTermination(ExpectException[SparkException])
+
+      // Resetting termination should make awaitAnyTermination() blocking again
+      spark.streams.resetTerminated()
+      testAwaitAnyTermination(ExpectBlocked)
+
+      // Terminate multiple queries, one with failure and see whether 
awaitAnyTermination throws
+      // the exception
+      val q3 = stopRandomQueryAsync(10 milliseconds, withError = false)
+      testAwaitAnyTermination(ExpectNotBlocked)
+      require(!q3.isActive)
+      val q4 = stopRandomQueryAsync(10 milliseconds, withError = true)
+      eventually(Timeout(streamingTimeout)) { require(!q4.isActive) }
+      // After q4 terminates with exception, awaitAnyTerm should start 
throwing exception
+      testAwaitAnyTermination(ExpectException[SparkException])
+    }
+  }
+
+  testQuietly("awaitAnyTermination with timeout and resetTerminated") {
+    val datasets = Seq.fill(6)(makeDataset._2)
+    withQueriesOn(datasets: _*) { queries =>
+      require(queries.size === datasets.size)
+      assert(spark.streams.active.toSet === queries.toSet)
+
+      // awaitAnyTermination should be blocking or non-blocking depending on 
timeout values
+      testAwaitAnyTermination(
+        ExpectBlocked,
+        awaitTimeout = 4 seconds,
+        expectedReturnedValue = false,
+        testBehaviorFor = 2 seconds)
+
+      testAwaitAnyTermination(
+        ExpectNotBlocked,
+        awaitTimeout = 50 milliseconds,
+        expectedReturnedValue = false,
+        testBehaviorFor = 1 second)
+
+      // Stop a query asynchronously within timeout and awaitAnyTerm should be 
unblocked
+      val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = 
false)
+      testAwaitAnyTermination(
+        ExpectNotBlocked,
+        awaitTimeout = 2 seconds,
+        expectedReturnedValue = true,
+        testBehaviorFor = 4 seconds)
+      require(!q1.isActive) // should be inactive by the time the prev 
awaitAnyTerm returned
+
+      // All subsequent calls to awaitAnyTermination should be non-blocking 
even if timeout is high
+      testAwaitAnyTermination(
+        ExpectNotBlocked, awaitTimeout = 4 seconds, expectedReturnedValue = 
true)
+
+      // Resetting termination should make awaitAnyTermination() blocking again
+      spark.streams.resetTerminated()
+      testAwaitAnyTermination(
+        ExpectBlocked,
+        awaitTimeout = 4 seconds,
+        expectedReturnedValue = false,
+        testBehaviorFor = 1 second)
+
+      // Terminate a query asynchronously with exception within timeout, 
awaitAnyTermination should
+      // throws the exception
+      val q2 = stopRandomQueryAsync(100 milliseconds, withError = true)
+      testAwaitAnyTermination(
+        ExpectException[SparkException],
+        awaitTimeout = 4 seconds,
+        testBehaviorFor = 6 seconds)
+      require(!q2.isActive) // should be inactive by the time the prev 
awaitAnyTerm returned
+
+      // All subsequent calls to awaitAnyTermination should throw the exception
+      testAwaitAnyTermination(
+        ExpectException[SparkException],
+        awaitTimeout = 2 seconds,
+        testBehaviorFor = 4 seconds)
+
+      // Terminate a query asynchronously outside the timeout, awaitAnyTerm 
should be blocked
+      spark.streams.resetTerminated()
+      val q3 = stopRandomQueryAsync(2 seconds, withError = true)
+      testAwaitAnyTermination(
+        ExpectNotBlocked,
+        awaitTimeout = 100 milliseconds,
+        expectedReturnedValue = false,
+        testBehaviorFor = 4 seconds)
+
+      // After that query is stopped, awaitAnyTerm should throw exception
+      eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait 
for query to stop
+      testAwaitAnyTermination(
+        ExpectException[SparkException],
+        awaitTimeout = 100 milliseconds,
+        testBehaviorFor = 4 seconds)
+
+
+      // Terminate multiple queries, one with failure and see whether 
awaitAnyTermination throws
+      // the exception
+      spark.streams.resetTerminated()
+
+      val q4 = stopRandomQueryAsync(10 milliseconds, withError = false)
+      testAwaitAnyTermination(
+        ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = 
true)
+      require(!q4.isActive)
+      val q5 = stopRandomQueryAsync(10 milliseconds, withError = true)
+      eventually(Timeout(streamingTimeout)) { require(!q5.isActive) }
+      // After q5 terminates with exception, awaitAnyTerm should start 
throwing exception
+      testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 
2 seconds)
+    }
+  }
+
+
+  /** Run a body of code by defining a query on each dataset */
+  private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] 
=> Unit): Unit = {
+    failAfter(streamingTimeout) {
+      val queries = withClue("Error starting queries") {
+        datasets.zipWithIndex.map { case (ds, i) =>
+          @volatile var query: StreamExecution = null
+          try {
+            val df = ds.toDF
+            val metadataRoot =
+              Utils.createTempDir(namePrefix = 
"streaming.checkpoint").getCanonicalPath
+            query =
+              df.writeStream
+                .format("memory")
+                .queryName(s"query$i")
+                .option("checkpointLocation", metadataRoot)
+                .outputMode("append")
+                .start()
+                .asInstanceOf[StreamExecution]
+          } catch {
+            case NonFatal(e) =>
+              if (query != null) query.stop()
+              throw e
+          }
+          query
+        }
+      }
+      try {
+        body(queries)
+      } finally {
+        queries.foreach(_.stop())
+      }
+    }
+  }
+
+  /** Test the behavior of awaitAnyTermination */
+  private def testAwaitAnyTermination(
+      expectedBehavior: ExpectedBehavior,
+      expectedReturnedValue: Boolean = false,
+      awaitTimeout: Span = null,
+      testBehaviorFor: Span = 4 seconds
+    ): Unit = {
+
+    def awaitTermFunc(): Unit = {
+      if (awaitTimeout != null && awaitTimeout.toMillis > 0) {
+        val returnedValue = 
spark.streams.awaitAnyTermination(awaitTimeout.toMillis)
+        assert(returnedValue === expectedReturnedValue, "Returned value does 
not match expected")
+      } else {
+        spark.streams.awaitAnyTermination()
+      }
+    }
+
+    AwaitTerminationTester.test(expectedBehavior, awaitTermFunc, 
testBehaviorFor)
+  }
+
+  /** Stop a random active query either with `stop()` or with an error */
+  private def stopRandomQueryAsync(stopAfter: Span, withError: Boolean): 
StreamingQuery = {
+
+    import scala.concurrent.ExecutionContext.Implicits.global
+
+    val activeQueries = spark.streams.active
+    val queryToStop = activeQueries(Random.nextInt(activeQueries.length))
+    Future {
+      Thread.sleep(stopAfter.toMillis)
+      if (withError) {
+        logDebug(s"Terminating query ${queryToStop.name} with error")
+        queryToStop.asInstanceOf[StreamExecution].logicalPlan.collect {
+          case StreamingExecutionRelation(source, _) =>
+            source.asInstanceOf[MemoryStream[Int]].addData(0)
+        }
+      } else {
+        logDebug(s"Stopping query ${queryToStop.name}")
+        queryToStop.stop()
+      }
+    }
+    queryToStop
+  }
+
+  private def makeDataset: (MemoryStream[Int], Dataset[Int]) = {
+    val inputData = MemoryStream[Int]
+    val mapped = inputData.toDS.map(6 / _)
+    (inputData, mapped)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
new file mode 100644
index 0000000..9d58315
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, 
MemoryStream, StreamExecution}
+import org.apache.spark.util.Utils
+
+
+class StreamingQuerySuite extends StreamTest with BeforeAndAfter {
+
+  import AwaitTerminationTester._
+  import testImplicits._
+
+  after {
+    sqlContext.streams.active.foreach(_.stop())
+  }
+
+  test("names unique across active queries, ids unique across all started 
queries") {
+    val inputData = MemoryStream[Int]
+    val mapped = inputData.toDS().map { 6 / _}
+
+    def startQuery(queryName: String): StreamingQuery = {
+      val metadataRoot = Utils.createTempDir(namePrefix = 
"streaming.checkpoint").getCanonicalPath
+      val writer = mapped.writeStream
+      writer
+        .queryName(queryName)
+        .format("memory")
+        .option("checkpointLocation", metadataRoot)
+        .start()
+    }
+
+    val q1 = startQuery("q1")
+    assert(q1.name === "q1")
+
+    // Verify that another query with same name cannot be started
+    val e1 = intercept[IllegalArgumentException] {
+      startQuery("q1")
+    }
+    Seq("q1", "already active").foreach { s => 
assert(e1.getMessage.contains(s)) }
+
+    // Verify q1 was unaffected by the above exception and stop it
+    assert(q1.isActive)
+    q1.stop()
+
+    // Verify another query can be started with name q1, but will have 
different id
+    val q2 = startQuery("q1")
+    assert(q2.name === "q1")
+    assert(q2.id !== q1.id)
+    q2.stop()
+  }
+
+  testQuietly("lifecycle states and awaitTermination") {
+    val inputData = MemoryStream[Int]
+    val mapped = inputData.toDS().map { 6 / _}
+
+    testStream(mapped)(
+      AssertOnQuery(_.isActive === true),
+      AssertOnQuery(_.exception.isEmpty),
+      AddData(inputData, 1, 2),
+      CheckAnswer(6, 3),
+      TestAwaitTermination(ExpectBlocked),
+      TestAwaitTermination(ExpectBlocked, timeoutMs = 2000),
+      TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, 
expectedReturnValue = false),
+      StopStream,
+      AssertOnQuery(_.isActive === false),
+      AssertOnQuery(_.exception.isEmpty),
+      TestAwaitTermination(ExpectNotBlocked),
+      TestAwaitTermination(ExpectNotBlocked, timeoutMs = 2000, 
expectedReturnValue = true),
+      TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, 
expectedReturnValue = true),
+      StartStream(),
+      AssertOnQuery(_.isActive === true),
+      AddData(inputData, 0),
+      ExpectFailure[SparkException],
+      AssertOnQuery(_.isActive === false),
+      TestAwaitTermination(ExpectException[SparkException]),
+      TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
+      TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
+      AssertOnQuery(
+        q =>
+          q.exception.get.startOffset.get === 
q.committedOffsets.toCompositeOffset(Seq(inputData)),
+        "incorrect start offset on exception")
+    )
+  }
+
+  testQuietly("source and sink statuses") {
+    val inputData = MemoryStream[Int]
+    val mapped = inputData.toDS().map(6 / _)
+
+    testStream(mapped)(
+      AssertOnQuery(_.sourceStatuses.length === 1),
+      AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === None),
+      AssertOnQuery(_.sinkStatus.description.contains("Memory")),
+      AssertOnQuery(_.sinkStatus.offsetDesc === new CompositeOffset(None :: 
Nil).toString),
+      AddData(inputData, 1, 2),
+      CheckAnswer(6, 3),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString)),
+      AssertOnQuery(_.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString),
+      AddData(inputData, 1, 2),
+      CheckAnswer(6, 3, 6, 3),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === 
Some(LongOffset(1).toString)),
+      AssertOnQuery(_.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(1)).toString),
+      AddData(inputData, 0),
+      ExpectFailure[SparkException],
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === 
Some(LongOffset(2).toString)),
+      AssertOnQuery(_.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(1)).toString)
+    )
+  }
+
+  /**
+   * A [[StreamAction]] to test the behavior of 
`StreamingQuery.awaitTermination()`.
+   *
+   * @param expectedBehavior  Expected behavior (not blocked, blocked, or 
exception thrown)
+   * @param timeoutMs         Timeout in milliseconds
+   *                          When timeoutMs <= 0, awaitTermination() is 
tested (i.e. w/o timeout)
+   *                          When timeoutMs > 0, awaitTermination(timeoutMs) 
is tested
+   * @param expectedReturnValue Expected return value when 
awaitTermination(timeoutMs) is used
+   */
+  case class TestAwaitTermination(
+      expectedBehavior: ExpectedBehavior,
+      timeoutMs: Int = -1,
+      expectedReturnValue: Boolean = false
+    ) extends AssertOnQuery(
+      TestAwaitTermination.assertOnQueryCondition(expectedBehavior, timeoutMs, 
expectedReturnValue),
+      "Error testing awaitTermination behavior"
+    ) {
+    override def toString(): String = {
+      s"TestAwaitTermination($expectedBehavior, timeoutMs = $timeoutMs, " +
+        s"expectedReturnValue = $expectedReturnValue)"
+    }
+  }
+
+  object TestAwaitTermination {
+
+    /**
+     * Tests the behavior of `StreamingQuery.awaitTermination`.
+     *
+     * @param expectedBehavior  Expected behavior (not blocked, blocked, or 
exception thrown)
+     * @param timeoutMs         Timeout in milliseconds
+     *                          When timeoutMs <= 0, awaitTermination() is 
tested (i.e. w/o timeout)
+     *                          When timeoutMs > 0, 
awaitTermination(timeoutMs) is tested
+     * @param expectedReturnValue Expected return value when 
awaitTermination(timeoutMs) is used
+     */
+    def assertOnQueryCondition(
+        expectedBehavior: ExpectedBehavior,
+        timeoutMs: Int,
+        expectedReturnValue: Boolean
+      )(q: StreamExecution): Boolean = {
+
+      def awaitTermFunc(): Unit = {
+        if (timeoutMs <= 0) {
+          q.awaitTermination()
+        } else {
+          val returnedValue = q.awaitTermination(timeoutMs)
+          assert(returnedValue === expectedReturnValue, "Returned value does 
not match expected")
+        }
+      }
+      AwaitTerminationTester.test(expectedBehavior, awaitTermFunc)
+      true // If the control reached here, then everything worked as expected
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to