Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21222#discussion_r207881161
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -513,6 +515,125 @@ class StreamSuite extends StreamTest {
         }
       }
     
    +  test("explain-continuous") {
    +    val inputData = ContinuousMemoryStream[Int]
    +    val df = inputData.toDS().map(_ * 2).filter(_ > 5)
    +
    +    // Test `df.explain`
    +    val explain = ExplainCommand(df.queryExecution.logical, extended = 
false)
    +    val explainString =
    +      spark.sessionState
    +        .executePlan(explain)
    +        .executedPlan
    +        .executeCollect()
    +        .map(_.getString(0))
    +        .mkString("\n")
    +    assert(explainString.contains("Filter"))
    +    assert(explainString.contains("MapElements"))
    +    assert(!explainString.contains("LocalTableScan"))
    +
    +    // Test StreamingQuery.display
    +    val q = df.writeStream.queryName("memory_continuous_explain")
    +      .outputMode(OutputMode.Update()).format("memory")
    +      .trigger(Trigger.Continuous("1 seconds"))
    +      .start()
    +      .asInstanceOf[StreamingQueryWrapper]
    +      .streamingQuery
    +    try {
    +      // in continuous mode, the query will be run even there's no data
    +      // sleep a bit to ensure initialization
    +      eventually(timeout(2.seconds), interval(100.milliseconds)) {
    +        assert(q.lastExecution != null)
    +      }
    +
    +      val explainWithoutExtended = q.explainInternal(false)
    +
    +      // `extended = false` only displays the physical plan.
    +      assert("Streaming RelationV2 ContinuousMemoryStream".r
    +        .findAllMatchIn(explainWithoutExtended).size === 0)
    +      assert("ScanV2 ContinuousMemoryStream".r
    +        .findAllMatchIn(explainWithoutExtended).size === 1)
    +
    +      val explainWithExtended = q.explainInternal(true)
    +      // `extended = true` displays 3 logical plans 
(Parsed/Optimized/Optimized) and 1 physical
    +      // plan.
    +      assert("Streaming RelationV2 ContinuousMemoryStream".r
    +        .findAllMatchIn(explainWithExtended).size === 3)
    +      assert("ScanV2 ContinuousMemoryStream".r
    +        .findAllMatchIn(explainWithExtended).size === 1)
    +    } finally {
    +      q.stop()
    +    }
    +  }
    +
    +  test("codegen-microbatch") {
    +    import org.apache.spark.sql.execution.debug._
    +
    +    val inputData = MemoryStream[Int]
    +    val df = inputData.toDS().map(_ * 2).filter(_ > 5)
    +
    +    // Test StreamingQuery.codegen
    +    val q = df.writeStream.queryName("memory_microbatch_codegen")
    +      .outputMode(OutputMode.Update)
    +      .format("memory")
    +      .trigger(Trigger.ProcessingTime("1 seconds"))
    +      .start()
    +
    +    try {
    +      assert("No physical plan. Waiting for data." === codegenString(q))
    +      assert(codegenStringSeq(q).isEmpty)
    +
    +      inputData.addData(1, 2, 3, 4, 5)
    +      q.processAllAvailable()
    +
    +      val codegenStr = codegenString(q)
    --- End diff --
    
    Nice finding. Will address.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to