This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4704af4 [SPARK-27449] Move WholeStageCodegen.limitNotReachedCond class checks into separate methods. 4704af4 is described below commit 4704af4c2639017278f2816b6db2fc38a9676b6b Author: herman <her...@databricks.com> AuthorDate: Sun Apr 14 15:54:20 2019 +0800 [SPARK-27449] Move WholeStageCodegen.limitNotReachedCond class checks into separate methods. ## What changes were proposed in this pull request? This PR moves the checks done in `WholeStageCodegen.limitNotReachedCond` into a separate protected method. This makes it easier to introduce new leaf or blocking nodes. ## How was this patch tested? Existing tests. Closes #24358 from hvanhovell/SPARK-27449. Authored-by: herman <her...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/execution/WholeStageCodegenExec.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 5e28cfb..027885b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -382,13 +382,16 @@ trait CodegenSupport extends SparkPlan { def limitNotReachedChecks: Seq[String] = parent.limitNotReachedChecks /** + * Check if the node is supposed to produce limit not reached checks. + */ + protected def canCheckLimitNotReached: Boolean = children.isEmpty + + /** * A helper method to generate the data producing loop condition according to the * limit-not-reached checks. */ final def limitNotReachedCond: String = { - // InputAdapter is also a leaf node. - val isLeafNode = children.isEmpty || this.isInstanceOf[InputAdapter] - if (!isLeafNode && !this.isInstanceOf[BlockingOperatorWithCodegen]) { + if (!canCheckLimitNotReached) { val errMsg = "Only leaf nodes and blocking nodes need to call 'limitNotReachedCond' " + "in its data producing loop." if (Utils.isTesting) { @@ -426,6 +429,9 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { // that upstream operators will not generate useless conditions (which are always evaluated to // false) for the Limit operators after this blocking operator. override def limitNotReachedChecks: Seq[String] = Nil + + // This is a blocking node so the node can produce these checks + override protected def canCheckLimitNotReached: Boolean = true } /** @@ -500,6 +506,9 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod override def inputRDD: RDD[InternalRow] = child.execute() + // This is a leaf node so the node can produce limit not reached checks. + override protected def canCheckLimitNotReached: Boolean = true + // InputAdapter does not need UnsafeProjection. protected val createUnsafeProjection: Boolean = false --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org