G Muciaccia created SPARK-37855:
-----------------------------------

             Summary: IllegalStateException when transforming an array inside a 
nested struct
                 Key: SPARK-37855
                 URL: https://issues.apache.org/jira/browse/SPARK-37855
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.2.0
         Environment: OS: Ubuntu 20.04.3 LTS

Scala version: 2.12.12

 
            Reporter: G Muciaccia


*NOTE*: this bug is only present in version {{3.2.0}}. Downgrading to {{3.1.2}} 
solves the problem.

h3. Prerequisites to reproduce the bug

# use Spark version 3.2.0
# create a DataFrame with an array field, which contains a struct field with a 
nested array field
# *apply a limit* to the DataFrame
# transform the outer array, renaming one of its fields
# transform the inner array too, which requires two {{getField}} in sequence

h3. Example that reproduces the bug

This is a minimal example (as minimal as I could make it) to reproduce the bug:

{code}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}

def makeInput(): DataFrame = {
    val innerElement1 = Row(3, 3.12)
    val innerElement2 = Row(4, 2.1)
    val innerElement3 = Row(1, 985.2)
    val innerElement4 = Row(10, 757548.0)
    val innerElement5 = Row(1223, 0.665)

    val outerElement1 = Row(1, Row(List(innerElement1, innerElement2)))
    val outerElement2 = Row(2, Row(List(innerElement3)))
    val outerElement3 = Row(3, Row(List(innerElement4, innerElement5)))

    val data = Seq(
        Row("row1", List(outerElement1)),
        Row("row2", List(outerElement2, outerElement3)),
    )

    val schema = new StructType()
        .add("name", StringType)
        .add("outer_array", ArrayType(new StructType()
            .add("id", IntegerType)
            .add("inner_array_struct", new StructType()
                .add("inner_array", ArrayType(new StructType()
                    .add("id", IntegerType)
                    .add("value", DoubleType)
                ))
            )
        ))

    spark.createDataFrame(spark.sparkContext
        .parallelize(data),schema)
}

// val df = makeInput()
val df = makeInput().limit(2)
// val df = makeInput().limit(2).cache()

val res = df.withColumn("extracted", transform(
    col("outer_array"),
    c1 => {
        struct(
            c1.getField("id").alias("outer_id"),
            transform(
                c1.getField("inner_array_struct").getField("inner_array"),
                c2 => {
                    struct(
                        c2.getField("value").alias("inner_value")
                    )
                }
            )
        )
    }
))

res.printSchema()
res.show(false)
{code}

h4. Executing the example code

When executing it as-is, the execution will fail on the {{show}} statement, with

{code}
java.lang.IllegalStateException Couldn't find _extract_inner_array#23 in 
[name#2,outer_array#3]
{code}

However, *if the limit is not applied, or if the DataFrame is cached after the 
limit, everything works* (you can uncomment the corresponding lines in the 
example to try it).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to