Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21222#discussion_r205961813
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala ---
    @@ -88,23 +100,70 @@ package object debug {
         }
       }
     
    +  /**
    +   * Get WholeStageCodegenExec subtrees and the codegen in a query plan 
into one String
    +   *
    +   * @param query the streaming query for codegen
    +   * @return single String containing all WholeStageCodegen subtrees and 
corresponding codegen
    +   */
    +  def codegenString(query: StreamingQuery): String = {
    +    val msg = unwrapStreamingQueryWrapper(query) match {
    +      case w: StreamExecution =>
    +        if (w.lastExecution != null) {
    +          codegenString(w.lastExecution.executedPlan)
    +        } else {
    +          "No physical plan. Waiting for data."
    +        }
    +
    +      case _ => "Only supported for StreamExecution."
    +    }
    +    msg
    +  }
    +
    +  /**
    +   * Get WholeStageCodegenExec subtrees and the codegen in a query plan
    +   *
    +   * @param query the streaming query for codegen
    +   * @return Sequence of WholeStageCodegen subtrees and corresponding 
codegen
    +   */
    +  def codegenStringSeq(query: StreamingQuery): Seq[(String, String)] = {
    +    val planAndCodes = unwrapStreamingQueryWrapper(query) match {
    +      case w: StreamExecution if w.lastExecution != null =>
    +        codegenStringSeq(w.lastExecution.executedPlan)
    +
    +      case _ => Seq.empty
    +    }
    +    planAndCodes
    +  }
    +
    +  /* Helper function to reuse duplicated code block between batch and 
streaming. */
    +  private def debugInternal(plan: SparkPlan): Unit = {
    +    val visited = new collection.mutable.HashSet[TreeNodeRef]()
    +    val debugPlan = plan transform {
    +      case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) =>
    +        visited += new TreeNodeRef(s)
    +        DebugExec(s)
    +    }
    +    debugPrint(s"Results returned: ${debugPlan.execute().count()}")
    +    debugPlan.foreach {
    +      case d: DebugExec => d.dumpStats()
    +      case _ =>
    +    }
    +  }
    +
    +  private def unwrapStreamingQueryWrapper(query: StreamingQuery): 
StreamingQuery = {
    +    query match {
    +      case wrapper: StreamingQueryWrapper => wrapper.streamingQuery
    +      case _ => query
    --- End diff --
    
    OK. I can apply your suggestion but not 100% sure about intention: 
propagate exception to the caller side (user code), or catch exception inside 
debug package and handle it. 
    I'll apply latter case but please let me know when you intended former case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to