Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18931#discussion_r163462688
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
    @@ -149,13 +149,100 @@ trait CodegenSupport extends SparkPlan {
     
         ctx.freshNamePrefix = parent.variablePrefix
         val evaluated = evaluateRequiredVariables(output, inputVars, 
parent.usedInputs)
    +
    +    // Under certain conditions, we can put the logic to consume the rows 
of this operator into
    +    // another function. So we can prevent a generated function too long 
to be optimized by JIT.
    +    // The conditions:
    +    // 1. The parent uses all variables in output. we can't defer variable 
evaluation when consume
    +    //    in another function.
    +    // 2. The output variables are not empty. If it's empty, we don't 
bother to do that.
    +    // 3. We don't use row variable. The construction of row uses deferred 
variable evaluation. We
    +    //    can't do it.
    +    // 4. The number of output variables must less than maximum number of 
parameters in Java method
    +    //    declaration.
    +    val requireAllOutput = output.forall(parent.usedInputs.contains(_))
    +    val consumeFunc =
    +      if (row == null && outputVars.nonEmpty && requireAllOutput && 
isValidParamLength(ctx)) {
    +        constructDoConsumeFunction(ctx, inputVars)
    +      } else {
    +        parent.doConsume(ctx, inputVars, rowVar)
    +      }
         s"""
            |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")}
            |$evaluated
    -       |${parent.doConsume(ctx, inputVars, rowVar)}
    +       |$consumeFunc
          """.stripMargin
       }
     
    +  /**
    +   * In Java, a method descriptor is valid only if it represents method 
parameters with a total
    +   * length of 255 or less. `this` contributes one unit and a parameter of 
type long or double
    +   * contributes two units. Besides, for nullable parameters, we also need 
to pass a boolean
    +   * for the null status.
    +   */
    +  private def isValidParamLength(ctx: CodegenContext): Boolean = {
    +    // Start value is 1 for `this`.
    +    output.foldLeft(1) { case (curLength, attr) =>
    +      ctx.javaType(attr.dataType) match {
    +        case (ctx.JAVA_LONG | ctx.JAVA_DOUBLE) if !attr.nullable => 
curLength + 2
    +        case ctx.JAVA_LONG | ctx.JAVA_DOUBLE => curLength + 3
    +        case _ if !attr.nullable => curLength + 1
    +        case _ => curLength + 2
    +      }
    +    } <= 255
    +  }
    +
    +  /**
    +   * To prevent concatenated function growing too long to be optimized by 
JIT. We can separate the
    +   * parent's `doConsume` codes of a `CodegenSupport` operator into a 
function to call.
    +   */
    +  private def constructDoConsumeFunction(
    +      ctx: CodegenContext,
    +      inputVars: Seq[ExprCode]): String = {
    +    val (callingParams, arguList, inputVarsInFunc) =
    +      constructConsumeParameters(ctx, output, inputVars)
    +    val rowVar = ExprCode("", "false", "unsafeRow")
    +    val doConsume = ctx.freshName("doConsume")
    +    val doConsumeFuncName = ctx.addNewFunction(doConsume,
    +      s"""
    +         | private void $doConsume($arguList) throws java.io.IOException {
    +         |   ${parent.doConsume(ctx, inputVarsInFunc, rowVar)}
    +         | }
    +       """.stripMargin)
    +
    +    s"""
    +       | $doConsumeFuncName($callingParams);
    +     """.stripMargin
    +  }
    +
    +  /**
    +   * Returns source code for calling consume function and the argument 
list of the consume function
    +   * and also the `ExprCode` for the argument list.
    +   */
    +  private def constructConsumeParameters(
    +      ctx: CodegenContext,
    +      attributes: Seq[Attribute],
    +      variables: Seq[ExprCode]): (String, String, Seq[ExprCode]) = {
    +    val params = variables.zipWithIndex.map { case (ev, i) =>
    +      val arguName = ctx.freshName(s"expr_$i")
    +      val arguType = ctx.javaType(attributes(i).dataType)
    +
    +      val (callingParam, funcParams, arguIsNull) = if 
(!attributes(i).nullable) {
    +        // When the argument is not nullable, we don't need to pass in 
`isNull` param for it and
    +        // simply give a `false`.
    +        val arguIsNull = "false"
    +        (ev.value, s"$arguType $arguName", arguIsNull)
    +      } else {
    +        val arguIsNull = ctx.freshName(s"exprIsNull_$i")
    +        (ev.value + ", " + ev.isNull, s"$arguType $arguName, boolean 
$arguIsNull", arguIsNull)
    +      }
    +      (callingParam, funcParams, ExprCode("", arguIsNull, arguName))
    +    }.unzip3
    +    (params._1.mkString(", "),
    +      params._2.mkString(", "),
    +      params._3)
    --- End diff --
    
    Done.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to