Revert "[SPARK-22600][SQL] Fix 64kb limit for deeply nested expressions under wholestage codegen"
This reverts commit c7d0148615c921dca782ee3785b5d0cd59e42262. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a29a60d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a29a60d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a29a60d Branch: refs/heads/master Commit: 2a29a60da32a5ccd04cc7c5c22c4075b159389e3 Parents: bc7e4a9 Author: Wenchen Fan <wenc...@databricks.com> Authored: Thu Dec 14 11:22:23 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Thu Dec 14 11:22:23 2017 +0800 ---------------------------------------------------------------------- .../sql/catalyst/expressions/Expression.scala | 37 +-- .../expressions/codegen/CodeGenerator.scala | 44 +-- .../expressions/codegen/ExpressionCodegen.scala | 269 ------------------- .../codegen/ExpressionCodegenSuite.scala | 220 --------------- .../spark/sql/execution/ColumnarBatchScan.scala | 5 +- .../sql/execution/WholeStageCodegenSuite.scala | 23 +- 6 files changed, 13 insertions(+), 585 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2a29a60d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 329ea5d..743782a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -105,12 +105,6 @@ abstract class Expression extends TreeNode[Expression] { val isNull = ctx.freshName("isNull") val value = ctx.freshName("value") val eval = doGenCode(ctx, ExprCode("", isNull, value)) - eval.isNull = if (this.nullable) eval.isNull else "false" - - // Records current input row and variables of this expression. - eval.inputRow = ctx.INPUT_ROW - eval.inputVars = findInputVars(ctx, eval) - reduceCodeSize(ctx, eval) if (eval.code.nonEmpty) { // Add `this` in the comment. @@ -121,29 +115,9 @@ abstract class Expression extends TreeNode[Expression] { } } - /** - * Returns the input variables to this expression. - */ - private def findInputVars(ctx: CodegenContext, eval: ExprCode): Seq[ExprInputVar] = { - if (ctx.currentVars != null) { - this.collect { - case b @ BoundReference(ordinal, _, _) if ctx.currentVars(ordinal) != null => - ExprInputVar(exprCode = ctx.currentVars(ordinal), - dataType = b.dataType, nullable = b.nullable) - } - } else { - Seq.empty - } - } - - /** - * In order to prevent 64kb compile error, reducing the size of generated codes by - * separating it into a function if the size exceeds a threshold. - */ private def reduceCodeSize(ctx: CodegenContext, eval: ExprCode): Unit = { - lazy val funcParams = ExpressionCodegen.getExpressionInputParams(ctx, this) - - if (eval.code.trim.length > 1024 && funcParams.isDefined) { + // TODO: support whole stage codegen too + if (eval.code.trim.length > 1024 && ctx.INPUT_ROW != null && ctx.currentVars == null) { val setIsNull = if (eval.isNull != "false" && eval.isNull != "true") { val globalIsNull = ctx.freshName("globalIsNull") ctx.addMutableState(ctx.JAVA_BOOLEAN, globalIsNull) @@ -158,12 +132,9 @@ abstract class Expression extends TreeNode[Expression] { val newValue = ctx.freshName("value") val funcName = ctx.freshName(nodeName) - val callParams = funcParams.map(_._1.mkString(", ")).get - val declParams = funcParams.map(_._2.mkString(", ")).get - val funcFullName = ctx.addNewFunction(funcName, s""" - |private $javaType $funcName($declParams) { + |private $javaType $funcName(InternalRow ${ctx.INPUT_ROW}) { | ${eval.code.trim} | $setIsNull | return ${eval.value}; @@ -171,7 +142,7 @@ abstract class Expression extends TreeNode[Expression] { """.stripMargin) eval.value = newValue - eval.code = s"$javaType $newValue = $funcFullName($callParams);" + eval.code = s"$javaType $newValue = $funcFullName(${ctx.INPUT_ROW});" } } http://git-wip-us.apache.org/repos/asf/spark/blob/2a29a60d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index b1d9311..3a03a65 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -55,24 +55,8 @@ import org.apache.spark.util.{ParentClassLoader, Utils} * to null. * @param value A term for a (possibly primitive) value of the result of the evaluation. Not * valid if `isNull` is set to `true`. - * @param inputRow A term that holds the input row name when generating this code. - * @param inputVars A list of [[ExprInputVar]] that holds input variables when generating this code. */ -case class ExprCode( - var code: String, - var isNull: String, - var value: String, - var inputRow: String = null, - var inputVars: Seq[ExprInputVar] = Seq.empty) - -/** - * Represents an input variable [[ExprCode]] to an evaluation of an [[Expression]]. - * - * @param exprCode The [[ExprCode]] that represents the evaluation result for the input variable. - * @param dataType The data type of the input variable. - * @param nullable Whether the input variable can be null or not. - */ -case class ExprInputVar(exprCode: ExprCode, dataType: DataType, nullable: Boolean) +case class ExprCode(var code: String, var isNull: String, var value: String) /** * State used for subexpression elimination. @@ -1028,25 +1012,16 @@ class CodegenContext { commonExprs.foreach { e => val expr = e.head val fnName = freshName("evalExpr") - val isNull = if (expr.nullable) { - s"${fnName}IsNull" - } else { - "" - } + val isNull = s"${fnName}IsNull" val value = s"${fnName}Value" // Generate the code for this expression tree and wrap it in a function. val eval = expr.genCode(this) - val assignIsNull = if (expr.nullable) { - s"$isNull = ${eval.isNull};" - } else { - "" - } val fn = s""" |private void $fnName(InternalRow $INPUT_ROW) { | ${eval.code.trim} - | $assignIsNull + | $isNull = ${eval.isNull}; | $value = ${eval.value}; |} """.stripMargin @@ -1064,17 +1039,12 @@ class CodegenContext { // 2. Less code. // Currently, we will do this for all non-leaf only expression trees (i.e. expr trees with // at least two nodes) as the cost of doing it is expected to be low. - if (expr.nullable) { - addMutableState(JAVA_BOOLEAN, isNull) - } - addMutableState(javaType(expr.dataType), value) + addMutableState(JAVA_BOOLEAN, isNull, s"$isNull = false;") + addMutableState(javaType(expr.dataType), value, + s"$value = ${defaultValue(expr.dataType)};") subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);" - val state = if (expr.nullable) { - SubExprEliminationState(isNull, value) - } else { - SubExprEliminationState("false", value) - } + val state = SubExprEliminationState(isNull, value) e.foreach(subExprEliminationExprs.put(_, state)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2a29a60d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala deleted file mode 100644 index a2dda48..0000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegen.scala +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.expressions.codegen - -import scala.collection.mutable - -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.DataType - -/** - * Defines util methods used in expression code generation. - */ -object ExpressionCodegen { - - /** - * Given an expression, returns the all necessary parameters to evaluate it, so the generated - * code of this expression can be split in a function. - * The 1st string in returned tuple is the parameter strings used to call the function. - * The 2nd string in returned tuple is the parameter strings used to declare the function. - * - * Returns `None` if it can't produce valid parameters. - * - * Params to include: - * 1. Evaluated columns referred by this, children or deferred expressions. - * 2. Rows referred by this, children or deferred expressions. - * 3. Eliminated subexpressions referred by children expressions. - */ - def getExpressionInputParams( - ctx: CodegenContext, - expr: Expression): Option[(Seq[String], Seq[String])] = { - val subExprs = getSubExprInChildren(ctx, expr) - val subExprCodes = getSubExprCodes(ctx, subExprs) - val subVars = subExprs.zip(subExprCodes).map { case (subExpr, subExprCode) => - ExprInputVar(subExprCode, subExpr.dataType, subExpr.nullable) - } - val paramsFromSubExprs = prepareFunctionParams(ctx, subVars) - - val inputVars = getInputVarsForChildren(ctx, expr) - val paramsFromColumns = prepareFunctionParams(ctx, inputVars) - - val inputRows = ctx.INPUT_ROW +: getInputRowsForChildren(ctx, expr) - val paramsFromRows = inputRows.distinct.filter(_ != null).map { row => - (row, s"InternalRow $row") - } - - val paramsLength = getParamLength(ctx, inputVars ++ subVars) + paramsFromRows.length - // Maximum allowed parameter number for Java's method descriptor. - if (paramsLength > 255) { - None - } else { - val allParams = (paramsFromRows ++ paramsFromColumns ++ paramsFromSubExprs).unzip - val callParams = allParams._1.distinct - val declParams = allParams._2.distinct - Some((callParams, declParams)) - } - } - - /** - * Returns the eliminated subexpressions in the children expressions. - */ - def getSubExprInChildren(ctx: CodegenContext, expr: Expression): Seq[Expression] = { - expr.children.flatMap { child => - child.collect { - case e if ctx.subExprEliminationExprs.contains(e) => e - } - }.distinct - } - - /** - * A small helper function to return `ExprCode`s that represent subexpressions. - */ - def getSubExprCodes(ctx: CodegenContext, subExprs: Seq[Expression]): Seq[ExprCode] = { - subExprs.map { subExpr => - val state = ctx.subExprEliminationExprs(subExpr) - ExprCode(code = "", value = state.value, isNull = state.isNull) - } - } - - /** - * Retrieves previous input rows referred by children and deferred expressions. - */ - def getInputRowsForChildren(ctx: CodegenContext, expr: Expression): Seq[String] = { - expr.children.flatMap(getInputRows(ctx, _)).distinct - } - - /** - * Given a child expression, retrieves previous input rows referred by it or deferred expressions - * which are needed to evaluate it. - */ - def getInputRows(ctx: CodegenContext, child: Expression): Seq[String] = { - child.flatMap { - // An expression directly evaluates on current input row. - case BoundReference(ordinal, _, _) if ctx.currentVars == null || - ctx.currentVars(ordinal) == null => - Seq(ctx.INPUT_ROW) - - // An expression which is not evaluated yet. Tracks down to find input rows. - case BoundReference(ordinal, _, _) if !isEvaluated(ctx.currentVars(ordinal)) => - trackDownRow(ctx, ctx.currentVars(ordinal)) - - case _ => Seq.empty - }.distinct - } - - /** - * Tracks down input rows referred by the generated code snippet. - */ - def trackDownRow(ctx: CodegenContext, exprCode: ExprCode): Seq[String] = { - val exprCodes = mutable.Queue[ExprCode](exprCode) - val inputRows = mutable.ArrayBuffer.empty[String] - - while (exprCodes.nonEmpty) { - val curExprCode = exprCodes.dequeue() - if (curExprCode.inputRow != null) { - inputRows += curExprCode.inputRow - } - curExprCode.inputVars.foreach { inputVar => - if (!isEvaluated(inputVar.exprCode)) { - exprCodes.enqueue(inputVar.exprCode) - } - } - } - inputRows - } - - /** - * Retrieves previously evaluated columns referred by children and deferred expressions. - * Returned tuple contains the list of expressions and the list of generated codes. - */ - def getInputVarsForChildren( - ctx: CodegenContext, - expr: Expression): Seq[ExprInputVar] = { - expr.children.flatMap(getInputVars(ctx, _)).distinct - } - - /** - * Given a child expression, retrieves previously evaluated columns referred by it or - * deferred expressions which are needed to evaluate it. - */ - def getInputVars(ctx: CodegenContext, child: Expression): Seq[ExprInputVar] = { - if (ctx.currentVars == null) { - return Seq.empty - } - - child.flatMap { - // An evaluated variable. - case b @ BoundReference(ordinal, _, _) if ctx.currentVars(ordinal) != null && - isEvaluated(ctx.currentVars(ordinal)) => - Seq(ExprInputVar(ctx.currentVars(ordinal), b.dataType, b.nullable)) - - // An input variable which is not evaluated yet. Tracks down to find any evaluated variables - // in the expression path. - // E.g., if this expression is "d = c + 1" and "c" is not evaluated. We need to track to - // "c = a + b" and see if "a" and "b" are evaluated. If they are, we need to return them so - // to include them into parameters, if not, we track down further. - case BoundReference(ordinal, _, _) if ctx.currentVars(ordinal) != null => - trackDownVar(ctx, ctx.currentVars(ordinal)) - - case _ => Seq.empty - }.distinct - } - - /** - * Tracks down previously evaluated columns referred by the generated code snippet. - */ - def trackDownVar(ctx: CodegenContext, exprCode: ExprCode): Seq[ExprInputVar] = { - val exprCodes = mutable.Queue[ExprCode](exprCode) - val inputVars = mutable.ArrayBuffer.empty[ExprInputVar] - - while (exprCodes.nonEmpty) { - exprCodes.dequeue().inputVars.foreach { inputVar => - if (isEvaluated(inputVar.exprCode)) { - inputVars += inputVar - } else { - exprCodes.enqueue(inputVar.exprCode) - } - } - } - inputVars - } - - /** - * Helper function to calculate the size of an expression as function parameter. - */ - def calculateParamLength(ctx: CodegenContext, input: ExprInputVar): Int = { - (if (input.nullable) 1 else 0) + ctx.javaType(input.dataType) match { - case ctx.JAVA_LONG | ctx.JAVA_DOUBLE => 2 - case _ => 1 - } - } - - /** - * In Java, a method descriptor is valid only if it represents method parameters with a total - * length of 255 or less. `this` contributes one unit and a parameter of type long or double - * contributes two units. - */ - def getParamLength(ctx: CodegenContext, inputs: Seq[ExprInputVar]): Int = { - // Initial value is 1 for `this`. - 1 + inputs.map(calculateParamLength(ctx, _)).sum - } - - /** - * Given the lists of input attributes and variables to this expression, returns the strings of - * funtion parameters. The first is the variable names used to call the function, the second is - * the parameters used to declare the function in generated code. - */ - def prepareFunctionParams( - ctx: CodegenContext, - inputVars: Seq[ExprInputVar]): Seq[(String, String)] = { - inputVars.flatMap { inputVar => - val params = mutable.ArrayBuffer.empty[(String, String)] - val ev = inputVar.exprCode - - // Only include the expression value if it is not a literal. - if (!isLiteral(ev)) { - val argType = ctx.javaType(inputVar.dataType) - params += ((ev.value, s"$argType ${ev.value}")) - } - - // If it is a nullable expression and `isNull` is not a literal. - if (inputVar.nullable && ev.isNull != "true" && ev.isNull != "false") { - params += ((ev.isNull, s"boolean ${ev.isNull}")) - } - - params - }.distinct - } - - /** - * Only applied to the `ExprCode` in `ctx.currentVars`. - * Returns true if this value is a literal. - */ - def isLiteral(exprCode: ExprCode): Boolean = { - assert(exprCode.value.nonEmpty, "ExprCode.value can't be empty string.") - - if (exprCode.value == "true" || exprCode.value == "false" || exprCode.value == "null") { - true - } else { - // The valid characters for the first character of a Java variable is [a-zA-Z_$]. - exprCode.value.head match { - case v if v >= 'a' && v <= 'z' => false - case v if v >= 'A' && v <= 'Z' => false - case '_' | '$' => false - case _ => true - } - } - } - - /** - * Only applied to the `ExprCode` in `ctx.currentVars`. - * The code is emptied after evaluation. - */ - def isEvaluated(exprCode: ExprCode): Boolean = exprCode.code == "" -} http://git-wip-us.apache.org/repos/asf/spark/blob/2a29a60d/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegenSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegenSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegenSuite.scala deleted file mode 100644 index 39d58ca..0000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/ExpressionCodegenSuite.scala +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.expressions.codegen - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.IntegerType - -class ExpressionCodegenSuite extends SparkFunSuite { - - test("Returns eliminated subexpressions for expression") { - val ctx = new CodegenContext() - val subExpr = Add(Literal(1), Literal(2)) - val exprs = Seq(Add(subExpr, Literal(3)), Add(subExpr, Literal(4))) - - ctx.generateExpressions(exprs, doSubexpressionElimination = true) - val subexpressions = ExpressionCodegen.getSubExprInChildren(ctx, exprs(0)) - assert(subexpressions.length == 1 && subexpressions(0) == subExpr) - } - - test("Gets parameters for subexpressions") { - val ctx = new CodegenContext() - val subExprs = Seq( - Add(Literal(1), AttributeReference("a", IntegerType, nullable = false)()), // non-nullable - Add(Literal(2), AttributeReference("b", IntegerType, nullable = true)())) // nullable - - ctx.subExprEliminationExprs.put(subExprs(0), SubExprEliminationState("false", "value1")) - ctx.subExprEliminationExprs.put(subExprs(1), SubExprEliminationState("isNull2", "value2")) - - val subExprCodes = ExpressionCodegen.getSubExprCodes(ctx, subExprs) - val subVars = subExprs.zip(subExprCodes).map { case (expr, exprCode) => - ExprInputVar(exprCode, expr.dataType, expr.nullable) - } - val params = ExpressionCodegen.prepareFunctionParams(ctx, subVars) - assert(params.length == 3) - assert(params(0) == Tuple2("value1", "int value1")) - assert(params(1) == Tuple2("value2", "int value2")) - assert(params(2) == Tuple2("isNull2", "boolean isNull2")) - } - - test("Returns input variables for expression: current variables") { - val ctx = new CodegenContext() - val currentVars = Seq( - ExprCode("", isNull = "false", value = "value1"), // evaluated - ExprCode("", isNull = "isNull2", value = "value2"), // evaluated - ExprCode("fake code;", isNull = "isNull3", value = "value3")) // not evaluated - ctx.currentVars = currentVars - ctx.INPUT_ROW = null - - val expr = If(Literal(false), - Add(BoundReference(0, IntegerType, nullable = false), - BoundReference(1, IntegerType, nullable = true)), - BoundReference(2, IntegerType, nullable = true)) - - val inputVars = ExpressionCodegen.getInputVarsForChildren(ctx, expr) - // Only two evaluated variables included. - assert(inputVars.length == 2) - assert(inputVars(0).dataType == IntegerType && inputVars(0).nullable == false) - assert(inputVars(1).dataType == IntegerType && inputVars(1).nullable == true) - assert(inputVars(0).exprCode == currentVars(0)) - assert(inputVars(1).exprCode == currentVars(1)) - - val params = ExpressionCodegen.prepareFunctionParams(ctx, inputVars) - assert(params.length == 3) - assert(params(0) == Tuple2("value1", "int value1")) - assert(params(1) == Tuple2("value2", "int value2")) - assert(params(2) == Tuple2("isNull2", "boolean isNull2")) - } - - test("Returns input variables for expression: deferred variables") { - val ctx = new CodegenContext() - - // The referred column is not evaluated yet. But it depends on an evaluated column from - // other operator. - val currentVars = Seq(ExprCode("fake code;", isNull = "isNull1", value = "value1")) - - // currentVars(0) depends on this evaluated column. - currentVars(0).inputVars = Seq(ExprInputVar(ExprCode("", isNull = "isNull2", value = "value2"), - dataType = IntegerType, nullable = true)) - ctx.currentVars = currentVars - ctx.INPUT_ROW = null - - val expr = Add(Literal(1), BoundReference(0, IntegerType, nullable = false)) - val inputVars = ExpressionCodegen.getInputVarsForChildren(ctx, expr) - assert(inputVars.length == 1) - assert(inputVars(0).dataType == IntegerType && inputVars(0).nullable == true) - - val params = ExpressionCodegen.prepareFunctionParams(ctx, inputVars) - assert(params.length == 2) - assert(params(0) == Tuple2("value2", "int value2")) - assert(params(1) == Tuple2("isNull2", "boolean isNull2")) - } - - test("Returns input rows for expression") { - val ctx = new CodegenContext() - ctx.currentVars = null - ctx.INPUT_ROW = "i" - - val expr = Add(BoundReference(0, IntegerType, nullable = false), - BoundReference(1, IntegerType, nullable = true)) - val inputRows = ExpressionCodegen.getInputRowsForChildren(ctx, expr) - assert(inputRows.length == 1) - assert(inputRows(0) == "i") - } - - test("Returns input rows for expression: deferred expression") { - val ctx = new CodegenContext() - - // The referred column is not evaluated yet. But it depends on an input row from - // other operator. - val currentVars = Seq(ExprCode("fake code;", isNull = "isNull1", value = "value1")) - currentVars(0).inputRow = "inputadaptor_row1" - ctx.currentVars = currentVars - ctx.INPUT_ROW = null - - val expr = Add(Literal(1), BoundReference(0, IntegerType, nullable = false)) - val inputRows = ExpressionCodegen.getInputRowsForChildren(ctx, expr) - assert(inputRows.length == 1) - assert(inputRows(0) == "inputadaptor_row1") - } - - test("Returns both input rows and variables for expression") { - val ctx = new CodegenContext() - // 5 input variables in currentVars: - // 1 evaluated variable (value1). - // 3 not evaluated variables. - // value2 depends on an evaluated column from other operator. - // value3 depends on an input row from other operator. - // value4 depends on a not evaluated yet column from other operator. - // 1 null indicating to use input row "i". - val currentVars = Seq( - ExprCode("", isNull = "false", value = "value1"), - ExprCode("fake code;", isNull = "isNull2", value = "value2"), - ExprCode("fake code;", isNull = "isNull3", value = "value3"), - ExprCode("fake code;", isNull = "isNull4", value = "value4"), - null) - // value2 depends on this evaluated column. - currentVars(1).inputVars = Seq(ExprInputVar(ExprCode("", isNull = "isNull5", value = "value5"), - dataType = IntegerType, nullable = true)) - // value3 depends on an input row "inputadaptor_row1". - currentVars(2).inputRow = "inputadaptor_row1" - // value4 depends on another not evaluated yet column. - currentVars(3).inputVars = Seq(ExprInputVar(ExprCode("fake code;", - isNull = "isNull6", value = "value6"), dataType = IntegerType, nullable = true)) - ctx.currentVars = currentVars - ctx.INPUT_ROW = "i" - - // expr: if (false) { value1 + value2 } else { (value3 + value4) + i[5] } - val expr = If(Literal(false), - Add(BoundReference(0, IntegerType, nullable = false), - BoundReference(1, IntegerType, nullable = true)), - Add(Add(BoundReference(2, IntegerType, nullable = true), - BoundReference(3, IntegerType, nullable = true)), - BoundReference(4, IntegerType, nullable = true))) // this is based on input row "i". - - // input rows: "i", "inputadaptor_row1". - val inputRows = ExpressionCodegen.getInputRowsForChildren(ctx, expr) - assert(inputRows.length == 2) - assert(inputRows(0) == "inputadaptor_row1") - assert(inputRows(1) == "i") - - // input variables: value1 and value5 - val inputVars = ExpressionCodegen.getInputVarsForChildren(ctx, expr) - assert(inputVars.length == 2) - - // value1 has inlined isNull "false", so don't need to include it in the params. - val inputVarParams = ExpressionCodegen.prepareFunctionParams(ctx, inputVars) - assert(inputVarParams.length == 3) - assert(inputVarParams(0) == Tuple2("value1", "int value1")) - assert(inputVarParams(1) == Tuple2("value5", "int value5")) - assert(inputVarParams(2) == Tuple2("isNull5", "boolean isNull5")) - } - - test("isLiteral: literals") { - val literals = Seq( - ExprCode("", "", "true"), - ExprCode("", "", "false"), - ExprCode("", "", "1"), - ExprCode("", "", "-1"), - ExprCode("", "", "1L"), - ExprCode("", "", "-1L"), - ExprCode("", "", "1.0f"), - ExprCode("", "", "-1.0f"), - ExprCode("", "", "0.1f"), - ExprCode("", "", "-0.1f"), - ExprCode("", "", """"string""""), - ExprCode("", "", "(byte)-1"), - ExprCode("", "", "(short)-1"), - ExprCode("", "", "null")) - - literals.foreach(l => assert(ExpressionCodegen.isLiteral(l) == true)) - } - - test("isLiteral: non literals") { - val variables = Seq( - ExprCode("", "", "var1"), - ExprCode("", "", "_var2"), - ExprCode("", "", "$var3"), - ExprCode("", "", "v1a2r3"), - ExprCode("", "", "_1v2a3r"), - ExprCode("", "", "$1v2a3r")) - - variables.foreach(v => assert(ExpressionCodegen.isLiteral(v) == false)) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/2a29a60d/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala index 05186c4..a9bfb63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala @@ -108,10 +108,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { |}""".stripMargin) ctx.currentVars = null - // `rowIdx` isn't in `ctx.currentVars`. If the expressions are split later, we can't track it. - // So making it as global variable. val rowidx = ctx.freshName("rowIdx") - ctx.addMutableState(ctx.JAVA_INT, rowidx) val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) } @@ -131,7 +128,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { | int $numRows = $batch.numRows(); | int $localEnd = $numRows - $idx; | for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) { - | $rowidx = $idx + $localIdx; + | int $rowidx = $idx + $localIdx; | ${consume(ctx, columnsBatchInput).trim} | $shouldStop | } http://git-wip-us.apache.org/repos/asf/spark/blob/2a29a60d/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 1281169..bc05dca 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -17,8 +17,7 @@ package org.apache.spark.sql.execution -import org.apache.spark.sql.{Column, QueryTest, Row, SaveMode} -import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.{QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec @@ -237,24 +236,4 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { } } } - - test("SPARK-22551: Fix 64kb limit for deeply nested expressions under wholestage codegen") { - import testImplicits._ - withTempPath { dir => - val path = dir.getCanonicalPath - val df = Seq(("abc", 1)).toDF("key", "int") - df.write.parquet(path) - - var strExpr: Expression = col("key").expr - for (_ <- 1 to 150) { - strExpr = Decode(Encode(strExpr, Literal("utf-8")), Literal("utf-8")) - } - val expressions = Seq(If(EqualTo(strExpr, strExpr), strExpr, strExpr)) - - val df2 = spark.read.parquet(path).select(expressions.map(Column(_)): _*) - val plan = df2.queryExecution.executedPlan - assert(plan.find(_.isInstanceOf[WholeStageCodegenExec]).isDefined) - df2.collect() - } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org