Repository: spark Updated Branches: refs/heads/branch-2.0 d9db8a9c8 -> 97fe1d8ee
[SPARK-15889][SQL][STREAMING] Add a unique id to ContinuousQuery ## What changes were proposed in this pull request? ContinuousQueries have names that are unique across all the active ones. However, when queries are rapidly restarted with same name, it causes races conditions with the listener. A listener event from a stopped query can arrive after the query has been restarted, leading to complexities in monitoring infrastructure. Along with this change, I have also consolidated all the messy code paths to start queries with different sinks. ## How was this patch tested? Added unit tests, and existing unit tests. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #13613 from tdas/SPARK-15889. (cherry picked from commit c654ae2140bc184adb407fd02072b653c5359ee5) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97fe1d8e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97fe1d8e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97fe1d8e Branch: refs/heads/branch-2.0 Commit: 97fe1d8ee10413e2dbff7b90a7e747636fcb2e6c Parents: d9db8a9 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Mon Jun 13 13:44:46 2016 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Mon Jun 13 13:44:53 2016 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/DataFrameWriter.scala | 69 +++++------------- .../execution/streaming/StreamExecution.scala | 8 ++- .../spark/sql/streaming/ContinuousQuery.scala | 11 ++- .../sql/streaming/ContinuousQueryInfo.scala | 5 +- .../sql/streaming/ContinuousQueryManager.scala | 74 ++++++++++++++++---- .../ContinuousQueryListenerSuite.scala | 17 ++++- .../streaming/ContinuousQueryManagerSuite.scala | 29 +++----- .../sql/streaming/ContinuousQuerySuite.scala | 43 +++++++++++- .../apache/spark/sql/streaming/StreamTest.scala | 12 ++-- 9 files changed, 167 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/97fe1d8e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index afae078..171b137 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -336,34 +336,23 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertStreaming("startStream() can only be called on continuous queries") if (source == "memory") { - val queryName = - extraOptions.getOrElse( - "queryName", throw new AnalysisException("queryName must be specified for memory sink")) - val checkpointLocation = getCheckpointLocation(queryName, failIfNotSet = false).getOrElse { - Utils.createTempDir(namePrefix = "memory.stream").getCanonicalPath - } - - // If offsets have already been created, we trying to resume a query. - val checkpointPath = new Path(checkpointLocation, "offsets") - val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf()) - if (fs.exists(checkpointPath)) { - throw new AnalysisException( - s"Unable to resume query written to memory sink. Delete $checkpointPath to start over.") - } else { - checkpointPath.toUri.toString + if (extraOptions.get("queryName").isEmpty) { + throw new AnalysisException("queryName must be specified for memory sink") } val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) - resultDf.createOrReplaceTempView(queryName) - val continuousQuery = df.sparkSession.sessionState.continuousQueryManager.startQuery( - queryName, - checkpointLocation, + val query = df.sparkSession.sessionState.continuousQueryManager.startQuery( + extraOptions.get("queryName"), + extraOptions.get("checkpointLocation"), df, sink, outputMode, - trigger) - continuousQuery + useTempCheckpointLocation = true, + recoverFromCheckpointLocation = false, + trigger = trigger) + resultDf.createOrReplaceTempView(query.name) + query } else { val dataSource = DataSource( @@ -371,14 +360,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { className = source, options = extraOptions.toMap, partitionColumns = normalizedParCols.getOrElse(Nil)) - val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName) df.sparkSession.sessionState.continuousQueryManager.startQuery( - queryName, - getCheckpointLocation(queryName, failIfNotSet = true).get, + extraOptions.get("queryName"), + extraOptions.get("checkpointLocation"), df, dataSource.createSink(outputMode), outputMode, - trigger) + trigger = trigger) } } @@ -437,38 +425,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { assertStreaming( "foreach() can only be called on streaming Datasets/DataFrames.") - val queryName = extraOptions.getOrElse("queryName", StreamExecution.nextName) val sink = new ForeachSink[T](ds.sparkSession.sparkContext.clean(writer))(ds.exprEnc) df.sparkSession.sessionState.continuousQueryManager.startQuery( - queryName, - getCheckpointLocation(queryName, failIfNotSet = false).getOrElse { - Utils.createTempDir(namePrefix = "foreach.stream").getCanonicalPath - }, + extraOptions.get("queryName"), + extraOptions.get("checkpointLocation"), df, sink, outputMode, - trigger) - } - - /** - * Returns the checkpointLocation for a query. If `failIfNotSet` is `true` but the checkpoint - * location is not set, [[AnalysisException]] will be thrown. If `failIfNotSet` is `false`, `None` - * will be returned if the checkpoint location is not set. - */ - private def getCheckpointLocation(queryName: String, failIfNotSet: Boolean): Option[String] = { - val checkpointLocation = extraOptions.get("checkpointLocation").map { userSpecified => - new Path(userSpecified).toUri.toString - }.orElse { - df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location => - new Path(location, queryName).toUri.toString - } - } - if (failIfNotSet && checkpointLocation.isEmpty) { - throw new AnalysisException("checkpointLocation must be specified either " + - """through option("checkpointLocation", ...) or """ + - s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""") - } - checkpointLocation + useTempCheckpointLocation = true, + trigger = trigger) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/97fe1d8e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 954fc33..5095fe7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.{CountDownLatch, TimeUnit} -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.locks.ReentrantLock import scala.collection.mutable.ArrayBuffer @@ -44,6 +44,7 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} */ class StreamExecution( override val sparkSession: SparkSession, + override val id: Long, override val name: String, checkpointRoot: String, private[sql] val logicalPlan: LogicalPlan, @@ -492,6 +493,7 @@ class StreamExecution( private def toInfo: ContinuousQueryInfo = { new ContinuousQueryInfo( this.name, + this.id, this.sourceStatuses, this.sinkStatus) } @@ -503,7 +505,7 @@ class StreamExecution( } private[sql] object StreamExecution { - private val nextId = new AtomicInteger() + private val _nextId = new AtomicLong(0) - def nextName: String = s"query-${nextId.getAndIncrement}" + def nextId: Long = _nextId.getAndIncrement() } http://git-wip-us.apache.org/repos/asf/spark/blob/97fe1d8e/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala index 3bbb0b8..1e0a47d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQuery.scala @@ -30,12 +30,21 @@ import org.apache.spark.sql.SparkSession trait ContinuousQuery { /** - * Returns the name of the query. + * Returns the name of the query. This name is unique across all active queries. This can be + * set in the[[org.apache.spark.sql.DataFrameWriter DataFrameWriter]] as + * `dataframe.write().queryName("query").startStream()`. * @since 2.0.0 */ def name: String /** + * Returns the unique id of this query. This id is automatically generated and is unique across + * all queries that have been started in the current process. + * @since 2.0.0 + */ + def id: Long + + /** * Returns the [[SparkSession]] associated with `this`. * @since 2.0.0 */ http://git-wip-us.apache.org/repos/asf/spark/blob/97fe1d8e/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala index 57b718b..19f2270 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala @@ -23,12 +23,15 @@ import org.apache.spark.annotation.Experimental * :: Experimental :: * A class used to report information about the progress of a [[ContinuousQuery]]. * - * @param name The [[ContinuousQuery]] name. + * @param name The [[ContinuousQuery]] name. This name is unique across all active queries. + * @param id The [[ContinuousQuery]] id. This id is unique across + * all queries that have been started in the current process. * @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s sources. * @param sinkStatus The current status of the [[ContinuousQuery]]'s sink. */ @Experimental class ContinuousQueryInfo private[sql]( val name: String, + val id: Long, val sourceStatuses: Seq[SourceStatus], val sinkStatus: SinkStatus) http://git-wip-us.apache.org/repos/asf/spark/blob/97fe1d8e/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala index 1bfdd2d..0f4a9c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryManager.scala @@ -19,13 +19,15 @@ package org.apache.spark.sql.streaming import scala.collection.mutable +import org.apache.hadoop.fs.Path + import org.apache.spark.annotation.Experimental -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.util.{Clock, SystemClock} +import org.apache.spark.util.{Clock, SystemClock, Utils} /** * :: Experimental :: @@ -39,7 +41,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { private[sql] val stateStoreCoordinator = StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env) private val listenerBus = new ContinuousQueryListenerBus(sparkSession.sparkContext.listenerBus) - private val activeQueries = new mutable.HashMap[String, ContinuousQuery] + private val activeQueries = new mutable.HashMap[Long, ContinuousQuery] private val activeQueriesLock = new Object private val awaitTerminationLock = new Object @@ -55,13 +57,12 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { } /** - * Returns an active query from this SQLContext or throws exception if bad name + * Returns the query if there is an active query with the given id, or null. * * @since 2.0.0 */ - def get(name: String): ContinuousQuery = activeQueriesLock.synchronized { - activeQueries.getOrElse(name, - throw new IllegalArgumentException(s"There is no active query with name $name")) + def get(id: Long): ContinuousQuery = activeQueriesLock.synchronized { + activeQueries.get(id).orNull } /** @@ -168,20 +169,66 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { listenerBus.post(event) } - /** Start a query */ + /** + * Start a [[ContinuousQuery]]. + * @param userSpecifiedName Query name optionally specified by the user. + * @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user. + * @param df Streaming DataFrame. + * @param sink Sink to write the streaming outputs. + * @param outputMode Output mode for the sink. + * @param useTempCheckpointLocation Whether to use a temporary checkpoint location when the user + * has not specified one. If false, then error will be thrown. + * @param recoverFromCheckpointLocation Whether to recover query from the checkpoint location. + * If false and the checkpoint location exists, then error + * will be thrown. + * @param trigger [[Trigger]] for the query. + * @param triggerClock [[Clock]] to use for the triggering. + */ private[sql] def startQuery( - name: String, - checkpointLocation: String, + userSpecifiedName: Option[String], + userSpecifiedCheckpointLocation: Option[String], df: DataFrame, sink: Sink, outputMode: OutputMode, + useTempCheckpointLocation: Boolean = false, + recoverFromCheckpointLocation: Boolean = true, trigger: Trigger = ProcessingTime(0), triggerClock: Clock = new SystemClock()): ContinuousQuery = { activeQueriesLock.synchronized { - if (activeQueries.contains(name)) { + val id = StreamExecution.nextId + val name = userSpecifiedName.getOrElse(s"query-$id") + if (activeQueries.values.exists(_.name == name)) { throw new IllegalArgumentException( s"Cannot start query with name $name as a query with that name is already active") } + val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified => + new Path(userSpecified).toUri.toString + }.orElse { + df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location => + new Path(location, name).toUri.toString + } + }.getOrElse { + if (useTempCheckpointLocation) { + Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath + } else { + throw new AnalysisException( + "checkpointLocation must be specified either " + + """through option("checkpointLocation", ...) or """ + + s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""") + } + } + + // If offsets have already been created, we trying to resume a query. + if (!recoverFromCheckpointLocation) { + val checkpointPath = new Path(checkpointLocation, "offsets") + val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf()) + if (fs.exists(checkpointPath)) { + throw new AnalysisException( + s"This query does not support recovering from checkpoint location. " + + s"Delete $checkpointPath to start over.") + } + } + val analyzedPlan = df.queryExecution.analyzed df.queryExecution.assertAnalyzed() @@ -203,6 +250,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { } val query = new StreamExecution( sparkSession, + id, name, checkpointLocation, logicalPlan, @@ -211,7 +259,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { triggerClock, outputMode) query.start() - activeQueries.put(name, query) + activeQueries.put(id, query) query } } @@ -219,7 +267,7 @@ class ContinuousQueryManager private[sql] (sparkSession: SparkSession) { /** Notify (by the ContinuousQuery) that the query has been terminated */ private[sql] def notifyQueryTermination(terminatedQuery: ContinuousQuery): Unit = { activeQueriesLock.synchronized { - activeQueries -= terminatedQuery.name + activeQueries -= terminatedQuery.id } awaitTerminationLock.synchronized { if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) { http://git-wip-us.apache.org/repos/asf/spark/blob/97fe1d8e/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala index 9b59ab6..8e1de09 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala @@ -50,9 +50,11 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { withListenerAdded(listener) { testStream(input.toDS)( StartStream(), - Assert("Incorrect query status in onQueryStarted") { + AssertOnQuery("Incorrect query status in onQueryStarted") { query => val status = listener.startStatus assert(status != null) + assert(status.name === query.name) + assert(status.id === query.id) assert(status.sourceStatuses.size === 1) assert(status.sourceStatuses(0).description.contains("Memory")) @@ -67,13 +69,15 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { }, AddDataMemory(input, Seq(1, 2, 3)), CheckAnswer(1, 2, 3), - Assert("Incorrect query status in onQueryProgress") { + AssertOnQuery("Incorrect query status in onQueryProgress") { query => eventually(Timeout(streamingTimeout)) { // There should be only on progress event as batch has been processed assert(listener.progressStatuses.size === 1) val status = listener.progressStatuses.peek() assert(status != null) + assert(status.name === query.name) + assert(status.id === query.id) assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)) assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString) @@ -82,12 +86,16 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { } }, StopStream, - Assert("Incorrect query status in onQueryTerminated") { + AssertOnQuery("Incorrect query status in onQueryTerminated") { query => eventually(Timeout(streamingTimeout)) { val status = listener.terminationStatus assert(status != null) + assert(status.name === query.name) + assert(status.id === query.id) assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)) assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString) + assert(listener.terminationStackTrace.isEmpty) + assert(listener.terminationException === None) } listener.checkAsyncErrors() } @@ -161,6 +169,7 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { test("QueryStarted serialization") { val queryStartedInfo = new ContinuousQueryInfo( "name", + 1, Seq(new SourceStatus("source1", None), new SourceStatus("source2", None)), new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString)) val queryStarted = new ContinuousQueryListener.QueryStarted(queryStartedInfo) @@ -173,6 +182,7 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { test("QueryProgress serialization") { val queryProcessInfo = new ContinuousQueryInfo( "name", + 1, Seq( new SourceStatus("source1", Some(LongOffset(0).toString)), new SourceStatus("source2", Some(LongOffset(1).toString))), @@ -187,6 +197,7 @@ class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { test("QueryTerminated serialization") { val queryTerminatedInfo = new ContinuousQueryInfo( "name", + 1, Seq( new SourceStatus("source1", Some(LongOffset(0).toString)), new SourceStatus("source2", Some(LongOffset(1).toString))), http://git-wip-us.apache.org/repos/asf/spark/blob/97fe1d8e/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala index c1e4970..f81608b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala @@ -59,23 +59,15 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter { assert(spark.streams.active.toSet === queries.toSet) val (q1, q2, q3) = (queries(0), queries(1), queries(2)) - assert(spark.streams.get(q1.name).eq(q1)) - assert(spark.streams.get(q2.name).eq(q2)) - assert(spark.streams.get(q3.name).eq(q3)) - intercept[IllegalArgumentException] { - spark.streams.get("non-existent-name") - } - + assert(spark.streams.get(q1.id).eq(q1)) + assert(spark.streams.get(q2.id).eq(q2)) + assert(spark.streams.get(q3.id).eq(q3)) + assert(spark.streams.get(-1) === null) // non-existent id q1.stop() assert(spark.streams.active.toSet === Set(q2, q3)) - val ex1 = withClue("no error while getting non-active query") { - intercept[IllegalArgumentException] { - spark.streams.get(q1.name) - } - } - assert(ex1.getMessage.contains(q1.name), "error does not contain name of query to be fetched") - assert(spark.streams.get(q2.name).eq(q2)) + assert(spark.streams.get(q1.id) === null) + assert(spark.streams.get(q2.id).eq(q2)) m2.addData(0) // q2 should terminate with error @@ -83,12 +75,7 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter { require(!q2.isActive) require(q2.exception.isDefined) } - withClue("no error while getting non-active query") { - intercept[IllegalArgumentException] { - spark.streams.get(q2.name).eq(q2) - } - } - + assert(spark.streams.get(q2.id) === null) assert(spark.streams.active.toSet === Set(q3)) } } @@ -227,7 +214,7 @@ class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter { } - /** Run a body of code by defining a query each on multiple datasets */ + /** Run a body of code by defining a query on each dataset */ private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[ContinuousQuery] => Unit): Unit = { failAfter(streamingTimeout) { val queries = withClue("Error starting queries") { http://git-wip-us.apache.org/repos/asf/spark/blob/97fe1d8e/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala index 5542405..43a8857 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala @@ -17,15 +17,56 @@ package org.apache.spark.sql.streaming +import org.scalatest.BeforeAndAfter + import org.apache.spark.SparkException import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, MemoryStream, StreamExecution} +import org.apache.spark.util.Utils -class ContinuousQuerySuite extends StreamTest { +class ContinuousQuerySuite extends StreamTest with BeforeAndAfter { import AwaitTerminationTester._ import testImplicits._ + after { + sqlContext.streams.active.foreach(_.stop()) + } + + test("names unique across active queries, ids unique across all started queries") { + val inputData = MemoryStream[Int] + val mapped = inputData.toDS().map { 6 / _} + + def startQuery(queryName: String): ContinuousQuery = { + val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath + val writer = mapped.write + writer + .queryName(queryName) + .format("memory") + .option("checkpointLocation", metadataRoot) + .startStream() + } + + val q1 = startQuery("q1") + assert(q1.name === "q1") + + // Verify that another query with same name cannot be started + val e1 = intercept[IllegalArgumentException] { + startQuery("q1") + } + Seq("q1", "already active").foreach { s => assert(e1.getMessage.contains(s)) } + + // Verify q1 was unaffected by the above exception and stop it + assert(q1.isActive) + q1.stop() + + // Verify another query can be started with name q1, but will have different id + val q2 = startQuery("q1") + assert(q2.name === "q1") + assert(q2.id !== q1.id) + q2.stop() + } + testQuietly("lifecycle states and awaitTermination") { val inputData = MemoryStream[Int] val mapped = inputData.toDS().map { 6 / _} http://git-wip-us.apache.org/repos/asf/spark/blob/97fe1d8e/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 7f1e5fe..cbfa6ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -188,8 +188,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { new AssertOnQuery(condition, message) } - def apply(message: String)(condition: StreamExecution => Boolean): AssertOnQuery = { - new AssertOnQuery(condition, message) + def apply(message: String)(condition: StreamExecution => Unit): AssertOnQuery = { + new AssertOnQuery(s => { condition; true }, message) } } @@ -305,13 +305,13 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { spark .streams .startQuery( - StreamExecution.nextName, - metadataRoot, + None, + Some(metadataRoot), stream, sink, outputMode, - trigger, - triggerClock) + trigger = trigger, + triggerClock = triggerClock) .asInstanceOf[StreamExecution] currentStream.microBatchThread.setUncaughtExceptionHandler( new UncaughtExceptionHandler { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org