[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18931 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163752999 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +162,96 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "spark.sql.codegen.splitConsumeFuncByOperator" is enabled. +// 2. `inputVars` are all materialized. That is guaranteed to be true if the parent plan uses +//all variables in output (see `requireAllOutput`). +// 3. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (SQLConf.get.wholeStageSplitConsumeFuncByOperator && requireAllOutput && + ctx.isValidParamLength(output)) { +constructDoConsumeFunction(ctx, inputVars, row) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + private def constructDoConsumeFunction( + ctx: CodegenContext, + inputVars: Seq[ExprCode], + row: String): String = { +val (args, params, inputVarsInFunc) = constructConsumeParameters(ctx, output, inputVars, row) +val rowVar = prepareRowVar(ctx, row, inputVarsInFunc) + +val doConsume = ctx.freshName("doConsume") +ctx.currentVars = inputVarsInFunc +ctx.INPUT_ROW = null + +val doConsumeFuncName = ctx.addNewFunction(doConsume, + s""" + | private void $doConsume(${params.mkString(", ")}) throws java.io.IOException { + | ${parent.doConsume(ctx, inputVarsInFunc, rowVar)} + | } + """.stripMargin) + +s""" + | $doConsumeFuncName(${args.mkString(", ")}); """.stripMargin } + /** + * Returns arguments for calling method and method definition parameters of the consume function. + * And also returns the list of `ExprCode` for the parameters. + */ + private def constructConsumeParameters( + ctx: CodegenContext, + attributes: Seq[Attribute], + variables: Seq[ExprCode], + row: String): (Seq[String], Seq[String], Seq[ExprCode]) = { +val arguments = mutable.ArrayBuffer[String]() +val parameters = mutable.ArrayBuffer[String]() +val paramVars = mutable.ArrayBuffer[ExprCode]() + +if (row != null) { + arguments += row --- End diff -- Added an extra unit for `row` if needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163747141 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +162,96 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "spark.sql.codegen.splitConsumeFuncByOperator" is enabled. +// 2. `inputVars` are all materialized. That is guaranteed to be true if the parent plan uses +//all variables in output (see `requireAllOutput`). +// 3. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (SQLConf.get.wholeStageSplitConsumeFuncByOperator && requireAllOutput && --- End diff -- super nit: ``` val confEnabled = SQLConf.get.wholeStageSplitConsumeFuncByOperator if (confEnabled && ...) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163746698 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +162,96 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "spark.sql.codegen.splitConsumeFuncByOperator" is enabled. +// 2. `inputVars` are all materialized. That is guaranteed to be true if the parent plan uses +//all variables in output (see `requireAllOutput`). +// 3. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (SQLConf.get.wholeStageSplitConsumeFuncByOperator && requireAllOutput && + ctx.isValidParamLength(output)) { +constructDoConsumeFunction(ctx, inputVars, row) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + private def constructDoConsumeFunction( + ctx: CodegenContext, + inputVars: Seq[ExprCode], + row: String): String = { +val (args, params, inputVarsInFunc) = constructConsumeParameters(ctx, output, inputVars, row) +val rowVar = prepareRowVar(ctx, row, inputVarsInFunc) + +val doConsume = ctx.freshName("doConsume") +ctx.currentVars = inputVarsInFunc +ctx.INPUT_ROW = null + +val doConsumeFuncName = ctx.addNewFunction(doConsume, + s""" + | private void $doConsume(${params.mkString(", ")}) throws java.io.IOException { + | ${parent.doConsume(ctx, inputVarsInFunc, rowVar)} + | } + """.stripMargin) + +s""" + | $doConsumeFuncName(${args.mkString(", ")}); """.stripMargin } + /** + * Returns arguments for calling method and method definition parameters of the consume function. + * And also returns the list of `ExprCode` for the parameters. + */ + private def constructConsumeParameters( + ctx: CodegenContext, + attributes: Seq[Attribute], + variables: Seq[ExprCode], + row: String): (Seq[String], Seq[String], Seq[ExprCode]) = { +val arguments = mutable.ArrayBuffer[String]() +val parameters = mutable.ArrayBuffer[String]() +val paramVars = mutable.ArrayBuffer[ExprCode]() + +if (row != null) { + arguments += row --- End diff -- we should probably have 2 methods for calculating param length and checking param length limitation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163746605 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +162,96 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "spark.sql.codegen.splitConsumeFuncByOperator" is enabled. +// 2. `inputVars` are all materialized. That is guaranteed to be true if the parent plan uses +//all variables in output (see `requireAllOutput`). +// 3. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (SQLConf.get.wholeStageSplitConsumeFuncByOperator && requireAllOutput && + ctx.isValidParamLength(output)) { +constructDoConsumeFunction(ctx, inputVars, row) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + private def constructDoConsumeFunction( + ctx: CodegenContext, + inputVars: Seq[ExprCode], + row: String): String = { +val (args, params, inputVarsInFunc) = constructConsumeParameters(ctx, output, inputVars, row) +val rowVar = prepareRowVar(ctx, row, inputVarsInFunc) + +val doConsume = ctx.freshName("doConsume") +ctx.currentVars = inputVarsInFunc +ctx.INPUT_ROW = null + +val doConsumeFuncName = ctx.addNewFunction(doConsume, + s""" + | private void $doConsume(${params.mkString(", ")}) throws java.io.IOException { + | ${parent.doConsume(ctx, inputVarsInFunc, rowVar)} + | } + """.stripMargin) + +s""" + | $doConsumeFuncName(${args.mkString(", ")}); """.stripMargin } + /** + * Returns arguments for calling method and method definition parameters of the consume function. + * And also returns the list of `ExprCode` for the parameters. + */ + private def constructConsumeParameters( + ctx: CodegenContext, + attributes: Seq[Attribute], + variables: Seq[ExprCode], + row: String): (Seq[String], Seq[String], Seq[ExprCode]) = { +val arguments = mutable.ArrayBuffer[String]() +val parameters = mutable.ArrayBuffer[String]() +val paramVars = mutable.ArrayBuffer[ExprCode]() + +if (row != null) { + arguments += row --- End diff -- we need to update `ctx.isValidParamLength` to count this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163743504 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -661,6 +661,15 @@ object SQLConf { .intConf .createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT) + val WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR = +buildConf("spark.sql.codegen.splitConsumeFuncByOperator") + .internal() + .doc("When true, whole stage codegen would put the logic of consuming rows of each " + +"physical operator into individual methods, instead of a single big method. This can be " + +"used to avoid oversized function that can miss the opportunity of JIT optimization.") + .booleanConf + .createWithDefault(true) --- End diff -- Set to true by default. If there is objection, I can change it to false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163723759 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1263,6 +1271,8 @@ class SQLConf extends Serializable with Logging { def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT) + def decoupleOperatorConsumeFuncs: Boolean = getConf(DECOUPLE_OPERATOR_CONSUME_FUNCTIONS) --- End diff -- Sure. Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163723731 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. +// 2. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 3. The output variables are not empty. If it's empty, we don't bother to do that. +// 4. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 5. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && outputVars.nonEmpty && + requireAllOutput && ctx.isValidParamLength(output)) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + private def constructDoConsumeFunction( + ctx: CodegenContext, + inputVars: Seq[ExprCode]): String = { +val (callingParams, arguList, inputVarsInFunc) = --- End diff -- Sounds cleaner. I need to change it a little because the arguments and parameters are not the same. Some variables are not able parameterized, e.g., constants or statements. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163723249 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. +// 2. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 3. The output variables are not empty. If it's empty, we don't bother to do that. +// 4. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 5. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && outputVars.nonEmpty && + requireAllOutput && ctx.isValidParamLength(output)) { +constructDoConsumeFunction(ctx, inputVars) --- End diff -- Good point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163723212 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. +// 2. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 3. The output variables are not empty. If it's empty, we don't bother to do that. +// 4. We don't use row variable. The construction of row uses deferred variable evaluation. We --- End diff -- Seems to me `outputVars != null` isn't necessary too. When it is null, `row` can't be null. `inputVars` will bind on `row` columns and be evaluated before calling created method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163723254 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. +// 2. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 3. The output variables are not empty. If it's empty, we don't bother to do that. +// 4. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 5. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && outputVars.nonEmpty && + requireAllOutput && ctx.isValidParamLength(output)) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + private def constructDoConsumeFunction( + ctx: CodegenContext, + inputVars: Seq[ExprCode]): String = { +val (callingParams, arguList, inputVarsInFunc) = + constructConsumeParameters(ctx, output, inputVars) + +// Set up rowVar because parent plan can possibly consume UnsafeRow instead of variables. +val colExprs = output.zipWithIndex.map { case (attr, i) => + BoundReference(i, attr.dataType, attr.nullable) +} +// Don't need to copy the variables because they're already evaluated before entering function. +ctx.INPUT_ROW = null +ctx.currentVars = inputVarsInFunc +val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false) +val rowVar = ExprCode(ev.code.trim, "false", ev.value) + +val doConsume = ctx.freshName("doConsume") --- End diff -- The `freshName` here will add `variablePrefix` before `doConsume`. So it already has operator name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163723195 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. +// 2. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 3. The output variables are not empty. If it's empty, we don't bother to do that. --- End diff -- Sounds correct to me, logically, although I have no clear idea about the actual operator can be. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163723102 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -661,6 +661,14 @@ object SQLConf { .intConf .createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT) + val DECOUPLE_OPERATOR_CONSUME_FUNCTIONS = buildConf("spark.sql.codegen.decoupleOperatorConsume") --- End diff -- Looks good to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163621072 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1263,6 +1271,8 @@ class SQLConf extends Serializable with Logging { def hugeMethodLimit: Int = getConf(WHOLESTAGE_HUGE_METHOD_LIMIT) + def decoupleOperatorConsumeFuncs: Boolean = getConf(DECOUPLE_OPERATOR_CONSUME_FUNCTIONS) --- End diff -- Add the `wholeStage` prefix for such flag names. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163620741 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -661,6 +661,14 @@ object SQLConf { .intConf .createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT) + val DECOUPLE_OPERATOR_CONSUME_FUNCTIONS = buildConf("spark.sql.codegen.decoupleOperatorConsume") --- End diff -- `DECOUPLE_OPERATOR_CONSUME_FUNCTIONS` -> `WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163518761 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. +// 2. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 3. The output variables are not empty. If it's empty, we don't bother to do that. +// 4. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 5. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && outputVars.nonEmpty && + requireAllOutput && ctx.isValidParamLength(output)) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + private def constructDoConsumeFunction( + ctx: CodegenContext, + inputVars: Seq[ExprCode]): String = { +val (callingParams, arguList, inputVarsInFunc) = --- End diff -- I feel it's cleaner to return `paramNames, paramTypes, paramVars`, then we can simply do ``` void $doConsume(paramTypes.zip(paramNames).map(i => i._1 + " " + i._2).mkString(", ")) ``` and ``` doConsumeFuncName(paramNames.mkString(", ")) ``` inside `constructConsumeParameters` we can just create 3 mutable collections and go through `variables` to fill these collections. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163516654 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. +// 2. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 3. The output variables are not empty. If it's empty, we don't bother to do that. +// 4. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 5. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && outputVars.nonEmpty && + requireAllOutput && ctx.isValidParamLength(output)) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + private def constructDoConsumeFunction( + ctx: CodegenContext, + inputVars: Seq[ExprCode]): String = { +val (callingParams, arguList, inputVarsInFunc) = + constructConsumeParameters(ctx, output, inputVars) + +// Set up rowVar because parent plan can possibly consume UnsafeRow instead of variables. +val colExprs = output.zipWithIndex.map { case (attr, i) => + BoundReference(i, attr.dataType, attr.nullable) +} +// Don't need to copy the variables because they're already evaluated before entering function. +ctx.INPUT_ROW = null +ctx.currentVars = inputVarsInFunc +val ev = GenerateUnsafeProjection.createCode(ctx, colExprs, false) +val rowVar = ExprCode(ev.code.trim, "false", ev.value) + +val doConsume = ctx.freshName("doConsume") --- End diff -- shall we put the operator name in this function name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163514909 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. +// 2. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 3. The output variables are not empty. If it's empty, we don't bother to do that. +// 4. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 5. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && outputVars.nonEmpty && + requireAllOutput && ctx.isValidParamLength(output)) { +constructDoConsumeFunction(ctx, inputVars) --- End diff -- maybe we should create a method for generating `rowVar`, so that we can use it in both `consume` and `constructDoConsumeFunction` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163513927 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. +// 2. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 3. The output variables are not empty. If it's empty, we don't bother to do that. +// 4. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 5. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (SQLConf.get.decoupleOperatorConsumeFuncs && row == null && outputVars.nonEmpty && + requireAllOutput && ctx.isValidParamLength(output)) { +constructDoConsumeFunction(ctx, inputVars) --- End diff -- we should pass `row` to this function, if it's non-null, we can save a projection. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163512972 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. +// 2. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 3. The output variables are not empty. If it's empty, we don't bother to do that. +// 4. We don't use row variable. The construction of row uses deferred variable evaluation. We --- End diff -- I think what we need is `inputVars` are all materialized, which can be guaranteed by `requireAllOutput` and `outputVars != null` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163512277 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -156,13 +156,94 @@ trait CodegenSupport extends SparkPlan { ctx.INPUT_ROW = null ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The config "SQLConf.DECOUPLE_OPERATOR_CONSUME_FUNCTIONS" is enabled. +// 2. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 3. The output variables are not empty. If it's empty, we don't bother to do that. --- End diff -- why this? I feel an operator can still have complex consume method even if it doesn't have output. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163511790 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -661,6 +661,14 @@ object SQLConf { .intConf .createWithDefault(CodeGenerator.DEFAULT_JVM_HUGE_METHOD_LIMIT) + val DECOUPLE_OPERATOR_CONSUME_FUNCTIONS = buildConf("spark.sql.codegen.decoupleOperatorConsume") --- End diff -- `decoupleOperatorConsume` looks weird, how about `splitConsumeMethodByOperator`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163477806 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1313,6 +1331,9 @@ object CodeGenerator extends Logging { // This is the value of HugeMethodLimit in the OpenJDK JVM settings val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000 + // The max valid length of method parameters in JVM. + val MAX_JVM_METHOD_PARAMS_LENGTH = 255 --- End diff -- Added `final` to all constants here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163472676 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -1313,6 +1331,9 @@ object CodeGenerator extends Logging { // This is the value of HugeMethodLimit in the OpenJDK JVM settings val DEFAULT_JVM_HUGE_METHOD_LIMIT = 8000 + // The max valid length of method parameters in JVM. + val MAX_JVM_METHOD_PARAMS_LENGTH = 255 --- End diff -- make it `final`? I think we can add final to all the constants here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163462731 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (row == null && outputVars.nonEmpty && requireAllOutput && isValidParamLength(ctx)) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc """.stripMargin } + /** + * In Java, a method descriptor is valid only if it represents method parameters with a total + * length of 255 or less. `this` contributes one unit and a parameter of type long or double + * contributes two units. Besides, for nullable parameters, we also need to pass a boolean + * for the null status. + */ + private def isValidParamLength(ctx: CodegenContext): Boolean = { --- End diff -- Yes. Put it in `CodegenContext`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163462688 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (row == null && outputVars.nonEmpty && requireAllOutput && isValidParamLength(ctx)) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc """.stripMargin } + /** + * In Java, a method descriptor is valid only if it represents method parameters with a total + * length of 255 or less. `this` contributes one unit and a parameter of type long or double + * contributes two units. Besides, for nullable parameters, we also need to pass a boolean + * for the null status. + */ + private def isValidParamLength(ctx: CodegenContext): Boolean = { +// Start value is 1 for `this`. +output.foldLeft(1) { case (curLength, attr) => + ctx.javaType(attr.dataType) match { +case (ctx.JAVA_LONG | ctx.JAVA_DOUBLE) if !attr.nullable => curLength + 2 +case ctx.JAVA_LONG | ctx.JAVA_DOUBLE => curLength + 3 +case _ if !attr.nullable => curLength + 1 +case _ => curLength + 2 + } +} <= 255 + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + private def constructDoConsumeFunction( + ctx: CodegenContext, + inputVars: Seq[ExprCode]): String = { +val (callingParams, arguList, inputVarsInFunc) = + constructConsumeParameters(ctx, output, inputVars) +val rowVar = ExprCode("", "false", "unsafeRow") +val doConsume = ctx.freshName("doConsume") +val doConsumeFuncName = ctx.addNewFunction(doConsume, + s""" + | private void $doConsume($arguList) throws java.io.IOException { + | ${parent.doConsume(ctx, inputVarsInFunc, rowVar)} + | } + """.stripMargin) + +s""" + | $doConsumeFuncName($callingParams); + """.stripMargin + } + + /** + * Returns source code for calling consume function and the argument list of the consume function + * and also the `ExprCode` for the argument list. + */ + private def constructConsumeParameters( + ctx: CodegenContext, + attributes: Seq[Attribute], + variables: Seq[ExprCode]): (String, String, Seq[ExprCode]) = { +val params = variables.zipWithIndex.map { case (ev, i) => + val arguName = ctx.freshName(s"expr_$i") + val arguType = ctx.javaType(attributes(i).dataType) + + val (callingParam, funcParams, arguIsNull) = if (!attributes(i).nullable) { +// When the argument is not nullable, we don't need to pass in `isNull` param for it and +// simply give a `false`. +val arguIsNull = "false" +(ev.value, s"$arguType $arguName", arguIsNull) + } else { +val arguIsNull = ctx.freshName(s"exprIsNull_$i") +(ev.value + ", " + ev.isNull, s"$arguType $arguName, boolean $arguIsNull", arguIsNull) + } + (callingParam, funcParams, ExprCode("", arguIsNull, arguName)) +}.unzip3 +(params._1.mkString(", "), + params._2.mkString(", "), + params._3) --- End diff -- Done. --- - To unsu
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163462698 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. --- End diff -- Added a config for it so we can turn it off. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163317809 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. --- End diff -- or introduce a config so that users can turn it off. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163316730 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. --- End diff -- Maybe we can be super safe and only do this for certain operators, like `HashAggregateExec`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163316509 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (row == null && outputVars.nonEmpty && requireAllOutput && isValidParamLength(ctx)) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc """.stripMargin } + /** + * In Java, a method descriptor is valid only if it represents method parameters with a total + * length of 255 or less. `this` contributes one unit and a parameter of type long or double + * contributes two units. Besides, for nullable parameters, we also need to pass a boolean + * for the null status. + */ + private def isValidParamLength(ctx: CodegenContext): Boolean = { +// Start value is 1 for `this`. +output.foldLeft(1) { case (curLength, attr) => + ctx.javaType(attr.dataType) match { +case (ctx.JAVA_LONG | ctx.JAVA_DOUBLE) if !attr.nullable => curLength + 2 +case ctx.JAVA_LONG | ctx.JAVA_DOUBLE => curLength + 3 +case _ if !attr.nullable => curLength + 1 +case _ => curLength + 2 + } +} <= 255 + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + private def constructDoConsumeFunction( + ctx: CodegenContext, + inputVars: Seq[ExprCode]): String = { +val (callingParams, arguList, inputVarsInFunc) = + constructConsumeParameters(ctx, output, inputVars) +val rowVar = ExprCode("", "false", "unsafeRow") +val doConsume = ctx.freshName("doConsume") +val doConsumeFuncName = ctx.addNewFunction(doConsume, + s""" + | private void $doConsume($arguList) throws java.io.IOException { + | ${parent.doConsume(ctx, inputVarsInFunc, rowVar)} + | } + """.stripMargin) + +s""" + | $doConsumeFuncName($callingParams); + """.stripMargin + } + + /** + * Returns source code for calling consume function and the argument list of the consume function + * and also the `ExprCode` for the argument list. + */ + private def constructConsumeParameters( + ctx: CodegenContext, + attributes: Seq[Attribute], + variables: Seq[ExprCode]): (String, String, Seq[ExprCode]) = { +val params = variables.zipWithIndex.map { case (ev, i) => + val arguName = ctx.freshName(s"expr_$i") + val arguType = ctx.javaType(attributes(i).dataType) + + val (callingParam, funcParams, arguIsNull) = if (!attributes(i).nullable) { +// When the argument is not nullable, we don't need to pass in `isNull` param for it and +// simply give a `false`. +val arguIsNull = "false" +(ev.value, s"$arguType $arguName", arguIsNull) + } else { +val arguIsNull = ctx.freshName(s"exprIsNull_$i") +(ev.value + ", " + ev.isNull, s"$arguType $arguName, boolean $arguIsNull", arguIsNull) + } + (callingParam, funcParams, ExprCode("", arguIsNull, arguName)) +}.unzip3 +(params._1.mkString(", "), + params._2.mkString(", "), + params._3) --- End diff -- the above 3 lines can be one line? --- -
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163315858 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. --- End diff -- My only concern is if we have a bunch of simple operators and we create a lot of small methods here. Maybe it's fine as optimizer would prevent such cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r163315164 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (row == null && outputVars.nonEmpty && requireAllOutput && isValidParamLength(ctx)) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc """.stripMargin } + /** + * In Java, a method descriptor is valid only if it represents method parameters with a total + * length of 255 or less. `this` contributes one unit and a parameter of type long or double + * contributes two units. Besides, for nullable parameters, we also need to pass a boolean + * for the null status. + */ + private def isValidParamLength(ctx: CodegenContext): Boolean = { --- End diff -- shall we put it into `CodegenContext` as a util function so that we can use it in other places in the future? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r144279986 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -175,6 +175,25 @@ trait CodegenSupport extends SparkPlan { } /** + * In Java, a method descriptor is valid only if it represents method parameters with a total + * length of 255 or less. `this` contributes one unit and a parameter of type long or double + * contributes two units. Besides, for nullable parameters, we also need to pass a boolean + * for the null status. + */ + private def isValidParamLength(ctx: CodegenContext): Boolean = { +var paramLength = 1 // for `this` parameter. +output.foreach { attr => --- End diff -- Thanks. Will follow it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user a10y commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r144025706 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -175,6 +175,25 @@ trait CodegenSupport extends SparkPlan { } /** + * In Java, a method descriptor is valid only if it represents method parameters with a total + * length of 255 or less. `this` contributes one unit and a parameter of type long or double + * contributes two units. Besides, for nullable parameters, we also need to pass a boolean + * for the null status. + */ + private def isValidParamLength(ctx: CodegenContext): Boolean = { +var paramLength = 1 // for `this` parameter. +output.foreach { attr => --- End diff -- (nit: This could be written as a `foldLeft` and then you can eliminate the `var`) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r143321347 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -181,7 +181,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val codeWithShortFunctions = genGroupByCode(3) val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions) assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) -val codeWithLongFunctions = genGroupByCode(20) +val codeWithLongFunctions = genGroupByCode(50) --- End diff -- In my pr, I changed the code to just check if long functions have the larger value of max code size: https://github.com/apache/spark/pull/19082/files#diff-0314224342bb8c30143ab784b3805d19R185 but, just increasing the value seems better. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r143321131 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -181,7 +181,7 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { val codeWithShortFunctions = genGroupByCode(3) val (_, maxCodeSize1) = CodeGenerator.compile(codeWithShortFunctions) assert(maxCodeSize1 < SQLConf.WHOLESTAGE_HUGE_METHOD_LIMIT.defaultValue.get) -val codeWithLongFunctions = genGroupByCode(20) +val codeWithLongFunctions = genGroupByCode(50) --- End diff -- We reduce the length of generated codes. So to make this test work, we increase the number of expressions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r140931214 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala --- @@ -89,6 +89,8 @@ case class ExpandExec( child.asInstanceOf[CodegenSupport].inputRDDs() } + override protected def doConsumeInChainOfFunc: Boolean = false --- End diff -- The good news is, the just merged #19324 simplifies the usage of `continue` in codegen. I'm now testing with it if I can remove this tricky part of `continue`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136776281 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume --- End diff -- thanks for the kind explanation! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136746833 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala --- @@ -89,6 +89,8 @@ case class ExpandExec( child.asInstanceOf[CodegenSupport].inputRDDs() } + override protected def doConsumeInChainOfFunc: Boolean = false --- End diff -- yea, probably we might need to describe more about exceptional cases we can't use this optimization like `HashAggregateExec` in https://github.com/apache/spark/pull/18931/files#diff-28cb12941b992ff680c277c651b59aa0R204 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136746468 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala --- @@ -177,6 +177,8 @@ case class SortExec( """.stripMargin.trim } + override protected def doConsumeInChainOfFunc: Boolean = false --- End diff -- yea, other guys might give good suggestions on the naming... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136743435 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala --- @@ -177,6 +177,8 @@ case class SortExec( """.stripMargin.trim } + override protected def doConsumeInChainOfFunc: Boolean = false --- End diff -- I revised this variable name in times, but didn't find a good name to convey its meaning. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136742515 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala --- @@ -89,6 +89,8 @@ case class ExpandExec( child.asInstanceOf[CodegenSupport].inputRDDs() } + override protected def doConsumeInChainOfFunc: Boolean = false --- End diff -- The `doConsume` produces something like: |for (int $i = 0; $i < ${projections.length}; $i ++) { | switch ($i) { |${cases.mkString("\n").trim} | } | $numOutput.add(1); | ${consume(ctx, outputColumns)} |} So the consume logic of its parent node is actually wrapped in a local for-loop. It has the same effect as not chain the next consume. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136742331 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (row == null && outputVars.nonEmpty && requireAllOutput && outputVars.length < 255) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. Instead of inlining, + * we may put the consume logic of parent operator into a function and set this flag to `true`. + * The parent operator can know if its consume logic is inlined or in separated function. + */ + private var doConsumeInFunc: Boolean = false + + /** + * Returning true means we have at least one consume logic from child operator or this operator is + * separated in a function. If this is `true`, this operator shouldn't use `continue` statement to + * continue on next row, because its generated codes aren't enclosed in main while-loop. + * + * For example, we have generated codes for a query plan like: + * Op1Exec + * Op2Exec + * Op3Exec + * + * If we put the consume code of Op2Exec into a separated function, the generated codes are like: + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * } + * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`. + * + * Notice for some operators like `HashAggregateExec`, it doesn't chain previous consume functions + * but begins with its produce framework. We should override `doConsumeInChainOfFunc` to return + * `false`. + */ + protected def doConsumeInChainOfFunc: Boolean = { +val codegenChildren = children.map(_.asInstanceOf[CodegenSupport]) +doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc) + } + + /** + * The actual java statement this operator should use if there is a need to continue on next row + * in its `doConsume` codes. + * + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * continue; // Wrong. We can't use continue with the while-loop. + * } + * In above code, we can't use `continue` in `Op2Exec_doConsume`. + * + * Instead, we do something like: + * while (...) { + * ... // logic of Op3Exec. + * boolean continueForLoop = Op2Exec_doConsume(...); + * if (continueForLoop) continue; + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * return true; // When we need to do continue, we return true. + * } + */ + protected def continueStatementInDoConsume: String = if (doConsumeInChainOfFunc) { +"return true;"; --- End diff -- Thanks. I'll fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136742282 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (row == null && outputVars.nonEmpty && requireAllOutput && outputVars.length < 255) { +constructDoConsumeFunction(ctx, inputVars) --- End diff -- I was thinking to check it. But the whole-stage codegen is a non-breaking processing which produce/consume calls are embeded together. You don't have a break to check the function length here. Actually I think it should have no negative effect to split consume functions always. From the benchmarking numbers, looks it shows no harm to normal queries. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136742019 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We --- End diff -- The same reason as above. The variables used to evaluate the row can be out of scope because row construction is deferred until it is used actually. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136741637 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume --- End diff -- E.g., a `ProjectExec` node doesn't necessarily evaluate all its output variables before continuing `doConsume` of its parent node. It can defer the evaluation until the variables are needed in the parent node's consume logic. Once a variable's evaluation is deferred, and if we create a consume function, the variable will be evaluated in the function. But now the references of this variable is out of scope. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136711099 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala --- @@ -177,6 +177,8 @@ case class SortExec( """.stripMargin.trim } + override protected def doConsumeInChainOfFunc: Boolean = false --- End diff -- I think it's better to put the reason why we set false explicitly in each plan like https://github.com/apache/spark/pull/18931/files#diff-28cb12941b992ff680c277c651b59aa0R204 btw, we can have better naming for this? e.g., `canPipeline`, or something because IIUC this optimisation can be applied in `pipelining` operators (`pipeling` is one of database terminology: https://link.springer.com/referenceworkentry/10.1007%2F978-0-387-39940-9_872). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136710928 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (row == null && outputVars.nonEmpty && requireAllOutput && outputVars.length < 255) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. Instead of inlining, + * we may put the consume logic of parent operator into a function and set this flag to `true`. + * The parent operator can know if its consume logic is inlined or in separated function. + */ + private var doConsumeInFunc: Boolean = false + + /** + * Returning true means we have at least one consume logic from child operator or this operator is + * separated in a function. If this is `true`, this operator shouldn't use `continue` statement to + * continue on next row, because its generated codes aren't enclosed in main while-loop. + * + * For example, we have generated codes for a query plan like: + * Op1Exec + * Op2Exec + * Op3Exec + * + * If we put the consume code of Op2Exec into a separated function, the generated codes are like: + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * } + * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`. + * + * Notice for some operators like `HashAggregateExec`, it doesn't chain previous consume functions + * but begins with its produce framework. We should override `doConsumeInChainOfFunc` to return + * `false`. + */ + protected def doConsumeInChainOfFunc: Boolean = { +val codegenChildren = children.map(_.asInstanceOf[CodegenSupport]) +doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc) + } + + /** + * The actual java statement this operator should use if there is a need to continue on next row + * in its `doConsume` codes. + * + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * continue; // Wrong. We can't use continue with the while-loop. + * } + * In above code, we can't use `continue` in `Op2Exec_doConsume`. + * + * Instead, we do something like: + * while (...) { + * ... // logic of Op3Exec. + * boolean continueForLoop = Op2Exec_doConsume(...); + * if (continueForLoop) continue; + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * return true; // When we need to do continue, we return true. + * } + */ + protected def continueStatementInDoConsume: String = if (doConsumeInChainOfFunc) { +"return true;"; + } else { +"continue;" + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + protected def constructDoConsumeFunction
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136710824 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (row == null && outputVars.nonEmpty && requireAllOutput && outputVars.length < 255) { +constructDoConsumeFunction(ctx, inputVars) --- End diff -- We always split consume functions?; we don't need to check if this consume function is too long? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136710234 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (row == null && outputVars.nonEmpty && requireAllOutput && outputVars.length < 255) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. Instead of inlining, + * we may put the consume logic of parent operator into a function and set this flag to `true`. + * The parent operator can know if its consume logic is inlined or in separated function. + */ + private var doConsumeInFunc: Boolean = false + + /** + * Returning true means we have at least one consume logic from child operator or this operator is + * separated in a function. If this is `true`, this operator shouldn't use `continue` statement to + * continue on next row, because its generated codes aren't enclosed in main while-loop. + * + * For example, we have generated codes for a query plan like: + * Op1Exec + * Op2Exec + * Op3Exec + * + * If we put the consume code of Op2Exec into a separated function, the generated codes are like: + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * } + * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`. + * + * Notice for some operators like `HashAggregateExec`, it doesn't chain previous consume functions + * but begins with its produce framework. We should override `doConsumeInChainOfFunc` to return + * `false`. + */ + protected def doConsumeInChainOfFunc: Boolean = { +val codegenChildren = children.map(_.asInstanceOf[CodegenSupport]) +doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc) + } + + /** + * The actual java statement this operator should use if there is a need to continue on next row + * in its `doConsume` codes. + * + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * continue; // Wrong. We can't use continue with the while-loop. + * } + * In above code, we can't use `continue` in `Op2Exec_doConsume`. + * + * Instead, we do something like: + * while (...) { + * ... // logic of Op3Exec. + * boolean continueForLoop = Op2Exec_doConsume(...); + * if (continueForLoop) continue; + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * return true; // When we need to do continue, we return true. + * } + */ + protected def continueStatementInDoConsume: String = if (doConsumeInChainOfFunc) { +"return true;"; --- End diff -- Remove `;` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136710741 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We --- End diff -- ditto; I want to know the concrete example, too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136710667 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume --- End diff -- What's the concrete example when this case prevents consume functions from being split? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136710909 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (row == null && outputVars.nonEmpty && requireAllOutput && outputVars.length < 255) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. Instead of inlining, + * we may put the consume logic of parent operator into a function and set this flag to `true`. + * The parent operator can know if its consume logic is inlined or in separated function. + */ + private var doConsumeInFunc: Boolean = false + + /** + * Returning true means we have at least one consume logic from child operator or this operator is + * separated in a function. If this is `true`, this operator shouldn't use `continue` statement to + * continue on next row, because its generated codes aren't enclosed in main while-loop. + * + * For example, we have generated codes for a query plan like: + * Op1Exec + * Op2Exec + * Op3Exec + * + * If we put the consume code of Op2Exec into a separated function, the generated codes are like: + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * } + * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`. + * + * Notice for some operators like `HashAggregateExec`, it doesn't chain previous consume functions + * but begins with its produce framework. We should override `doConsumeInChainOfFunc` to return + * `false`. + */ + protected def doConsumeInChainOfFunc: Boolean = { +val codegenChildren = children.map(_.asInstanceOf[CodegenSupport]) +doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc) + } + + /** + * The actual java statement this operator should use if there is a need to continue on next row + * in its `doConsume` codes. + * + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * continue; // Wrong. We can't use continue with the while-loop. + * } + * In above code, we can't use `continue` in `Op2Exec_doConsume`. + * + * Instead, we do something like: + * while (...) { + * ... // logic of Op3Exec. + * boolean continueForLoop = Op2Exec_doConsume(...); + * if (continueForLoop) continue; + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * return true; // When we need to do continue, we return true. + * } + */ + protected def continueStatementInDoConsume: String = if (doConsumeInChainOfFunc) { +"return true;"; + } else { +"continue;" + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + protected def constructDoConsumeFunction
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136711183 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala --- @@ -89,6 +89,8 @@ case class ExpandExec( child.asInstanceOf[CodegenSupport].inputRDDs() } + override protected def doConsumeInChainOfFunc: Boolean = false --- End diff -- I might be not 100% sure about your intention though, I feel this is a little confusing because `ExpandExec` consume functions can be chained in gen'd code, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136710872 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,146 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +// 4. The number of output variables must less than maximum number of parameters in Java method +//declaration. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (row == null && outputVars.nonEmpty && requireAllOutput && outputVars.length < 255) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. Instead of inlining, + * we may put the consume logic of parent operator into a function and set this flag to `true`. + * The parent operator can know if its consume logic is inlined or in separated function. + */ + private var doConsumeInFunc: Boolean = false + + /** + * Returning true means we have at least one consume logic from child operator or this operator is + * separated in a function. If this is `true`, this operator shouldn't use `continue` statement to + * continue on next row, because its generated codes aren't enclosed in main while-loop. + * + * For example, we have generated codes for a query plan like: + * Op1Exec + * Op2Exec + * Op3Exec + * + * If we put the consume code of Op2Exec into a separated function, the generated codes are like: + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * } + * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`. + * + * Notice for some operators like `HashAggregateExec`, it doesn't chain previous consume functions + * but begins with its produce framework. We should override `doConsumeInChainOfFunc` to return + * `false`. + */ + protected def doConsumeInChainOfFunc: Boolean = { +val codegenChildren = children.map(_.asInstanceOf[CodegenSupport]) +doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc) + } + + /** + * The actual java statement this operator should use if there is a need to continue on next row + * in its `doConsume` codes. + * + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * continue; // Wrong. We can't use continue with the while-loop. + * } + * In above code, we can't use `continue` in `Op2Exec_doConsume`. + * + * Instead, we do something like: + * while (...) { + * ... // logic of Op3Exec. + * boolean continueForLoop = Op2Exec_doConsume(...); + * if (continueForLoop) continue; + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * return true; // When we need to do continue, we return true. + * } + */ + protected def continueStatementInDoConsume: String = if (doConsumeInChainOfFunc) { +"return true;"; + } else { +"continue;" + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + protected def constructDoConsumeFunction
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136492055 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,144 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (row == null && outputVars.nonEmpty && requireAllOutput) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. Instead of inlining, + * we may put the consume logic of parent operator into a function and set this flag to `true`. + * The parent operator can know if its consume logic is inlined or in separated function. + */ + private var doConsumeInFunc: Boolean = false + + /** + * Returning true means we have at least one consume logic from child operator or this operator is + * separated in a function. If this is `true`, this operator shouldn't use `continue` statement to + * continue on next row, because its generated codes aren't enclosed in main while-loop. + * + * For example, we have generated codes for a query plan like: + * Op1Exec + * Op2Exec + * Op3Exec + * + * If we put the consume code of Op2Exec into a separated function, the generated codes are like: + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * } + * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`. + * + * Notice for some operators like `HashAggregateExec`, it doesn't chain previous consume functions + * but begins with its produce framework. We should override `doConsumeInChainOfFunc` to return + * `false`. + */ + protected def doConsumeInChainOfFunc: Boolean = { +val codegenChildren = children.map(_.asInstanceOf[CodegenSupport]) +doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc) + } + + /** + * The actual java statement this operator should use if there is a need to continue on next row + * in its `doConsume` codes. + * + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * continue; // Wrong. We can't use continue with the while-loop. + * } + * In above code, we can't use `continue` in `Op2Exec_doConsume`. + * + * Instead, we do something like: + * while (...) { + * ... // logic of Op3Exec. + * boolean continueForLoop = Op2Exec_doConsume(...); + * if (continueForLoop) continue; + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * return true; // When we need to do continue, we return true. + * } + */ + protected def continueStatementInDoConsume: String = if (doConsumeInChainOfFunc) { +"return true;"; + } else { +"continue;" + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + protected def constructDoConsumeFunction( + ctx: CodegenContext, + inputVars: Seq[ExprCode]): String = { +val (callingParams, arguList, inputVarsInFunc) = --- End diff --
[GitHub] spark pull request #18931: [SPARK-21717][SQL] Decouple consume functions of ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18931#discussion_r136491920 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -149,14 +149,144 @@ trait CodegenSupport extends SparkPlan { ctx.freshNamePrefix = parent.variablePrefix val evaluated = evaluateRequiredVariables(output, inputVars, parent.usedInputs) + +// Under certain conditions, we can put the logic to consume the rows of this operator into +// another function. So we can prevent a generated function too long to be optimized by JIT. +// The conditions: +// 1. The parent uses all variables in output. we can't defer variable evaluation when consume +//in another function. +// 2. The output variables are not empty. If it's empty, we don't bother to do that. +// 3. We don't use row variable. The construction of row uses deferred variable evaluation. We +//can't do it. +val requireAllOutput = output.forall(parent.usedInputs.contains(_)) +val consumeFunc = + if (row == null && outputVars.nonEmpty && requireAllOutput) { +constructDoConsumeFunction(ctx, inputVars) + } else { +parent.doConsume(ctx, inputVars, rowVar) + } s""" |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} |$evaluated - |${parent.doConsume(ctx, inputVars, rowVar)} + |$consumeFunc + """.stripMargin + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. Instead of inlining, + * we may put the consume logic of parent operator into a function and set this flag to `true`. + * The parent operator can know if its consume logic is inlined or in separated function. + */ + private var doConsumeInFunc: Boolean = false + + /** + * Returning true means we have at least one consume logic from child operator or this operator is + * separated in a function. If this is `true`, this operator shouldn't use `continue` statement to + * continue on next row, because its generated codes aren't enclosed in main while-loop. + * + * For example, we have generated codes for a query plan like: + * Op1Exec + * Op2Exec + * Op3Exec + * + * If we put the consume code of Op2Exec into a separated function, the generated codes are like: + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * } + * For now, `doConsumeInChainOfFunc` of Op2Exec will be `true`. + * + * Notice for some operators like `HashAggregateExec`, it doesn't chain previous consume functions + * but begins with its produce framework. We should override `doConsumeInChainOfFunc` to return + * `false`. + */ + protected def doConsumeInChainOfFunc: Boolean = { +val codegenChildren = children.map(_.asInstanceOf[CodegenSupport]) +doConsumeInFunc || codegenChildren.exists(_.doConsumeInChainOfFunc) + } + + /** + * The actual java statement this operator should use if there is a need to continue on next row + * in its `doConsume` codes. + * + * while (...) { + * ... // logic of Op3Exec. + * Op2Exec_doConsume(...); + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * continue; // Wrong. We can't use continue with the while-loop. + * } + * In above code, we can't use `continue` in `Op2Exec_doConsume`. + * + * Instead, we do something like: + * while (...) { + * ... // logic of Op3Exec. + * boolean continueForLoop = Op2Exec_doConsume(...); + * if (continueForLoop) continue; + * } + * private boolean Op2Exec_doConsume(...) { + * ... // logic of Op2Exec to consume rows. + * return true; // When we need to do continue, we return true. + * } + */ + protected def continueStatementInDoConsume: String = if (doConsumeInChainOfFunc) { +"return true;"; + } else { +"continue;" + } + + /** + * To prevent concatenated function growing too long to be optimized by JIT. We can separate the + * parent's `doConsume` codes of a `CodegenSupport` operator into a function to call. + */ + protected def constructDoConsumeFunction( + ctx: CodegenContext, + inputVars: Seq[ExprCode]): String = { +val (callingParams, arguList, inputVarsInFunc) = --- End diff --