Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/11006#discussion_r51483601 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala --- @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.lang.Thread.UncaughtExceptionHandler + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.util.Random + +import org.scalatest.concurrent.Timeouts +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, RowEncoder} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.execution.streaming._ + +/** + * A framework for implementing tests for streaming queries and sources. + * + * A test consists of a set of steps (expressed as a `StreamAction`) that are executed in order, + * blocking as necessary to let the stream catch up. For example, the following adds some data to + * a stream, blocking until it can verify that the correct values are eventually produced. + * + * {{{ + * val inputData = MemoryStream[Int] + val mapped = inputData.toDS().map(_ + 1) + + testStream(mapped)( + AddData(inputData, 1, 2, 3), + CheckAnswer(2, 3, 4)) + * }}} + * + * Note that while we do sleep to allow the other thread to progress without spinning, + * `StreamAction` checks should not depend on the amount of time spent sleeping. Instead they + * should check the actual progress of the stream before verifying the required test condition. + * + * Currently it is assumed that all streaming queries will eventually complete in 10 seconds to + * avoid hanging forever in the case of failures. However, individual suites can change this + * by overriding `streamingTimeout`. + */ +trait StreamTest extends QueryTest with Timeouts { + + implicit class RichSource(s: Source) { + def toDF(): DataFrame = new DataFrame(sqlContext, StreamingRelation(s)) + } + + /** How long to wait for an active stream to catch up when checking a result. */ + val streamingTimout = 10.seconds + + /** A trait for actions that can be performed while testing a streaming DataFrame. */ + trait StreamAction + + /** A trait to mark actions that require the stream to be actively running. */ + trait StreamMustBeRunning + + /** + * Adds the given data to the stream. Subsuquent check answers will block until this data has + * been processed. + */ + object AddData { + def apply[A](source: MemoryStream[A], data: A*): AddDataMemory[A] = + AddDataMemory(source, data) + } + + /** A trait that can be extended when testing other sources. */ + trait AddData extends StreamAction { + def source: Source + + /** + * Called to trigger adding the data. Should return the offset that will denote when this + * new data has been processed. + */ + def addData(): Offset + } + + case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData { + override def toString: String = s"AddData to $source: ${data.mkString(",")}" + + override def addData(): Offset = { + source.addData(data) + } + } + + /** + * Checks to make sure that the current data stored in the sink matches the `expectedAnswer`. + * This operation automatically blocks untill all added data has been processed. + */ + object CheckAnswer { + def apply[A : Encoder](data: A*): CheckAnswerRows = { + val encoder = encoderFor[A] + val toExternalRow = RowEncoder(encoder.schema) + CheckAnswerRows(data.map(d => toExternalRow.fromRow(encoder.toRow(d)))) + } + + def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows) + } + + case class CheckAnswerRows(expectedAnswer: Seq[Row]) + extends StreamAction with StreamMustBeRunning { + override def toString: String = s"CheckAnswer: ${expectedAnswer.mkString(",")}" + } + + case class DropBatches(num: Int) extends StreamAction + + /** Stops the stream. It must currently be running. */ + case object StopStream extends StreamAction + + /** Starts the stream, resuming if data has already been processed. It must not be running. */ + case object StartStream extends StreamAction + + /** Signals that a failure is expected and should not kill the test. */ + case object ExpectFailure extends StreamAction + + /** A helper for running actions on a Streaming Dataset. See `checkAnswer(DataFrame)`. */ + def testStream(stream: Dataset[_])(actions: StreamAction*): Unit = + testStream(stream.toDF())(actions: _*) + + /** + * Executes the specified actions on the the given streaming DataFrame and provides helpful + * error messages in the case of failures or incorrect answers. + * + * Note that if the stream is not explictly started before an action that requires it to be + * running then it will be automatically started before performing any other actions. + */ + def testStream(stream: DataFrame)(actions: StreamAction*): Unit = { + var pos = 0 + var currentPlan: LogicalPlan = stream.logicalPlan + var currentStream: StreamExecution = null + val awaiting = new mutable.HashMap[Source, Offset]() + val sink = new MemorySink(stream.schema) + + @volatile + var streamDeathCause: Throwable = null + + // If the test doesn't manually start the stream, we do it automatically at the beginning. + val startedManually = + actions.takeWhile(_.isInstanceOf[StreamMustBeRunning]).contains(StartStream) + val startedTest = if (startedManually) actions else StartStream +: actions + + def testActions = actions.zipWithIndex.map { + case (a, i) => + if ((pos == i && startedManually) || (pos == (i + 1) && !startedManually)) { + "=> " + a.toString + } else { + " " + a.toString + } + }.mkString("\n") + + def currentOffsets = + if (currentStream != null) currentStream.currentOffsets.toString else "not started" + + def threadState = + if (currentStream != null && currentStream.microBatchThread.isAlive) "alive" else "dead" + def testState = + s""" + |== Progress == + |$testActions + | + |== Stream == + |Stream state: $currentOffsets + |Thread state: $threadState + |${if (streamDeathCause != null) stackTraceToString(streamDeathCause) else ""} + | + |== Sink == + |$sink + | + |== Plan == + |${if (currentStream != null) currentStream.lastExecution else ""} + """ + + def checkState(check: Boolean, error: String) = if (!check) { + fail( + s""" + |Invalid State: $error + |$testState + """.stripMargin) + } + + val testThread = Thread.currentThread() + + try { + startedTest.foreach { action => + action match { + case StartStream => + checkState(currentStream == null, "stream already running") + + currentStream = new StreamExecution(sqlContext, stream.logicalPlan, sink) + currentStream.microBatchThread.setUncaughtExceptionHandler( --- End diff -- The exception between `microBatchThread.start()` and `microBatchThread.setUncaughtExceptionHandler` will be missing. How about adding an `UncaughtExceptionHandler ` parameter to `StreamExecution` and passing it into `StreamExecution`? So that we can set it before `microBatchThread.start()`.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org