http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala new file mode 100644 index 0000000..c43de58 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.annotation.Experimental +import org.apache.spark.scheduler.SparkListenerEvent + +/** + * :: Experimental :: + * Interface for listening to events related to [[StreamingQuery StreamingQueries]]. + * @note The methods are not thread-safe as they may be called from different threads. + * + * @since 2.0.0 + */ +@Experimental +abstract class StreamingQueryListener { + + import StreamingQueryListener._ + + /** + * Called when a query is started. + * @note This is called synchronously with + * [[org.apache.spark.sql.DataFrameWriter `DataFrameWriter.startStream()`]], + * that is, `onQueryStart` will be called on all listeners before + * `DataFrameWriter.startStream()` returns the corresponding [[StreamingQuery]]. Please + * don't block this method as it will block your query. + * @since 2.0.0 + */ + def onQueryStarted(queryStarted: QueryStarted): Unit + + /** + * Called when there is some status update (ingestion rate updated, etc.) + * + * @note This method is asynchronous. The status in [[StreamingQuery]] will always be + * latest no matter when this method is called. Therefore, the status of [[StreamingQuery]] + * may be changed before/when you process the event. E.g., you may find [[StreamingQuery]] + * is terminated when you are processing [[QueryProgress]]. + * @since 2.0.0 + */ + def onQueryProgress(queryProgress: QueryProgress): Unit + + /** + * Called when a query is stopped, with or without error. + * @since 2.0.0 + */ + def onQueryTerminated(queryTerminated: QueryTerminated): Unit +} + + +/** + * :: Experimental :: + * Companion object of [[StreamingQueryListener]] that defines the listener events. + * @since 2.0.0 + */ +@Experimental +object StreamingQueryListener { + + /** + * :: Experimental :: + * Base type of [[StreamingQueryListener]] events + * @since 2.0.0 + */ + @Experimental + trait Event extends SparkListenerEvent + + /** + * :: Experimental :: + * Event representing the start of a query + * @since 2.0.0 + */ + @Experimental + class QueryStarted private[sql](val queryInfo: StreamingQueryInfo) extends Event + + /** + * :: Experimental :: + * Event representing any progress updates in a query + * @since 2.0.0 + */ + @Experimental + class QueryProgress private[sql](val queryInfo: StreamingQueryInfo) extends Event + + /** + * :: Experimental :: + * Event representing that termination of a query + * + * @param queryInfo Information about the status of the query. + * @param exception The exception message of the [[StreamingQuery]] if the query was terminated + * with an exception. Otherwise, it will be `None`. + * @param stackTrace The stack trace of the exception if the query was terminated with an + * exception. It will be empty if there was no error. + * @since 2.0.0 + */ + @Experimental + class QueryTerminated private[sql]( + val queryInfo: StreamingQueryInfo, + val exception: Option[String], + val stackTrace: Seq[StackTraceElement]) extends Event +}
http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala new file mode 100644 index 0000000..bae7f56 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.collection.mutable + +import org.apache.hadoop.fs.Path + +import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Clock, SystemClock, Utils} + +/** + * :: Experimental :: + * A class to manage all the [[StreamingQuery]] active on a [[SparkSession]]. + * + * @since 2.0.0 + */ +@Experimental +class StreamingQueryManager private[sql] (sparkSession: SparkSession) { + + private[sql] val stateStoreCoordinator = + StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env) + private val listenerBus = new StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus) + private val activeQueries = new mutable.HashMap[Long, StreamingQuery] + private val activeQueriesLock = new Object + private val awaitTerminationLock = new Object + + private var lastTerminatedQuery: StreamingQuery = null + + /** + * Returns a list of active queries associated with this SQLContext + * + * @since 2.0.0 + */ + def active: Array[StreamingQuery] = activeQueriesLock.synchronized { + activeQueries.values.toArray + } + + /** + * Returns the query if there is an active query with the given id, or null. + * + * @since 2.0.0 + */ + def get(id: Long): StreamingQuery = activeQueriesLock.synchronized { + activeQueries.get(id).orNull + } + + /** + * Wait until any of the queries on the associated SQLContext has terminated since the + * creation of the context, or since `resetTerminated()` was called. If any query was terminated + * with an exception, then the exception will be thrown. + * + * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either + * return immediately (if the query was terminated by `query.stop()`), + * or throw the exception immediately (if the query was terminated with exception). Use + * `resetTerminated()` to clear past terminations and wait for new terminations. + * + * In the case where multiple queries have terminated since `resetTermination()` was called, + * if any query has terminated with exception, then `awaitAnyTermination()` will + * throw any of the exception. For correctly documenting exceptions across multiple queries, + * users need to stop all of them after any of them terminates with exception, and then check the + * `query.exception()` for each query. + * + * @throws StreamingQueryException, if any query has terminated with an exception + * + * @since 2.0.0 + */ + def awaitAnyTermination(): Unit = { + awaitTerminationLock.synchronized { + while (lastTerminatedQuery == null) { + awaitTerminationLock.wait(10) + } + if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) { + throw lastTerminatedQuery.exception.get + } + } + } + + /** + * Wait until any of the queries on the associated SQLContext has terminated since the + * creation of the context, or since `resetTerminated()` was called. Returns whether any query + * has terminated or not (multiple may have terminated). If any query has terminated with an + * exception, then the exception will be thrown. + * + * If a query has terminated, then subsequent calls to `awaitAnyTermination()` will either + * return `true` immediately (if the query was terminated by `query.stop()`), + * or throw the exception immediately (if the query was terminated with exception). Use + * `resetTerminated()` to clear past terminations and wait for new terminations. + * + * In the case where multiple queries have terminated since `resetTermination()` was called, + * if any query has terminated with exception, then `awaitAnyTermination()` will + * throw any of the exception. For correctly documenting exceptions across multiple queries, + * users need to stop all of them after any of them terminates with exception, and then check the + * `query.exception()` for each query. + * + * @throws StreamingQueryException, if any query has terminated with an exception + * + * @since 2.0.0 + */ + def awaitAnyTermination(timeoutMs: Long): Boolean = { + + val startTime = System.currentTimeMillis + def isTimedout = System.currentTimeMillis - startTime >= timeoutMs + + awaitTerminationLock.synchronized { + while (!isTimedout && lastTerminatedQuery == null) { + awaitTerminationLock.wait(10) + } + if (lastTerminatedQuery != null && lastTerminatedQuery.exception.nonEmpty) { + throw lastTerminatedQuery.exception.get + } + lastTerminatedQuery != null + } + } + + /** + * Forget about past terminated queries so that `awaitAnyTermination()` can be used again to + * wait for new terminations. + * + * @since 2.0.0 + */ + def resetTerminated(): Unit = { + awaitTerminationLock.synchronized { + lastTerminatedQuery = null + } + } + + /** + * Register a [[StreamingQueryListener]] to receive up-calls for life cycle events of + * [[StreamingQuery]]. + * + * @since 2.0.0 + */ + def addListener(listener: StreamingQueryListener): Unit = { + listenerBus.addListener(listener) + } + + /** + * Deregister a [[StreamingQueryListener]]. + * + * @since 2.0.0 + */ + def removeListener(listener: StreamingQueryListener): Unit = { + listenerBus.removeListener(listener) + } + + /** Post a listener event */ + private[sql] def postListenerEvent(event: StreamingQueryListener.Event): Unit = { + listenerBus.post(event) + } + + /** + * Start a [[StreamingQuery]]. + * @param userSpecifiedName Query name optionally specified by the user. + * @param userSpecifiedCheckpointLocation Checkpoint location optionally specified by the user. + * @param df Streaming DataFrame. + * @param sink Sink to write the streaming outputs. + * @param outputMode Output mode for the sink. + * @param useTempCheckpointLocation Whether to use a temporary checkpoint location when the user + * has not specified one. If false, then error will be thrown. + * @param recoverFromCheckpointLocation Whether to recover query from the checkpoint location. + * If false and the checkpoint location exists, then error + * will be thrown. + * @param trigger [[Trigger]] for the query. + * @param triggerClock [[Clock]] to use for the triggering. + */ + private[sql] def startQuery( + userSpecifiedName: Option[String], + userSpecifiedCheckpointLocation: Option[String], + df: DataFrame, + sink: Sink, + outputMode: OutputMode, + useTempCheckpointLocation: Boolean = false, + recoverFromCheckpointLocation: Boolean = true, + trigger: Trigger = ProcessingTime(0), + triggerClock: Clock = new SystemClock()): StreamingQuery = { + activeQueriesLock.synchronized { + val id = StreamExecution.nextId + val name = userSpecifiedName.getOrElse(s"query-$id") + if (activeQueries.values.exists(_.name == name)) { + throw new IllegalArgumentException( + s"Cannot start query with name $name as a query with that name is already active") + } + val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified => + new Path(userSpecified).toUri.toString + }.orElse { + df.sparkSession.conf.get(SQLConf.CHECKPOINT_LOCATION).map { location => + new Path(location, name).toUri.toString + } + }.getOrElse { + if (useTempCheckpointLocation) { + Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath + } else { + throw new AnalysisException( + "checkpointLocation must be specified either " + + """through option("checkpointLocation", ...) or """ + + s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""") + } + } + + // If offsets have already been created, we trying to resume a query. + if (!recoverFromCheckpointLocation) { + val checkpointPath = new Path(checkpointLocation, "offsets") + val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf()) + if (fs.exists(checkpointPath)) { + throw new AnalysisException( + s"This query does not support recovering from checkpoint location. " + + s"Delete $checkpointPath to start over.") + } + } + + val analyzedPlan = df.queryExecution.analyzed + df.queryExecution.assertAnalyzed() + + if (sparkSession.conf.get(SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED)) { + UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) + } + + var nextSourceId = 0L + + val logicalPlan = analyzedPlan.transform { + case StreamingRelation(dataSource, _, output) => + // Materialize source to avoid creating it in every batch + val metadataPath = s"$checkpointLocation/sources/$nextSourceId" + val source = dataSource.createSource(metadataPath) + nextSourceId += 1 + // We still need to use the previous `output` instead of `source.schema` as attributes in + // "df.logicalPlan" has already used attributes of the previous `output`. + StreamingExecutionRelation(source, output) + } + val query = new StreamExecution( + sparkSession, + id, + name, + checkpointLocation, + logicalPlan, + sink, + trigger, + triggerClock, + outputMode) + query.start() + activeQueries.put(id, query) + query + } + } + + /** Notify (by the StreamingQuery) that the query has been terminated */ + private[sql] def notifyQueryTermination(terminatedQuery: StreamingQuery): Unit = { + activeQueriesLock.synchronized { + activeQueries -= terminatedQuery.id + } + awaitTerminationLock.synchronized { + if (lastTerminatedQuery == null || terminatedQuery.exception.nonEmpty) { + lastTerminatedQuery = terminatedQuery + } + awaitTerminationLock.notifyAll() + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala index d3fdbac..55be7a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/Trigger.scala @@ -28,7 +28,7 @@ import org.apache.spark.unsafe.types.CalendarInterval /** * :: Experimental :: - * Used to indicate how often results should be produced by a [[ContinuousQuery]]. + * Used to indicate how often results should be produced by a [[StreamingQuery]]. * * @since 2.0.0 */ @@ -65,7 +65,7 @@ case class ProcessingTime(intervalMs: Long) extends Trigger { /** * :: Experimental :: - * Used to create [[ProcessingTime]] triggers for [[ContinuousQuery]]s. + * Used to create [[ProcessingTime]] triggers for [[StreamingQuery]]s. * * @since 2.0.0 */ http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala deleted file mode 100644 index 8e1de09..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala +++ /dev/null @@ -1,304 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import java.util.concurrent.ConcurrentLinkedQueue - -import org.scalatest.BeforeAndAfter -import org.scalatest.PrivateMethodTester._ -import org.scalatest.concurrent.AsyncAssertions.Waiter -import org.scalatest.concurrent.Eventually._ -import org.scalatest.concurrent.PatienceConfiguration.Timeout -import org.scalatest.time.SpanSugar._ - -import org.apache.spark.SparkException -import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.util.JsonProtocol - - -class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter { - - import testImplicits._ - import ContinuousQueryListener._ - - after { - spark.streams.active.foreach(_.stop()) - assert(spark.streams.active.isEmpty) - assert(addedListeners.isEmpty) - // Make sure we don't leak any events to the next test - spark.sparkContext.listenerBus.waitUntilEmpty(10000) - } - - test("single listener") { - val listener = new QueryStatusCollector - val input = MemoryStream[Int] - withListenerAdded(listener) { - testStream(input.toDS)( - StartStream(), - AssertOnQuery("Incorrect query status in onQueryStarted") { query => - val status = listener.startStatus - assert(status != null) - assert(status.name === query.name) - assert(status.id === query.id) - assert(status.sourceStatuses.size === 1) - assert(status.sourceStatuses(0).description.contains("Memory")) - - // The source and sink offsets must be None as this must be called before the - // batches have started - assert(status.sourceStatuses(0).offsetDesc === None) - assert(status.sinkStatus.offsetDesc === CompositeOffset(None :: Nil).toString) - - // No progress events or termination events - assert(listener.progressStatuses.isEmpty) - assert(listener.terminationStatus === null) - }, - AddDataMemory(input, Seq(1, 2, 3)), - CheckAnswer(1, 2, 3), - AssertOnQuery("Incorrect query status in onQueryProgress") { query => - eventually(Timeout(streamingTimeout)) { - - // There should be only on progress event as batch has been processed - assert(listener.progressStatuses.size === 1) - val status = listener.progressStatuses.peek() - assert(status != null) - assert(status.name === query.name) - assert(status.id === query.id) - assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)) - assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString) - - // No termination events - assert(listener.terminationStatus === null) - } - }, - StopStream, - AssertOnQuery("Incorrect query status in onQueryTerminated") { query => - eventually(Timeout(streamingTimeout)) { - val status = listener.terminationStatus - assert(status != null) - assert(status.name === query.name) - assert(status.id === query.id) - assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)) - assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString) - assert(listener.terminationStackTrace.isEmpty) - assert(listener.terminationException === None) - } - listener.checkAsyncErrors() - } - ) - } - } - - test("adding and removing listener") { - def isListenerActive(listener: QueryStatusCollector): Boolean = { - listener.reset() - testStream(MemoryStream[Int].toDS)( - StartStream(), - StopStream - ) - listener.startStatus != null - } - - try { - val listener1 = new QueryStatusCollector - val listener2 = new QueryStatusCollector - - spark.streams.addListener(listener1) - assert(isListenerActive(listener1) === true) - assert(isListenerActive(listener2) === false) - spark.streams.addListener(listener2) - assert(isListenerActive(listener1) === true) - assert(isListenerActive(listener2) === true) - spark.streams.removeListener(listener1) - assert(isListenerActive(listener1) === false) - assert(isListenerActive(listener2) === true) - } finally { - addedListeners.foreach(spark.streams.removeListener) - } - } - - test("event ordering") { - val listener = new QueryStatusCollector - withListenerAdded(listener) { - for (i <- 1 to 100) { - listener.reset() - require(listener.startStatus === null) - testStream(MemoryStream[Int].toDS)( - StartStream(), - Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"), - StopStream, - Assert { listener.checkAsyncErrors() } - ) - } - } - } - - test("exception should be reported in QueryTerminated") { - val listener = new QueryStatusCollector - withListenerAdded(listener) { - val input = MemoryStream[Int] - testStream(input.toDS.map(_ / 0))( - StartStream(), - AddData(input, 1), - ExpectFailure[SparkException](), - Assert { - spark.sparkContext.listenerBus.waitUntilEmpty(10000) - assert(listener.terminationStatus !== null) - assert(listener.terminationException.isDefined) - assert(listener.terminationException.get.contains("java.lang.ArithmeticException")) - assert(listener.terminationStackTrace.nonEmpty) - } - ) - } - } - - test("QueryStarted serialization") { - val queryStartedInfo = new ContinuousQueryInfo( - "name", - 1, - Seq(new SourceStatus("source1", None), new SourceStatus("source2", None)), - new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString)) - val queryStarted = new ContinuousQueryListener.QueryStarted(queryStartedInfo) - val json = JsonProtocol.sparkEventToJson(queryStarted) - val newQueryStarted = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[ContinuousQueryListener.QueryStarted] - assertContinuousQueryInfoEquals(queryStarted.queryInfo, newQueryStarted.queryInfo) - } - - test("QueryProgress serialization") { - val queryProcessInfo = new ContinuousQueryInfo( - "name", - 1, - Seq( - new SourceStatus("source1", Some(LongOffset(0).toString)), - new SourceStatus("source2", Some(LongOffset(1).toString))), - new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString)) - val queryProcess = new ContinuousQueryListener.QueryProgress(queryProcessInfo) - val json = JsonProtocol.sparkEventToJson(queryProcess) - val newQueryProcess = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[ContinuousQueryListener.QueryProgress] - assertContinuousQueryInfoEquals(queryProcess.queryInfo, newQueryProcess.queryInfo) - } - - test("QueryTerminated serialization") { - val queryTerminatedInfo = new ContinuousQueryInfo( - "name", - 1, - Seq( - new SourceStatus("source1", Some(LongOffset(0).toString)), - new SourceStatus("source2", Some(LongOffset(1).toString))), - new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString)) - val exception = new RuntimeException("exception") - val queryQueryTerminated = new ContinuousQueryListener.QueryTerminated( - queryTerminatedInfo, - Some(exception.getMessage), - exception.getStackTrace) - val json = - JsonProtocol.sparkEventToJson(queryQueryTerminated) - val newQueryTerminated = JsonProtocol.sparkEventFromJson(json) - .asInstanceOf[ContinuousQueryListener.QueryTerminated] - assertContinuousQueryInfoEquals(queryQueryTerminated.queryInfo, newQueryTerminated.queryInfo) - assert(queryQueryTerminated.exception === newQueryTerminated.exception) - } - - private def assertContinuousQueryInfoEquals( - expected: ContinuousQueryInfo, - actual: ContinuousQueryInfo): Unit = { - assert(expected.name === actual.name) - assert(expected.sourceStatuses.size === actual.sourceStatuses.size) - expected.sourceStatuses.zip(actual.sourceStatuses).foreach { - case (expectedSource, actualSource) => - assertSourceStatus(expectedSource, actualSource) - } - assertSinkStatus(expected.sinkStatus, actual.sinkStatus) - } - - private def assertSourceStatus(expected: SourceStatus, actual: SourceStatus): Unit = { - assert(expected.description === actual.description) - assert(expected.offsetDesc === actual.offsetDesc) - } - - private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit = { - assert(expected.description === actual.description) - assert(expected.offsetDesc === actual.offsetDesc) - } - - private def withListenerAdded(listener: ContinuousQueryListener)(body: => Unit): Unit = { - try { - failAfter(1 minute) { - spark.streams.addListener(listener) - body - } - } finally { - spark.streams.removeListener(listener) - } - } - - private def addedListeners(): Array[ContinuousQueryListener] = { - val listenerBusMethod = - PrivateMethod[ContinuousQueryListenerBus]('listenerBus) - val listenerBus = spark.streams invokePrivate listenerBusMethod() - listenerBus.listeners.toArray.map(_.asInstanceOf[ContinuousQueryListener]) - } - - class QueryStatusCollector extends ContinuousQueryListener { - // to catch errors in the async listener events - @volatile private var asyncTestWaiter = new Waiter - - @volatile var startStatus: ContinuousQueryInfo = null - @volatile var terminationStatus: ContinuousQueryInfo = null - @volatile var terminationException: Option[String] = null - @volatile var terminationStackTrace: Seq[StackTraceElement] = null - - val progressStatuses = new ConcurrentLinkedQueue[ContinuousQueryInfo] - - def reset(): Unit = { - startStatus = null - terminationStatus = null - progressStatuses.clear() - asyncTestWaiter = new Waiter - } - - def checkAsyncErrors(): Unit = { - asyncTestWaiter.await(timeout(streamingTimeout)) - } - - - override def onQueryStarted(queryStarted: QueryStarted): Unit = { - asyncTestWaiter { - startStatus = queryStarted.queryInfo - } - } - - override def onQueryProgress(queryProgress: QueryProgress): Unit = { - asyncTestWaiter { - assert(startStatus != null, "onQueryProgress called before onQueryStarted") - progressStatuses.add(queryProgress.queryInfo) - } - } - - override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { - asyncTestWaiter { - assert(startStatus != null, "onQueryTerminated called before onQueryStarted") - terminationStatus = queryTerminated.queryInfo - terminationException = queryTerminated.exception - terminationStackTrace = queryTerminated.stackTrace - } - asyncTestWaiter.dismiss() - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala deleted file mode 100644 index ef2fcbf..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryManagerSuite.scala +++ /dev/null @@ -1,299 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import scala.concurrent.Future -import scala.util.Random -import scala.util.control.NonFatal - -import org.scalatest.BeforeAndAfter -import org.scalatest.concurrent.Eventually._ -import org.scalatest.concurrent.PatienceConfiguration.Timeout -import org.scalatest.time.Span -import org.scalatest.time.SpanSugar._ - -import org.apache.spark.SparkException -import org.apache.spark.sql.Dataset -import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.util.Utils - -class ContinuousQueryManagerSuite extends StreamTest with BeforeAndAfter { - - import AwaitTerminationTester._ - import testImplicits._ - - override val streamingTimeout = 20.seconds - - before { - assert(spark.streams.active.isEmpty) - spark.streams.resetTerminated() - } - - after { - assert(spark.streams.active.isEmpty) - spark.streams.resetTerminated() - } - - testQuietly("listing") { - val (m1, ds1) = makeDataset - val (m2, ds2) = makeDataset - val (m3, ds3) = makeDataset - - withQueriesOn(ds1, ds2, ds3) { queries => - require(queries.size === 3) - assert(spark.streams.active.toSet === queries.toSet) - val (q1, q2, q3) = (queries(0), queries(1), queries(2)) - - assert(spark.streams.get(q1.id).eq(q1)) - assert(spark.streams.get(q2.id).eq(q2)) - assert(spark.streams.get(q3.id).eq(q3)) - assert(spark.streams.get(-1) === null) // non-existent id - q1.stop() - - assert(spark.streams.active.toSet === Set(q2, q3)) - assert(spark.streams.get(q1.id) === null) - assert(spark.streams.get(q2.id).eq(q2)) - - m2.addData(0) // q2 should terminate with error - - eventually(Timeout(streamingTimeout)) { - require(!q2.isActive) - require(q2.exception.isDefined) - } - assert(spark.streams.get(q2.id) === null) - assert(spark.streams.active.toSet === Set(q3)) - } - } - - testQuietly("awaitAnyTermination without timeout and resetTerminated") { - val datasets = Seq.fill(5)(makeDataset._2) - withQueriesOn(datasets: _*) { queries => - require(queries.size === datasets.size) - assert(spark.streams.active.toSet === queries.toSet) - - // awaitAnyTermination should be blocking - testAwaitAnyTermination(ExpectBlocked) - - // Stop a query asynchronously and see if it is reported through awaitAnyTermination - val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false) - testAwaitAnyTermination(ExpectNotBlocked) - require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned - - // All subsequent calls to awaitAnyTermination should be non-blocking - testAwaitAnyTermination(ExpectNotBlocked) - - // Resetting termination should make awaitAnyTermination() blocking again - spark.streams.resetTerminated() - testAwaitAnyTermination(ExpectBlocked) - - // Terminate a query asynchronously with exception and see awaitAnyTermination throws - // the exception - val q2 = stopRandomQueryAsync(100 milliseconds, withError = true) - testAwaitAnyTermination(ExpectException[SparkException]) - require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned - - // All subsequent calls to awaitAnyTermination should throw the exception - testAwaitAnyTermination(ExpectException[SparkException]) - - // Resetting termination should make awaitAnyTermination() blocking again - spark.streams.resetTerminated() - testAwaitAnyTermination(ExpectBlocked) - - // Terminate multiple queries, one with failure and see whether awaitAnyTermination throws - // the exception - val q3 = stopRandomQueryAsync(10 milliseconds, withError = false) - testAwaitAnyTermination(ExpectNotBlocked) - require(!q3.isActive) - val q4 = stopRandomQueryAsync(10 milliseconds, withError = true) - eventually(Timeout(streamingTimeout)) { require(!q4.isActive) } - // After q4 terminates with exception, awaitAnyTerm should start throwing exception - testAwaitAnyTermination(ExpectException[SparkException]) - } - } - - testQuietly("awaitAnyTermination with timeout and resetTerminated") { - val datasets = Seq.fill(6)(makeDataset._2) - withQueriesOn(datasets: _*) { queries => - require(queries.size === datasets.size) - assert(spark.streams.active.toSet === queries.toSet) - - // awaitAnyTermination should be blocking or non-blocking depending on timeout values - testAwaitAnyTermination( - ExpectBlocked, - awaitTimeout = 4 seconds, - expectedReturnedValue = false, - testBehaviorFor = 2 seconds) - - testAwaitAnyTermination( - ExpectNotBlocked, - awaitTimeout = 50 milliseconds, - expectedReturnedValue = false, - testBehaviorFor = 1 second) - - // Stop a query asynchronously within timeout and awaitAnyTerm should be unblocked - val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false) - testAwaitAnyTermination( - ExpectNotBlocked, - awaitTimeout = 2 seconds, - expectedReturnedValue = true, - testBehaviorFor = 4 seconds) - require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned - - // All subsequent calls to awaitAnyTermination should be non-blocking even if timeout is high - testAwaitAnyTermination( - ExpectNotBlocked, awaitTimeout = 4 seconds, expectedReturnedValue = true) - - // Resetting termination should make awaitAnyTermination() blocking again - spark.streams.resetTerminated() - testAwaitAnyTermination( - ExpectBlocked, - awaitTimeout = 4 seconds, - expectedReturnedValue = false, - testBehaviorFor = 1 second) - - // Terminate a query asynchronously with exception within timeout, awaitAnyTermination should - // throws the exception - val q2 = stopRandomQueryAsync(100 milliseconds, withError = true) - testAwaitAnyTermination( - ExpectException[SparkException], - awaitTimeout = 4 seconds, - testBehaviorFor = 6 seconds) - require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned - - // All subsequent calls to awaitAnyTermination should throw the exception - testAwaitAnyTermination( - ExpectException[SparkException], - awaitTimeout = 2 seconds, - testBehaviorFor = 4 seconds) - - // Terminate a query asynchronously outside the timeout, awaitAnyTerm should be blocked - spark.streams.resetTerminated() - val q3 = stopRandomQueryAsync(2 seconds, withError = true) - testAwaitAnyTermination( - ExpectNotBlocked, - awaitTimeout = 100 milliseconds, - expectedReturnedValue = false, - testBehaviorFor = 4 seconds) - - // After that query is stopped, awaitAnyTerm should throw exception - eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait for query to stop - testAwaitAnyTermination( - ExpectException[SparkException], - awaitTimeout = 100 milliseconds, - testBehaviorFor = 4 seconds) - - - // Terminate multiple queries, one with failure and see whether awaitAnyTermination throws - // the exception - spark.streams.resetTerminated() - - val q4 = stopRandomQueryAsync(10 milliseconds, withError = false) - testAwaitAnyTermination( - ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true) - require(!q4.isActive) - val q5 = stopRandomQueryAsync(10 milliseconds, withError = true) - eventually(Timeout(streamingTimeout)) { require(!q5.isActive) } - // After q5 terminates with exception, awaitAnyTerm should start throwing exception - testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2 seconds) - } - } - - - /** Run a body of code by defining a query on each dataset */ - private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[ContinuousQuery] => Unit): Unit = { - failAfter(streamingTimeout) { - val queries = withClue("Error starting queries") { - datasets.zipWithIndex.map { case (ds, i) => - @volatile var query: StreamExecution = null - try { - val df = ds.toDF - val metadataRoot = - Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath - query = - df.writeStream - .format("memory") - .queryName(s"query$i") - .option("checkpointLocation", metadataRoot) - .outputMode("append") - .start() - .asInstanceOf[StreamExecution] - } catch { - case NonFatal(e) => - if (query != null) query.stop() - throw e - } - query - } - } - try { - body(queries) - } finally { - queries.foreach(_.stop()) - } - } - } - - /** Test the behavior of awaitAnyTermination */ - private def testAwaitAnyTermination( - expectedBehavior: ExpectedBehavior, - expectedReturnedValue: Boolean = false, - awaitTimeout: Span = null, - testBehaviorFor: Span = 4 seconds - ): Unit = { - - def awaitTermFunc(): Unit = { - if (awaitTimeout != null && awaitTimeout.toMillis > 0) { - val returnedValue = spark.streams.awaitAnyTermination(awaitTimeout.toMillis) - assert(returnedValue === expectedReturnedValue, "Returned value does not match expected") - } else { - spark.streams.awaitAnyTermination() - } - } - - AwaitTerminationTester.test(expectedBehavior, awaitTermFunc, testBehaviorFor) - } - - /** Stop a random active query either with `stop()` or with an error */ - private def stopRandomQueryAsync(stopAfter: Span, withError: Boolean): ContinuousQuery = { - - import scala.concurrent.ExecutionContext.Implicits.global - - val activeQueries = spark.streams.active - val queryToStop = activeQueries(Random.nextInt(activeQueries.length)) - Future { - Thread.sleep(stopAfter.toMillis) - if (withError) { - logDebug(s"Terminating query ${queryToStop.name} with error") - queryToStop.asInstanceOf[StreamExecution].logicalPlan.collect { - case StreamingExecutionRelation(source, _) => - source.asInstanceOf[MemoryStream[Int]].addData(0) - } - } else { - logDebug(s"Stopping query ${queryToStop.name}") - queryToStop.stop() - } - } - queryToStop - } - - private def makeDataset: (MemoryStream[Int], Dataset[Int]) = { - val inputData = MemoryStream[Int] - val mapped = inputData.toDS.map(6 / _) - (inputData, mapped) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala deleted file mode 100644 index ad6bc27..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import org.scalatest.BeforeAndAfter - -import org.apache.spark.SparkException -import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, MemoryStream, StreamExecution} -import org.apache.spark.util.Utils - - -class ContinuousQuerySuite extends StreamTest with BeforeAndAfter { - - import AwaitTerminationTester._ - import testImplicits._ - - after { - sqlContext.streams.active.foreach(_.stop()) - } - - test("names unique across active queries, ids unique across all started queries") { - val inputData = MemoryStream[Int] - val mapped = inputData.toDS().map { 6 / _} - - def startQuery(queryName: String): ContinuousQuery = { - val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath - val writer = mapped.writeStream - writer - .queryName(queryName) - .format("memory") - .option("checkpointLocation", metadataRoot) - .start() - } - - val q1 = startQuery("q1") - assert(q1.name === "q1") - - // Verify that another query with same name cannot be started - val e1 = intercept[IllegalArgumentException] { - startQuery("q1") - } - Seq("q1", "already active").foreach { s => assert(e1.getMessage.contains(s)) } - - // Verify q1 was unaffected by the above exception and stop it - assert(q1.isActive) - q1.stop() - - // Verify another query can be started with name q1, but will have different id - val q2 = startQuery("q1") - assert(q2.name === "q1") - assert(q2.id !== q1.id) - q2.stop() - } - - testQuietly("lifecycle states and awaitTermination") { - val inputData = MemoryStream[Int] - val mapped = inputData.toDS().map { 6 / _} - - testStream(mapped)( - AssertOnQuery(_.isActive === true), - AssertOnQuery(_.exception.isEmpty), - AddData(inputData, 1, 2), - CheckAnswer(6, 3), - TestAwaitTermination(ExpectBlocked), - TestAwaitTermination(ExpectBlocked, timeoutMs = 2000), - TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, expectedReturnValue = false), - StopStream, - AssertOnQuery(_.isActive === false), - AssertOnQuery(_.exception.isEmpty), - TestAwaitTermination(ExpectNotBlocked), - TestAwaitTermination(ExpectNotBlocked, timeoutMs = 2000, expectedReturnValue = true), - TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, expectedReturnValue = true), - StartStream(), - AssertOnQuery(_.isActive === true), - AddData(inputData, 0), - ExpectFailure[SparkException], - AssertOnQuery(_.isActive === false), - TestAwaitTermination(ExpectException[SparkException]), - TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000), - TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), - AssertOnQuery( - q => - q.exception.get.startOffset.get === q.committedOffsets.toCompositeOffset(Seq(inputData)), - "incorrect start offset on exception") - ) - } - - testQuietly("source and sink statuses") { - val inputData = MemoryStream[Int] - val mapped = inputData.toDS().map(6 / _) - - testStream(mapped)( - AssertOnQuery(_.sourceStatuses.length === 1), - AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === None), - AssertOnQuery(_.sinkStatus.description.contains("Memory")), - AssertOnQuery(_.sinkStatus.offsetDesc === new CompositeOffset(None :: Nil).toString), - AddData(inputData, 1, 2), - CheckAnswer(6, 3), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)), - AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString), - AddData(inputData, 1, 2), - CheckAnswer(6, 3, 6, 3), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(1).toString)), - AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString), - AddData(inputData, 0), - ExpectFailure[SparkException], - AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(2).toString)), - AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString) - ) - } - - /** - * A [[StreamAction]] to test the behavior of `ContinuousQuery.awaitTermination()`. - * - * @param expectedBehavior Expected behavior (not blocked, blocked, or exception thrown) - * @param timeoutMs Timeout in milliseconds - * When timeoutMs <= 0, awaitTermination() is tested (i.e. w/o timeout) - * When timeoutMs > 0, awaitTermination(timeoutMs) is tested - * @param expectedReturnValue Expected return value when awaitTermination(timeoutMs) is used - */ - case class TestAwaitTermination( - expectedBehavior: ExpectedBehavior, - timeoutMs: Int = -1, - expectedReturnValue: Boolean = false - ) extends AssertOnQuery( - TestAwaitTermination.assertOnQueryCondition(expectedBehavior, timeoutMs, expectedReturnValue), - "Error testing awaitTermination behavior" - ) { - override def toString(): String = { - s"TestAwaitTermination($expectedBehavior, timeoutMs = $timeoutMs, " + - s"expectedReturnValue = $expectedReturnValue)" - } - } - - object TestAwaitTermination { - - /** - * Tests the behavior of `ContinuousQuery.awaitTermination`. - * - * @param expectedBehavior Expected behavior (not blocked, blocked, or exception thrown) - * @param timeoutMs Timeout in milliseconds - * When timeoutMs <= 0, awaitTermination() is tested (i.e. w/o timeout) - * When timeoutMs > 0, awaitTermination(timeoutMs) is tested - * @param expectedReturnValue Expected return value when awaitTermination(timeoutMs) is used - */ - def assertOnQueryCondition( - expectedBehavior: ExpectedBehavior, - timeoutMs: Int, - expectedReturnValue: Boolean - )(q: StreamExecution): Boolean = { - - def awaitTermFunc(): Unit = { - if (timeoutMs <= 0) { - q.awaitTermination() - } else { - val returnedValue = q.awaitTermination(timeoutMs) - assert(returnedValue === expectedReturnValue, "Returned value does not match expected") - } - } - AwaitTerminationTester.test(expectedBehavior, awaitTermFunc) - true // If the control reached here, then everything worked as expected - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala index a5acc97..9d0a2b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala @@ -124,7 +124,7 @@ class FileStreamSinkSuite extends StreamTest { val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath - var query: ContinuousQuery = null + var query: StreamingQuery = null try { query = @@ -156,7 +156,7 @@ class FileStreamSinkSuite extends StreamTest { val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath - var query: ContinuousQuery = null + var query: StreamingQuery = null try { query = @@ -240,7 +240,7 @@ class FileStreamSinkSuite extends StreamTest { val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath - var query: ContinuousQuery = null + var query: StreamingQuery = null try { val writer = http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala index 0e157cf..f9e236c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStressSuite.scala @@ -57,7 +57,7 @@ class FileStressSuite extends StreamTest { @volatile var continue = true @volatile - var stream: ContinuousQuery = null + var stream: StreamingQuery = null val writer = new Thread("stream writer") { override def run(): Unit = { @@ -100,7 +100,7 @@ class FileStressSuite extends StreamTest { val input = spark.readStream.format("text").load(inputDir) - def startStream(): ContinuousQuery = { + def startStream(): StreamingQuery = { val output = input .repartition(5) .as[String] @@ -139,7 +139,7 @@ class FileStressSuite extends StreamTest { try { stream.awaitTermination() } catch { - case ce: ContinuousQueryException => + case ce: StreamingQueryException => failures += 1 } } http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index cbfa6ff..720ffaf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -353,7 +353,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case ef: ExpectFailure[_] => verify(currentStream != null, "can not expect failure when stream is not running") try failAfter(streamingTimeout) { - val thrownException = intercept[ContinuousQueryException] { + val thrownException = intercept[StreamingQueryException] { currentStream.awaitTermination() } eventually("microbatch thread not stopped after termination with failure") { @@ -563,7 +563,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case e: ExpectException[_] => val thrownException = withClue(s"Did not throw ${e.t.runtimeClass.getSimpleName} when expected.") { - intercept[ContinuousQueryException] { + intercept[StreamingQueryException] { failAfter(testTimeout) { awaitTermFunc() } http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 7f44227..8681199 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -40,8 +40,6 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { import testImplicits._ - - test("simple count, update mode") { val inputData = MemoryStream[Int] http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala new file mode 100644 index 0000000..7f4d28c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.concurrent.ConcurrentLinkedQueue + +import org.scalatest.BeforeAndAfter +import org.scalatest.PrivateMethodTester._ +import org.scalatest.concurrent.AsyncAssertions.Waiter +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkException +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.util.JsonProtocol + + +class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { + + import testImplicits._ + import StreamingQueryListener._ + + after { + spark.streams.active.foreach(_.stop()) + assert(spark.streams.active.isEmpty) + assert(addedListeners.isEmpty) + // Make sure we don't leak any events to the next test + spark.sparkContext.listenerBus.waitUntilEmpty(10000) + } + + test("single listener") { + val listener = new QueryStatusCollector + val input = MemoryStream[Int] + withListenerAdded(listener) { + testStream(input.toDS)( + StartStream(), + AssertOnQuery("Incorrect query status in onQueryStarted") { query => + val status = listener.startStatus + assert(status != null) + assert(status.name === query.name) + assert(status.id === query.id) + assert(status.sourceStatuses.size === 1) + assert(status.sourceStatuses(0).description.contains("Memory")) + + // The source and sink offsets must be None as this must be called before the + // batches have started + assert(status.sourceStatuses(0).offsetDesc === None) + assert(status.sinkStatus.offsetDesc === CompositeOffset(None :: Nil).toString) + + // No progress events or termination events + assert(listener.progressStatuses.isEmpty) + assert(listener.terminationStatus === null) + }, + AddDataMemory(input, Seq(1, 2, 3)), + CheckAnswer(1, 2, 3), + AssertOnQuery("Incorrect query status in onQueryProgress") { query => + eventually(Timeout(streamingTimeout)) { + + // There should be only on progress event as batch has been processed + assert(listener.progressStatuses.size === 1) + val status = listener.progressStatuses.peek() + assert(status != null) + assert(status.name === query.name) + assert(status.id === query.id) + assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)) + assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString) + + // No termination events + assert(listener.terminationStatus === null) + } + }, + StopStream, + AssertOnQuery("Incorrect query status in onQueryTerminated") { query => + eventually(Timeout(streamingTimeout)) { + val status = listener.terminationStatus + assert(status != null) + assert(status.name === query.name) + assert(status.id === query.id) + assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)) + assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString) + assert(listener.terminationStackTrace.isEmpty) + assert(listener.terminationException === None) + } + listener.checkAsyncErrors() + } + ) + } + } + + test("adding and removing listener") { + def isListenerActive(listener: QueryStatusCollector): Boolean = { + listener.reset() + testStream(MemoryStream[Int].toDS)( + StartStream(), + StopStream + ) + listener.startStatus != null + } + + try { + val listener1 = new QueryStatusCollector + val listener2 = new QueryStatusCollector + + spark.streams.addListener(listener1) + assert(isListenerActive(listener1) === true) + assert(isListenerActive(listener2) === false) + spark.streams.addListener(listener2) + assert(isListenerActive(listener1) === true) + assert(isListenerActive(listener2) === true) + spark.streams.removeListener(listener1) + assert(isListenerActive(listener1) === false) + assert(isListenerActive(listener2) === true) + } finally { + addedListeners.foreach(spark.streams.removeListener) + } + } + + test("event ordering") { + val listener = new QueryStatusCollector + withListenerAdded(listener) { + for (i <- 1 to 100) { + listener.reset() + require(listener.startStatus === null) + testStream(MemoryStream[Int].toDS)( + StartStream(), + Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"), + StopStream, + Assert { listener.checkAsyncErrors() } + ) + } + } + } + + test("exception should be reported in QueryTerminated") { + val listener = new QueryStatusCollector + withListenerAdded(listener) { + val input = MemoryStream[Int] + testStream(input.toDS.map(_ / 0))( + StartStream(), + AddData(input, 1), + ExpectFailure[SparkException](), + Assert { + spark.sparkContext.listenerBus.waitUntilEmpty(10000) + assert(listener.terminationStatus !== null) + assert(listener.terminationException.isDefined) + assert(listener.terminationException.get.contains("java.lang.ArithmeticException")) + assert(listener.terminationStackTrace.nonEmpty) + } + ) + } + } + + test("QueryStarted serialization") { + val queryStartedInfo = new StreamingQueryInfo( + "name", + 1, + Seq(new SourceStatus("source1", None), new SourceStatus("source2", None)), + new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString)) + val queryStarted = new StreamingQueryListener.QueryStarted(queryStartedInfo) + val json = JsonProtocol.sparkEventToJson(queryStarted) + val newQueryStarted = JsonProtocol.sparkEventFromJson(json) + .asInstanceOf[StreamingQueryListener.QueryStarted] + assertStreamingQueryInfoEquals(queryStarted.queryInfo, newQueryStarted.queryInfo) + } + + test("QueryProgress serialization") { + val queryProcessInfo = new StreamingQueryInfo( + "name", + 1, + Seq( + new SourceStatus("source1", Some(LongOffset(0).toString)), + new SourceStatus("source2", Some(LongOffset(1).toString))), + new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString)) + val queryProcess = new StreamingQueryListener.QueryProgress(queryProcessInfo) + val json = JsonProtocol.sparkEventToJson(queryProcess) + val newQueryProcess = JsonProtocol.sparkEventFromJson(json) + .asInstanceOf[StreamingQueryListener.QueryProgress] + assertStreamingQueryInfoEquals(queryProcess.queryInfo, newQueryProcess.queryInfo) + } + + test("QueryTerminated serialization") { + val queryTerminatedInfo = new StreamingQueryInfo( + "name", + 1, + Seq( + new SourceStatus("source1", Some(LongOffset(0).toString)), + new SourceStatus("source2", Some(LongOffset(1).toString))), + new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString)) + val exception = new RuntimeException("exception") + val queryQueryTerminated = new StreamingQueryListener.QueryTerminated( + queryTerminatedInfo, + Some(exception.getMessage), + exception.getStackTrace) + val json = + JsonProtocol.sparkEventToJson(queryQueryTerminated) + val newQueryTerminated = JsonProtocol.sparkEventFromJson(json) + .asInstanceOf[StreamingQueryListener.QueryTerminated] + assertStreamingQueryInfoEquals(queryQueryTerminated.queryInfo, newQueryTerminated.queryInfo) + assert(queryQueryTerminated.exception === newQueryTerminated.exception) + } + + private def assertStreamingQueryInfoEquals( + expected: StreamingQueryInfo, + actual: StreamingQueryInfo): Unit = { + assert(expected.name === actual.name) + assert(expected.sourceStatuses.size === actual.sourceStatuses.size) + expected.sourceStatuses.zip(actual.sourceStatuses).foreach { + case (expectedSource, actualSource) => + assertSourceStatus(expectedSource, actualSource) + } + assertSinkStatus(expected.sinkStatus, actual.sinkStatus) + } + + private def assertSourceStatus(expected: SourceStatus, actual: SourceStatus): Unit = { + assert(expected.description === actual.description) + assert(expected.offsetDesc === actual.offsetDesc) + } + + private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit = { + assert(expected.description === actual.description) + assert(expected.offsetDesc === actual.offsetDesc) + } + + private def withListenerAdded(listener: StreamingQueryListener)(body: => Unit): Unit = { + try { + failAfter(1 minute) { + spark.streams.addListener(listener) + body + } + } finally { + spark.streams.removeListener(listener) + } + } + + private def addedListeners(): Array[StreamingQueryListener] = { + val listenerBusMethod = + PrivateMethod[StreamingQueryListenerBus]('listenerBus) + val listenerBus = spark.streams invokePrivate listenerBusMethod() + listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener]) + } + + class QueryStatusCollector extends StreamingQueryListener { + // to catch errors in the async listener events + @volatile private var asyncTestWaiter = new Waiter + + @volatile var startStatus: StreamingQueryInfo = null + @volatile var terminationStatus: StreamingQueryInfo = null + @volatile var terminationException: Option[String] = null + @volatile var terminationStackTrace: Seq[StackTraceElement] = null + + val progressStatuses = new ConcurrentLinkedQueue[StreamingQueryInfo] + + def reset(): Unit = { + startStatus = null + terminationStatus = null + progressStatuses.clear() + asyncTestWaiter = new Waiter + } + + def checkAsyncErrors(): Unit = { + asyncTestWaiter.await(timeout(streamingTimeout)) + } + + + override def onQueryStarted(queryStarted: QueryStarted): Unit = { + asyncTestWaiter { + startStatus = queryStarted.queryInfo + } + } + + override def onQueryProgress(queryProgress: QueryProgress): Unit = { + asyncTestWaiter { + assert(startStatus != null, "onQueryProgress called before onQueryStarted") + progressStatuses.add(queryProgress.queryInfo) + } + } + + override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { + asyncTestWaiter { + assert(startStatus != null, "onQueryTerminated called before onQueryStarted") + terminationStatus = queryTerminated.queryInfo + terminationException = queryTerminated.exception + terminationStackTrace = queryTerminated.stackTrace + } + asyncTestWaiter.dismiss() + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala new file mode 100644 index 0000000..41ffd56 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.concurrent.Future +import scala.util.Random +import scala.util.control.NonFatal + +import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.PatienceConfiguration.Timeout +import org.scalatest.time.Span +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkException +import org.apache.spark.sql.Dataset +import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.util.Utils + +class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { + + import AwaitTerminationTester._ + import testImplicits._ + + override val streamingTimeout = 20.seconds + + before { + assert(spark.streams.active.isEmpty) + spark.streams.resetTerminated() + } + + after { + assert(spark.streams.active.isEmpty) + spark.streams.resetTerminated() + } + + testQuietly("listing") { + val (m1, ds1) = makeDataset + val (m2, ds2) = makeDataset + val (m3, ds3) = makeDataset + + withQueriesOn(ds1, ds2, ds3) { queries => + require(queries.size === 3) + assert(spark.streams.active.toSet === queries.toSet) + val (q1, q2, q3) = (queries(0), queries(1), queries(2)) + + assert(spark.streams.get(q1.id).eq(q1)) + assert(spark.streams.get(q2.id).eq(q2)) + assert(spark.streams.get(q3.id).eq(q3)) + assert(spark.streams.get(-1) === null) // non-existent id + q1.stop() + + assert(spark.streams.active.toSet === Set(q2, q3)) + assert(spark.streams.get(q1.id) === null) + assert(spark.streams.get(q2.id).eq(q2)) + + m2.addData(0) // q2 should terminate with error + + eventually(Timeout(streamingTimeout)) { + require(!q2.isActive) + require(q2.exception.isDefined) + } + assert(spark.streams.get(q2.id) === null) + assert(spark.streams.active.toSet === Set(q3)) + } + } + + testQuietly("awaitAnyTermination without timeout and resetTerminated") { + val datasets = Seq.fill(5)(makeDataset._2) + withQueriesOn(datasets: _*) { queries => + require(queries.size === datasets.size) + assert(spark.streams.active.toSet === queries.toSet) + + // awaitAnyTermination should be blocking + testAwaitAnyTermination(ExpectBlocked) + + // Stop a query asynchronously and see if it is reported through awaitAnyTermination + val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false) + testAwaitAnyTermination(ExpectNotBlocked) + require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned + + // All subsequent calls to awaitAnyTermination should be non-blocking + testAwaitAnyTermination(ExpectNotBlocked) + + // Resetting termination should make awaitAnyTermination() blocking again + spark.streams.resetTerminated() + testAwaitAnyTermination(ExpectBlocked) + + // Terminate a query asynchronously with exception and see awaitAnyTermination throws + // the exception + val q2 = stopRandomQueryAsync(100 milliseconds, withError = true) + testAwaitAnyTermination(ExpectException[SparkException]) + require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned + + // All subsequent calls to awaitAnyTermination should throw the exception + testAwaitAnyTermination(ExpectException[SparkException]) + + // Resetting termination should make awaitAnyTermination() blocking again + spark.streams.resetTerminated() + testAwaitAnyTermination(ExpectBlocked) + + // Terminate multiple queries, one with failure and see whether awaitAnyTermination throws + // the exception + val q3 = stopRandomQueryAsync(10 milliseconds, withError = false) + testAwaitAnyTermination(ExpectNotBlocked) + require(!q3.isActive) + val q4 = stopRandomQueryAsync(10 milliseconds, withError = true) + eventually(Timeout(streamingTimeout)) { require(!q4.isActive) } + // After q4 terminates with exception, awaitAnyTerm should start throwing exception + testAwaitAnyTermination(ExpectException[SparkException]) + } + } + + testQuietly("awaitAnyTermination with timeout and resetTerminated") { + val datasets = Seq.fill(6)(makeDataset._2) + withQueriesOn(datasets: _*) { queries => + require(queries.size === datasets.size) + assert(spark.streams.active.toSet === queries.toSet) + + // awaitAnyTermination should be blocking or non-blocking depending on timeout values + testAwaitAnyTermination( + ExpectBlocked, + awaitTimeout = 4 seconds, + expectedReturnedValue = false, + testBehaviorFor = 2 seconds) + + testAwaitAnyTermination( + ExpectNotBlocked, + awaitTimeout = 50 milliseconds, + expectedReturnedValue = false, + testBehaviorFor = 1 second) + + // Stop a query asynchronously within timeout and awaitAnyTerm should be unblocked + val q1 = stopRandomQueryAsync(stopAfter = 100 milliseconds, withError = false) + testAwaitAnyTermination( + ExpectNotBlocked, + awaitTimeout = 2 seconds, + expectedReturnedValue = true, + testBehaviorFor = 4 seconds) + require(!q1.isActive) // should be inactive by the time the prev awaitAnyTerm returned + + // All subsequent calls to awaitAnyTermination should be non-blocking even if timeout is high + testAwaitAnyTermination( + ExpectNotBlocked, awaitTimeout = 4 seconds, expectedReturnedValue = true) + + // Resetting termination should make awaitAnyTermination() blocking again + spark.streams.resetTerminated() + testAwaitAnyTermination( + ExpectBlocked, + awaitTimeout = 4 seconds, + expectedReturnedValue = false, + testBehaviorFor = 1 second) + + // Terminate a query asynchronously with exception within timeout, awaitAnyTermination should + // throws the exception + val q2 = stopRandomQueryAsync(100 milliseconds, withError = true) + testAwaitAnyTermination( + ExpectException[SparkException], + awaitTimeout = 4 seconds, + testBehaviorFor = 6 seconds) + require(!q2.isActive) // should be inactive by the time the prev awaitAnyTerm returned + + // All subsequent calls to awaitAnyTermination should throw the exception + testAwaitAnyTermination( + ExpectException[SparkException], + awaitTimeout = 2 seconds, + testBehaviorFor = 4 seconds) + + // Terminate a query asynchronously outside the timeout, awaitAnyTerm should be blocked + spark.streams.resetTerminated() + val q3 = stopRandomQueryAsync(2 seconds, withError = true) + testAwaitAnyTermination( + ExpectNotBlocked, + awaitTimeout = 100 milliseconds, + expectedReturnedValue = false, + testBehaviorFor = 4 seconds) + + // After that query is stopped, awaitAnyTerm should throw exception + eventually(Timeout(streamingTimeout)) { require(!q3.isActive) } // wait for query to stop + testAwaitAnyTermination( + ExpectException[SparkException], + awaitTimeout = 100 milliseconds, + testBehaviorFor = 4 seconds) + + + // Terminate multiple queries, one with failure and see whether awaitAnyTermination throws + // the exception + spark.streams.resetTerminated() + + val q4 = stopRandomQueryAsync(10 milliseconds, withError = false) + testAwaitAnyTermination( + ExpectNotBlocked, awaitTimeout = 2 seconds, expectedReturnedValue = true) + require(!q4.isActive) + val q5 = stopRandomQueryAsync(10 milliseconds, withError = true) + eventually(Timeout(streamingTimeout)) { require(!q5.isActive) } + // After q5 terminates with exception, awaitAnyTerm should start throwing exception + testAwaitAnyTermination(ExpectException[SparkException], awaitTimeout = 2 seconds) + } + } + + + /** Run a body of code by defining a query on each dataset */ + private def withQueriesOn(datasets: Dataset[_]*)(body: Seq[StreamingQuery] => Unit): Unit = { + failAfter(streamingTimeout) { + val queries = withClue("Error starting queries") { + datasets.zipWithIndex.map { case (ds, i) => + @volatile var query: StreamExecution = null + try { + val df = ds.toDF + val metadataRoot = + Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath + query = + df.writeStream + .format("memory") + .queryName(s"query$i") + .option("checkpointLocation", metadataRoot) + .outputMode("append") + .start() + .asInstanceOf[StreamExecution] + } catch { + case NonFatal(e) => + if (query != null) query.stop() + throw e + } + query + } + } + try { + body(queries) + } finally { + queries.foreach(_.stop()) + } + } + } + + /** Test the behavior of awaitAnyTermination */ + private def testAwaitAnyTermination( + expectedBehavior: ExpectedBehavior, + expectedReturnedValue: Boolean = false, + awaitTimeout: Span = null, + testBehaviorFor: Span = 4 seconds + ): Unit = { + + def awaitTermFunc(): Unit = { + if (awaitTimeout != null && awaitTimeout.toMillis > 0) { + val returnedValue = spark.streams.awaitAnyTermination(awaitTimeout.toMillis) + assert(returnedValue === expectedReturnedValue, "Returned value does not match expected") + } else { + spark.streams.awaitAnyTermination() + } + } + + AwaitTerminationTester.test(expectedBehavior, awaitTermFunc, testBehaviorFor) + } + + /** Stop a random active query either with `stop()` or with an error */ + private def stopRandomQueryAsync(stopAfter: Span, withError: Boolean): StreamingQuery = { + + import scala.concurrent.ExecutionContext.Implicits.global + + val activeQueries = spark.streams.active + val queryToStop = activeQueries(Random.nextInt(activeQueries.length)) + Future { + Thread.sleep(stopAfter.toMillis) + if (withError) { + logDebug(s"Terminating query ${queryToStop.name} with error") + queryToStop.asInstanceOf[StreamExecution].logicalPlan.collect { + case StreamingExecutionRelation(source, _) => + source.asInstanceOf[MemoryStream[Int]].addData(0) + } + } else { + logDebug(s"Stopping query ${queryToStop.name}") + queryToStop.stop() + } + } + queryToStop + } + + private def makeDataset: (MemoryStream[Int], Dataset[Int]) = { + val inputData = MemoryStream[Int] + val mapped = inputData.toDS.map(6 / _) + (inputData, mapped) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/9a507199/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala new file mode 100644 index 0000000..9d58315 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkException +import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, MemoryStream, StreamExecution} +import org.apache.spark.util.Utils + + +class StreamingQuerySuite extends StreamTest with BeforeAndAfter { + + import AwaitTerminationTester._ + import testImplicits._ + + after { + sqlContext.streams.active.foreach(_.stop()) + } + + test("names unique across active queries, ids unique across all started queries") { + val inputData = MemoryStream[Int] + val mapped = inputData.toDS().map { 6 / _} + + def startQuery(queryName: String): StreamingQuery = { + val metadataRoot = Utils.createTempDir(namePrefix = "streaming.checkpoint").getCanonicalPath + val writer = mapped.writeStream + writer + .queryName(queryName) + .format("memory") + .option("checkpointLocation", metadataRoot) + .start() + } + + val q1 = startQuery("q1") + assert(q1.name === "q1") + + // Verify that another query with same name cannot be started + val e1 = intercept[IllegalArgumentException] { + startQuery("q1") + } + Seq("q1", "already active").foreach { s => assert(e1.getMessage.contains(s)) } + + // Verify q1 was unaffected by the above exception and stop it + assert(q1.isActive) + q1.stop() + + // Verify another query can be started with name q1, but will have different id + val q2 = startQuery("q1") + assert(q2.name === "q1") + assert(q2.id !== q1.id) + q2.stop() + } + + testQuietly("lifecycle states and awaitTermination") { + val inputData = MemoryStream[Int] + val mapped = inputData.toDS().map { 6 / _} + + testStream(mapped)( + AssertOnQuery(_.isActive === true), + AssertOnQuery(_.exception.isEmpty), + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + TestAwaitTermination(ExpectBlocked), + TestAwaitTermination(ExpectBlocked, timeoutMs = 2000), + TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, expectedReturnValue = false), + StopStream, + AssertOnQuery(_.isActive === false), + AssertOnQuery(_.exception.isEmpty), + TestAwaitTermination(ExpectNotBlocked), + TestAwaitTermination(ExpectNotBlocked, timeoutMs = 2000, expectedReturnValue = true), + TestAwaitTermination(ExpectNotBlocked, timeoutMs = 10, expectedReturnValue = true), + StartStream(), + AssertOnQuery(_.isActive === true), + AddData(inputData, 0), + ExpectFailure[SparkException], + AssertOnQuery(_.isActive === false), + TestAwaitTermination(ExpectException[SparkException]), + TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000), + TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10), + AssertOnQuery( + q => + q.exception.get.startOffset.get === q.committedOffsets.toCompositeOffset(Seq(inputData)), + "incorrect start offset on exception") + ) + } + + testQuietly("source and sink statuses") { + val inputData = MemoryStream[Int] + val mapped = inputData.toDS().map(6 / _) + + testStream(mapped)( + AssertOnQuery(_.sourceStatuses.length === 1), + AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === None), + AssertOnQuery(_.sinkStatus.description.contains("Memory")), + AssertOnQuery(_.sinkStatus.offsetDesc === new CompositeOffset(None :: Nil).toString), + AddData(inputData, 1, 2), + CheckAnswer(6, 3), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)), + AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString), + AddData(inputData, 1, 2), + CheckAnswer(6, 3, 6, 3), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(1).toString)), + AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString), + AddData(inputData, 0), + ExpectFailure[SparkException], + AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(2).toString)), + AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString) + ) + } + + /** + * A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`. + * + * @param expectedBehavior Expected behavior (not blocked, blocked, or exception thrown) + * @param timeoutMs Timeout in milliseconds + * When timeoutMs <= 0, awaitTermination() is tested (i.e. w/o timeout) + * When timeoutMs > 0, awaitTermination(timeoutMs) is tested + * @param expectedReturnValue Expected return value when awaitTermination(timeoutMs) is used + */ + case class TestAwaitTermination( + expectedBehavior: ExpectedBehavior, + timeoutMs: Int = -1, + expectedReturnValue: Boolean = false + ) extends AssertOnQuery( + TestAwaitTermination.assertOnQueryCondition(expectedBehavior, timeoutMs, expectedReturnValue), + "Error testing awaitTermination behavior" + ) { + override def toString(): String = { + s"TestAwaitTermination($expectedBehavior, timeoutMs = $timeoutMs, " + + s"expectedReturnValue = $expectedReturnValue)" + } + } + + object TestAwaitTermination { + + /** + * Tests the behavior of `StreamingQuery.awaitTermination`. + * + * @param expectedBehavior Expected behavior (not blocked, blocked, or exception thrown) + * @param timeoutMs Timeout in milliseconds + * When timeoutMs <= 0, awaitTermination() is tested (i.e. w/o timeout) + * When timeoutMs > 0, awaitTermination(timeoutMs) is tested + * @param expectedReturnValue Expected return value when awaitTermination(timeoutMs) is used + */ + def assertOnQueryCondition( + expectedBehavior: ExpectedBehavior, + timeoutMs: Int, + expectedReturnValue: Boolean + )(q: StreamExecution): Boolean = { + + def awaitTermFunc(): Unit = { + if (timeoutMs <= 0) { + q.awaitTermination() + } else { + val returnedValue = q.awaitTermination(timeoutMs) + assert(returnedValue === expectedReturnValue, "Returned value does not match expected") + } + } + AwaitTerminationTester.test(expectedBehavior, awaitTermFunc) + true // If the control reached here, then everything worked as expected + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org