Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18931#discussion_r163752999
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
    @@ -156,13 +162,96 @@ trait CodegenSupport extends SparkPlan {
         ctx.INPUT_ROW = null
         ctx.freshNamePrefix = parent.variablePrefix
         val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
    +
    +    // Under certain conditions, we can put the logic to consume the rows 
of this operator into
    +    // another function. So we can prevent a generated function too long 
to be optimized by JIT.
    +    // The conditions:
    +    // 1. The config "spark.sql.codegen.splitConsumeFuncByOperator" is 
enabled.
    +    // 2. `inputVars` are all materialized. That is guaranteed to be true 
if the parent plan uses
    +    //    all variables in output (see `requireAllOutput`).
    +    // 3. The number of output variables must less than maximum number of 
parameters in Java method
    +    //    declaration.
    +    val requireAllOutput = output.forall(parent.usedInputs.contains(_))
    +    val consumeFunc =
    +      if (SQLConf.get.wholeStageSplitConsumeFuncByOperator && 
requireAllOutput &&
    +          ctx.isValidParamLength(output)) {
    +        constructDoConsumeFunction(ctx, inputVars, row)
    +      } else {
    +        parent.doConsume(ctx, inputVars, rowVar)
    +      }
         s"""
            |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
            |$evaluated
    -       |${parent.doConsume(ctx, inputVars, rowVar)}
    +       |$consumeFunc
    +     """.stripMargin
    +  }
    +
    +  /**
    +   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
    +   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
    +   */
    +  private def constructDoConsumeFunction(
    +      ctx: CodegenContext,
    +      inputVars: Seq[ExprCode],
    +      row: String): String = {
    +    val (args, params, inputVarsInFunc) = constructConsumeParameters(ctx, 
output, inputVars, row)
    +    val rowVar = prepareRowVar(ctx, row, inputVarsInFunc)
    +
    +    val doConsume = ctx.freshName("doConsume")
    +    ctx.currentVars = inputVarsInFunc
    +    ctx.INPUT_ROW = null
    +
    +    val doConsumeFuncName = ctx.addNewFunction(doConsume,
    +      s"""
    +         | private void $doConsume(${params.mkString(", ")}) throws 
java.io.IOException {
    +         |   ${parent.doConsume(ctx, inputVarsInFunc, rowVar)}
    +         | }
    +       """.stripMargin)
    +
    +    s"""
    +       | $doConsumeFuncName(${args.mkString(", ")});
          """.stripMargin
       }
     
    +  /**
    +   * Returns arguments for calling method and method definition parameters 
of the consume function.
    +   * And also returns the list of `ExprCode` for the parameters.
    +   */
    +  private def constructConsumeParameters(
    +      ctx: CodegenContext,
    +      attributes: Seq[Attribute],
    +      variables: Seq[ExprCode],
    +      row: String): (Seq[String], Seq[String], Seq[ExprCode]) = {
    +    val arguments = mutable.ArrayBuffer[String]()
    +    val parameters = mutable.ArrayBuffer[String]()
    +    val paramVars = mutable.ArrayBuffer[ExprCode]()
    +
    +    if (row != null) {
    +      arguments += row
    --- End diff --
    
    Added an extra unit for `row` if needed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to