Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11006#discussion_r51501233
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql
    +
    +import java.lang.Thread.UncaughtExceptionHandler
    +
    +import scala.collection.mutable
    +import scala.collection.mutable.ArrayBuffer
    +import scala.util.Random
    +
    +import org.scalatest.concurrent.Timeouts
    +import org.scalatest.time.SpanSugar._
    +
    +import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder, RowEncoder}
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.catalyst.util._
    +import org.apache.spark.sql.execution.streaming._
    +
    +/**
    + * A framework for implementing tests for streaming queries and sources.
    + *
    + * A test consists of a set of steps (expressed as a `StreamAction`) that 
are executed in order,
    + * blocking as necessary to let the stream catch up.  For example, the 
following adds some data to
    + * a stream, blocking until it can verify that the correct values are 
eventually produced.
    + *
    + * {{{
    + *  val inputData = MemoryStream[Int]
    +    val mapped = inputData.toDS().map(_ + 1)
    +
    +    testStream(mapped)(
    +      AddData(inputData, 1, 2, 3),
    +      CheckAnswer(2, 3, 4))
    + * }}}
    + *
    + * Note that while we do sleep to allow the other thread to progress 
without spinning,
    + * `StreamAction` checks should not depend on the amount of time spent 
sleeping.  Instead they
    + * should check the actual progress of the stream before verifying the 
required test condition.
    + *
    + * Currently it is assumed that all streaming queries will eventually 
complete in 10 seconds to
    + * avoid hanging forever in the case of failures. However, individual 
suites can change this
    + * by overriding `streamingTimeout`.
    + */
    +trait StreamTest extends QueryTest with Timeouts {
    +
    +  implicit class RichSource(s: Source) {
    +    def toDF(): DataFrame = new DataFrame(sqlContext, StreamingRelation(s))
    +  }
    +
    +  /** How long to wait for an active stream to catch up when checking a 
result. */
    +  val streamingTimout = 10.seconds
    +
    +  /** A trait for actions that can be performed while testing a streaming 
DataFrame. */
    +  trait StreamAction
    +
    +  /** A trait to mark actions that require the stream to be actively 
running. */
    +  trait StreamMustBeRunning
    +
    +  /**
    +   * Adds the given data to the stream.  Subsuquent check answers will 
block until this data has
    +   * been processed.
    +   */
    +  object AddData {
    +    def apply[A](source: MemoryStream[A], data: A*): AddDataMemory[A] =
    +      AddDataMemory(source, data)
    +  }
    +
    +  /** A trait that can be extended when testing other sources. */
    +  trait AddData extends StreamAction {
    +    def source: Source
    +
    +    /**
    +     * Called to trigger adding the data.  Should return the offset that 
will denote when this
    +     * new data has been processed.
    +     */
    +    def addData(): Offset
    +  }
    +
    +  case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) 
extends AddData {
    +    override def toString: String = s"AddData to $source: 
${data.mkString(",")}"
    +
    +    override def addData(): Offset = {
    +      source.addData(data)
    +    }
    +  }
    +
    +  /**
    +   * Checks to make sure that the current data stored in the sink matches 
the `expectedAnswer`.
    +   * This operation automatically blocks untill all added data has been 
processed.
    +   */
    +  object CheckAnswer {
    +    def apply[A : Encoder](data: A*): CheckAnswerRows = {
    +      val encoder = encoderFor[A]
    +      val toExternalRow = RowEncoder(encoder.schema)
    +      CheckAnswerRows(data.map(d => 
toExternalRow.fromRow(encoder.toRow(d))))
    +    }
    +
    +    def apply(rows: Row*): CheckAnswerRows = CheckAnswerRows(rows)
    +  }
    +
    +  case class CheckAnswerRows(expectedAnswer: Seq[Row])
    +      extends StreamAction with StreamMustBeRunning {
    +    override def toString: String = s"CheckAnswer: 
${expectedAnswer.mkString(",")}"
    +  }
    +
    +  case class DropBatches(num: Int) extends StreamAction
    +
    +  /** Stops the stream.  It must currently be running. */
    +  case object StopStream extends StreamAction
    +
    +  /** Starts the stream, resuming if data has already been processed.  It 
must not be running. */
    +  case object StartStream extends StreamAction
    +
    +  /** Signals that a failure is expected and should not kill the test. */
    +  case object ExpectFailure extends StreamAction
    +
    +  /** A helper for running actions on a Streaming Dataset. See 
`checkAnswer(DataFrame)`. */
    +  def testStream(stream: Dataset[_])(actions: StreamAction*): Unit =
    +    testStream(stream.toDF())(actions: _*)
    +
    +  /**
    +   * Executes the specified actions on the the given streaming DataFrame 
and provides helpful
    +   * error messages in the case of failures or incorrect answers.
    +   *
    +   * Note that if the stream is not explictly started before an action 
that requires it to be
    +   * running then it will be automatically started before performing any 
other actions.
    +   */
    +  def testStream(stream: DataFrame)(actions: StreamAction*): Unit = {
    +    var pos = 0
    +    var currentPlan: LogicalPlan = stream.logicalPlan
    +    var currentStream: StreamExecution = null
    +    val awaiting = new mutable.HashMap[Source, Offset]()
    +    val sink = new MemorySink(stream.schema)
    +
    +    @volatile
    +    var streamDeathCause: Throwable = null
    +
    +    // If the test doesn't manually start the stream, we do it 
automatically at the beginning.
    +    val startedManually =
    +      
actions.takeWhile(_.isInstanceOf[StreamMustBeRunning]).contains(StartStream)
    +    val startedTest = if (startedManually) actions else StartStream +: 
actions
    +
    +    def testActions = actions.zipWithIndex.map {
    +      case (a, i) =>
    +        if ((pos == i && startedManually) || (pos == (i + 1) && 
!startedManually)) {
    +          "=> " + a.toString
    +        } else {
    +          "   " + a.toString
    +        }
    +    }.mkString("\n")
    +
    +    def currentOffsets =
    +      if (currentStream != null) currentStream.currentOffsets.toString 
else "not started"
    +
    +    def threadState =
    +      if (currentStream != null && currentStream.microBatchThread.isAlive) 
"alive" else "dead"
    +    def testState =
    +      s"""
    +         |== Progress ==
    +         |$testActions
    +         |
    +         |== Stream ==
    +         |Stream state: $currentOffsets
    +         |Thread state: $threadState
    +         |${if (streamDeathCause != null) 
stackTraceToString(streamDeathCause) else ""}
    +         |
    +         |== Sink ==
    +         |$sink
    +         |
    +         |== Plan ==
    +         |${if (currentStream != null) currentStream.lastExecution else ""}
    +         """
    +
    +    def checkState(check: Boolean, error: String) = if (!check) {
    +      fail(
    +        s"""
    +           |Invalid State: $error
    +           |$testState
    +         """.stripMargin)
    +    }
    +
    +    val testThread = Thread.currentThread()
    +
    +    try {
    +      startedTest.foreach { action =>
    +        action match {
    +          case StartStream =>
    +            checkState(currentStream == null, "stream already running")
    +
    +            currentStream = new StreamExecution(sqlContext, 
stream.logicalPlan, sink)
    +            currentStream.microBatchThread.setUncaughtExceptionHandler(
    --- End diff --
    
    I think you are right, but I would probably defer this to the management 
API that TD is working on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to