Repository: spark Updated Branches: refs/heads/master 9a02f6821 -> c3d08e2f2
http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala new file mode 100644 index 0000000..7129fa4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.{util => ju} +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import org.apache.jute.compiler.JLong +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. + */ +@Experimental +class StateOperatorProgress private[sql]( + val numRowsTotal: Long, + val numRowsUpdated: Long) { + private[sql] def jsonValue: JValue = { + ("numRowsTotal" -> JInt(numRowsTotal)) ~ + ("numRowsUpdated" -> JInt(numRowsUpdated)) + } +} + +/** + * :: Experimental :: + * Information about progress made in the execution of a [[StreamingQuery]] during + * a trigger. Each event relates to processing done for a single trigger of the streaming + * query. Events are emitted even when no new data is available to be processed. + * + * @param id A unique id of the query. + * @param name Name of the query. This name is unique across all active queries. + * @param timestamp Timestamp (ms) of the beginning of the trigger. + * @param batchId A unique id for the current batch of data being processed. Note that in the + * case of retries after a failure a given batchId my be executed more than once. + * Similarly, when there is no data to be processed, the batchId will not be + * incremented. + * @param durationMs The amount of time taken to perform various operations in milliseconds. + * @param currentWatermark The current event time watermark in milliseconds + * @param stateOperators Information about operators in the query that store state. + * @param sources detailed statistics on data being read from each of the streaming sources. + * @since 2.1.0 + */ +@Experimental +class StreamingQueryProgress private[sql]( + val id: UUID, + val name: String, + val timestamp: Long, + val batchId: Long, + val durationMs: ju.Map[String, java.lang.Long], + val currentWatermark: Long, + val stateOperators: Array[StateOperatorProgress], + val sources: Array[SourceProgress], + val sink: SinkProgress) { + + /** The aggregate (across all sources) number of records processed in a trigger. */ + def numInputRows: Long = sources.map(_.numInputRows).sum + + /** The aggregate (across all sources) rate of data arriving. */ + def inputRowsPerSecond: Double = sources.map(_.inputRowsPerSecond).sum + + /** The aggregate (across all sources) rate at which Spark is processing data. */ + def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum + + /** The compact JSON representation of this status. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this status. */ + def prettyJson: String = pretty(render(jsonValue)) + + override def toString: String = prettyJson + + private[sql] def jsonValue: JValue = { + def safeDoubleToJValue(value: Double): JValue = { + if (value.isNaN || value.isInfinity) JNothing else JDouble(value) + } + + ("id" -> JString(id.toString)) ~ + ("name" -> JString(name)) ~ + ("timestamp" -> JInt(timestamp)) ~ + ("numInputRows" -> JInt(numInputRows)) ~ + ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ + ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ + ("durationMs" -> durationMs + .asScala + .map { case (k, v) => k -> JInt(v.toLong): JObject } + .reduce(_ ~ _)) ~ + ("currentWatermark" -> JInt(currentWatermark)) ~ + ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~ + ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~ + ("sink" -> sink.jsonValue) + + } +} + +/** + * :: Experimental :: + * Information about progress made for a source in the execution of a [[StreamingQuery]] + * during a trigger. See [[StreamingQueryProgress]] for more information. + * + * @param description Description of the source. + * @param startOffset The starting offset for data being read. + * @param endOffset The ending offset for data being read. + * @param numInputRows The number of records read from this source. + * @param inputRowsPerSecond The rate at which data is arriving from this source. + * @param processedRowsPerSecond The rate at which data from this source is being procressed by + * Spark. + * @since 2.1.0 + */ +@Experimental +class SourceProgress protected[sql]( + val description: String, + val startOffset: String, + val endOffset: String, + val numInputRows: Long, + val inputRowsPerSecond: Double, + val processedRowsPerSecond: Double) { + + /** The compact JSON representation of this progress. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this progress. */ + def prettyJson: String = pretty(render(jsonValue)) + + override def toString: String = prettyJson + + private[sql] def jsonValue: JValue = { + def safeDoubleToJValue(value: Double): JValue = { + if (value.isNaN || value.isInfinity) JNothing else JDouble(value) + } + + ("description" -> JString(description)) ~ + ("startOffset" -> tryParse(startOffset)) ~ + ("endOffset" -> tryParse(endOffset)) ~ + ("numInputRows" -> JInt(numInputRows)) ~ + ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ + ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) + } + + private def tryParse(json: String) = try { + parse(json) + } catch { + case NonFatal(e) => JString(json) + } +} + +/** + * :: Experimental :: + * Information about progress made for a sink in the execution of a [[StreamingQuery]] + * during a trigger. See [[StreamingQueryProgress]] for more information. + * + * @param description Description of the source corresponding to this status. + * @since 2.1.0 + */ +@Experimental +class SinkProgress protected[sql]( + val description: String) { + + /** The compact JSON representation of this status. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this status. */ + def prettyJson: String = pretty(render(jsonValue)) + + override def toString: String = prettyJson + + private[sql] def jsonValue: JValue = { + ("description" -> JString(description)) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala deleted file mode 100644 index 38c4ece..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import org.scalactic.TolerantNumerics - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.types.{StructField, StructType} -import org.apache.spark.util.ManualClock - -class StreamMetricsSuite extends SparkFunSuite { - import StreamMetrics._ - - // To make === between double tolerate inexact values - implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01) - - test("rates, latencies, trigger details - basic life cycle") { - val sm = newStreamMetrics(source) - assert(sm.currentInputRate() === 0.0) - assert(sm.currentProcessingRate() === 0.0) - assert(sm.currentSourceInputRate(source) === 0.0) - assert(sm.currentSourceProcessingRate(source) === 0.0) - assert(sm.currentLatency() === None) - assert(sm.currentTriggerDetails().isEmpty) - - // When trigger started, the rates should not change, but should return - // reported trigger details - sm.reportTriggerStarted(1) - sm.reportTriggerDetail("key", "value") - sm.reportSourceTriggerDetail(source, "key2", "value2") - assert(sm.currentInputRate() === 0.0) - assert(sm.currentProcessingRate() === 0.0) - assert(sm.currentSourceInputRate(source) === 0.0) - assert(sm.currentSourceProcessingRate(source) === 0.0) - assert(sm.currentLatency() === None) - assert(sm.currentTriggerDetails() === - Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "true", - START_TIMESTAMP -> "0", "key" -> "value")) - assert(sm.currentSourceTriggerDetails(source) === - Map(BATCH_ID -> "1", "key2" -> "value2")) - - // Finishing the trigger should calculate the rates, except input rate which needs - // to have another trigger interval - sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows, 10 output rows - clock.advance(1000) - sm.reportTriggerFinished() - assert(sm.currentInputRate() === 0.0) - assert(sm.currentProcessingRate() === 100.0) // 100 input rows processed in 1 sec - assert(sm.currentSourceInputRate(source) === 0.0) - assert(sm.currentSourceProcessingRate(source) === 100.0) - assert(sm.currentLatency() === None) - assert(sm.currentTriggerDetails() === - Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "false", - START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000", - NUM_INPUT_ROWS -> "100", "key" -> "value")) - assert(sm.currentSourceTriggerDetails(source) === - Map(BATCH_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2")) - - // After another trigger starts, the rates and latencies should not change until - // new rows are reported - clock.advance(1000) - sm.reportTriggerStarted(2) - assert(sm.currentInputRate() === 0.0) - assert(sm.currentProcessingRate() === 100.0) - assert(sm.currentSourceInputRate(source) === 0.0) - assert(sm.currentSourceProcessingRate(source) === 100.0) - assert(sm.currentLatency() === None) - - // Reporting new rows should update the rates and latencies - sm.reportNumInputRows(Map(source -> 200L)) // 200 input rows - clock.advance(500) - sm.reportTriggerFinished() - assert(sm.currentInputRate() === 100.0) // 200 input rows generated in 2 seconds b/w starts - assert(sm.currentProcessingRate() === 400.0) // 200 output rows processed in 0.5 sec - assert(sm.currentSourceInputRate(source) === 100.0) - assert(sm.currentSourceProcessingRate(source) === 400.0) - assert(sm.currentLatency().get === 1500.0) // 2000 ms / 2 + 500 ms - - // Rates should be set to 0 after stop - sm.stop() - assert(sm.currentInputRate() === 0.0) - assert(sm.currentProcessingRate() === 0.0) - assert(sm.currentSourceInputRate(source) === 0.0) - assert(sm.currentSourceProcessingRate(source) === 0.0) - assert(sm.currentLatency() === None) - assert(sm.currentTriggerDetails().isEmpty) - } - - test("rates and latencies - after trigger with no data") { - val sm = newStreamMetrics(source) - // Trigger 1 with data - sm.reportTriggerStarted(1) - sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows - clock.advance(1000) - sm.reportTriggerFinished() - - // Trigger 2 with data - clock.advance(1000) - sm.reportTriggerStarted(2) - sm.reportNumInputRows(Map(source -> 200L)) // 200 input rows - clock.advance(500) - sm.reportTriggerFinished() - - // Make sure that all rates are set - require(sm.currentInputRate() === 100.0) // 200 input rows generated in 2 seconds b/w starts - require(sm.currentProcessingRate() === 400.0) // 200 output rows processed in 0.5 sec - require(sm.currentSourceInputRate(source) === 100.0) - require(sm.currentSourceProcessingRate(source) === 400.0) - require(sm.currentLatency().get === 1500.0) // 2000 ms / 2 + 500 ms - - // Trigger 3 with data - clock.advance(500) - sm.reportTriggerStarted(3) - clock.advance(500) - sm.reportTriggerFinished() - - // Rates are set to zero and latency is set to None - assert(sm.currentInputRate() === 0.0) - assert(sm.currentProcessingRate() === 0.0) - assert(sm.currentSourceInputRate(source) === 0.0) - assert(sm.currentSourceProcessingRate(source) === 0.0) - assert(sm.currentLatency() === None) - sm.stop() - } - - test("rates - after trigger with multiple sources, and one source having no info") { - val source1 = TestSource(1) - val source2 = TestSource(2) - val sm = newStreamMetrics(source1, source2) - // Trigger 1 with data - sm.reportTriggerStarted(1) - sm.reportNumInputRows(Map(source1 -> 100L, source2 -> 100L)) - clock.advance(1000) - sm.reportTriggerFinished() - - // Trigger 2 with data - clock.advance(1000) - sm.reportTriggerStarted(2) - sm.reportNumInputRows(Map(source1 -> 200L, source2 -> 200L)) - clock.advance(500) - sm.reportTriggerFinished() - - // Make sure that all rates are set - assert(sm.currentInputRate() === 200.0) // 200*2 input rows generated in 2 seconds b/w starts - assert(sm.currentProcessingRate() === 800.0) // 200*2 output rows processed in 0.5 sec - assert(sm.currentSourceInputRate(source1) === 100.0) - assert(sm.currentSourceInputRate(source2) === 100.0) - assert(sm.currentSourceProcessingRate(source1) === 400.0) - assert(sm.currentSourceProcessingRate(source2) === 400.0) - - // Trigger 3 with data - clock.advance(500) - sm.reportTriggerStarted(3) - clock.advance(500) - sm.reportNumInputRows(Map(source1 -> 200L)) - sm.reportTriggerFinished() - - // Rates are set to zero and latency is set to None - assert(sm.currentInputRate() === 200.0) - assert(sm.currentProcessingRate() === 400.0) - assert(sm.currentSourceInputRate(source1) === 200.0) - assert(sm.currentSourceInputRate(source2) === 0.0) - assert(sm.currentSourceProcessingRate(source1) === 400.0) - assert(sm.currentSourceProcessingRate(source2) === 0.0) - sm.stop() - } - - test("registered Codahale metrics") { - import scala.collection.JavaConverters._ - val sm = newStreamMetrics(source) - val gaugeNames = sm.metricRegistry.getGauges().keySet().asScala - - // so that all metrics are considered as a single metric group in Ganglia - assert(!gaugeNames.exists(_.contains("."))) - assert(gaugeNames === Set( - "inputRate-total", - "inputRate-source0", - "processingRate-total", - "processingRate-source0", - "latency")) - } - - private def newStreamMetrics(sources: Source*): StreamMetrics = { - new StreamMetrics(sources.toSet, clock, "test") - } - - private val clock = new ManualClock() - private val source = TestSource(0) - - case class TestSource(id: Int) extends Source { - override def schema: StructType = StructType(Array.empty[StructField]) - override def getOffset: Option[Offset] = Some(new LongOffset(0)) - override def getBatch(start: Option[Offset], end: Offset): DataFrame = { null } - override def stop() {} - override def toString(): String = s"source$id" - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index bad6642..8256c63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1006,9 +1006,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest { testStream(input)( AddTextFileData("100", src, tmp), CheckAnswer("100"), - AssertOnLastQueryStatus { status => - assert(status.triggerDetails.get("numRows.input.total") === "1") - assert(status.sourceStatuses(0).processingRate > 0.0) + AssertOnQuery { query => + val actualProgress = query.recentProgresses + .find(_.numInputRows > 0) + .getOrElse(sys.error("Could not find records with data.")) + assert(actualProgress.numInputRows === 1) + assert(actualProgress.sources(0).processedRowsPerSecond > 0.0) + true } ) } http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index a6b2d4b..a2629f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -28,7 +28,6 @@ import scala.util.control.NonFatal import org.scalatest.Assertions import org.scalatest.concurrent.{Eventually, Timeouts} -import org.scalatest.concurrent.AsyncAssertions.Waiter import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.exceptions.TestFailedDueToTimeoutException @@ -202,10 +201,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } } - case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit) - extends StreamAction - - class StreamManualClock(time: Long = 0L) extends ManualClock(time) { + class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable { private var waitStartTime: Option[Long] = None override def waitTillTime(targetTime: Long): Long = synchronized { @@ -325,10 +321,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { val testThread = Thread.currentThread() val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath - val statusCollector = new QueryStatusCollector var manualClockExpectedTime = -1L try { - spark.streams.addListener(statusCollector) startedTest.foreach { action => logInfo(s"Processing test stream action: $action") action match { @@ -375,10 +369,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { s"can not advance clock of type ${currentStream.triggerClock.getClass}") val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock] assert(manualClockExpectedTime >= 0) + // Make sure we don't advance ManualClock too early. See SPARK-16002. eventually("StreamManualClock has not yet entered the waiting state") { assert(clock.isStreamWaitingAt(manualClockExpectedTime)) } + clock.advance(timeToAdd) manualClockExpectedTime += timeToAdd verify(clock.getTimeMillis() === manualClockExpectedTime, @@ -447,13 +443,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { val streamToAssert = Option(currentStream).getOrElse(lastStream) verify({ a.run(); true }, s"Assert failed: ${a.message}") - case a: AssertOnLastQueryStatus => - Eventually.eventually(timeout(streamingTimeout)) { - require(statusCollector.lastTriggerStatus.nonEmpty) - } - val status = statusCollector.lastTriggerStatus.get - verify({ a.condition(status); true }, "Assert on last query status failed") - case a: AddData => try { // Add data and get the source where it was added, and the expected offset of the @@ -528,7 +517,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { if (currentStream != null && currentStream.microBatchThread.isAlive) { currentStream.stop() } - spark.streams.removeListener(statusCollector) // Rollback prev configuration values resetConfValues.foreach { @@ -614,7 +602,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { testStream(ds)(actions: _*) } - object AwaitTerminationTester { trait ExpectedBehavior @@ -668,58 +655,4 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { } } } - - - class QueryStatusCollector extends StreamingQueryListener { - // to catch errors in the async listener events - @volatile private var asyncTestWaiter = new Waiter - - @volatile var startStatus: StreamingQueryStatus = null - @volatile var terminationStatus: StreamingQueryStatus = null - @volatile var terminationException: Option[String] = null - - private val progressStatuses = new mutable.ArrayBuffer[StreamingQueryStatus] - - /** Get the info of the last trigger that processed data */ - def lastTriggerStatus: Option[StreamingQueryStatus] = synchronized { - progressStatuses.filter { i => - i.triggerDetails.get("isTriggerActive").toBoolean == false && - i.triggerDetails.get("isDataPresentInTrigger").toBoolean == true - }.lastOption - } - - def reset(): Unit = { - startStatus = null - terminationStatus = null - progressStatuses.clear() - asyncTestWaiter = new Waiter - } - - def checkAsyncErrors(): Unit = { - asyncTestWaiter.await(timeout(10 seconds)) - } - - - override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { - asyncTestWaiter { - startStatus = queryStarted.queryStatus - } - } - - override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { - asyncTestWaiter { - assert(startStatus != null, "onQueryProgress called before onQueryStarted") - synchronized { progressStatuses += queryProgress.queryStatus } - } - } - - override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { - asyncTestWaiter { - assert(startStatus != null, "onQueryTerminated called before onQueryStarted") - terminationStatus = queryTerminated.queryStatus - terminationException = queryTerminated.exception - } - asyncTestWaiter.dismiss() - } - } } http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 98f3bec..c68f953 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -17,24 +17,26 @@ package org.apache.spark.sql.streaming +import java.util.UUID + import scala.collection.mutable import org.scalactic.TolerantNumerics +import org.scalatest.concurrent.AsyncAssertions.Waiter +import org.scalatest.concurrent.Eventually._ +import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.BeforeAndAfter import org.scalatest.PrivateMethodTester._ import org.apache.spark.SparkException import org.apache.spark.scheduler._ -import org.apache.spark.sql.DataFrame import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.functions._ -import org.apache.spark.util.{JsonProtocol, ManualClock} - +import org.apache.spark.sql.streaming.StreamingQueryListener._ +import org.apache.spark.util.JsonProtocol class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { import testImplicits._ - import StreamingQueryListenerSuite._ // To make === between double tolerate inexact values implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01) @@ -46,86 +48,86 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Make sure we don't leak any events to the next test } - test("single listener, check trigger statuses") { - import StreamingQueryListenerSuite._ - clock = new StreamManualClock - - /** Custom MemoryStream that waits for manual clock to reach a time */ - val inputData = new MemoryStream[Int](0, sqlContext) { - // Wait for manual clock to be 100 first time there is data - override def getOffset: Option[Offset] = { - val offset = super.getOffset - if (offset.nonEmpty) { - clock.waitTillTime(100) + testQuietly("single listener, check trigger events are generated correctly") { + val clock = new StreamManualClock + val inputData = new MemoryStream[Int](0, sqlContext) + val df = inputData.toDS().as[Long].map { 10 / _ } + val listener = new EventCollector + try { + // No events until started + spark.streams.addListener(listener) + assert(listener.startEvent === null) + assert(listener.progressEvents.isEmpty) + assert(listener.terminationEvent === null) + + testStream(df, OutputMode.Append)( + + // Start event generated when query started + StartStream(ProcessingTime(100), triggerClock = clock), + AssertOnQuery { query => + assert(listener.startEvent !== null) + assert(listener.startEvent.id === query.id) + assert(listener.startEvent.name === query.name) + assert(listener.progressEvents.isEmpty) + assert(listener.terminationEvent === null) + true + }, + + // Progress event generated when data processed + AddData(inputData, 1, 2), + AdvanceManualClock(100), + CheckAnswer(10, 5), + AssertOnQuery { query => + assert(listener.progressEvents.nonEmpty) + assert(listener.progressEvents.last.json === query.lastProgress.json) + assert(listener.terminationEvent === null) + true + }, + + // Termination event generated when stopped cleanly + StopStream, + AssertOnQuery { query => + eventually(Timeout(streamingTimeout)) { + assert(listener.terminationEvent !== null) + assert(listener.terminationEvent.id === query.id) + assert(listener.terminationEvent.exception === None) + } + listener.checkAsyncErrors() + listener.reset() + true + }, + + // Termination event generated with exception message when stopped with error + StartStream(ProcessingTime(100), triggerClock = clock), + AddData(inputData, 0), + AdvanceManualClock(100), + ExpectFailure[SparkException], + AssertOnQuery { query => + assert(listener.terminationEvent !== null) + assert(listener.terminationEvent.id === query.id) + assert(listener.terminationEvent.exception.nonEmpty) + listener.checkAsyncErrors() + true } - offset - } - - // Wait for manual clock to be 300 first time there is data - override def getBatch(start: Option[Offset], end: Offset): DataFrame = { - clock.waitTillTime(300) - super.getBatch(start, end) - } - } - - // This is to make sure thatquery waits for manual clock to be 600 first time there is data - val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x => - clock.waitTillTime(600) - x + ) + } finally { + spark.streams.removeListener(listener) } - - testStream(mapped, OutputMode.Complete)( - StartStream(triggerClock = clock), - AddData(inputData, 1, 2), - AdvanceManualClock(100), // unblock getOffset, will block on getBatch - AdvanceManualClock(200), // unblock getBatch, will block on computation - AdvanceManualClock(300), // unblock computation - AssertOnQuery { _ => clock.getTimeMillis() === 600 }, - AssertOnLastQueryStatus { status: StreamingQueryStatus => - // Check the correctness of the trigger info of the last completed batch reported by - // onQueryProgress - assert(status.triggerDetails.containsKey("batchId")) - assert(status.triggerDetails.get("isTriggerActive") === "false") - assert(status.triggerDetails.get("isDataPresentInTrigger") === "true") - - assert(status.triggerDetails.get("timestamp.triggerStart") === "0") - assert(status.triggerDetails.get("timestamp.afterGetOffset") === "100") - assert(status.triggerDetails.get("timestamp.afterGetBatch") === "300") - assert(status.triggerDetails.get("timestamp.triggerFinish") === "600") - - assert(status.triggerDetails.get("latency.getOffset.total") === "100") - assert(status.triggerDetails.get("latency.getBatch.total") === "200") - assert(status.triggerDetails.get("latency.optimizer") === "0") - assert(status.triggerDetails.get("latency.offsetLogWrite") === "0") - assert(status.triggerDetails.get("latency.fullTrigger") === "600") - - assert(status.triggerDetails.get("numRows.input.total") === "2") - assert(status.triggerDetails.get("numRows.state.aggregation1.total") === "1") - assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1") - - assert(status.sourceStatuses.length === 1) - assert(status.sourceStatuses(0).triggerDetails.containsKey("batchId")) - assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100") - assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200") - assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2") - }, - CheckAnswer(2) - ) } test("adding and removing listener") { - def isListenerActive(listener: QueryStatusCollector): Boolean = { + def isListenerActive(listener: EventCollector): Boolean = { listener.reset() testStream(MemoryStream[Int].toDS)( StartStream(), StopStream ) - listener.startStatus != null + listener.startEvent != null } try { - val listener1 = new QueryStatusCollector - val listener2 = new QueryStatusCollector + val listener1 = new EventCollector + val listener2 = new EventCollector spark.streams.addListener(listener1) assert(isListenerActive(listener1) === true) @@ -142,14 +144,14 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } test("event ordering") { - val listener = new QueryStatusCollector + val listener = new EventCollector withListenerAdded(listener) { for (i <- 1 to 100) { listener.reset() - require(listener.startStatus === null) + require(listener.startEvent === null) testStream(MemoryStream[Int].toDS)( StartStream(), - Assert(listener.startStatus !== null, "onQueryStarted not called before query returned"), + Assert(listener.startEvent !== null, "onQueryStarted not called before query returned"), StopStream, Assert { listener.checkAsyncErrors() } ) @@ -158,7 +160,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } testQuietly("exception should be reported in QueryTerminated") { - val listener = new QueryStatusCollector + val listener = new EventCollector withListenerAdded(listener) { val input = MemoryStream[Int] testStream(input.toDS.map(_ / 0))( @@ -167,49 +169,46 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { ExpectFailure[SparkException](), Assert { spark.sparkContext.listenerBus.waitUntilEmpty(10000) - assert(listener.terminationStatus !== null) - assert(listener.terminationException.isDefined) + assert(listener.terminationEvent !== null) + assert(listener.terminationEvent.exception.nonEmpty) // Make sure that the exception message reported through listener // contains the actual exception and relevant stack trace - assert(!listener.terminationException.get.contains("StreamingQueryException")) - assert(listener.terminationException.get.contains("java.lang.ArithmeticException")) - assert(listener.terminationException.get.contains("StreamingQueryListenerSuite")) + assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException")) + assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException")) + assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite")) } ) } } - test("QueryStarted serialization") { - val queryStarted = new StreamingQueryListener.QueryStartedEvent(StreamingQueryStatus.testStatus) + test("QueryStartedEvent serialization") { + val queryStarted = new StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name") val json = JsonProtocol.sparkEventToJson(queryStarted) val newQueryStarted = JsonProtocol.sparkEventFromJson(json) .asInstanceOf[StreamingQueryListener.QueryStartedEvent] - assertStreamingQueryInfoEquals(queryStarted.queryStatus, newQueryStarted.queryStatus) } - test("QueryProgress serialization") { - val queryProcess = new StreamingQueryListener.QueryProgressEvent( - StreamingQueryStatus.testStatus) - val json = JsonProtocol.sparkEventToJson(queryProcess) - val newQueryProcess = JsonProtocol.sparkEventFromJson(json) + test("QueryProgressEvent serialization") { + val event = new StreamingQueryListener.QueryProgressEvent( + StreamingQueryProgressSuite.testProgress) + val json = JsonProtocol.sparkEventToJson(event) + val newEvent = JsonProtocol.sparkEventFromJson(json) .asInstanceOf[StreamingQueryListener.QueryProgressEvent] - assertStreamingQueryInfoEquals(queryProcess.queryStatus, newQueryProcess.queryStatus) + assert(event.progress.json === newEvent.progress.json) } - test("QueryTerminated serialization") { + test("QueryTerminatedEvent serialization") { val exception = new RuntimeException("exception") val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent( - StreamingQueryStatus.testStatus, - Some(exception.getMessage)) - val json = - JsonProtocol.sparkEventToJson(queryQueryTerminated) + UUID.randomUUID, Some(exception.getMessage)) + val json = JsonProtocol.sparkEventToJson(queryQueryTerminated) val newQueryTerminated = JsonProtocol.sparkEventFromJson(json) .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent] - assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, newQueryTerminated.queryStatus) + assert(queryQueryTerminated.id === newQueryTerminated.id) assert(queryQueryTerminated.exception === newQueryTerminated.exception) } - test("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") { + testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") { // query-event-logs-version-2.0.0.txt has all types of events generated by // Structured Streaming in Spark 2.0.0. // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it @@ -217,7 +216,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt") } - test("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") { + testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") { // query-event-logs-version-2.0.1.txt has all types of events generated by // Structured Streaming in Spark 2.0.1. // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it @@ -248,28 +247,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } } - private def assertStreamingQueryInfoEquals( - expected: StreamingQueryStatus, - actual: StreamingQueryStatus): Unit = { - assert(expected.name === actual.name) - assert(expected.sourceStatuses.size === actual.sourceStatuses.size) - expected.sourceStatuses.zip(actual.sourceStatuses).foreach { - case (expectedSource, actualSource) => - assertSourceStatus(expectedSource, actualSource) - } - assertSinkStatus(expected.sinkStatus, actual.sinkStatus) - } - - private def assertSourceStatus(expected: SourceStatus, actual: SourceStatus): Unit = { - assert(expected.description === actual.description) - assert(expected.offsetDesc === actual.offsetDesc) - } - - private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit = { - assert(expected.description === actual.description) - assert(expected.offsetDesc === actual.offsetDesc) - } - private def withListenerAdded(listener: StreamingQueryListener)(body: => Unit): Unit = { try { failAfter(streamingTimeout) { @@ -287,9 +264,51 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { val listenerBus = spark.streams invokePrivate listenerBusMethod() listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener]) } -} -object StreamingQueryListenerSuite { - // Singleton reference to clock that does not get serialized in task closures - @volatile var clock: ManualClock = null + /** Collects events from the StreamingQueryListener for testing */ + class EventCollector extends StreamingQueryListener { + // to catch errors in the async listener events + @volatile private var asyncTestWaiter = new Waiter + + @volatile var startEvent: QueryStartedEvent = null + @volatile var terminationEvent: QueryTerminatedEvent = null + + private val _progressEvents = new mutable.Queue[StreamingQueryProgress] + + def progressEvents: Seq[StreamingQueryProgress] = _progressEvents.synchronized { + _progressEvents.filter(_.numInputRows > 0) + } + + def reset(): Unit = { + startEvent = null + terminationEvent = null + _progressEvents.clear() + asyncTestWaiter = new Waiter + } + + def checkAsyncErrors(): Unit = { + asyncTestWaiter.await(timeout(streamingTimeout)) + } + + override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { + asyncTestWaiter { + startEvent = queryStarted + } + } + + override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { + asyncTestWaiter { + assert(startEvent != null, "onQueryProgress called before onQueryStarted") + _progressEvents.synchronized { _progressEvents += queryProgress.progress } + } + } + + override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { + asyncTestWaiter { + assert(startEvent != null, "onQueryTerminated called before onQueryStarted") + terminationEvent = queryTerminated + } + asyncTestWaiter.dismiss() + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index 41ffd56..268b8ff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -62,7 +62,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { assert(spark.streams.get(q1.id).eq(q1)) assert(spark.streams.get(q2.id).eq(q2)) assert(spark.streams.get(q3.id).eq(q3)) - assert(spark.streams.get(-1) === null) // non-existent id + assert(spark.streams.get(java.util.UUID.randomUUID()) === null) // non-existent id q1.stop() assert(spark.streams.active.toSet === Set(q2, q3)) http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala new file mode 100644 index 0000000..45d29f6 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import java.util.UUID + +import scala.collection.JavaConverters._ + +import org.json4s._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.streaming.StreamingQueryProgressSuite._ + + +class StreamingQueryProgressSuite extends SparkFunSuite { + + test("prettyJson") { + val json = testProgress.prettyJson + assert(json === + s""" + |{ + | "id" : "${testProgress.id.toString}", + | "name" : "name", + | "timestamp" : 1, + | "numInputRows" : 678, + | "inputRowsPerSecond" : 10.0, + | "durationMs" : { + | "total" : 0 + | }, + | "currentWatermark" : 3, + | "stateOperators" : [ { + | "numRowsTotal" : 0, + | "numRowsUpdated" : 1 + | } ], + | "sources" : [ { + | "description" : "source", + | "startOffset" : 123, + | "endOffset" : 456, + | "numInputRows" : 678, + | "inputRowsPerSecond" : 10.0 + | } ], + | "sink" : { + | "description" : "sink" + | } + |} + """.stripMargin.trim) + assert(compact(parse(json)) === testProgress.json) + + } + + test("json") { + assert(compact(parse(testProgress.json)) === testProgress.json) + } + + test("toString") { + assert(testProgress.toString === testProgress.prettyJson) + } +} + +object StreamingQueryProgressSuite { + val testProgress = new StreamingQueryProgress( + id = UUID.randomUUID(), + name = "name", + timestamp = 1L, + batchId = 2L, + durationMs = Map("total" -> 0L).mapValues(long2Long).asJava, + currentWatermark = 3L, + stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)), + sources = Array( + new SourceProgress( + description = "source", + startOffset = "123", + endOffset = "456", + numInputRows = 678, + inputRowsPerSecond = 10.0, + processedRowsPerSecond = Double.PositiveInfinity // should not be present in the json + ) + ), + sink = new SinkProgress("sink") + ) +} + http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala deleted file mode 100644 index 50a7d92..0000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import org.apache.spark.SparkFunSuite - -class StreamingQueryStatusSuite extends SparkFunSuite { - test("toString") { - assert(StreamingQueryStatus.testStatus.sourceStatuses(0).toString === - """ - |Status of source MySource1 - | Available offset: 0 - | Input rate: 15.5 rows/sec - | Processing rate: 23.5 rows/sec - | Trigger details: - | numRows.input.source: 100 - | latency.getOffset.source: 10 - | latency.getBatch.source: 20 - """.stripMargin.trim, "SourceStatus.toString does not match") - - assert(StreamingQueryStatus.testStatus.sinkStatus.toString === - """ - |Status of sink MySink - | Committed offsets: [1, -] - """.stripMargin.trim, "SinkStatus.toString does not match") - - assert(StreamingQueryStatus.testStatus.toString === - """ - |Status of query 'query' - | Query id: 1 - | Status timestamp: 123 - | Input rate: 15.5 rows/sec - | Processing rate 23.5 rows/sec - | Latency: 345.0 ms - | Trigger details: - | batchId: 5 - | isDataPresentInTrigger: true - | isTriggerActive: true - | latency.getBatch.total: 20 - | latency.getOffset.total: 10 - | numRows.input.total: 100 - | Source statuses [1 source]: - | Source 1 - MySource1 - | Available offset: 0 - | Input rate: 15.5 rows/sec - | Processing rate: 23.5 rows/sec - | Trigger details: - | numRows.input.source: 100 - | latency.getOffset.source: 10 - | latency.getBatch.source: 20 - | Sink status - MySink - | Committed offsets: [1, -] - """.stripMargin.trim, "StreamingQueryStatus.toString does not match") - - } - - test("json") { - assert(StreamingQueryStatus.testStatus.json === - """ - |{"name":"query","id":1,"timestamp":123,"inputRate":15.5,"processingRate":23.5, - |"latency":345.0,"triggerDetails":{"latency.getBatch.total":"20", - |"numRows.input.total":"100","isTriggerActive":"true","batchId":"5", - |"latency.getOffset.total":"10","isDataPresentInTrigger":"true"}, - |"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5, - |"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100", - |"latency.getOffset.source":"10","latency.getBatch.source":"20"}}], - |"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}} - """.stripMargin.replace("\n", "").trim) - } - - test("prettyJson") { - assert( - StreamingQueryStatus.testStatus.prettyJson === - """ - |{ - | "name" : "query", - | "id" : 1, - | "timestamp" : 123, - | "inputRate" : 15.5, - | "processingRate" : 23.5, - | "latency" : 345.0, - | "triggerDetails" : { - | "latency.getBatch.total" : "20", - | "numRows.input.total" : "100", - | "isTriggerActive" : "true", - | "batchId" : "5", - | "latency.getOffset.total" : "10", - | "isDataPresentInTrigger" : "true" - | }, - | "sourceStatuses" : [ { - | "description" : "MySource1", - | "offsetDesc" : "0", - | "inputRate" : 15.5, - | "processingRate" : 23.5, - | "triggerDetails" : { - | "numRows.input.source" : "100", - | "latency.getOffset.source" : "10", - | "latency.getBatch.source" : "20" - | } - | } ], - | "sinkStatus" : { - | "description" : "MySink", - | "offsetDesc" : "[1, -]" - | } - |} - """.stripMargin.trim) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 8ecb33c..4f3b4a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -20,14 +20,15 @@ package org.apache.spark.sql.streaming import org.scalactic.TolerantNumerics import org.scalatest.concurrent.Eventually._ import org.scalatest.BeforeAndAfter +import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.apache.spark.internal.Logging import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.streaming.StreamingQueryListener._ import org.apache.spark.sql.types.StructType import org.apache.spark.SparkException import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.util.Utils +import org.apache.spark.sql.functions._ +import org.apache.spark.util.{ManualClock, Utils} class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { @@ -109,85 +110,139 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { ) } - testQuietly("query statuses") { - val inputData = MemoryStream[Int] - val mapped = inputData.toDS().map(6 / _) - testStream(mapped)( - AssertOnQuery(q => q.status.name === q.name), - AssertOnQuery(q => q.status.id === q.id), - AssertOnQuery(_.status.timestamp <= System.currentTimeMillis), - AssertOnQuery(_.status.inputRate === 0.0), - AssertOnQuery(_.status.processingRate === 0.0), - AssertOnQuery(_.status.sourceStatuses.length === 1), - AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")), - AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === "-"), - AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0), - AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0), - AssertOnQuery(_.status.sinkStatus.description.contains("Memory")), - AssertOnQuery(_.status.sinkStatus.offsetDesc === OffsetSeq(None :: Nil).toString), - AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === "-"), - AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0), - AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0), - AssertOnQuery(_.sinkStatus.description.contains("Memory")), - AssertOnQuery(_.sinkStatus.offsetDesc === new OffsetSeq(None :: Nil).toString), + testQuietly("query statuses and progresses") { + import StreamingQuerySuite._ + clock = new StreamManualClock + + /** Custom MemoryStream that waits for manual clock to reach a time */ + val inputData = new MemoryStream[Int](0, sqlContext) { + // Wait for manual clock to be 100 first time there is data + override def getOffset: Option[Offset] = { + val offset = super.getOffset + if (offset.nonEmpty) { + clock.waitTillTime(300) + } + offset + } - AddData(inputData, 1, 2), - CheckAnswer(6, 3), - AssertOnQuery(_.status.timestamp <= System.currentTimeMillis), - AssertOnQuery(_.status.inputRate >= 0.0), - AssertOnQuery(_.status.processingRate >= 0.0), - AssertOnQuery(_.status.sourceStatuses.length === 1), - AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")), - AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(0).json), - AssertOnQuery(_.status.sourceStatuses(0).inputRate >= 0.0), - AssertOnQuery(_.status.sourceStatuses(0).processingRate >= 0.0), - AssertOnQuery(_.status.sinkStatus.description.contains("Memory")), - AssertOnQuery(_.status.sinkStatus.offsetDesc === - OffsetSeq.fill(LongOffset(0)).toString), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(0).json), - AssertOnQuery(_.sourceStatuses(0).inputRate >= 0.0), - AssertOnQuery(_.sourceStatuses(0).processingRate >= 0.0), - AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(0)).toString), + // Wait for manual clock to be 300 first time there is data + override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + clock.waitTillTime(600) + super.getBatch(start, end) + } + } - AddData(inputData, 1, 2), - CheckAnswer(6, 3, 6, 3), - AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json), - AssertOnQuery(_.status.sinkStatus.offsetDesc === - OffsetSeq.fill(LongOffset(1)).toString), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json), - AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString), + // This is to make sure thatquery waits for manual clock to be 600 first time there is data + val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x => + clock.waitTillTime(1100) + x + } - StopStream, - AssertOnQuery(_.status.inputRate === 0.0), - AssertOnQuery(_.status.processingRate === 0.0), - AssertOnQuery(_.status.sourceStatuses.length === 1), - AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(1).json), - AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0), - AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0), - AssertOnQuery(_.status.sinkStatus.offsetDesc === - OffsetSeq.fill(LongOffset(1)).toString), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json), - AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0), - AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0), - AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString), - AssertOnQuery(_.status.triggerDetails.isEmpty), + case class AssertStreamExecThreadToWaitForClock() + extends AssertOnQuery(q => { + eventually(Timeout(streamingTimeout)) { + if (q.exception.isEmpty) { + assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis)) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + }, "") + + testStream(mapped, OutputMode.Complete)( + StartStream(ProcessingTime(100), triggerClock = clock), + AssertStreamExecThreadToWaitForClock(), + AssertOnQuery(_.status.isDataAvailable === false), + AssertOnQuery(_.status.isTriggerActive === false), + // TODO: test status.message before trigger has started + // AssertOnQuery(_.lastProgress === null) // there is an empty trigger as soon as started + AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + + // Test status while offset is being fetched + AddData(inputData, 1, 2), + AdvanceManualClock(100), // time = 100 to start new trigger, will block on getOffset + AssertStreamExecThreadToWaitForClock(), + AssertOnQuery(_.status.isDataAvailable === false), + AssertOnQuery(_.status.isTriggerActive === true), + AssertOnQuery(_.status.message.toLowerCase.contains("getting offsets from")), + AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + + // Test status while batch is being fetched + AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch + AssertStreamExecThreadToWaitForClock(), + AssertOnQuery(_.status.isDataAvailable === true), + AssertOnQuery(_.status.isTriggerActive === true), + AssertOnQuery(_.status.message === "Processing new data"), + AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + + // Test status while batch is being processed + AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job + AssertOnQuery(_.status.isDataAvailable === true), + AssertOnQuery(_.status.isTriggerActive === true), + AssertOnQuery(_.status.message === "Processing new data"), + AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0), + + // Test status while batch processing has completed + AdvanceManualClock(500), // time = 1100 to unblock job + AssertOnQuery { _ => clock.getTimeMillis() === 1100 }, + CheckAnswer(2), + AssertOnQuery(_.status.isDataAvailable === true), + AssertOnQuery(_.status.isTriggerActive === false), + AssertOnQuery(_.status.message === "Waiting for next trigger"), + AssertOnQuery { query => + assert(query.lastProgress != null) + assert(query.recentProgresses.exists(_.numInputRows > 0)) + assert(query.recentProgresses.last.eq(query.lastProgress)) + + val progress = query.lastProgress + assert(progress.id === query.id) + assert(progress.name === query.name) + assert(progress.batchId === 0) + assert(progress.timestamp === 100) + assert(progress.numInputRows === 2) + assert(progress.processedRowsPerSecond === 2.0) + + assert(progress.durationMs.get("getOffset") === 200) + assert(progress.durationMs.get("getBatch") === 300) + assert(progress.durationMs.get("queryPlanning") === 0) + assert(progress.durationMs.get("walCommit") === 0) + assert(progress.durationMs.get("triggerExecution") === 1000) + + assert(progress.sources.length === 1) + assert(progress.sources(0).description contains "MemoryStream") + assert(progress.sources(0).startOffset === null) + assert(progress.sources(0).endOffset !== null) + assert(progress.sources(0).processedRowsPerSecond === 2.0) + + assert(progress.stateOperators.length === 1) + assert(progress.stateOperators(0).numRowsUpdated === 1) + assert(progress.stateOperators(0).numRowsTotal === 1) + + assert(progress.sink.description contains "MemorySink") + true + }, - StartStream(), - AddData(inputData, 0), - ExpectFailure[SparkException], - AssertOnQuery(_.status.inputRate === 0.0), - AssertOnQuery(_.status.processingRate === 0.0), - AssertOnQuery(_.status.sourceStatuses.length === 1), - AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === LongOffset(2).json), - AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0), - AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0), - AssertOnQuery(_.status.sinkStatus.offsetDesc === - OffsetSeq.fill(LongOffset(1)).toString), - AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(2).json), - AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0), - AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0), - AssertOnQuery(_.sinkStatus.offsetDesc === OffsetSeq.fill(LongOffset(1)).toString) + AddData(inputData, 1, 2), + AdvanceManualClock(100), // allow another trigger + CheckAnswer(4), + AssertOnQuery(_.status.isDataAvailable === true), + AssertOnQuery(_.status.isTriggerActive === false), + AssertOnQuery(_.status.message === "Waiting for next trigger"), + AssertOnQuery { query => + assert(query.recentProgresses.last.eq(query.lastProgress)) + assert(query.lastProgress.batchId === 1) + assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818) + true + }, + + // Test status after data is not available for a trigger + AdvanceManualClock(100), // allow another trigger + AssertStreamExecThreadToWaitForClock(), + AssertOnQuery(_.status.isDataAvailable === false), + AssertOnQuery(_.status.isTriggerActive === false), + AssertOnQuery(_.status.message === "Waiting for next trigger") ) } @@ -196,7 +251,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { /** Whether metrics of a query is registered for reporting */ def isMetricsRegistered(query: StreamingQuery): Boolean = { - val sourceName = s"StructuredStreaming.${query.name}" + val sourceName = s"spark.streaming.${query.name}" val sources = spark.sparkContext.env.metricsSystem.getSourcesByName(sourceName) require(sources.size <= 1) sources.nonEmpty @@ -229,23 +284,23 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { // Trigger input has 10 rows, static input has 2 rows, // therefore after the first trigger, the calculated input rows should be 10 - val status = getFirstTriggerStatus(streamingInputDF.join(staticInputDF, "value")) - assert(status.triggerDetails.get("numRows.input.total") === "10") - assert(status.sourceStatuses.size === 1) - assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "10") + val progress = getFirstProgress(streamingInputDF.join(staticInputDF, "value")) + assert(progress.numInputRows === 10) + assert(progress.sources.size === 1) + assert(progress.sources(0).numInputRows === 10) } - test("input row calculation with trigger DF having multiple leaves") { + test("input row calculation with trigger input DF having multiple leaves") { val streamingTriggerDF = spark.createDataset(1 to 5).toDF.union(spark.createDataset(6 to 10).toDF) require(streamingTriggerDF.logicalPlan.collectLeaves().size > 1) val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF) // After the first trigger, the calculated input rows should be 10 - val status = getFirstTriggerStatus(streamingInputDF) - assert(status.triggerDetails.get("numRows.input.total") === "10") - assert(status.sourceStatuses.size === 1) - assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "10") + val progress = getFirstProgress(streamingInputDF) + assert(progress.numInputRows === 10) + assert(progress.sources.size === 1) + assert(progress.sources(0).numInputRows === 10) } testQuietly("StreamExecution metadata garbage collection") { @@ -285,34 +340,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { StreamingExecutionRelation(source) } - /** Returns the query status at the end of the first trigger of streaming DF */ - private def getFirstTriggerStatus(streamingDF: DataFrame): StreamingQueryStatus = { - // A StreamingQueryListener that gets the query status after the first completed trigger - val listener = new StreamingQueryListener { - @volatile var firstStatus: StreamingQueryStatus = null - @volatile var queryStartedEvent = 0 - override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { - queryStartedEvent += 1 - } - override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = { - if (firstStatus == null) firstStatus = queryProgress.queryStatus - } - override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = { } - } - + /** Returns the query progress at the end of the first trigger of streaming DF */ + private def getFirstProgress(streamingDF: DataFrame): StreamingQueryProgress = { try { - spark.streams.addListener(listener) val q = streamingDF.writeStream.format("memory").queryName("test").start() q.processAllAvailable() - eventually(timeout(streamingTimeout)) { - assert(listener.firstStatus != null) - // test if QueryStartedEvent callback is called for only once - assert(listener.queryStartedEvent === 1) - } - listener.firstStatus + q.recentProgresses.head } finally { spark.streams.active.map(_.stop()) - spark.streams.removeListener(listener) } } @@ -369,3 +404,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { } } } + +object StreamingQuerySuite { + // Singleton reference to clock that does not get serialized in task closures + var clock: ManualClock = null +} http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala index 3e9488c..12f3c3e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala @@ -51,6 +51,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { test("watermark metric") { + val inputData = MemoryStream[Int] val windowedAggregation = inputData.toDF() @@ -62,16 +63,19 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter with Logging { testStream(windowedAggregation)( AddData(inputData, 15), - AssertOnLastQueryStatus { status => - status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "5000" + CheckAnswer(), + AssertOnQuery { query => + query.lastProgress.currentWatermark === 5000 }, AddData(inputData, 15), - AssertOnLastQueryStatus { status => - status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "5000" + CheckAnswer(), + AssertOnQuery { query => + query.lastProgress.currentWatermark === 5000 }, AddData(inputData, 25), - AssertOnLastQueryStatus { status => - status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === "15000" + CheckAnswer(), + AssertOnQuery { query => + query.lastProgress.currentWatermark === 15000 } ) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org