[SPARK-15686][SQL] Move user-facing streaming classes into sql.streaming ## What changes were proposed in this pull request? This patch moves all user-facing structured streaming classes into sql.streaming. As part of this, I also added some since version annotation to methods and classes that don't have them.
## How was this patch tested? Updated tests to reflect the moves. Author: Reynold Xin <r...@databricks.com> Closes #13429 from rxin/SPARK-15686. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a71d1364 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a71d1364 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a71d1364 Branch: refs/heads/master Commit: a71d1364ae87aa388128da34dd0b9b02ff85e458 Parents: d5012c2 Author: Reynold Xin <r...@databricks.com> Authored: Wed Jun 1 10:14:40 2016 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Wed Jun 1 10:14:40 2016 -0700 ---------------------------------------------------------------------- python/pyspark/sql/streaming.py | 3 +- python/pyspark/sql/utils.py | 2 +- .../java/org/apache/spark/sql/OutputMode.java | 54 -- .../apache/spark/sql/streaming/OutputMode.java | 55 ++ .../apache/spark/sql/InternalOutputModes.scala | 2 + .../analysis/UnsupportedOperationChecker.scala | 3 +- .../apache/spark/sql/JavaOutputModeSuite.java | 31 - .../sql/streaming/JavaOutputModeSuite.java | 31 + .../analysis/UnsupportedOperationsSuite.scala | 3 +- .../org/apache/spark/sql/ContinuousQuery.scala | 109 ---- .../spark/sql/ContinuousQueryException.scala | 54 -- .../spark/sql/ContinuousQueryManager.scala | 232 -------- .../org/apache/spark/sql/DataFrameWriter.scala | 1 + .../scala/org/apache/spark/sql/Dataset.scala | 1 + .../scala/org/apache/spark/sql/SQLContext.scala | 6 +- .../scala/org/apache/spark/sql/SinkStatus.scala | 34 -- .../org/apache/spark/sql/SourceStatus.scala | 34 -- .../org/apache/spark/sql/SparkSession.scala | 7 +- .../scala/org/apache/spark/sql/Trigger.scala | 133 ----- .../spark/sql/execution/SparkStrategies.scala | 3 +- .../sql/execution/datasources/DataSource.scala | 1 + .../streaming/ContinuousQueryListenerBus.scala | 11 +- .../streaming/IncrementalExecution.scala | 3 +- .../execution/streaming/StreamExecution.scala | 5 +- .../execution/streaming/TriggerExecutor.scala | 2 +- .../spark/sql/execution/streaming/console.scala | 3 +- .../spark/sql/execution/streaming/memory.scala | 1 + .../spark/sql/internal/SessionState.scala | 3 +- .../apache/spark/sql/sources/interfaces.scala | 1 + .../spark/sql/streaming/ContinuousQuery.scala | 110 ++++ .../streaming/ContinuousQueryException.scala | 54 ++ .../sql/streaming/ContinuousQueryListener.scala | 95 ++++ .../sql/streaming/ContinuousQueryManager.scala | 231 ++++++++ .../apache/spark/sql/streaming/SinkStatus.scala | 34 ++ .../spark/sql/streaming/SourceStatus.scala | 34 ++ .../apache/spark/sql/streaming/Trigger.scala | 147 +++++ .../sql/util/ContinuousQueryListener.scala | 75 --- .../apache/spark/sql/ProcessingTimeSuite.scala | 1 + .../scala/org/apache/spark/sql/StreamTest.scala | 565 ------------------ .../streaming/ProcessingTimeExecutorSuite.scala | 2 +- .../ContinuousQueryListenerSuite.scala | 216 +++++++ .../streaming/ContinuousQueryManagerSuite.scala | 5 +- .../sql/streaming/ContinuousQuerySuite.scala | 5 +- .../sql/streaming/FileStreamSinkSuite.scala | 2 +- .../sql/streaming/FileStreamSourceSuite.scala | 4 +- .../spark/sql/streaming/FileStressSuite.scala | 4 +- .../spark/sql/streaming/MemorySinkSuite.scala | 2 +- .../sql/streaming/MemorySourceStressSuite.scala | 4 +- .../spark/sql/streaming/StreamSuite.scala | 2 +- .../apache/spark/sql/streaming/StreamTest.scala | 567 +++++++++++++++++++ .../streaming/StreamingAggregationSuite.scala | 5 +- .../test/DataFrameReaderWriterSuite.scala | 4 +- .../sql/util/ContinuousQueryListenerSuite.scala | 217 ------- 53 files changed, 1630 insertions(+), 1583 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/python/pyspark/sql/streaming.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 8238b8e..cd75622 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -201,7 +201,8 @@ class ProcessingTime(Trigger): self.interval = interval def _to_java_trigger(self, sqlContext): - return sqlContext._sc._jvm.org.apache.spark.sql.ProcessingTime.create(self.interval) + return sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create( + self.interval) def _test(): http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/python/pyspark/sql/utils.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index 8c8768f..9ddaf78 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -71,7 +71,7 @@ def capture_sql_exception(f): raise AnalysisException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '): raise ParseException(s.split(': ', 1)[1], stackTrace) - if s.startswith('org.apache.spark.sql.ContinuousQueryException: '): + if s.startswith('org.apache.spark.sql.streaming.ContinuousQueryException: '): raise ContinuousQueryException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.execution.QueryExecutionException: '): raise QueryExecutionException(s.split(': ', 1)[1], stackTrace) http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java deleted file mode 100644 index 1936d53..0000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/OutputMode.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql; - -import org.apache.spark.annotation.Experimental; - -/** - * :: Experimental :: - * - * OutputMode is used to what data will be written to a streaming sink when there is - * new data available in a streaming DataFrame/Dataset. - * - * @since 2.0.0 - */ -@Experimental -public class OutputMode { - - /** - * OutputMode in which only the new rows in the streaming DataFrame/Dataset will be - * written to the sink. This output mode can be only be used in queries that do not - * contain any aggregation. - * - * @since 2.0.0 - */ - public static OutputMode Append() { - return InternalOutputModes.Append$.MODULE$; - } - - /** - * OutputMode in which all the rows in the streaming DataFrame/Dataset will be written - * to the sink every time these is some updates. This output mode can only be used in queries - * that contain aggregations. - * - * @since 2.0.0 - */ - public static OutputMode Complete() { - return InternalOutputModes.Complete$.MODULE$; - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java new file mode 100644 index 0000000..41e2582 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming; + +import org.apache.spark.annotation.Experimental; +import org.apache.spark.sql.InternalOutputModes; + +/** + * :: Experimental :: + * + * OutputMode is used to what data will be written to a streaming sink when there is + * new data available in a streaming DataFrame/Dataset. + * + * @since 2.0.0 + */ +@Experimental +public class OutputMode { + + /** + * OutputMode in which only the new rows in the streaming DataFrame/Dataset will be + * written to the sink. This output mode can be only be used in queries that do not + * contain any aggregation. + * + * @since 2.0.0 + */ + public static OutputMode Append() { + return InternalOutputModes.Append$.MODULE$; + } + + /** + * OutputMode in which all the rows in the streaming DataFrame/Dataset will be written + * to the sink every time these is some updates. This output mode can only be used in queries + * that contain aggregations. + * + * @since 2.0.0 + */ + public static OutputMode Complete() { + return InternalOutputModes.Complete$.MODULE$; + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala index 8ef5d9a..153f9f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/InternalOutputModes.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.streaming.OutputMode + /** * Internal helper class to generate objects representing various [[OutputMode]]s, */ http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index f4c0347..8373fa3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -17,9 +17,10 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.{AnalysisException, InternalOutputModes, OutputMode} +import org.apache.spark.sql.{AnalysisException, InternalOutputModes} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.streaming.OutputMode /** * Analyzes the presence of unsupported operations in a logical plan. http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java deleted file mode 100644 index 1764f33..0000000 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/JavaOutputModeSuite.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql; - -import org.junit.Test; - -public class JavaOutputModeSuite { - - @Test - public void testOutputModes() { - OutputMode o1 = OutputMode.Append(); - assert(o1.toString().toLowerCase().contains("append")); - OutputMode o2 = OutputMode.Complete(); - assert (o2.toString().toLowerCase().contains("complete")); - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java new file mode 100644 index 0000000..e0a54fe --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/streaming/JavaOutputModeSuite.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming; + +import org.junit.Test; + +public class JavaOutputModeSuite { + + @Test + public void testOutputModes() { + OutputMode o1 = OutputMode.Append(); + assert(o1.toString().toLowerCase().contains("append")); + OutputMode o2 = OutputMode.Complete(); + assert (o2.toString().toLowerCase().contains("complete")); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index c2e3d47..378cca3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.{AnalysisException, OutputMode} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.InternalOutputModes._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.IntegerType /** A dummy command for testing unsupported operations. */ http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala deleted file mode 100644 index 4d5afe2..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.annotation.Experimental - -/** - * :: Experimental :: - * A handle to a query that is executing continuously in the background as new data arrives. - * All these methods are thread-safe. - * @since 2.0.0 - */ -@Experimental -trait ContinuousQuery { - - /** - * Returns the name of the query. - * @since 2.0.0 - */ - def name: String - - /** - * Returns the [[SparkSession]] associated with `this`. - * @since 2.0.0 - */ - def sparkSession: SparkSession - - /** - * Whether the query is currently active or not - * @since 2.0.0 - */ - def isActive: Boolean - - /** - * Returns the [[ContinuousQueryException]] if the query was terminated by an exception. - * @since 2.0.0 - */ - def exception: Option[ContinuousQueryException] - - /** - * Returns current status of all the sources. - * @since 2.0.0 - */ - def sourceStatuses: Array[SourceStatus] - - /** Returns current status of the sink. */ - def sinkStatus: SinkStatus - - /** - * Waits for the termination of `this` query, either by `query.stop()` or by an exception. - * If the query has terminated with an exception, then the exception will be thrown. - * - * If the query has terminated, then all subsequent calls to this method will either return - * immediately (if the query was terminated by `stop()`), or throw the exception - * immediately (if the query has terminated with exception). - * - * @throws ContinuousQueryException, if `this` query has terminated with an exception. - * - * @since 2.0.0 - */ - def awaitTermination(): Unit - - /** - * Waits for the termination of `this` query, either by `query.stop()` or by an exception. - * If the query has terminated with an exception, then the exception will be thrown. - * Otherwise, it returns whether the query has terminated or not within the `timeoutMs` - * milliseconds. - * - * If the query has terminated, then all subsequent calls to this method will either return - * `true` immediately (if the query was terminated by `stop()`), or throw the exception - * immediately (if the query has terminated with exception). - * - * @throws ContinuousQueryException, if `this` query has terminated with an exception - * - * @since 2.0.0 - */ - def awaitTermination(timeoutMs: Long): Boolean - - /** - * Blocks until all available data in the source has been processed an committed to the sink. - * This method is intended for testing. Note that in the case of continually arriving data, this - * method may block forever. Additionally, this method is only guaranteed to block until data that - * has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]] - * prior to invocation. (i.e. `getOffset` must immediately reflect the addition). - */ - def processAllAvailable(): Unit - - /** - * Stops the execution of this query if it is running. This method blocks until the threads - * performing execution has stopped. - * @since 2.0.0 - */ - def stop(): Unit -} http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala deleted file mode 100644 index fec3862..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryException.scala +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution} - -/** - * :: Experimental :: - * Exception that stopped a [[ContinuousQuery]]. - * @param query Query that caused the exception - * @param message Message of this exception - * @param cause Internal cause of this exception - * @param startOffset Starting offset (if known) of the range of data in which exception occurred - * @param endOffset Ending offset (if known) of the range of data in exception occurred - * @since 2.0.0 - */ -@Experimental -class ContinuousQueryException private[sql]( - @transient val query: ContinuousQuery, - val message: String, - val cause: Throwable, - val startOffset: Option[Offset] = None, - val endOffset: Option[Offset] = None) - extends Exception(message, cause) { - - /** Time when the exception occurred */ - val time: Long = System.currentTimeMillis - - override def toString(): String = { - val causeStr = - s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}" - s""" - |$causeStr - | - |${query.asInstanceOf[StreamExecution].toDebugString} - """.stripMargin - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala deleted file mode 100644 index c686400..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQueryManager.scala +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import scala.collection.mutable - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker -import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.util.ContinuousQueryListener -import org.apache.spark.util.{Clock, SystemClock} - -/** - * :: Experimental :: - * A class to manage all the [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active - * on a [[SparkSession]]. - * - * @since 2.0.0 - */ -@Experimental -class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { - - private[sql] val stateStoreCoordinator = - StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env) - private val listenerBus = new ContinuousQueryListenerBus(sparkSession.sparkContext.listenerBus) - private val activeQueries = new mutable.HashMap[String, ContinuousQuery] - private val activeQueriesLock = new Object - private val awaitTerminationLock = new Object - - private var lastTerminatedQuery: ContinuousQuery = null - - /** - * Returns a list of active queries associated with this SQLContext - * - * @since 2.0.0 - */ - def active: Array[ContinuousQuery] = activeQueriesLock.synchronized { - activeQueries.values.toArray - } - - /** - * Returns an active query from this SQLContext or throws exception if bad name - * - * @since 2.0.0 - */ - def get(name: String): ContinuousQuery = activeQueriesLock.synchronized { - activeQueries.getOrElse(name, - throw new IllegalArgumentException(s"There is no active query with name $name")) - } - - /** - * Wait until any of the queries on the associated SQLContext has terminated since the - * creation of the context, or since `resetTerminated()` was called. If any query was terminated - * with an exception, then the exception will be thrown. - * - * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either - * return immediately (if the query was terminated by `query.stop()`), - * or throw the exception immediately (if the query was terminated with exception). Use - * `resetTerminated()` to clear past terminations and wait for new terminations. - * - * In the case where multiple queries have terminated since `resetTermination()` was called, - * if any query has terminated with exception, then `awaitAnyTermination()` will - * throw any of the exception. For correctly documenting exceptions across multiple queries, - * users need to stop all of them after any of them terminates with exception, and then check the - * `query.exception()` for each query. - * - * @throws ContinuousQueryException, if any query has terminated with an exception - * - * @since 2.0.0 - */ - def awaitAnyTermination(): Unit = { - awaitTerminationLock.synchronized { - while (lastTerminatedQuery == null) { - awaitTerminationLock.wait(10) - } - if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) { - throw lastTerminatedQuery.exception.get - } - } - } - - /** - * Wait until any of the queries on the associated SQLContext has terminated since the - * creation of the context, or since `resetTerminated()` was called. Returns whether any query - * has terminated or not (multiple may have terminated). If any query has terminated with an - * exception, then the exception will be thrown. - * - * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either - * return `true` immediately (if the query was terminated by `query.stop()`), - * or throw the exception immediately (if the query was terminated with exception). Use - * `resetTerminated()` to clear past terminations and wait for new terminations. - * - * In the case where multiple queries have terminated since `resetTermination()` was called, - * if any query has terminated with exception, then `awaitAnyTermination()` will - * throw any of the exception. For correctly documenting exceptions across multiple queries, - * users need to stop all of them after any of them terminates with exception, and then check the - * `query.exception()` for each query. - * - * @throws ContinuousQueryException, if any query has terminated with an exception - * - * @since 2.0.0 - */ - def awaitAnyTermination(timeoutMs: Long): Boolean = { - - val startTime = System.currentTimeMillis - def isTimedout = System.currentTimeMillis - startTime >= timeoutMs - - awaitTerminationLock.synchronized { - while (!isTimedout && lastTerminatedQuery == null) { - awaitTerminationLock.wait(10) - } - if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) { - throw lastTerminatedQuery.exception.get - } - lastTerminatedQuery != null - } - } - - /** - * Forget about past terminated queries so that `awaitAnyTermination()` can be used again to - * wait for new terminations. - * - * @since 2.0.0 - */ - def resetTerminated(): Unit = { - awaitTerminationLock.synchronized { - lastTerminatedQuery = null - } - } - - /** - * Register a [[ContinuousQueryListener]] to receive up-calls for life cycle events of - * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]]. - * - * @since 2.0.0 - */ - def addListener(listener: ContinuousQueryListener): Unit = { - listenerBus.addListener(listener) - } - - /** - * Deregister a [[ContinuousQueryListener]]. - * - * @since 2.0.0 - */ - def removeListener(listener: ContinuousQueryListener): Unit = { - listenerBus.removeListener(listener) - } - - /** Post a listener event */ - private[sql] def postListenerEvent(event: ContinuousQueryListener.Event): Unit = { - listenerBus.post(event) - } - - /** Start a query */ - private[sql] def startQuery( - name: String, - checkpointLocation: String, - df: DataFrame, - sink: Sink, - outputMode: OutputMode, - trigger: Trigger = ProcessingTime(0), - triggerClock: Clock = new SystemClock()): ContinuousQuery = { - activeQueriesLock.synchronized { - if (activeQueries.contains(name)) { - throw new IllegalArgumentException( - s"Cannot start query with name $name as a query with that name is already active") - } - val analyzedPlan = df.queryExecution.analyzed - df.queryExecution.assertAnalyzed() - - if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) { - UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) - } - - var nextSourceId = 0L - - val logicalPlan = analyzedPlan.transform { - case StreamingRelation(dataSource, _, output) => - // Materialize source to avoid creating it in every batch - val metadataPath = s"$checkpointLocation/sources/$nextSourceId" - val source = dataSource.createSource(metadataPath) - nextSourceId += 1 - // We still need to use the previous `output` instead of `source.schema` as attributes in - // "df.logicalPlan" has already used attributes of the previous `output`. - StreamingExecutionRelation(source, output) - } - val query = new StreamExecution( - sparkSession, - name, - checkpointLocation, - logicalPlan, - sink, - trigger, - triggerClock, - outputMode) - query.start() - activeQueries.put(name, query) - query - } - } - - /** Notify (by the ContinuousQuery) that the query has been terminated */ - private[sql] def notifyQueryTermination(terminatedQuery: ContinuousQuery): Unit = { - activeQueriesLock.synchronized { - activeQueries -= terminatedQuery.name - } - awaitTerminationLock.synchronized { - if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) { - lastTerminatedQuery = terminatedQuery - } - awaitTerminationLock.notifyAll() - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 291b825..25678e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingA import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.execution.streaming.{MemoryPlan, MemorySink, StreamExecution} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.{ContinuousQuery, OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Utils /** http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 7be49b1..3a6ec45 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython +import org.apache.spark.sql.streaming.ContinuousQuery import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 0dc70c0..2e14c5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -30,13 +30,11 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst._ -import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.ShowTablesCommand import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf} import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.streaming.ContinuousQueryManager import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ExecutionListenerManager @@ -645,7 +643,7 @@ class SQLContext private[sql](val sparkSession: SparkSession) /** * Returns a [[ContinuousQueryManager]] that allows managing all the - * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on `this` context. + * [[org.apache.spark.sql.streaming.ContinuousQuery ContinuousQueries]] active on `this` context. * * @since 2.0.0 */ http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala deleted file mode 100644 index 5a98528..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/SinkStatus.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.{Offset, Sink} - -/** - * :: Experimental :: - * Status and metrics of a streaming [[Sink]]. - * - * @param description Description of the source corresponding to this status - * @param offset Current offset up to which data has been written by the sink - * @since 2.0.0 - */ -@Experimental -class SinkStatus private[sql]( - val description: String, - val offset: Offset) http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala deleted file mode 100644 index 2479e67..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/SourceStatus.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.execution.streaming.{Offset, Source} - -/** - * :: Experimental :: - * Status and metrics of a streaming [[Source]]. - * - * @param description Description of the source corresponding to this status - * @param offset Current offset of the source, if known - * @since 2.0.0 - */ -@Experimental -class SourceStatus private[sql] ( - val description: String, - val offset: Option[Offset]) http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index dc4b72a..52bedf9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -25,7 +25,7 @@ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal -import org.apache.spark.{SparkConf, SparkContext, SparkException} +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.JavaRDD import org.apache.spark.internal.Logging @@ -40,8 +40,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range} import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.ui.SQLListener -import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState, SQLConf} +import org.apache.spark.sql.internal.{CatalogImpl, SessionState, SharedState} import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.streaming._ import org.apache.spark.sql.types.{DataType, LongType, StructType} import org.apache.spark.sql.util.ExecutionListenerManager import org.apache.spark.util.Utils @@ -182,7 +183,7 @@ class SparkSession private( /** * :: Experimental :: * Returns a [[ContinuousQueryManager]] that allows managing all the - * [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on `this`. + * [[ContinuousQuery ContinuousQueries]] active on `this`. * * @group basic * @since 2.0.0 http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala deleted file mode 100644 index 256e8a4..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import java.util.concurrent.TimeUnit - -import scala.concurrent.duration.Duration - -import org.apache.commons.lang3.StringUtils - -import org.apache.spark.annotation.Experimental -import org.apache.spark.unsafe.types.CalendarInterval - -/** - * :: Experimental :: - * Used to indicate how often results should be produced by a [[ContinuousQuery]]. - */ -@Experimental -sealed trait Trigger {} - -/** - * :: Experimental :: - * A trigger that runs a query periodically based on the processing time. If `interval` is 0, - * the query will run as fast as possible. - * - * Scala Example: - * {{{ - * df.write.trigger(ProcessingTime("10 seconds")) - * - * import scala.concurrent.duration._ - * df.write.trigger(ProcessingTime(10.seconds)) - * }}} - * - * Java Example: - * {{{ - * df.write.trigger(ProcessingTime.create("10 seconds")) - * - * import java.util.concurrent.TimeUnit - * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) - * }}} - */ -@Experimental -case class ProcessingTime(intervalMs: Long) extends Trigger { - require(intervalMs >= 0, "the interval of trigger should not be negative") -} - -/** - * :: Experimental :: - * Used to create [[ProcessingTime]] triggers for [[ContinuousQuery]]s. - */ -@Experimental -object ProcessingTime { - - /** - * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. - * - * Example: - * {{{ - * df.write.trigger(ProcessingTime("10 seconds")) - * }}} - */ - def apply(interval: String): ProcessingTime = { - if (StringUtils.isBlank(interval)) { - throw new IllegalArgumentException( - "interval cannot be null or blank.") - } - val cal = if (interval.startsWith("interval")) { - CalendarInterval.fromString(interval) - } else { - CalendarInterval.fromString("interval " + interval) - } - if (cal == null) { - throw new IllegalArgumentException(s"Invalid interval: $interval") - } - if (cal.months > 0) { - throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") - } - new ProcessingTime(cal.microseconds / 1000) - } - - /** - * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. - * - * Example: - * {{{ - * import scala.concurrent.duration._ - * df.write.trigger(ProcessingTime(10.seconds)) - * }}} - */ - def apply(interval: Duration): ProcessingTime = { - new ProcessingTime(interval.toMillis) - } - - /** - * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. - * - * Example: - * {{{ - * df.write.trigger(ProcessingTime.create("10 seconds")) - * }}} - */ - def create(interval: String): ProcessingTime = { - apply(interval) - } - - /** - * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. - * - * Example: - * {{{ - * import java.util.concurrent.TimeUnit - * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) - * }}} - */ - def create(interval: Long, unit: TimeUnit): ProcessingTime = { - new ProcessingTime(unit.toMillis(interval)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index e405252..7e3e45e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.execution.streaming.MemoryPlan import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.streaming.ContinuousQuery private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SparkPlanner => @@ -201,7 +202,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { /** * Used to plan aggregation queries that are computed incrementally as part of a - * [[org.apache.spark.sql.ContinuousQuery]]. Currently this rule is injected into the planner + * [[ContinuousQuery]]. Currently this rule is injected into the planner * on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]] */ object StatefulAggregationStrategy extends Strategy { http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 814880b..93f1ad0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{CalendarIntervalType, StructType} import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala index b1d24b6..2a1be09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerEvent} -import org.apache.spark.sql.util.ContinuousQueryListener -import org.apache.spark.sql.util.ContinuousQueryListener._ +import org.apache.spark.sql.streaming.ContinuousQueryListener import org.apache.spark.util.ListenerBus /** @@ -30,7 +29,10 @@ import org.apache.spark.util.ListenerBus * dispatch them to ContinuousQueryListener. */ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus) - extends SparkListener with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] { + extends SparkListener + with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] { + + import ContinuousQueryListener._ sparkListenerBus.addListener(this) @@ -74,7 +76,8 @@ class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus) * listener bus. */ private case class WrappedContinuousQueryListenerEvent( - streamingListenerEvent: ContinuousQueryListener.Event) extends SparkListenerEvent { + streamingListenerEvent: ContinuousQueryListener.Event) + extends SparkListenerEvent { // Do not log streaming events in event log as history server does not support these events. protected[spark] override def logEvent: Boolean = false http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 5c86049..bc0e443 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.execution.streaming -import org.apache.spark.sql.{InternalOutputModes, OutputMode, SparkSession} +import org.apache.spark.sql.{InternalOutputModes, SparkSession} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SparkPlanner, UnaryExecNode} +import org.apache.spark.sql.streaming.OutputMode /** * A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]] http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index ab0900d..16d38a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -33,8 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.util.ContinuousQueryListener -import org.apache.spark.sql.util.ContinuousQueryListener._ +import org.apache.spark.sql.streaming._ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} /** @@ -54,6 +53,8 @@ class StreamExecution( val outputMode: OutputMode) extends ContinuousQuery with Logging { + import org.apache.spark.sql.streaming.ContinuousQueryListener._ + /** * A lock used to wait/notify when batches complete. Use a fair lock to avoid thread starvation. */ http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index 569907b..ac510df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.internal.Logging -import org.apache.spark.sql.ProcessingTime +import org.apache.spark.sql.streaming.ProcessingTime import org.apache.spark.util.{Clock, SystemClock} trait TriggerExecutor { http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 391f1e5..2ec2a3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -18,8 +18,9 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, OutputMode, SQLContext} +import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} +import org.apache.spark.sql.streaming.OutputMode class ConsoleSink(options: Map[String, String]) extends Sink with Logging { // Number of rows to display, by default 20 rows http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index e4a95e7..4496f41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType object MemoryStream { http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index 4c7bbf0..b2db377 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.AnalyzeTableCommand import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreInsertCastAndRename, ResolveDataSource} +import org.apache.spark.sql.streaming.{ContinuousQuery, ContinuousQueryManager} import org.apache.spark.sql.util.ExecutionListenerManager @@ -142,7 +143,7 @@ private[sql] class SessionState(sparkSession: SparkSession) { lazy val listenerManager: ExecutionListenerManager = new ExecutionListenerManager /** - * Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s. + * Interface to start and stop [[ContinuousQuery]]s. */ lazy val continuousQueryManager: ContinuousQueryManager = { new ContinuousQueryManager(sparkSession) http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 3d4edbb..d2077a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.streaming.{Sink, Source} +import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType /** http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala new file mode 100644 index 0000000..451cfd8 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.SparkSession + +/** + * :: Experimental :: + * A handle to a query that is executing continuously in the background as new data arrives. + * All these methods are thread-safe. + * @since 2.0.0 + */ +@Experimental +trait ContinuousQuery { + + /** + * Returns the name of the query. + * @since 2.0.0 + */ + def name: String + + /** + * Returns the [[SparkSession]] associated with `this`. + * @since 2.0.0 + */ + def sparkSession: SparkSession + + /** + * Whether the query is currently active or not + * @since 2.0.0 + */ + def isActive: Boolean + + /** + * Returns the [[ContinuousQueryException]] if the query was terminated by an exception. + * @since 2.0.0 + */ + def exception: Option[ContinuousQueryException] + + /** + * Returns current status of all the sources. + * @since 2.0.0 + */ + def sourceStatuses: Array[SourceStatus] + + /** Returns current status of the sink. */ + def sinkStatus: SinkStatus + + /** + * Waits for the termination of `this` query, either by `query.stop()` or by an exception. + * If the query has terminated with an exception, then the exception will be thrown. + * + * If the query has terminated, then all subsequent calls to this method will either return + * immediately (if the query was terminated by `stop()`), or throw the exception + * immediately (if the query has terminated with exception). + * + * @throws ContinuousQueryException, if `this` query has terminated with an exception. + * + * @since 2.0.0 + */ + def awaitTermination(): Unit + + /** + * Waits for the termination of `this` query, either by `query.stop()` or by an exception. + * If the query has terminated with an exception, then the exception will be thrown. + * Otherwise, it returns whether the query has terminated or not within the `timeoutMs` + * milliseconds. + * + * If the query has terminated, then all subsequent calls to this method will either return + * `true` immediately (if the query was terminated by `stop()`), or throw the exception + * immediately (if the query has terminated with exception). + * + * @throws ContinuousQueryException, if `this` query has terminated with an exception + * + * @since 2.0.0 + */ + def awaitTermination(timeoutMs: Long): Boolean + + /** + * Blocks until all available data in the source has been processed an committed to the sink. + * This method is intended for testing. Note that in the case of continually arriving data, this + * method may block forever. Additionally, this method is only guaranteed to block until data that + * has been synchronously appended data to a [[org.apache.spark.sql.execution.streaming.Source]] + * prior to invocation. (i.e. `getOffset` must immediately reflect the addition). + */ + def processAllAvailable(): Unit + + /** + * Stops the execution of this query if it is running. This method blocks until the threads + * performing execution has stopped. + * @since 2.0.0 + */ + def stop(): Unit +} http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala new file mode 100644 index 0000000..5196c5a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryException.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution} + +/** + * :: Experimental :: + * Exception that stopped a [[ContinuousQuery]]. + * @param query Query that caused the exception + * @param message Message of this exception + * @param cause Internal cause of this exception + * @param startOffset Starting offset (if known) of the range of data in which exception occurred + * @param endOffset Ending offset (if known) of the range of data in exception occurred + * @since 2.0.0 + */ +@Experimental +class ContinuousQueryException private[sql]( + @transient val query: ContinuousQuery, + val message: String, + val cause: Throwable, + val startOffset: Option[Offset] = None, + val endOffset: Option[Offset] = None) + extends Exception(message, cause) { + + /** Time when the exception occurred */ + val time: Long = System.currentTimeMillis + + override def toString(): String = { + val causeStr = + s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}" + s""" + |$causeStr + | + |${query.asInstanceOf[StreamExecution].toDebugString} + """.stripMargin + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala new file mode 100644 index 0000000..6bdd513 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Interface for listening to events related to [[ContinuousQuery ContinuousQueries]]. + * @note The methods are not thread-safe as they may be called from different threads. + * + * @since 2.0.0 + */ +@Experimental +abstract class ContinuousQueryListener { + + import ContinuousQueryListener._ + + /** + * Called when a query is started. + * @note This is called synchronously with + * [[org.apache.spark.sql.DataFrameWriter `DataFrameWriter.startStream()`]], + * that is, `onQueryStart` will be called on all listeners before + * `DataFrameWriter.startStream()` returns the corresponding [[ContinuousQuery]]. Please + * don't block this method as it will block your query. + * @since 2.0.0 + */ + def onQueryStarted(queryStarted: QueryStarted): Unit + + /** + * Called when there is some status update (ingestion rate updated, etc.) + * + * @note This method is asynchronous. The status in [[ContinuousQuery]] will always be + * latest no matter when this method is called. Therefore, the status of [[ContinuousQuery]] + * may be changed before/when you process the event. E.g., you may find [[ContinuousQuery]] + * is terminated when you are processing [[QueryProgress]]. + * @since 2.0.0 + */ + def onQueryProgress(queryProgress: QueryProgress): Unit + + /** + * Called when a query is stopped, with or without error. + * @since 2.0.0 + */ + def onQueryTerminated(queryTerminated: QueryTerminated): Unit +} + + +/** + * :: Experimental :: + * Companion object of [[ContinuousQueryListener]] that defines the listener events. + * @since 2.0.0 + */ +@Experimental +object ContinuousQueryListener { + + /** + * Base type of [[ContinuousQueryListener]] events. + * @since 2.0.0 + */ + trait Event + + /** + * Event representing the start of a query. + * @since 2.0.0 + */ + class QueryStarted private[sql](val query: ContinuousQuery) extends Event + + /** + * Event representing any progress updates in a query. + * @since 2.0.0 + */ + class QueryProgress private[sql](val query: ContinuousQuery) extends Event + + /** + * Event representing that termination of a query. + * @since 2.0.0 + */ + class QueryTerminated private[sql](val query: ContinuousQuery) extends Event +} http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala new file mode 100644 index 0000000..1bfdd2d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.mutable + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Clock, SystemClock} + +/** + * :: Experimental :: + * A class to manage all the [[ContinuousQuery]] active on a [[SparkSession]]. + * + * @since 2.0.0 + */ +@Experimental +class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { + + private[sql] val stateStoreCoordinator = + StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env) + private val listenerBus = new ContinuousQueryListenerBus(sparkSession.sparkContext.listenerBus) + private val activeQueries = new mutable.HashMap[String, ContinuousQuery] + private val activeQueriesLock = new Object + private val awaitTerminationLock = new Object + + private var lastTerminatedQuery: ContinuousQuery = null + + /** + * Returns a list of active queries associated with this SQLContext + * + * @since 2.0.0 + */ + def active: Array[ContinuousQuery] = activeQueriesLock.synchronized { + activeQueries.values.toArray + } + + /** + * Returns an active query from this SQLContext or throws exception if bad name + * + * @since 2.0.0 + */ + def get(name: String): ContinuousQuery = activeQueriesLock.synchronized { + activeQueries.getOrElse(name, + throw new IllegalArgumentException(s"There is no active query with name $name")) + } + + /** + * Wait until any of the queries on the associated SQLContext has terminated since the + * creation of the context, or since `resetTerminated()` was called. If any query was terminated + * with an exception, then the exception will be thrown. + * + * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either + * return immediately (if the query was terminated by `query.stop()`), + * or throw the exception immediately (if the query was terminated with exception). Use + * `resetTerminated()` to clear past terminations and wait for new terminations. + * + * In the case where multiple queries have terminated since `resetTermination()` was called, + * if any query has terminated with exception, then `awaitAnyTermination()` will + * throw any of the exception. For correctly documenting exceptions across multiple queries, + * users need to stop all of them after any of them terminates with exception, and then check the + * `query.exception()` for each query. + * + * @throws ContinuousQueryException, if any query has terminated with an exception + * + * @since 2.0.0 + */ + def awaitAnyTermination(): Unit = { + awaitTerminationLock.synchronized { + while (lastTerminatedQuery == null) { + awaitTerminationLock.wait(10) + } + if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) { + throw lastTerminatedQuery.exception.get + } + } + } + + /** + * Wait until any of the queries on the associated SQLContext has terminated since the + * creation of the context, or since `resetTerminated()` was called. Returns whether any query + * has terminated or not (multiple may have terminated). If any query has terminated with an + * exception, then the exception will be thrown. + * + * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either + * return `true` immediately (if the query was terminated by `query.stop()`), + * or throw the exception immediately (if the query was terminated with exception). Use + * `resetTerminated()` to clear past terminations and wait for new terminations. + * + * In the case where multiple queries have terminated since `resetTermination()` was called, + * if any query has terminated with exception, then `awaitAnyTermination()` will + * throw any of the exception. For correctly documenting exceptions across multiple queries, + * users need to stop all of them after any of them terminates with exception, and then check the + * `query.exception()` for each query. + * + * @throws ContinuousQueryException, if any query has terminated with an exception + * + * @since 2.0.0 + */ + def awaitAnyTermination(timeoutMs: Long): Boolean = { + + val startTime = System.currentTimeMillis + def isTimedout = System.currentTimeMillis - startTime >= timeoutMs + + awaitTerminationLock.synchronized { + while (!isTimedout && lastTerminatedQuery == null) { + awaitTerminationLock.wait(10) + } + if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) { + throw lastTerminatedQuery.exception.get + } + lastTerminatedQuery != null + } + } + + /** + * Forget about past terminated queries so that `awaitAnyTermination()` can be used again to + * wait for new terminations. + * + * @since 2.0.0 + */ + def resetTerminated(): Unit = { + awaitTerminationLock.synchronized { + lastTerminatedQuery = null + } + } + + /** + * Register a [[ContinuousQueryListener]] to receive up-calls for life cycle events of + * [[ContinuousQuery]]. + * + * @since 2.0.0 + */ + def addListener(listener: ContinuousQueryListener): Unit = { + listenerBus.addListener(listener) + } + + /** + * Deregister a [[ContinuousQueryListener]]. + * + * @since 2.0.0 + */ + def removeListener(listener: ContinuousQueryListener): Unit = { + listenerBus.removeListener(listener) + } + + /** Post a listener event */ + private[sql] def postListenerEvent(event: ContinuousQueryListener.Event): Unit = { + listenerBus.post(event) + } + + /** Start a query */ + private[sql] def startQuery( + name: String, + checkpointLocation: String, + df: DataFrame, + sink: Sink, + outputMode: OutputMode, + trigger: Trigger = ProcessingTime(0), + triggerClock: Clock = new SystemClock()): ContinuousQuery = { + activeQueriesLock.synchronized { + if (activeQueries.contains(name)) { + throw new IllegalArgumentException( + s"Cannot start query with name $name as a query with that name is already active") + } + val analyzedPlan = df.queryExecution.analyzed + df.queryExecution.assertAnalyzed() + + if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) { + UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) + } + + var nextSourceId = 0L + + val logicalPlan = analyzedPlan.transform { + case StreamingRelation(dataSource, _, output) => + // Materialize source to avoid creating it in every batch + val metadataPath = s"$checkpointLocation/sources/$nextSourceId" + val source = dataSource.createSource(metadataPath) + nextSourceId += 1 + // We still need to use the previous `output` instead of `source.schema` as attributes in + // "df.logicalPlan" has already used attributes of the previous `output`. + StreamingExecutionRelation(source, output) + } + val query = new StreamExecution( + sparkSession, + name, + checkpointLocation, + logicalPlan, + sink, + trigger, + triggerClock, + outputMode) + query.start() + activeQueries.put(name, query) + query + } + } + + /** Notify (by the ContinuousQuery) that the query has been terminated */ + private[sql] def notifyQueryTermination(terminatedQuery: ContinuousQuery): Unit = { + activeQueriesLock.synchronized { + activeQueries -= terminatedQuery.name + } + awaitTerminationLock.synchronized { + if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) { + lastTerminatedQuery = terminatedQuery + } + awaitTerminationLock.notifyAll() + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala new file mode 100644 index 0000000..79ddf01 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.execution.streaming.{Offset, Sink} + +/** + * :: Experimental :: + * Status and metrics of a streaming [[Sink]]. + * + * @param description Description of the source corresponding to this status + * @param offset Current offset up to which data has been written by the sink + * @since 2.0.0 + */ +@Experimental +class SinkStatus private[sql]( + val description: String, + val offset: Offset) http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala new file mode 100644 index 0000000..8fccd5b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.execution.streaming.{Offset, Source} + +/** + * :: Experimental :: + * Status and metrics of a streaming [[Source]]. + * + * @param description Description of the source corresponding to this status + * @param offset Current offset of the source, if known + * @since 2.0.0 + */ +@Experimental +class SourceStatus private[sql] ( + val description: String, + val offset: Option[Offset]) http://git-wip-us.apache.org/repos/asf/spark/blob/a71d1364/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala new file mode 100644 index 0000000..d3fdbac --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration.Duration + +import org.apache.commons.lang3.StringUtils + +import org.apache.spark.annotation.Experimental +import org.apache.spark.unsafe.types.CalendarInterval + +/** + * :: Experimental :: + * Used to indicate how often results should be produced by a [[ContinuousQuery]]. + * + * @since 2.0.0 + */ +@Experimental +sealed trait Trigger + +/** + * :: Experimental :: + * A trigger that runs a query periodically based on the processing time. If `interval` is 0, + * the query will run as fast as possible. + * + * Scala Example: + * {{{ + * df.write.trigger(ProcessingTime("10 seconds")) + * + * import scala.concurrent.duration._ + * df.write.trigger(ProcessingTime(10.seconds)) + * }}} + * + * Java Example: + * {{{ + * df.write.trigger(ProcessingTime.create("10 seconds")) + * + * import java.util.concurrent.TimeUnit + * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + */ +@Experimental +case class ProcessingTime(intervalMs: Long) extends Trigger { + require(intervalMs >= 0, "the interval of trigger should not be negative") +} + +/** + * :: Experimental :: + * Used to create [[ProcessingTime]] triggers for [[ContinuousQuery]]s. + * + * @since 2.0.0 + */ +@Experimental +object ProcessingTime { + + /** + * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. + * + * Example: + * {{{ + * df.write.trigger(ProcessingTime("10 seconds")) + * }}} + * + * @since 2.0.0 + */ + def apply(interval: String): ProcessingTime = { + if (StringUtils.isBlank(interval)) { + throw new IllegalArgumentException( + "interval cannot be null or blank.") + } + val cal = if (interval.startsWith("interval")) { + CalendarInterval.fromString(interval) + } else { + CalendarInterval.fromString("interval " + interval) + } + if (cal == null) { + throw new IllegalArgumentException(s"Invalid interval: $interval") + } + if (cal.months > 0) { + throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") + } + new ProcessingTime(cal.microseconds / 1000) + } + + /** + * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. + * + * Example: + * {{{ + * import scala.concurrent.duration._ + * df.write.trigger(ProcessingTime(10.seconds)) + * }}} + * + * @since 2.0.0 + */ + def apply(interval: Duration): ProcessingTime = { + new ProcessingTime(interval.toMillis) + } + + /** + * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. + * + * Example: + * {{{ + * df.write.trigger(ProcessingTime.create("10 seconds")) + * }}} + * + * @since 2.0.0 + */ + def create(interval: String): ProcessingTime = { + apply(interval) + } + + /** + * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. + * + * Example: + * {{{ + * import java.util.concurrent.TimeUnit + * df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) + * }}} + * + * @since 2.0.0 + */ + def create(interval: Long, unit: TimeUnit): ProcessingTime = { + new ProcessingTime(unit.toMillis(interval)) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org