Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21222#discussion_r205917365
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---
    @@ -88,23 +100,70 @@ package object debug {
         }
       }
     
    +  /**
    +   * Get WholeStageCodegenExec subtrees and the codegen in a query plan 
into one String
    +   *
    +   * @param query the streaming query for codegen
    +   * @return single String containing all WholeStageCodegen subtrees and 
corresponding codegen
    +   */
    +  def codegenString(query: StreamingQuery): String = {
    +    val msg = unwrapStreamingQueryWrapper(query) match {
    +      case w: StreamExecution =>
    +        if (w.lastExecution != null) {
    +          codegenString(w.lastExecution.executedPlan)
    +        } else {
    +          "No physical plan. Waiting for data."
    +        }
    +
    +      case _ => "Only supported for StreamExecution."
    +    }
    +    msg
    +  }
    +
    +  /**
    +   * Get WholeStageCodegenExec subtrees and the codegen in a query plan
    +   *
    +   * @param query the streaming query for codegen
    +   * @return Sequence of WholeStageCodegen subtrees and corresponding 
codegen
    +   */
    +  def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = {
    +    val planAndCodes = unwrapStreamingQueryWrapper(query) match {
    +      case w: StreamExecution if w.lastExecution != null =>
    +        codegenStringSeq(w.lastExecution.executedPlan)
    +
    +      case _ => Seq.empty
    +    }
    +    planAndCodes
    +  }
    +
    +  /* Helper function to reuse duplicated code block between batch and 
streaming. */
    +  private def debugInternal(plan: SparkPlan): Unit = {
    +    val visited = new collection.mutable.HashSet[TreeNodeRef]()
    +    val debugPlan = plan transform {
    +      case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) =>
    +        visited += new TreeNodeRef(s)
    +        DebugExec(s)
    +    }
    +    debugPrint(s"Results returned: ${debugPlan.execute().count()}")
    +    debugPlan.foreach {
    +      case d: DebugExec => d.dumpStats()
    +      case _ =>
    +    }
    +  }
    +
    +  private def unwrapStreamingQueryWrapper(query: StreamingQuery): 
StreamingQuery = {
    +    query match {
    +      case wrapper: StreamingQueryWrapper => wrapper.streamingQuery
    +      case _ => query
    --- End diff --
    
    I prefer to throw an exception here and make this method return 
`StreamExecution`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to