[SPARK-15686][SQL] Move user-facing streaming classes into sql.streaming

## What changes were proposed in this pull request?
This patch moves all user-facing structured streaming classes into 
sql.streaming. As part of this, I also added some since version annotation to 
methods and classes that don't have them.

## How was this patch tested?
Updated tests to reflect the moves.

Author: Reynold Xin <r...@databricks.com>

Closes #13429 from rxin/SPARK-15686.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a71d1364
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a71d1364
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a71d1364

Branch: refs/heads/master
Commit: a71d1364ae87aa388128da34dd0b9b02ff85e458
Parents: d5012c2
Author: Reynold Xin <r...@databricks.com>
Authored: Wed Jun 1 10:14:40 2016 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Wed Jun 1 10:14:40 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/streaming.py                 |   3 +-
 python/pyspark/sql/utils.py                     |   2 +-
 .../java/org/apache/spark/sql/OutputMode.java   |  54 --
 .../apache/spark/sql/streaming/OutputMode.java  |  55 ++
 .../apache/spark/sql/InternalOutputModes.scala  |   2 +
 .../analysis/UnsupportedOperationChecker.scala  |   3 +-
 .../apache/spark/sql/JavaOutputModeSuite.java   |  31 -
 .../sql/streaming/JavaOutputModeSuite.java      |  31 +
 .../analysis/UnsupportedOperationsSuite.scala   |   3 +-
 .../org/apache/spark/sql/ContinuousQuery.scala  | 109 ----
 .../spark/sql/ContinuousQueryException.scala    |  54 --
 .../spark/sql/ContinuousQueryManager.scala      | 232 --------
 .../org/apache/spark/sql/DataFrameWriter.scala  |   1 +
 .../scala/org/apache/spark/sql/Dataset.scala    |   1 +
 .../scala/org/apache/spark/sql/SQLContext.scala |   6 +-
 .../scala/org/apache/spark/sql/SinkStatus.scala |  34 --
 .../org/apache/spark/sql/SourceStatus.scala     |  34 --
 .../org/apache/spark/sql/SparkSession.scala     |   7 +-
 .../scala/org/apache/spark/sql/Trigger.scala    | 133 -----
 .../spark/sql/execution/SparkStrategies.scala   |   3 +-
 .../sql/execution/datasources/DataSource.scala  |   1 +
 .../streaming/ContinuousQueryListenerBus.scala  |  11 +-
 .../streaming/IncrementalExecution.scala        |   3 +-
 .../execution/streaming/StreamExecution.scala   |   5 +-
 .../execution/streaming/TriggerExecutor.scala   |   2 +-
 .../spark/sql/execution/streaming/console.scala |   3 +-
 .../spark/sql/execution/streaming/memory.scala  |   1 +
 .../spark/sql/internal/SessionState.scala       |   3 +-
 .../apache/spark/sql/sources/interfaces.scala   |   1 +
 .../spark/sql/streaming/ContinuousQuery.scala   | 110 ++++
 .../streaming/ContinuousQueryException.scala    |  54 ++
 .../sql/streaming/ContinuousQueryListener.scala |  95 ++++
 .../sql/streaming/ContinuousQueryManager.scala  | 231 ++++++++
 .../apache/spark/sql/streaming/SinkStatus.scala |  34 ++
 .../spark/sql/streaming/SourceStatus.scala      |  34 ++
 .../apache/spark/sql/streaming/Trigger.scala    | 147 +++++
 .../sql/util/ContinuousQueryListener.scala      |  75 ---
 .../apache/spark/sql/ProcessingTimeSuite.scala  |   1 +
 .../scala/org/apache/spark/sql/StreamTest.scala | 565 ------------------
 .../streaming/ProcessingTimeExecutorSuite.scala |   2 +-
 .../ContinuousQueryListenerSuite.scala          | 216 +++++++
 .../streaming/ContinuousQueryManagerSuite.scala |   5 +-
 .../sql/streaming/ContinuousQuerySuite.scala    |   5 +-
 .../sql/streaming/FileStreamSinkSuite.scala     |   2 +-
 .../sql/streaming/FileStreamSourceSuite.scala   |   4 +-
 .../spark/sql/streaming/FileStressSuite.scala   |   4 +-
 .../spark/sql/streaming/MemorySinkSuite.scala   |   2 +-
 .../sql/streaming/MemorySourceStressSuite.scala |   4 +-
 .../spark/sql/streaming/StreamSuite.scala       |   2 +-
 .../apache/spark/sql/streaming/StreamTest.scala | 567 +++++++++++++++++++
 .../streaming/StreamingAggregationSuite.scala   |   5 +-
 .../test/DataFrameReaderWriterSuite.scala       |   4 +-
 .../sql/util/ContinuousQueryListenerSuite.scala | 217 -------
 53 files changed, 1630 insertions(+), 1583 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 8238b8e..cd75622 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -201,7 +201,8 @@ class ProcessingTime(Trigger):
         self.interval = interval
 
     def _to_java_trigger(self, sqlContext):
-        return 
sqlContext._sc._jvm.org.apache.spark.sql.ProcessingTime.create(self.interval)
+        return 
sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create(
+            self.interval)
 
 
 def _test():

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/python/pyspark/sql/utils.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py
index 8c8768f..9ddaf78 100644
--- a/python/pyspark/sql/utils.py
+++ b/python/pyspark/sql/utils.py
@@ -71,7 +71,7 @@ def capture_sql_exception(f):
                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
             if 
s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '):
                 raise ParseException(s.split(': ', 1)[1], stackTrace)
-            if s.startswith('org.apache.spark.sql.ContinuousQueryException: '):
+            if 
s.startswith('org.apache.spark.sql.streaming.ContinuousQueryException: '):
                 raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace)
             if 
s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '):
                 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java
deleted file mode 100644
index 1936d53..0000000
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql;
-
-import org.apache.spark.annotation.Experimental;
-
-/**
- * :: Experimental ::
- *
- * OutputMode is used to what data will be written to a streaming sink when 
there is
- * new data available in a streaming DataFrame/Dataset.
- *
- * @since 2.0.0
- */
-@Experimental
-public class OutputMode {
-
-  /**
-   * OutputMode in which only the new rows in the streaming DataFrame/Dataset 
will be
-   * written to the sink. This output mode can be only be used in queries that 
do not
-   * contain any aggregation.
-   *
-   * @since 2.0.0
-   */
-  public static OutputMode Append() {
-    return InternalOutputModes.Append$.MODULE$;
-  }
-
-  /**
-   * OutputMode in which all the rows in the streaming DataFrame/Dataset will 
be written
-   * to the sink every time these is some updates. This output mode can only 
be used in queries
-   * that contain aggregations.
-   *
-   * @since 2.0.0
-   */
-  public static OutputMode Complete() {
-    return InternalOutputModes.Complete$.MODULE$;
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
new file mode 100644
index 0000000..41e2582
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import org.apache.spark.annotation.Experimental;
+import org.apache.spark.sql.InternalOutputModes;
+
+/**
+ * :: Experimental ::
+ *
+ * OutputMode is used to what data will be written to a streaming sink when 
there is
+ * new data available in a streaming DataFrame/Dataset.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+public class OutputMode {
+
+  /**
+   * OutputMode in which only the new rows in the streaming DataFrame/Dataset 
will be
+   * written to the sink. This output mode can be only be used in queries that 
do not
+   * contain any aggregation.
+   *
+   * @since 2.0.0
+   */
+  public static OutputMode Append() {
+    return InternalOutputModes.Append$.MODULE$;
+  }
+
+  /**
+   * OutputMode in which all the rows in the streaming DataFrame/Dataset will 
be written
+   * to the sink every time these is some updates. This output mode can only 
be used in queries
+   * that contain aggregations.
+   *
+   * @since 2.0.0
+   */
+  public static OutputMode Complete() {
+    return InternalOutputModes.Complete$.MODULE$;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
index 8ef5d9a..153f9f5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.streaming.OutputMode
+
 /**
  * Internal helper class to generate objects representing various 
[[OutputMode]]s,
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index f4c0347..8373fa3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.sql.{AnalysisException, InternalOutputModes, 
OutputMode}
+import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.streaming.OutputMode
 
 /**
  * Analyzes the presence of unsupported operations in a logical plan.

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java 
b/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java
deleted file mode 100644
index 1764f33..0000000
--- a/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql;
-
-import org.junit.Test;
-
-public class JavaOutputModeSuite {
-
-  @Test
-  public void testOutputModes() {
-    OutputMode o1 = OutputMode.Append();
-    assert(o1.toString().toLowerCase().contains("append"));
-    OutputMode o2 = OutputMode.Complete();
-    assert (o2.toString().toLowerCase().contains("complete"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java
 
b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java
new file mode 100644
index 0000000..e0a54fe
--- /dev/null
+++ 
b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming;
+
+import org.junit.Test;
+
+public class JavaOutputModeSuite {
+
+  @Test
+  public void testOutputModes() {
+    OutputMode o1 = OutputMode.Append();
+    assert(o1.toString().toLowerCase().contains("append"));
+    OutputMode o2 = OutputMode.Complete();
+    assert (o2.toString().toLowerCase().contains("complete"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index c2e3d47..378cca3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{AnalysisException, OutputMode}
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.InternalOutputModes._
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference,
 import org.apache.spark.sql.catalyst.expressions.aggregate.Count
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.IntegerType
 
 /** A dummy command for testing unsupported operations. */

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
deleted file mode 100644
index 4d5afe2..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.annotation.Experimental
-
-/**
- * :: Experimental ::
- * A handle to a query that is executing continuously in the background as new 
data arrives.
- * All these methods are thread-safe.
- * @since 2.0.0
- */
-@Experimental
-trait ContinuousQuery {
-
-  /**
-   * Returns the name of the query.
-   * @since 2.0.0
-   */
-  def name: String
-
-  /**
-   * Returns the [[SparkSession]] associated with `this`.
-   * @since 2.0.0
-   */
-  def sparkSession: SparkSession
-
-  /**
-   * Whether the query is currently active or not
-   * @since 2.0.0
-   */
-  def isActive: Boolean
-
-  /**
-   * Returns the [[ContinuousQueryException]] if the query was terminated by 
an exception.
-   * @since 2.0.0
-   */
-  def exception: Option[ContinuousQueryException]
-
-  /**
-   * Returns current status of all the sources.
-   * @since 2.0.0
-   */
-  def sourceStatuses: Array[SourceStatus]
-
-  /** Returns current status of the sink. */
-  def sinkStatus: SinkStatus
-
-  /**
-   * Waits for the termination of `this` query, either by `query.stop()` or by 
an exception.
-   * If the query has terminated with an exception, then the exception will be 
thrown.
-   *
-   * If the query has terminated, then all subsequent calls to this method 
will either return
-   * immediately (if the query was terminated by `stop()`), or throw the 
exception
-   * immediately (if the query has terminated with exception).
-   *
-   * @throws ContinuousQueryException, if `this` query has terminated with an 
exception.
-   *
-   * @since 2.0.0
-   */
-  def awaitTermination(): Unit
-
-  /**
-   * Waits for the termination of `this` query, either by `query.stop()` or by 
an exception.
-   * If the query has terminated with an exception, then the exception will be 
thrown.
-   * Otherwise, it returns whether the query has terminated or not within the 
`timeoutMs`
-   * milliseconds.
-   *
-   * If the query has terminated, then all subsequent calls to this method 
will either return
-   * `true` immediately (if the query was terminated by `stop()`), or throw 
the exception
-   * immediately (if the query has terminated with exception).
-   *
-   * @throws ContinuousQueryException, if `this` query has terminated with an 
exception
-   *
-   * @since 2.0.0
-   */
-  def awaitTermination(timeoutMs: Long): Boolean
-
-  /**
-   * Blocks until all available data in the source has been processed an 
committed to the sink.
-   * This method is intended for testing. Note that in the case of continually 
arriving data, this
-   * method may block forever. Additionally, this method is only guaranteed to 
block until data that
-   * has been synchronously appended data to a 
[[org.apache.spark.sql.execution.streaming.Source]]
-   * prior to invocation. (i.e. `getOffset` must immediately reflect the 
addition).
-   */
-  def processAllAvailable(): Unit
-
-  /**
-   * Stops the execution of this query if it is running. This method blocks 
until the threads
-   * performing execution has stopped.
-   * @since 2.0.0
-   */
-  def stop(): Unit
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala
deleted file mode 100644
index fec3862..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
-
-/**
- * :: Experimental ::
- * Exception that stopped a [[ContinuousQuery]].
- * @param query      Query that caused the exception
- * @param message     Message of this exception
- * @param cause       Internal cause of this exception
- * @param startOffset Starting offset (if known) of the range of data in which 
exception occurred
- * @param endOffset   Ending offset (if known) of the range of data in 
exception occurred
- * @since 2.0.0
- */
-@Experimental
-class ContinuousQueryException private[sql](
-    @transient val query: ContinuousQuery,
-    val message: String,
-    val cause: Throwable,
-    val startOffset: Option[Offset] = None,
-    val endOffset: Option[Offset] = None)
-  extends Exception(message, cause) {
-
-  /** Time when the exception occurred */
-  val time: Long = System.currentTimeMillis
-
-  override def toString(): String = {
-    val causeStr =
-      s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", 
"\n|\t", "\n")}"
-    s"""
-       |$causeStr
-       |
-       |${query.asInstanceOf[StreamExecution].toDebugString}
-       """.stripMargin
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
deleted file mode 100644
index c686400..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import scala.collection.mutable
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
-import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.util.ContinuousQueryListener
-import org.apache.spark.util.{Clock, SystemClock}
-
-/**
- * :: Experimental ::
- * A class to manage all the [[org.apache.spark.sql.ContinuousQuery 
ContinuousQueries]] active
- * on a [[SparkSession]].
- *
- * @since 2.0.0
- */
-@Experimental
-class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
-
-  private[sql] val stateStoreCoordinator =
-    StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
-  private val listenerBus = new 
ContinuousQueryListenerBus(sparkSession.sparkContext.listenerBus)
-  private val activeQueries = new mutable.HashMap[String, ContinuousQuery]
-  private val activeQueriesLock = new Object
-  private val awaitTerminationLock = new Object
-
-  private var lastTerminatedQuery: ContinuousQuery = null
-
-  /**
-   * Returns a list of active queries associated with this SQLContext
-   *
-   * @since 2.0.0
-   */
-  def active: Array[ContinuousQuery] = activeQueriesLock.synchronized {
-    activeQueries.values.toArray
-  }
-
-  /**
-   * Returns an active query from this SQLContext or throws exception if bad 
name
-   *
-   * @since 2.0.0
-   */
-  def get(name: String): ContinuousQuery = activeQueriesLock.synchronized {
-    activeQueries.getOrElse(name,
-      throw new IllegalArgumentException(s"There is no active query with name 
$name"))
-  }
-
-  /**
-   * Wait until any of the queries on the associated SQLContext has terminated 
since the
-   * creation of the context, or since `resetTerminated()` was called. If any 
query was terminated
-   * with an exception, then the exception will be thrown.
-   *
-   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
-   * return immediately (if the query was terminated by `query.stop()`),
-   * or throw the exception immediately (if the query was terminated with 
exception). Use
-   * `resetTerminated()` to clear past terminations and wait for new 
terminations.
-   *
-   * In the case where multiple queries have terminated since 
`resetTermination()` was called,
-   * if any query has terminated with exception, then `awaitAnyTermination()` 
will
-   * throw any of the exception. For correctly documenting exceptions across 
multiple queries,
-   * users need to stop all of them after any of them terminates with 
exception, and then check the
-   * `query.exception()` for each query.
-   *
-   * @throws ContinuousQueryException, if any query has terminated with an 
exception
-   *
-   * @since 2.0.0
-   */
-  def awaitAnyTermination(): Unit = {
-    awaitTerminationLock.synchronized {
-      while (lastTerminatedQuery == null) {
-        awaitTerminationLock.wait(10)
-      }
-      if (lastTerminatedQuery != null && 
lastTerminatedQuery.exception.nonEmpty) {
-        throw lastTerminatedQuery.exception.get
-      }
-    }
-  }
-
-  /**
-   * Wait until any of the queries on the associated SQLContext has terminated 
since the
-   * creation of the context, or since `resetTerminated()` was called. Returns 
whether any query
-   * has terminated or not (multiple may have terminated). If any query has 
terminated with an
-   * exception, then the exception will be thrown.
-   *
-   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
-   * return `true` immediately (if the query was terminated by `query.stop()`),
-   * or throw the exception immediately (if the query was terminated with 
exception). Use
-   * `resetTerminated()` to clear past terminations and wait for new 
terminations.
-   *
-   * In the case where multiple queries have terminated since 
`resetTermination()` was called,
-   * if any query has terminated with exception, then `awaitAnyTermination()` 
will
-   * throw any of the exception. For correctly documenting exceptions across 
multiple queries,
-   * users need to stop all of them after any of them terminates with 
exception, and then check the
-   * `query.exception()` for each query.
-   *
-   * @throws ContinuousQueryException, if any query has terminated with an 
exception
-   *
-   * @since 2.0.0
-   */
-  def awaitAnyTermination(timeoutMs: Long): Boolean = {
-
-    val startTime = System.currentTimeMillis
-    def isTimedout = System.currentTimeMillis - startTime >= timeoutMs
-
-    awaitTerminationLock.synchronized {
-      while (!isTimedout && lastTerminatedQuery == null) {
-        awaitTerminationLock.wait(10)
-      }
-      if (lastTerminatedQuery != null && 
lastTerminatedQuery.exception.nonEmpty) {
-        throw lastTerminatedQuery.exception.get
-      }
-      lastTerminatedQuery != null
-    }
-  }
-
-  /**
-   * Forget about past terminated queries so that `awaitAnyTermination()` can 
be used again to
-   * wait for new terminations.
-   *
-   * @since 2.0.0
-   */
-  def resetTerminated(): Unit = {
-    awaitTerminationLock.synchronized {
-      lastTerminatedQuery = null
-    }
-  }
-
-  /**
-   * Register a [[ContinuousQueryListener]] to receive up-calls for life cycle 
events of
-   * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]].
-   *
-   * @since 2.0.0
-   */
-  def addListener(listener: ContinuousQueryListener): Unit = {
-    listenerBus.addListener(listener)
-  }
-
-  /**
-   * Deregister a [[ContinuousQueryListener]].
-   *
-   * @since 2.0.0
-   */
-  def removeListener(listener: ContinuousQueryListener): Unit = {
-    listenerBus.removeListener(listener)
-  }
-
-  /** Post a listener event */
-  private[sql] def postListenerEvent(event: ContinuousQueryListener.Event): 
Unit = {
-    listenerBus.post(event)
-  }
-
-  /** Start a query */
-  private[sql] def startQuery(
-      name: String,
-      checkpointLocation: String,
-      df: DataFrame,
-      sink: Sink,
-      outputMode: OutputMode,
-      trigger: Trigger = ProcessingTime(0),
-      triggerClock: Clock = new SystemClock()): ContinuousQuery = {
-    activeQueriesLock.synchronized {
-      if (activeQueries.contains(name)) {
-        throw new IllegalArgumentException(
-          s"Cannot start query with name $name as a query with that name is 
already active")
-      }
-      val analyzedPlan = df.queryExecution.analyzed
-      df.queryExecution.assertAnalyzed()
-
-      if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
-        UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
-      }
-
-      var nextSourceId = 0L
-
-      val logicalPlan = analyzedPlan.transform {
-        case StreamingRelation(dataSource, _, output) =>
-          // Materialize source to avoid creating it in every batch
-          val metadataPath = s"$checkpointLocation/sources/$nextSourceId"
-          val source = dataSource.createSource(metadataPath)
-          nextSourceId += 1
-          // We still need to use the previous `output` instead of 
`source.schema` as attributes in
-          // "df.logicalPlan" has already used attributes of the previous 
`output`.
-          StreamingExecutionRelation(source, output)
-      }
-      val query = new StreamExecution(
-        sparkSession,
-        name,
-        checkpointLocation,
-        logicalPlan,
-        sink,
-        trigger,
-        triggerClock,
-        outputMode)
-      query.start()
-      activeQueries.put(name, query)
-      query
-    }
-  }
-
-  /** Notify (by the ContinuousQuery) that the query has been terminated */
-  private[sql] def notifyQueryTermination(terminatedQuery: ContinuousQuery): 
Unit = {
-    activeQueriesLock.synchronized {
-      activeQueries -= terminatedQuery.name
-    }
-    awaitTerminationLock.synchronized {
-      if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) {
-        lastTerminatedQuery = terminatedQuery
-      }
-      awaitTerminationLock.notifyAll()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 291b825..25678e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -31,6 +31,7 @@ import 
org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingA
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
 import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, 
StreamExecution}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, 
ProcessingTime, Trigger}
 import org.apache.spark.util.Utils
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 7be49b1..3a6ec45 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -49,6 +49,7 @@ import 
org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation}
 import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.execution.python.EvaluatePython
+import org.apache.spark.sql.streaming.ContinuousQuery
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 0dc70c0..2e14c5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -30,13 +30,11 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.ShowTablesCommand
 import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
 import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.streaming.ContinuousQueryManager
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.ExecutionListenerManager
 
@@ -645,7 +643,7 @@ class SQLContext private[sql](val sparkSession: 
SparkSession)
 
   /**
    * Returns a [[ContinuousQueryManager]] that allows managing all the
-   * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on 
`this` context.
+   * [[org.apache.spark.sql.streaming.ContinuousQuery ContinuousQueries]] 
active on `this` context.
    *
    * @since 2.0.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala
deleted file mode 100644
index 5a98528..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{Offset, Sink}
-
-/**
- * :: Experimental ::
- * Status and metrics of a streaming [[Sink]].
- *
- * @param description Description of the source corresponding to this status
- * @param offset      Current offset up to which data has been written by the 
sink
- * @since 2.0.0
- */
-@Experimental
-class SinkStatus private[sql](
-    val description: String,
-    val offset: Offset)

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala
deleted file mode 100644
index 2479e67..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{Offset, Source}
-
-/**
- * :: Experimental ::
- * Status and metrics of a streaming [[Source]].
- *
- * @param description     Description of the source corresponding to this 
status
- * @param offset          Current offset of the source, if known
- * @since 2.0.0
- */
-@Experimental
-class SourceStatus private[sql] (
-    val description: String,
-    val offset: Option[Offset])

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index dc4b72a..52bedf9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -25,7 +25,7 @@ import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
-import org.apache.spark.{SparkConf, SparkContext, SparkException}
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.internal.Logging
@@ -40,8 +40,9 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.ui.SQLListener
-import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState, 
SQLConf}
+import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState}
 import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.types.{DataType, LongType, StructType}
 import org.apache.spark.sql.util.ExecutionListenerManager
 import org.apache.spark.util.Utils
@@ -182,7 +183,7 @@ class SparkSession private(
   /**
    * :: Experimental ::
    * Returns a [[ContinuousQueryManager]] that allows managing all the
-   * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on 
`this`.
+   * [[ContinuousQuery ContinuousQueries]] active on `this`.
    *
    * @group basic
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala
deleted file mode 100644
index 256e8a4..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql
-
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.duration.Duration
-
-import org.apache.commons.lang3.StringUtils
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.unsafe.types.CalendarInterval
-
-/**
- * :: Experimental ::
- * Used to indicate how often results should be produced by a 
[[ContinuousQuery]].
- */
-@Experimental
-sealed trait Trigger {}
-
-/**
- * :: Experimental ::
- * A trigger that runs a query periodically based on the processing time. If 
`interval` is 0,
- * the query will run as fast as possible.
- *
- * Scala Example:
- * {{{
- *   df.write.trigger(ProcessingTime("10 seconds"))
- *
- *   import scala.concurrent.duration._
- *   df.write.trigger(ProcessingTime(10.seconds))
- * }}}
- *
- * Java Example:
- * {{{
- *   df.write.trigger(ProcessingTime.create("10 seconds"))
- *
- *   import java.util.concurrent.TimeUnit
- *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
- * }}}
- */
-@Experimental
-case class ProcessingTime(intervalMs: Long) extends Trigger {
-  require(intervalMs >= 0, "the interval of trigger should not be negative")
-}
-
-/**
- * :: Experimental ::
- * Used to create [[ProcessingTime]] triggers for [[ContinuousQuery]]s.
- */
-@Experimental
-object ProcessingTime {
-
-  /**
-   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
-   *
-   * Example:
-   * {{{
-   *   df.write.trigger(ProcessingTime("10 seconds"))
-   * }}}
-   */
-  def apply(interval: String): ProcessingTime = {
-    if (StringUtils.isBlank(interval)) {
-      throw new IllegalArgumentException(
-        "interval cannot be null or blank.")
-    }
-    val cal = if (interval.startsWith("interval")) {
-      CalendarInterval.fromString(interval)
-    } else {
-      CalendarInterval.fromString("interval " + interval)
-    }
-    if (cal == null) {
-      throw new IllegalArgumentException(s"Invalid interval: $interval")
-    }
-    if (cal.months > 0) {
-      throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
-    }
-    new ProcessingTime(cal.microseconds / 1000)
-  }
-
-  /**
-   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
-   *
-   * Example:
-   * {{{
-   *   import scala.concurrent.duration._
-   *   df.write.trigger(ProcessingTime(10.seconds))
-   * }}}
-   */
-  def apply(interval: Duration): ProcessingTime = {
-    new ProcessingTime(interval.toMillis)
-  }
-
-  /**
-   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
-   *
-   * Example:
-   * {{{
-   *   df.write.trigger(ProcessingTime.create("10 seconds"))
-   * }}}
-   */
-  def create(interval: String): ProcessingTime = {
-    apply(interval)
-  }
-
-  /**
-   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
-   *
-   * Example:
-   * {{{
-   *   import java.util.concurrent.TimeUnit
-   *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
-   * }}}
-   */
-  def create(interval: Long, unit: TimeUnit): ProcessingTime = {
-    new ProcessingTime(unit.toMillis(interval))
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index e405252..7e3e45e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
 import org.apache.spark.sql.execution.streaming.MemoryPlan
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.streaming.ContinuousQuery
 
 private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   self: SparkPlanner =>
@@ -201,7 +202,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
   /**
    * Used to plan aggregation queries that are computed incrementally as part 
of a
-   * [[org.apache.spark.sql.ContinuousQuery]]. Currently this rule is injected 
into the planner
+   * [[ContinuousQuery]]. Currently this rule is injected into the planner
    * on-demand, only when planning in a 
[[org.apache.spark.sql.execution.streaming.StreamExecution]]
    */
   object StatefulAggregationStrategy extends Strategy {

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 814880b..93f1ad0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -37,6 +37,7 @@ import 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
+import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
index b1d24b6..2a1be09 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
@@ -18,8 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, 
SparkListenerEvent}
-import org.apache.spark.sql.util.ContinuousQueryListener
-import org.apache.spark.sql.util.ContinuousQueryListener._
+import org.apache.spark.sql.streaming.ContinuousQueryListener
 import org.apache.spark.util.ListenerBus
 
 /**
@@ -30,7 +29,10 @@ import org.apache.spark.util.ListenerBus
  * dispatch them to ContinuousQueryListener.
  */
 class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
-  extends SparkListener with ListenerBus[ContinuousQueryListener, 
ContinuousQueryListener.Event] {
+  extends SparkListener
+    with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] {
+
+  import ContinuousQueryListener._
 
   sparkListenerBus.addListener(this)
 
@@ -74,7 +76,8 @@ class ContinuousQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
    * listener bus.
    */
   private case class WrappedContinuousQueryListenerEvent(
-      streamingListenerEvent: ContinuousQueryListener.Event) extends 
SparkListenerEvent {
+      streamingListenerEvent: ContinuousQueryListener.Event)
+    extends SparkListenerEvent {
 
     // Do not log streaming events in event log as history server does not 
support these events.
     protected[spark] override def logEvent: Boolean = false

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
index 5c86049..bc0e443 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.sql.execution.streaming
 
-import org.apache.spark.sql.{InternalOutputModes, OutputMode, SparkSession}
+import org.apache.spark.sql.{InternalOutputModes, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, 
SparkPlanner, UnaryExecNode}
+import org.apache.spark.sql.streaming.OutputMode
 
 /**
  * A variant of [[QueryExecution]] that allows the execution of the given 
[[LogicalPlan]]

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ab0900d..16d38a2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -33,8 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeMap}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.QueryExecution
-import org.apache.spark.sql.util.ContinuousQueryListener
-import org.apache.spark.sql.util.ContinuousQueryListener._
+import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
 
 /**
@@ -54,6 +53,8 @@ class StreamExecution(
     val outputMode: OutputMode)
   extends ContinuousQuery with Logging {
 
+  import org.apache.spark.sql.streaming.ContinuousQueryListener._
+
   /**
    * A lock used to wait/notify when batches complete. Use a fair lock to 
avoid thread starvation.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
index 569907b..ac510df 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.ProcessingTime
+import org.apache.spark.sql.streaming.ProcessingTime
 import org.apache.spark.util.{Clock, SystemClock}
 
 trait TriggerExecutor {

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
index 391f1e5..2ec2a3c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala
@@ -18,8 +18,9 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, OutputMode, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
+import org.apache.spark.sql.streaming.OutputMode
 
 class ConsoleSink(options: Map[String, String]) extends Sink with Logging {
   // Number of rows to display, by default 20 rows

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index e4a95e7..4496f41 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.encoders.encoderFor
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LeafNode
+import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
 object MemoryStream {

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 4c7bbf0..b2db377 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.AnalyzeTableCommand
 import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, 
FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource}
+import org.apache.spark.sql.streaming.{ContinuousQuery, ContinuousQueryManager}
 import org.apache.spark.sql.util.ExecutionListenerManager
 
 
@@ -142,7 +143,7 @@ private[sql] class SessionState(sparkSession: SparkSession) 
{
   lazy val listenerManager: ExecutionListenerManager = new 
ExecutionListenerManager
 
   /**
-   * Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s.
+   * Interface to start and stop [[ContinuousQuery]]s.
    */
   lazy val continuousQueryManager: ContinuousQueryManager = {
     new ContinuousQueryManager(sparkSession)

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 3d4edbb..d2077a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
+import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
new file mode 100644
index 0000000..451cfd8
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.SparkSession
+
+/**
+ * :: Experimental ::
+ * A handle to a query that is executing continuously in the background as new 
data arrives.
+ * All these methods are thread-safe.
+ * @since 2.0.0
+ */
+@Experimental
+trait ContinuousQuery {
+
+  /**
+   * Returns the name of the query.
+   * @since 2.0.0
+   */
+  def name: String
+
+  /**
+   * Returns the [[SparkSession]] associated with `this`.
+   * @since 2.0.0
+   */
+  def sparkSession: SparkSession
+
+  /**
+   * Whether the query is currently active or not
+   * @since 2.0.0
+   */
+  def isActive: Boolean
+
+  /**
+   * Returns the [[ContinuousQueryException]] if the query was terminated by 
an exception.
+   * @since 2.0.0
+   */
+  def exception: Option[ContinuousQueryException]
+
+  /**
+   * Returns current status of all the sources.
+   * @since 2.0.0
+   */
+  def sourceStatuses: Array[SourceStatus]
+
+  /** Returns current status of the sink. */
+  def sinkStatus: SinkStatus
+
+  /**
+   * Waits for the termination of `this` query, either by `query.stop()` or by 
an exception.
+   * If the query has terminated with an exception, then the exception will be 
thrown.
+   *
+   * If the query has terminated, then all subsequent calls to this method 
will either return
+   * immediately (if the query was terminated by `stop()`), or throw the 
exception
+   * immediately (if the query has terminated with exception).
+   *
+   * @throws ContinuousQueryException, if `this` query has terminated with an 
exception.
+   *
+   * @since 2.0.0
+   */
+  def awaitTermination(): Unit
+
+  /**
+   * Waits for the termination of `this` query, either by `query.stop()` or by 
an exception.
+   * If the query has terminated with an exception, then the exception will be 
thrown.
+   * Otherwise, it returns whether the query has terminated or not within the 
`timeoutMs`
+   * milliseconds.
+   *
+   * If the query has terminated, then all subsequent calls to this method 
will either return
+   * `true` immediately (if the query was terminated by `stop()`), or throw 
the exception
+   * immediately (if the query has terminated with exception).
+   *
+   * @throws ContinuousQueryException, if `this` query has terminated with an 
exception
+   *
+   * @since 2.0.0
+   */
+  def awaitTermination(timeoutMs: Long): Boolean
+
+  /**
+   * Blocks until all available data in the source has been processed an 
committed to the sink.
+   * This method is intended for testing. Note that in the case of continually 
arriving data, this
+   * method may block forever. Additionally, this method is only guaranteed to 
block until data that
+   * has been synchronously appended data to a 
[[org.apache.spark.sql.execution.streaming.Source]]
+   * prior to invocation. (i.e. `getOffset` must immediately reflect the 
addition).
+   */
+  def processAllAvailable(): Unit
+
+  /**
+   * Stops the execution of this query if it is running. This method blocks 
until the threads
+   * performing execution has stopped.
+   * @since 2.0.0
+   */
+  def stop(): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala
new file mode 100644
index 0000000..5196c5a
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}
+
+/**
+ * :: Experimental ::
+ * Exception that stopped a [[ContinuousQuery]].
+ * @param query      Query that caused the exception
+ * @param message     Message of this exception
+ * @param cause       Internal cause of this exception
+ * @param startOffset Starting offset (if known) of the range of data in which 
exception occurred
+ * @param endOffset   Ending offset (if known) of the range of data in 
exception occurred
+ * @since 2.0.0
+ */
+@Experimental
+class ContinuousQueryException private[sql](
+    @transient val query: ContinuousQuery,
+    val message: String,
+    val cause: Throwable,
+    val startOffset: Option[Offset] = None,
+    val endOffset: Option[Offset] = None)
+  extends Exception(message, cause) {
+
+  /** Time when the exception occurred */
+  val time: Long = System.currentTimeMillis
+
+  override def toString(): String = {
+    val causeStr =
+      s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", 
"\n|\t", "\n")}"
+    s"""
+       |$causeStr
+       |
+       |${query.asInstanceOf[StreamExecution].toDebugString}
+       """.stripMargin
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
new file mode 100644
index 0000000..6bdd513
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Interface for listening to events related to [[ContinuousQuery 
ContinuousQueries]].
+ * @note The methods are not thread-safe as they may be called from different 
threads.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+abstract class ContinuousQueryListener {
+
+  import ContinuousQueryListener._
+
+  /**
+   * Called when a query is started.
+   * @note This is called synchronously with
+   *       [[org.apache.spark.sql.DataFrameWriter 
`DataFrameWriter.startStream()`]],
+   *       that is, `onQueryStart` will be called on all listeners before
+   *       `DataFrameWriter.startStream()` returns the corresponding 
[[ContinuousQuery]]. Please
+   *       don't block this method as it will block your query.
+   * @since 2.0.0
+   */
+  def onQueryStarted(queryStarted: QueryStarted): Unit
+
+  /**
+   * Called when there is some status update (ingestion rate updated, etc.)
+   *
+   * @note This method is asynchronous. The status in [[ContinuousQuery]] will 
always be
+   *       latest no matter when this method is called. Therefore, the status 
of [[ContinuousQuery]]
+   *       may be changed before/when you process the event. E.g., you may 
find [[ContinuousQuery]]
+   *       is terminated when you are processing [[QueryProgress]].
+   * @since 2.0.0
+   */
+  def onQueryProgress(queryProgress: QueryProgress): Unit
+
+  /**
+   * Called when a query is stopped, with or without error.
+   * @since 2.0.0
+   */
+  def onQueryTerminated(queryTerminated: QueryTerminated): Unit
+}
+
+
+/**
+ * :: Experimental ::
+ * Companion object of [[ContinuousQueryListener]] that defines the listener 
events.
+ * @since 2.0.0
+ */
+@Experimental
+object ContinuousQueryListener {
+
+  /**
+   * Base type of [[ContinuousQueryListener]] events.
+   * @since 2.0.0
+   */
+  trait Event
+
+  /**
+   * Event representing the start of a query.
+   * @since 2.0.0
+   */
+  class QueryStarted private[sql](val query: ContinuousQuery) extends Event
+
+  /**
+   * Event representing any progress updates in a query.
+   * @since 2.0.0
+   */
+  class QueryProgress private[sql](val query: ContinuousQuery) extends Event
+
+  /**
+   * Event representing that termination of a query.
+   * @since 2.0.0
+   */
+  class QueryTerminated private[sql](val query: ContinuousQuery) extends Event
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
new file mode 100644
index 0000000..1bfdd2d
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.collection.mutable
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Clock, SystemClock}
+
+/**
+ * :: Experimental ::
+ * A class to manage all the [[ContinuousQuery]] active on a [[SparkSession]].
+ *
+ * @since 2.0.0
+ */
+@Experimental
+class ContinuousQueryManager private[sql] (sparkSession: SparkSession) {
+
+  private[sql] val stateStoreCoordinator =
+    StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
+  private val listenerBus = new 
ContinuousQueryListenerBus(sparkSession.sparkContext.listenerBus)
+  private val activeQueries = new mutable.HashMap[String, ContinuousQuery]
+  private val activeQueriesLock = new Object
+  private val awaitTerminationLock = new Object
+
+  private var lastTerminatedQuery: ContinuousQuery = null
+
+  /**
+   * Returns a list of active queries associated with this SQLContext
+   *
+   * @since 2.0.0
+   */
+  def active: Array[ContinuousQuery] = activeQueriesLock.synchronized {
+    activeQueries.values.toArray
+  }
+
+  /**
+   * Returns an active query from this SQLContext or throws exception if bad 
name
+   *
+   * @since 2.0.0
+   */
+  def get(name: String): ContinuousQuery = activeQueriesLock.synchronized {
+    activeQueries.getOrElse(name,
+      throw new IllegalArgumentException(s"There is no active query with name 
$name"))
+  }
+
+  /**
+   * Wait until any of the queries on the associated SQLContext has terminated 
since the
+   * creation of the context, or since `resetTerminated()` was called. If any 
query was terminated
+   * with an exception, then the exception will be thrown.
+   *
+   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
+   * return immediately (if the query was terminated by `query.stop()`),
+   * or throw the exception immediately (if the query was terminated with 
exception). Use
+   * `resetTerminated()` to clear past terminations and wait for new 
terminations.
+   *
+   * In the case where multiple queries have terminated since 
`resetTermination()` was called,
+   * if any query has terminated with exception, then `awaitAnyTermination()` 
will
+   * throw any of the exception. For correctly documenting exceptions across 
multiple queries,
+   * users need to stop all of them after any of them terminates with 
exception, and then check the
+   * `query.exception()` for each query.
+   *
+   * @throws ContinuousQueryException, if any query has terminated with an 
exception
+   *
+   * @since 2.0.0
+   */
+  def awaitAnyTermination(): Unit = {
+    awaitTerminationLock.synchronized {
+      while (lastTerminatedQuery == null) {
+        awaitTerminationLock.wait(10)
+      }
+      if (lastTerminatedQuery != null && 
lastTerminatedQuery.exception.nonEmpty) {
+        throw lastTerminatedQuery.exception.get
+      }
+    }
+  }
+
+  /**
+   * Wait until any of the queries on the associated SQLContext has terminated 
since the
+   * creation of the context, or since `resetTerminated()` was called. Returns 
whether any query
+   * has terminated or not (multiple may have terminated). If any query has 
terminated with an
+   * exception, then the exception will be thrown.
+   *
+   * If a query has terminated, then subsequent calls to 
`awaitAnyTermination()` will either
+   * return `true` immediately (if the query was terminated by `query.stop()`),
+   * or throw the exception immediately (if the query was terminated with 
exception). Use
+   * `resetTerminated()` to clear past terminations and wait for new 
terminations.
+   *
+   * In the case where multiple queries have terminated since 
`resetTermination()` was called,
+   * if any query has terminated with exception, then `awaitAnyTermination()` 
will
+   * throw any of the exception. For correctly documenting exceptions across 
multiple queries,
+   * users need to stop all of them after any of them terminates with 
exception, and then check the
+   * `query.exception()` for each query.
+   *
+   * @throws ContinuousQueryException, if any query has terminated with an 
exception
+   *
+   * @since 2.0.0
+   */
+  def awaitAnyTermination(timeoutMs: Long): Boolean = {
+
+    val startTime = System.currentTimeMillis
+    def isTimedout = System.currentTimeMillis - startTime >= timeoutMs
+
+    awaitTerminationLock.synchronized {
+      while (!isTimedout && lastTerminatedQuery == null) {
+        awaitTerminationLock.wait(10)
+      }
+      if (lastTerminatedQuery != null && 
lastTerminatedQuery.exception.nonEmpty) {
+        throw lastTerminatedQuery.exception.get
+      }
+      lastTerminatedQuery != null
+    }
+  }
+
+  /**
+   * Forget about past terminated queries so that `awaitAnyTermination()` can 
be used again to
+   * wait for new terminations.
+   *
+   * @since 2.0.0
+   */
+  def resetTerminated(): Unit = {
+    awaitTerminationLock.synchronized {
+      lastTerminatedQuery = null
+    }
+  }
+
+  /**
+   * Register a [[ContinuousQueryListener]] to receive up-calls for life cycle 
events of
+   * [[ContinuousQuery]].
+   *
+   * @since 2.0.0
+   */
+  def addListener(listener: ContinuousQueryListener): Unit = {
+    listenerBus.addListener(listener)
+  }
+
+  /**
+   * Deregister a [[ContinuousQueryListener]].
+   *
+   * @since 2.0.0
+   */
+  def removeListener(listener: ContinuousQueryListener): Unit = {
+    listenerBus.removeListener(listener)
+  }
+
+  /** Post a listener event */
+  private[sql] def postListenerEvent(event: ContinuousQueryListener.Event): 
Unit = {
+    listenerBus.post(event)
+  }
+
+  /** Start a query */
+  private[sql] def startQuery(
+      name: String,
+      checkpointLocation: String,
+      df: DataFrame,
+      sink: Sink,
+      outputMode: OutputMode,
+      trigger: Trigger = ProcessingTime(0),
+      triggerClock: Clock = new SystemClock()): ContinuousQuery = {
+    activeQueriesLock.synchronized {
+      if (activeQueries.contains(name)) {
+        throw new IllegalArgumentException(
+          s"Cannot start query with name $name as a query with that name is 
already active")
+      }
+      val analyzedPlan = df.queryExecution.analyzed
+      df.queryExecution.assertAnalyzed()
+
+      if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) {
+        UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
+      }
+
+      var nextSourceId = 0L
+
+      val logicalPlan = analyzedPlan.transform {
+        case StreamingRelation(dataSource, _, output) =>
+          // Materialize source to avoid creating it in every batch
+          val metadataPath = s"$checkpointLocation/sources/$nextSourceId"
+          val source = dataSource.createSource(metadataPath)
+          nextSourceId += 1
+          // We still need to use the previous `output` instead of 
`source.schema` as attributes in
+          // "df.logicalPlan" has already used attributes of the previous 
`output`.
+          StreamingExecutionRelation(source, output)
+      }
+      val query = new StreamExecution(
+        sparkSession,
+        name,
+        checkpointLocation,
+        logicalPlan,
+        sink,
+        trigger,
+        triggerClock,
+        outputMode)
+      query.start()
+      activeQueries.put(name, query)
+      query
+    }
+  }
+
+  /** Notify (by the ContinuousQuery) that the query has been terminated */
+  private[sql] def notifyQueryTermination(terminatedQuery: ContinuousQuery): 
Unit = {
+    activeQueriesLock.synchronized {
+      activeQueries -= terminatedQuery.name
+    }
+    awaitTerminationLock.synchronized {
+      if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) {
+        lastTerminatedQuery = terminatedQuery
+      }
+      awaitTerminationLock.notifyAll()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
new file mode 100644
index 0000000..79ddf01
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.execution.streaming.{Offset, Sink}
+
+/**
+ * :: Experimental ::
+ * Status and metrics of a streaming [[Sink]].
+ *
+ * @param description Description of the source corresponding to this status
+ * @param offset      Current offset up to which data has been written by the 
sink
+ * @since 2.0.0
+ */
+@Experimental
+class SinkStatus private[sql](
+    val description: String,
+    val offset: Offset)

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
new file mode 100644
index 0000000..8fccd5b
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.execution.streaming.{Offset, Source}
+
+/**
+ * :: Experimental ::
+ * Status and metrics of a streaming [[Source]].
+ *
+ * @param description     Description of the source corresponding to this 
status
+ * @param offset          Current offset of the source, if known
+ * @since 2.0.0
+ */
+@Experimental
+class SourceStatus private[sql] (
+    val description: String,
+    val offset: Option[Offset])

http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
new file mode 100644
index 0000000..d3fdbac
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.Duration
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.unsafe.types.CalendarInterval
+
+/**
+ * :: Experimental ::
+ * Used to indicate how often results should be produced by a 
[[ContinuousQuery]].
+ *
+ * @since 2.0.0
+ */
+@Experimental
+sealed trait Trigger
+
+/**
+ * :: Experimental ::
+ * A trigger that runs a query periodically based on the processing time. If 
`interval` is 0,
+ * the query will run as fast as possible.
+ *
+ * Scala Example:
+ * {{{
+ *   df.write.trigger(ProcessingTime("10 seconds"))
+ *
+ *   import scala.concurrent.duration._
+ *   df.write.trigger(ProcessingTime(10.seconds))
+ * }}}
+ *
+ * Java Example:
+ * {{{
+ *   df.write.trigger(ProcessingTime.create("10 seconds"))
+ *
+ *   import java.util.concurrent.TimeUnit
+ *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ * }}}
+ *
+ * @since 2.0.0
+ */
+@Experimental
+case class ProcessingTime(intervalMs: Long) extends Trigger {
+  require(intervalMs >= 0, "the interval of trigger should not be negative")
+}
+
+/**
+ * :: Experimental ::
+ * Used to create [[ProcessingTime]] triggers for [[ContinuousQuery]]s.
+ *
+ * @since 2.0.0
+ */
+@Experimental
+object ProcessingTime {
+
+  /**
+   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
+   *
+   * Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime("10 seconds"))
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  def apply(interval: String): ProcessingTime = {
+    if (StringUtils.isBlank(interval)) {
+      throw new IllegalArgumentException(
+        "interval cannot be null or blank.")
+    }
+    val cal = if (interval.startsWith("interval")) {
+      CalendarInterval.fromString(interval)
+    } else {
+      CalendarInterval.fromString("interval " + interval)
+    }
+    if (cal == null) {
+      throw new IllegalArgumentException(s"Invalid interval: $interval")
+    }
+    if (cal.months > 0) {
+      throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
+    }
+    new ProcessingTime(cal.microseconds / 1000)
+  }
+
+  /**
+   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
+   *
+   * Example:
+   * {{{
+   *   import scala.concurrent.duration._
+   *   df.write.trigger(ProcessingTime(10.seconds))
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  def apply(interval: Duration): ProcessingTime = {
+    new ProcessingTime(interval.toMillis)
+  }
+
+  /**
+   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
+   *
+   * Example:
+   * {{{
+   *   df.write.trigger(ProcessingTime.create("10 seconds"))
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  def create(interval: String): ProcessingTime = {
+    apply(interval)
+  }
+
+  /**
+   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
+   *
+   * Example:
+   * {{{
+   *   import java.util.concurrent.TimeUnit
+   *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   * }}}
+   *
+   * @since 2.0.0
+   */
+  def create(interval: Long, unit: TimeUnit): ProcessingTime = {
+    new ProcessingTime(unit.toMillis(interval))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to