Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/21222#discussion_r205917365 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala --- @@ -88,23 +100,70 @@ package object debug { } } + /** + * Get WholeStageCodegenExec subtrees and the codegen in a query plan into one String + * + * @param query the streaming query for codegen + * @return single String containing all WholeStageCodegen subtrees and corresponding codegen + */ + def codegenString(query: StreamingQuery): String = { + val msg = unwrapStreamingQueryWrapper(query) match { + case w: StreamExecution => + if (w.lastExecution != null) { + codegenString(w.lastExecution.executedPlan) + } else { + "No physical plan. Waiting for data." + } + + case _ => "Only supported for StreamExecution." + } + msg + } + + /** + * Get WholeStageCodegenExec subtrees and the codegen in a query plan + * + * @param query the streaming query for codegen + * @return Sequence of WholeStageCodegen subtrees and corresponding codegen + */ + def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = { + val planAndCodes = unwrapStreamingQueryWrapper(query) match { + case w: StreamExecution if w.lastExecution != null => + codegenStringSeq(w.lastExecution.executedPlan) + + case _ => Seq.empty + } + planAndCodes + } + + /* Helper function to reuse duplicated code block between batch and streaming. */ + private def debugInternal(plan: SparkPlan): Unit = { + val visited = new collection.mutable.HashSet[TreeNodeRef]() + val debugPlan = plan transform { + case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) => + visited += new TreeNodeRef(s) + DebugExec(s) + } + debugPrint(s"Results returned: ${debugPlan.execute().count()}") + debugPlan.foreach { + case d: DebugExec => d.dumpStats() + case _ => + } + } + + private def unwrapStreamingQueryWrapper(query: StreamingQuery): StreamingQuery = { + query match { + case wrapper: StreamingQueryWrapper => wrapper.streamingQuery + case _ => query --- End diff -- I prefer to throw an exception here and make this method return `StreamExecution`.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org