This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4704af4  [SPARK-27449] Move WholeStageCodegen.limitNotReachedCond 
class checks into separate methods.
4704af4 is described below

commit 4704af4c2639017278f2816b6db2fc38a9676b6b
Author: herman <her...@databricks.com>
AuthorDate: Sun Apr 14 15:54:20 2019 +0800

    [SPARK-27449] Move WholeStageCodegen.limitNotReachedCond class checks into 
separate methods.
    
    ## What changes were proposed in this pull request?
    This PR moves the checks done in `WholeStageCodegen.limitNotReachedCond` 
into a separate protected method. This makes it easier to introduce new leaf or 
blocking nodes.
    
    ## How was this patch tested?
    Existing tests.
    
    Closes #24358 from hvanhovell/SPARK-27449.
    
    Authored-by: herman <her...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/execution/WholeStageCodegenExec.scala       | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index 5e28cfb..027885b1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -382,13 +382,16 @@ trait CodegenSupport extends SparkPlan {
   def limitNotReachedChecks: Seq[String] = parent.limitNotReachedChecks
 
   /**
+   * Check if the node is supposed to produce limit not reached checks.
+   */
+  protected def canCheckLimitNotReached: Boolean = children.isEmpty
+
+  /**
    * A helper method to generate the data producing loop condition according 
to the
    * limit-not-reached checks.
    */
   final def limitNotReachedCond: String = {
-    // InputAdapter is also a leaf node.
-    val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter]
-    if (!isLeafNode && !this.isInstanceOf[BlockingOperatorWithCodegen]) {
+    if (!canCheckLimitNotReached) {
       val errMsg = "Only leaf nodes and blocking nodes need to call 
'limitNotReachedCond' " +
         "in its data producing loop."
       if (Utils.isTesting) {
@@ -426,6 +429,9 @@ trait BlockingOperatorWithCodegen extends CodegenSupport {
   // that upstream operators will not generate useless conditions (which are 
always evaluated to
   // false) for the Limit operators after this blocking operator.
   override def limitNotReachedChecks: Seq[String] = Nil
+
+  // This is a blocking node so the node can produce these checks
+  override protected def canCheckLimitNotReached: Boolean = true
 }
 
 /**
@@ -500,6 +506,9 @@ case class InputAdapter(child: SparkPlan) extends 
UnaryExecNode with InputRDDCod
 
   override def inputRDD: RDD[InternalRow] = child.execute()
 
+  // This is a leaf node so the node can produce limit not reached checks.
+  override protected def canCheckLimitNotReached: Boolean = true
+
   // InputAdapter does not need UnsafeProjection.
   protected val createUnsafeProjection: Boolean = false
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to