Repository: spark Updated Branches: refs/heads/branch-2.0 a0d9015b3 -> 881e0eb05
http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 984b84f..06f1bd6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -74,6 +74,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth // Verify state after updating put(store, "a", 1) + assert(store.numKeys() === 1) intercept[IllegalStateException] { store.iterator() } @@ -85,7 +86,9 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth // Make updates, commit and then verify state put(store, "b", 2) put(store, "aa", 3) + assert(store.numKeys() === 3) remove(store, _.startsWith("a")) + assert(store.numKeys() === 1) assert(store.commit() === 1) assert(store.hasCommitted) @@ -107,7 +110,9 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth val reloadedProvider = new HDFSBackedStateStoreProvider( store.id, keySchema, valueSchema, StateStoreConf.empty, new Configuration) val reloadedStore = reloadedProvider.getStore(1) + assert(reloadedStore.numKeys() === 1) put(reloadedStore, "c", 4) + assert(reloadedStore.numKeys() === 2) assert(reloadedStore.commit() === 2) assert(rowsToSet(reloadedStore.iterator()) === Set("b" -> 2, "c" -> 4)) assert(getDataFromFiles(provider) === Set("b" -> 2, "c" -> 4)) http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 55c95ae..c18d843 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -899,6 +899,20 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } } + + test("input row metrics") { + withTempDirs { case (src, tmp) => + val input = spark.readStream.format("text").load(src.getCanonicalPath) + testStream(input)( + AddTextFileData("100", src, tmp), + CheckAnswer("100"), + AssertOnLastQueryStatus { status => + assert(status.triggerDetails.get("numRows.input.total") === "1") + assert(status.sourceStatuses(0).processingRate > 0.0) + } + ) + } + } } class FileStreamSourceStressTestSuite extends FileStreamSourceTest { http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index d4c64e4..be24cbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -28,6 +28,8 @@ import scala.util.control.NonFatal import org.scalatest.Assertions import org.scalatest.concurrent.{Eventually, Timeouts} +import org.scalatest.concurrent.AsyncAssertions.Waiter +import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.Span @@ -38,6 +40,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.streaming._ +import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} @@ -193,6 +196,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } } + case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit) + extends StreamAction + + /** * Executes the specified actions on the given streaming DataFrame and provides helpful * error messages in the case of failures or incorrect answers. @@ -294,9 +301,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { val testThread = Thread.currentThread() val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath + val statusCollector = new QueryStatusCollector try { + spark.streams.addListener(statusCollector) startedTest.foreach { action => + logInfo(s"Processing test stream action: $action") action match { case StartStream(trigger, triggerClock) => verify(currentStream == null, "stream already running") @@ -394,6 +404,13 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { val streamToAssert = Option(currentStream).getOrElse(lastStream) verify({ a.run(); true }, s"Assert failed: ${a.message}") + case a: AssertOnLastQueryStatus => + Eventually.eventually(timeout(streamingTimeout)) { + require(statusCollector.lastTriggerStatus.nonEmpty) + } + val status = statusCollector.lastTriggerStatus.get + verify({ a.condition(status); true }, "Assert on last query status failed") + case a: AddData => try { // Add data and get the source where it was added, and the expected offset of the @@ -465,6 +482,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { if (currentStream != null && currentStream.microBatchThread.isAlive) { currentStream.stop() } + spark.streams.removeListener(statusCollector) } } @@ -598,4 +616,58 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } } } + + + class QueryStatusCollector extends StreamingQueryListener { + // to catch errors in the async listener events + @volatile private var asyncTestWaiter = new Waiter + + @volatile var startStatus: StreamingQueryStatus = null + @volatile var terminationStatus: StreamingQueryStatus = null + @volatile var terminationException: Option[String] = null + + private val progressStatuses = new mutable.ArrayBuffer[StreamingQueryStatus] + + /** Get the info of the last trigger that processed data */ + def lastTriggerStatus: Option[StreamingQueryStatus] = synchronized { + progressStatuses.filter { i => + i.triggerDetails.get("isTriggerActive").toBoolean == false && + i.triggerDetails.get("isDataPresentInTrigger").toBoolean == true + }.lastOption + } + + def reset(): Unit = { + startStatus = null + terminationStatus = null + progressStatuses.clear() + asyncTestWaiter = new Waiter + } + + def checkAsyncErrors(): Unit = { + asyncTestWaiter.await(timeout(10 seconds)) + } + + + override def onQueryStarted(queryStarted: QueryStarted): Unit = { + asyncTestWaiter { + startStatus = queryStarted.queryStatus + } + } + + override def onQueryProgress(queryProgress: QueryProgress): Unit = { + asyncTestWaiter { + assert(startStatus != null, "onQueryProgress called before onQueryStarted") + synchronized { progressStatuses += queryProgress.queryStatus } + } + } + + override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { + asyncTestWaiter { + assert(startStatus != null, "onQueryTerminated called before onQueryStarted") + terminationStatus = queryTerminated.queryStatus + terminationException = queryTerminated.exception + } + asyncTestWaiter.dismiss() + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala index 8681199..e59b549 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.InternalOutputModes._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStore import org.apache.spark.sql.expressions.scalalang.typed @@ -129,6 +130,59 @@ class StreamingAggregationSuite extends StreamTest with BeforeAndAfterAll { ) } + test("state metrics") { + val inputData = MemoryStream[Int] + + val aggregated = + inputData.toDS() + .flatMap(x => Seq(x, x + 1)) + .toDF("value") + .groupBy($"value") + .agg(count("*")) + .as[(Int, Long)] + + implicit class RichStreamExecution(query: StreamExecution) { + def stateNodes: Seq[SparkPlan] = { + query.lastExecution.executedPlan.collect { + case p if p.isInstanceOf[StateStoreSaveExec] => p + } + } + } + + // Test with Update mode + testStream(aggregated, Update)( + AddData(inputData, 1), + CheckLastBatch((1, 1), (2, 1)), + AssertOnQuery { _.stateNodes.size === 1 }, + AssertOnQuery { _.stateNodes.head.metrics.get("numOutputRows").get.value === 2 }, + AssertOnQuery { _.stateNodes.head.metrics.get("numUpdatedStateRows").get.value === 2 }, + AssertOnQuery { _.stateNodes.head.metrics.get("numTotalStateRows").get.value === 2 }, + AddData(inputData, 2, 3), + CheckLastBatch((2, 2), (3, 2), (4, 1)), + AssertOnQuery { _.stateNodes.size === 1 }, + AssertOnQuery { _.stateNodes.head.metrics.get("numOutputRows").get.value === 3 }, + AssertOnQuery { _.stateNodes.head.metrics.get("numUpdatedStateRows").get.value === 3 }, + AssertOnQuery { _.stateNodes.head.metrics.get("numTotalStateRows").get.value === 4 } + ) + + // Test with Complete mode + inputData.reset() + testStream(aggregated, Complete)( + AddData(inputData, 1), + CheckLastBatch((1, 1), (2, 1)), + AssertOnQuery { _.stateNodes.size === 1 }, + AssertOnQuery { _.stateNodes.head.metrics.get("numOutputRows").get.value === 2 }, + AssertOnQuery { _.stateNodes.head.metrics.get("numUpdatedStateRows").get.value === 2 }, + AssertOnQuery { _.stateNodes.head.metrics.get("numTotalStateRows").get.value === 2 }, + AddData(inputData, 2, 3), + CheckLastBatch((1, 1), (2, 2), (3, 2), (4, 1)), + AssertOnQuery { _.stateNodes.size === 1 }, + AssertOnQuery { _.stateNodes.head.metrics.get("numOutputRows").get.value === 4 }, + AssertOnQuery { _.stateNodes.head.metrics.get("numUpdatedStateRows").get.value === 3 }, + AssertOnQuery { _.stateNodes.head.metrics.get("numTotalStateRows").get.value === 4 } + ) + } + test("multiple keys") { val inputData = MemoryStream[Int] http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 831543a..9e0eefb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -17,92 +17,97 @@ package org.apache.spark.sql.streaming -import java.util.concurrent.ConcurrentLinkedQueue - +import org.scalactic.TolerantNumerics import org.scalatest.BeforeAndAfter import org.scalatest.PrivateMethodTester._ -import org.scalatest.concurrent.AsyncAssertions.Waiter -import org.scalatest.concurrent.Eventually._ -import org.scalatest.concurrent.PatienceConfiguration.Timeout -import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.util.JsonProtocol +import org.apache.spark.sql.functions._ +import org.apache.spark.util.{JsonProtocol, ManualClock} class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { import testImplicits._ - import StreamingQueryListener._ + import StreamingQueryListenerSuite._ + + // To make === between double tolerate inexact values + implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01) after { spark.streams.active.foreach(_.stop()) assert(spark.streams.active.isEmpty) assert(addedListeners.isEmpty) // Make sure we don't leak any events to the next test - spark.sparkContext.listenerBus.waitUntilEmpty(10000) } - test("single listener") { - val listener = new QueryStatusCollector - val input = MemoryStream[Int] - withListenerAdded(listener) { - testStream(input.toDS)( - StartStream(), - AssertOnQuery("Incorrect query status in onQueryStarted") { query => - val status = listener.startStatus - assert(status != null) - assert(status.name === query.name) - assert(status.id === query.id) - assert(status.sourceStatuses.size === 1) - assert(status.sourceStatuses(0).description.contains("Memory")) - - // The source and sink offsets must be None as this must be called before the - // batches have started - assert(status.sourceStatuses(0).offsetDesc === None) - assert(status.sinkStatus.offsetDesc === CompositeOffset(None :: Nil).toString) - - // No progress events or termination events - assert(listener.progressStatuses.isEmpty) - assert(listener.terminationStatus === null) - true - }, - AddDataMemory(input, Seq(1, 2, 3)), - CheckAnswer(1, 2, 3), - AssertOnQuery("Incorrect query status in onQueryProgress") { query => - eventually(Timeout(streamingTimeout)) { + ignore("single listener, check trigger statuses") { + import StreamingQueryListenerSuite._ + clock = new ManualClock() + + /** Custom MemoryStream that waits for manual clock to reach a time */ + val inputData = new MemoryStream[Int](0, sqlContext) { + // Wait for manual clock to be 100 first time there is data + override def getOffset: Option[Offset] = { + val offset = super.getOffset + if (offset.nonEmpty) { + clock.waitTillTime(100) + } + offset + } - // There should be only on progress event as batch has been processed - assert(listener.progressStatuses.size === 1) - val status = listener.progressStatuses.peek() - assert(status != null) - assert(status.name === query.name) - assert(status.id === query.id) - assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)) - assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString) + // Wait for manual clock to be 300 first time there is data + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + clock.waitTillTime(300) + super.getBatch(start, end) + } + } - // No termination events - assert(listener.terminationStatus === null) - } - true - }, - StopStream, - AssertOnQuery("Incorrect query status in onQueryTerminated") { query => - eventually(Timeout(streamingTimeout)) { - val status = listener.terminationStatus - assert(status != null) - assert(status.name === query.name) - assert(status.id === query.id) - assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)) - assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString) - assert(listener.terminationException === None) - } - listener.checkAsyncErrors() - true - } - ) + // This is to make sure thatquery waits for manual clock to be 600 first time there is data + val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x => + clock.waitTillTime(600) + x } + + testStream(mapped, OutputMode.Complete)( + StartStream(triggerClock = clock), + AddData(inputData, 1, 2), + AdvanceManualClock(100), // unblock getOffset, will block on getBatch + AdvanceManualClock(200), // unblock getBatch, will block on computation + AdvanceManualClock(300), // unblock computation + AssertOnQuery { _ => clock.getTimeMillis() === 600 }, + AssertOnLastQueryStatus { status: StreamingQueryStatus => + // Check the correctness of the trigger info of the last completed batch reported by + // onQueryProgress + assert(status.triggerDetails.get("triggerId") == "0") + assert(status.triggerDetails.get("isTriggerActive") === "false") + assert(status.triggerDetails.get("isDataPresentInTrigger") === "true") + + assert(status.triggerDetails.get("timestamp.triggerStart") === "0") + assert(status.triggerDetails.get("timestamp.afterGetOffset") === "100") + assert(status.triggerDetails.get("timestamp.afterGetBatch") === "300") + assert(status.triggerDetails.get("timestamp.triggerFinish") === "600") + + assert(status.triggerDetails.get("latency.getOffset.total") === "100") + assert(status.triggerDetails.get("latency.getBatch.total") === "200") + assert(status.triggerDetails.get("latency.optimizer") === "0") + assert(status.triggerDetails.get("latency.offsetLogWrite") === "0") + assert(status.triggerDetails.get("latency.fullTrigger") === "600") + + assert(status.triggerDetails.get("numRows.input.total") === "2") + assert(status.triggerDetails.get("numRows.state.aggregation1.total") === "1") + assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1") + + assert(status.sourceStatuses.length === 1) + assert(status.sourceStatuses(0).triggerDetails.get("triggerId") === "0") + assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100") + assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200") + assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2") + }, + CheckAnswer(2) + ) } test("adding and removing listener") { @@ -172,56 +177,37 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } test("QueryStarted serialization") { - val queryStartedInfo = new StreamingQueryInfo( - "name", - 1, - Seq(new SourceStatus("source1", None), new SourceStatus("source2", None)), - new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString)) - val queryStarted = new StreamingQueryListener.QueryStarted(queryStartedInfo) + val queryStarted = new StreamingQueryListener.QueryStarted(StreamingQueryStatus.testStatus) val json = JsonProtocol.sparkEventToJson(queryStarted) val newQueryStarted = JsonProtocol.sparkEventFromJson(json) .asInstanceOf[StreamingQueryListener.QueryStarted] - assertStreamingQueryInfoEquals(queryStarted.queryInfo, newQueryStarted.queryInfo) + assertStreamingQueryInfoEquals(queryStarted.queryStatus, newQueryStarted.queryStatus) } test("QueryProgress serialization") { - val queryProcessInfo = new StreamingQueryInfo( - "name", - 1, - Seq( - new SourceStatus("source1", Some(LongOffset(0).toString)), - new SourceStatus("source2", Some(LongOffset(1).toString))), - new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString)) - val queryProcess = new StreamingQueryListener.QueryProgress(queryProcessInfo) + val queryProcess = new StreamingQueryListener.QueryProgress(StreamingQueryStatus.testStatus) val json = JsonProtocol.sparkEventToJson(queryProcess) val newQueryProcess = JsonProtocol.sparkEventFromJson(json) .asInstanceOf[StreamingQueryListener.QueryProgress] - assertStreamingQueryInfoEquals(queryProcess.queryInfo, newQueryProcess.queryInfo) + assertStreamingQueryInfoEquals(queryProcess.queryStatus, newQueryProcess.queryStatus) } test("QueryTerminated serialization") { - val queryTerminatedInfo = new StreamingQueryInfo( - "name", - 1, - Seq( - new SourceStatus("source1", Some(LongOffset(0).toString)), - new SourceStatus("source2", Some(LongOffset(1).toString))), - new SinkStatus("sink", new CompositeOffset(Array(None, Some(LongOffset(1)))).toString)) val exception = new RuntimeException("exception") val queryQueryTerminated = new StreamingQueryListener.QueryTerminated( - queryTerminatedInfo, + StreamingQueryStatus.testStatus, Some(exception.getMessage)) val json = JsonProtocol.sparkEventToJson(queryQueryTerminated) val newQueryTerminated = JsonProtocol.sparkEventFromJson(json) .asInstanceOf[StreamingQueryListener.QueryTerminated] - assertStreamingQueryInfoEquals(queryQueryTerminated.queryInfo, newQueryTerminated.queryInfo) + assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, newQueryTerminated.queryStatus) assert(queryQueryTerminated.exception === newQueryTerminated.exception) } private def assertStreamingQueryInfoEquals( - expected: StreamingQueryInfo, - actual: StreamingQueryInfo): Unit = { + expected: StreamingQueryStatus, + actual: StreamingQueryStatus): Unit = { assert(expected.name === actual.name) assert(expected.sourceStatuses.size === actual.sourceStatuses.size) expected.sourceStatuses.zip(actual.sourceStatuses).foreach { @@ -243,7 +229,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { private def withListenerAdded(listener: StreamingQueryListener)(body: => Unit): Unit = { try { - failAfter(1 minute) { + failAfter(streamingTimeout) { spark.streams.addListener(listener) body } @@ -258,49 +244,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { val listenerBus = spark.streams invokePrivate listenerBusMethod() listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener]) } +} - class QueryStatusCollector extends StreamingQueryListener { - // to catch errors in the async listener events - @volatile private var asyncTestWaiter = new Waiter - - @volatile var startStatus: StreamingQueryInfo = null - @volatile var terminationStatus: StreamingQueryInfo = null - @volatile var terminationException: Option[String] = null - - val progressStatuses = new ConcurrentLinkedQueue[StreamingQueryInfo] - - def reset(): Unit = { - startStatus = null - terminationStatus = null - progressStatuses.clear() - asyncTestWaiter = new Waiter - } - - def checkAsyncErrors(): Unit = { - asyncTestWaiter.await(timeout(streamingTimeout)) - } - - - override def onQueryStarted(queryStarted: QueryStarted): Unit = { - asyncTestWaiter { - startStatus = queryStarted.queryInfo - } - } - - override def onQueryProgress(queryProgress: QueryProgress): Unit = { - asyncTestWaiter { - assert(startStatus != null, "onQueryProgress called before onQueryStarted") - progressStatuses.add(queryProgress.queryInfo) - } - } - - override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { - asyncTestWaiter { - assert(startStatus != null, "onQueryTerminated called before onQueryStarted") - terminationStatus = queryTerminated.queryInfo - terminationException = queryTerminated.exception - } - asyncTestWaiter.dismiss() - } - } +object StreamingQueryListenerSuite { + // Singleton reference to clock that does not get serialized in task closures + @volatile var clock: ManualClock = null } http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 88f1f18..9f8e2db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -17,18 +17,27 @@ package org.apache.spark.sql.streaming +import org.scalactic.TolerantNumerics +import org.scalatest.concurrent.Eventually._ import org.scalatest.BeforeAndAfter +import org.apache.spark.internal.Logging +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.streaming.StreamingQueryListener._ +import org.apache.spark.sql.types.StructType import org.apache.spark.SparkException -import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, MemoryStream, StreamExecution} +import org.apache.spark.sql.execution.streaming._ import org.apache.spark.util.Utils -class StreamingQuerySuite extends StreamTest with BeforeAndAfter { +class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { import AwaitTerminationTester._ import testImplicits._ + // To make === between double tolerate inexact values + implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01) + after { sqlContext.streams.active.foreach(_.stop()) } @@ -100,31 +109,145 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } - testQuietly("source and sink statuses") { + testQuietly("query statuses") { val inputData = MemoryStream[Int] val mapped = inputData.toDS().map(6 / _) - testStream(mapped)( - AssertOnQuery(_.sourceStatuses.length === 1), + AssertOnQuery(q => q.status.name === q.name), + AssertOnQuery(q => q.status.id === q.id), + AssertOnQuery(_.status.timestamp <= System.currentTimeMillis), + AssertOnQuery(_.status.inputRate === 0.0), + AssertOnQuery(_.status.processingRate === 0.0), + AssertOnQuery(_.status.sourceStatuses.length === 1), + AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")), + AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === "-"), + AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0), + AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0), + AssertOnQuery(_.status.sinkStatus.description.contains("Memory")), + AssertOnQuery(_.status.sinkStatus.offsetDesc === CompositeOffset(None :: Nil).toString), AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === None), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === "-"), + AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0), + AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0), AssertOnQuery(_.sinkStatus.description.contains("Memory")), AssertOnQuery(_.sinkStatus.offsetDesc === new CompositeOffset(None :: Nil).toString), + AddData(inputData, 1, 2), CheckAnswer(6, 3), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString)), + AssertOnQuery(_.status.timestamp <= System.currentTimeMillis), + AssertOnQuery(_.status.inputRate >= 0.0), + AssertOnQuery(_.status.processingRate >= 0.0), + AssertOnQuery(_.status.sourceStatuses.length === 1), + AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")), + AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(0).toString), + AssertOnQuery(_.status.sourceStatuses(0).inputRate >= 0.0), + AssertOnQuery(_.status.sourceStatuses(0).processingRate >= 0.0), + AssertOnQuery(_.status.sinkStatus.description.contains("Memory")), + AssertOnQuery(_.status.sinkStatus.offsetDesc === + CompositeOffset.fill(LongOffset(0)).toString), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(0).toString), + AssertOnQuery(_.sourceStatuses(0).inputRate >= 0.0), + AssertOnQuery(_.sourceStatuses(0).processingRate >= 0.0), AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString), + AddData(inputData, 1, 2), CheckAnswer(6, 3, 6, 3), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(1).toString)), + AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).toString), + AssertOnQuery(_.status.sinkStatus.offsetDesc === + CompositeOffset.fill(LongOffset(1)).toString), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).toString), AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString), + + StopStream, + AssertOnQuery(_.status.inputRate === 0.0), + AssertOnQuery(_.status.processingRate === 0.0), + AssertOnQuery(_.status.sourceStatuses.length === 1), + AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).toString), + AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0), + AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0), + AssertOnQuery(_.status.sinkStatus.offsetDesc === + CompositeOffset.fill(LongOffset(1)).toString), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).toString), + AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0), + AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0), + AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString), + AssertOnQuery(_.status.triggerDetails.isEmpty), + + StartStream(), AddData(inputData, 0), ExpectFailure[SparkException], - AssertOnQuery(_.sourceStatuses(0).offsetDesc === Some(LongOffset(2).toString)), + AssertOnQuery(_.status.inputRate === 0.0), + AssertOnQuery(_.status.processingRate === 0.0), + AssertOnQuery(_.status.sourceStatuses.length === 1), + AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(2).toString), + AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0), + AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0), + AssertOnQuery(_.status.sinkStatus.offsetDesc === + CompositeOffset.fill(LongOffset(1)).toString), + AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(2).toString), + AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0), + AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0), AssertOnQuery(_.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(1)).toString) ) } + test("codahale metrics") { + val inputData = MemoryStream[Int] + + /** Whether metrics of a query is registered for reporting */ + def isMetricsRegistered(query: StreamingQuery): Boolean = { + val sourceName = s"StructuredStreaming.${query.name}" + val sources = spark.sparkContext.env.metricsSystem.getSourcesByName(sourceName) + require(sources.size <= 1) + sources.nonEmpty + } + // Disabled by default + assert(spark.conf.get("spark.sql.streaming.metricsEnabled").toBoolean === false) + + withSQLConf("spark.sql.streaming.metricsEnabled" -> "false") { + testStream(inputData.toDF)( + AssertOnQuery { q => !isMetricsRegistered(q) }, + StopStream, + AssertOnQuery { q => !isMetricsRegistered(q) } + ) + } + + // Registered when enabled + withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") { + testStream(inputData.toDF)( + AssertOnQuery { q => isMetricsRegistered(q) }, + StopStream, + AssertOnQuery { q => !isMetricsRegistered(q) } + ) + } + } + + test("input row calculation with mixed batch and streaming sources") { + val streamingTriggerDF = spark.createDataset(1 to 10).toDF + val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value") + val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> "2")).toDF("value", "anotherValue") + + // Trigger input has 10 rows, static input has 2 rows, + // therefore after the first trigger, the calculated input rows should be 10 + val status = getFirstTriggerStatus(streamingInputDF.join(staticInputDF, "value")) + assert(status.triggerDetails.get("numRows.input.total") === "10") + assert(status.sourceStatuses.size === 1) + assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "10") + } + + test("input row calculation with trigger DF having multiple leaves") { + val streamingTriggerDF = + spark.createDataset(1 to 5).toDF.union(spark.createDataset(6 to 10).toDF) + require(streamingTriggerDF.logicalPlan.collectLeaves().size > 1) + val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF) + + // After the first trigger, the calculated input rows should be 10 + val status = getFirstTriggerStatus(streamingInputDF) + assert(status.triggerDetails.get("numRows.input.total") === "10") + assert(status.sourceStatuses.size === 1) + assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "10") + } + testQuietly("StreamExecution metadata garbage collection") { val inputData = MemoryStream[Int] val mapped = inputData.toDS().map(6 / _) @@ -149,6 +272,45 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter { ) } + /** Create a streaming DF that only execute one batch in which it returns the given static DF */ + private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame = { + require(!triggerDF.isStreaming) + // A streaming Source that generate only on trigger and returns the given Dataframe as batch + val source = new Source() { + override def schema: StructType = triggerDF.schema + override def getOffset: Option[Offset] = Some(LongOffset(0)) + override def getBatch(start: Option[Offset], end: Offset): DataFrame = triggerDF + override def stop(): Unit = {} + } + StreamingExecutionRelation(source) + } + + /** Returns the query status at the end of the first trigger of streaming DF */ + private def getFirstTriggerStatus(streamingDF: DataFrame): StreamingQueryStatus = { + // A StreamingQueryListener that gets the query status after the first completed trigger + val listener = new StreamingQueryListener { + @volatile var firstStatus: StreamingQueryStatus = null + override def onQueryStarted(queryStarted: QueryStarted): Unit = { } + override def onQueryProgress(queryProgress: QueryProgress): Unit = { + if (firstStatus == null) firstStatus = queryProgress.queryStatus + } + override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = { } + } + + try { + spark.streams.addListener(listener) + val q = streamingDF.writeStream.format("memory").queryName("test").start() + q.processAllAvailable() + eventually(timeout(streamingTimeout)) { + assert(listener.firstStatus != null) + } + listener.firstStatus + } finally { + spark.streams.active.map(_.stop()) + spark.streams.removeListener(listener) + } + } + /** * A [[StreamAction]] to test the behavior of `StreamingQuery.awaitTermination()`. * --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org