Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21222#discussion_r207881161 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -513,6 +515,125 @@ class StreamSuite extends StreamTest { } } + test("explain-continuous") { + val inputData = ContinuousMemoryStream[Int] + val df = inputData.toDS().map(_ * 2).filter(_ > 5) + + // Test `df.explain` + val explain = ExplainCommand(df.queryExecution.logical, extended = false) + val explainString = + spark.sessionState + .executePlan(explain) + .executedPlan + .executeCollect() + .map(_.getString(0)) + .mkString("\n") + assert(explainString.contains("Filter")) + assert(explainString.contains("MapElements")) + assert(!explainString.contains("LocalTableScan")) + + // Test StreamingQuery.display + val q = df.writeStream.queryName("memory_continuous_explain") + .outputMode(OutputMode.Update()).format("memory") + .trigger(Trigger.Continuous("1 seconds")) + .start() + .asInstanceOf[StreamingQueryWrapper] + .streamingQuery + try { + // in continuous mode, the query will be run even there's no data + // sleep a bit to ensure initialization + eventually(timeout(2.seconds), interval(100.milliseconds)) { + assert(q.lastExecution != null) + } + + val explainWithoutExtended = q.explainInternal(false) + + // `extended = false` only displays the physical plan. + assert("Streaming RelationV2 ContinuousMemoryStream".r + .findAllMatchIn(explainWithoutExtended).size === 0) + assert("ScanV2 ContinuousMemoryStream".r + .findAllMatchIn(explainWithoutExtended).size === 1) + + val explainWithExtended = q.explainInternal(true) + // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical + // plan. + assert("Streaming RelationV2 ContinuousMemoryStream".r + .findAllMatchIn(explainWithExtended).size === 3) + assert("ScanV2 ContinuousMemoryStream".r + .findAllMatchIn(explainWithExtended).size === 1) + } finally { + q.stop() + } + } + + test("codegen-microbatch") { + import org.apache.spark.sql.execution.debug._ + + val inputData = MemoryStream[Int] + val df = inputData.toDS().map(_ * 2).filter(_ > 5) + + // Test StreamingQuery.codegen + val q = df.writeStream.queryName("memory_microbatch_codegen") + .outputMode(OutputMode.Update) + .format("memory") + .trigger(Trigger.ProcessingTime("1 seconds")) + .start() + + try { + assert("No physical plan. Waiting for data." === codegenString(q)) + assert(codegenStringSeq(q).isEmpty) + + inputData.addData(1, 2, 3, 4, 5) + q.processAllAvailable() + + val codegenStr = codegenString(q) --- End diff -- Nice finding. Will address.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org