Github user hvanhovell commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13065#discussion_r82944265
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala ---
    @@ -99,5 +102,182 @@ case class GenerateExec(
           }
         }
       }
    -}
     
    +  override def inputRDDs(): Seq[RDD[InternalRow]] = {
    +    child.asInstanceOf[CodegenSupport].inputRDDs()
    +  }
    +
    +  protected override def doProduce(ctx: CodegenContext): String = {
    +    // We need to add some code here for terminating generators.
    +    child.asInstanceOf[CodegenSupport].produce(ctx, this)
    +  }
    +
    +  override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
    +    ctx.currentVars = input
    +    ctx.copyResult = true
    +
    +    // Add input rows to the values when we are joining
    +    val values = if (join) {
    +      input
    +    } else {
    +      Seq.empty
    +    }
    +
    +    // Generate the driving expression.
    +    val data = boundGenerator.genCode(ctx)
    +
    +    boundGenerator match {
    +      case e: CollectionGenerator => codeGenCollection(ctx, e, values, 
data, row)
    +      case g => codeGenTraversableOnce(ctx, g, values, data, row)
    +    }
    +  }
    +
    +  /**
    +   * Generate code for [[CollectionGenerator]] expressions.
    +   */
    +  private def codeGenCollection(
    +      ctx: CodegenContext,
    +      e: CollectionGenerator,
    +      input: Seq[ExprCode],
    +      data: ExprCode,
    +      row: ExprCode): String = {
    +
    +    // Generate looping variables.
    +    val index = ctx.freshName("index")
    +
    +    // Add a check if the generate outer flag is true.
    +    val checks = optionalCode(outer, data.isNull)
    +
    +    // Add position
    +    val position = if (e.position) {
    +      Seq(ExprCode("", "false", index))
    +    } else {
    +      Seq.empty
    +    }
    +
    +    // Generate code for either ArrayData or MapData
    +    val (initMapData, updateRowData, values) = e.collectionSchema match {
    +      case ArrayType(st: StructType, nullable) if e.inline =>
    +        val row = codeGenAccessor(ctx, data.value, "col", index, st, 
nullable, checks)
    +        val fieldChecks = checks ++ optionalCode(nullable, row.isNull)
    +        val columns = st.fields.toSeq.zipWithIndex.map { case (f, i) =>
    +          codeGenAccessor(ctx, row.value, f.name, i.toString, f.dataType, 
f.nullable, fieldChecks)
    +        }
    +        ("", row.code, columns)
    +
    +      case ArrayType(dataType, nullable) =>
    +        ("", "", Seq(codeGenAccessor(ctx, data.value, "col", index, 
dataType, nullable, checks)))
    +
    +      case MapType(keyType, valueType, valueContainsNull) =>
    +        // Materialize the key and the value arrays before we enter the 
loop.
    +        val keyArray = ctx.freshName("keyArray")
    +        val valueArray = ctx.freshName("valueArray")
    +        val initArrayData =
    +          s"""
    +             |ArrayData $keyArray = ${data.isNull} ? null : 
${data.value}.keyArray();
    +             |ArrayData $valueArray = ${data.isNull} ? null : 
${data.value}.valueArray();
    +           """.stripMargin
    +        val values = Seq(
    +          codeGenAccessor(ctx, keyArray, "key", index, keyType, nullable = 
false, checks),
    +          codeGenAccessor(ctx, valueArray, "value", index, valueType, 
valueContainsNull, checks))
    +        (initArrayData, "", values)
    +    }
    +
    +    // In case of outer we need to make sure the loop is executed at-least 
once when the array/map
    +    // contains no input. We do this by setting the looping index to -1 if 
there is no input,
    +    // evaluation of the array is prevented by a check in the accessor 
code.
    +    val numElements = ctx.freshName("numElements")
    +    val init = if (outer) s"$numElements == 0 ? -1 : 0" else "0"
    +    val numOutput = metricTerm(ctx, "numOutputRows")
    +    s"""
    +       |${data.code}
    +       |$initMapData
    +       |int $numElements = ${data.isNull} ? 0 : 
${data.value}.numElements();
    +       |for (int $index = $init; $index < $numElements; $index++) {
    +       |  $numOutput.add(1);
    +       |  $updateRowData
    +       |  ${consume(ctx, input ++ position ++ values)}
    +       |}
    +     """.stripMargin
    +  }
    +
    +  /**
    +   * Generate code for a regular [[TraversableOnce]] returning 
[[Generator]].
    +   */
    +  private def codeGenTraversableOnce(
    +      ctx: CodegenContext,
    +      e: Expression,
    +      input: Seq[ExprCode],
    +      data: ExprCode,
    +      row: ExprCode): String = {
    +
    +    // Generate looping variables.
    +    val iterator = ctx.freshName("iterator")
    +    val hasNext = ctx.freshName("hasNext")
    +    val current = ctx.freshName("row")
    +
    +    // Add a check if the generate outer flag is true.
    +    val checks = optionalCode(outer, s"!$hasNext")
    +    val values = e.dataType match {
    --- End diff --
    
    The elementType is not going to work, since we also need the array element 
`nullable` flag.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to