[ 
https://issues.apache.org/jira/browse/SPARK-35213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-35213:
------------------------------------

    Assignee:     (was: Apache Spark)

> Corrupt DataFrame for certain withField patterns
> ------------------------------------------------
>
>                 Key: SPARK-35213
>                 URL: https://issues.apache.org/jira/browse/SPARK-35213
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 3.1.1
>            Reporter: Adam Binford
>            Priority: Major
>
> We encountered a very weird bug heavily using withField in production with 
> the Spark 3.1.1. Jobs were dying with a lot of very weird JVM crashing errors 
> (like jshort_disjoint_arraycopy during a copyMemory call), and occasional 
> NegativeArraySize exceptions. We finally found a work around by ordering our 
> withField calls in a certain way, and I was finally able to create some 
> minimal examples to reproduce similar weird/broken behavior.
> It seems to stem from the optimizations added in 
> [https://github.com/apache/spark/pull/29812.] Because the same new 
> optimization was added as an analyzer, there seems to be two different ways 
> this issue can crop up, once at analysis time and once at runtime.
> While these examples might seem odd, they represent how we've created a 
> helper function that can create columns in arbitrary nested fields even if 
> the intermediate fields don't exist yet.
> Example of what I assume is an issue during analysis:
> {code:java}
> import pyspark.sql.functions as F 
> df = spark.range(1).withColumn('data', F.struct()
>     .withField('a', F.struct())
>     .withField('b', F.struct())
>     .withField('a.aa', F.lit('aa'))
>     .withField('b.ba', F.lit('ba'))
>     .withField('a.ab', F.lit('ab'))
>     .withField('b.bb', F.lit('bb'))
>     .withField('a.ac', F.lit('ac'))
> )
> df.printSchema(){code}
> Output schema:
> {code:java}
> root
>  |-- id: long (nullable = false)
>  |-- data: struct (nullable = false)
>  | |-- b: struct (nullable = false)
>  | | |-- aa: string (nullable = false)
>  | | |-- ab: string (nullable = false)
>  | | |-- bb: string (nullable = false)
>  | |-- a: struct (nullable = false)
>  | | |-- aa: string (nullable = false)
>  | | |-- ab: string (nullable = false)
>  | | |-- ac: string (nullable = false){code}
> And an example of runtime data issue:
> {code:java}
> df = (spark.range(1)
>  .withColumn('data', F.struct()
>    .withField('a', F.struct().withField('aa', F.lit('aa')))
>    .withField('b', F.struct().withField('ba', F.lit('ba')))
>  )
>  .withColumn('data', F.col('data').withField('b.bb', F.lit('bb')))
>  .withColumn('data', F.col('data').withField('a.ab', F.lit('ab')))
> )
> df.printSchema()
> df.groupBy('data.a.aa', 'data.a.ab', 'data.b.ba', 'data.b.bb').count().show()
> {code}
>  Output:
> {code:java}
> root
>  |-- id: long (nullable = false)
>  |-- data: struct (nullable = false)
>  | |-- a: struct (nullable = false)
>  | | |-- aa: string (nullable = false)
>  | | |-- ab: string (nullable = false)
>  | |-- b: struct (nullable = false)
>  | | |-- ba: string (nullable = false)
>  | | |-- bb: string (nullable = false)
> +---+---+---+---+-----+
> | aa| ab| ba| bb|count|
> +---+---+---+---+-----+
> | ba| bb| aa| ab|    1|
> +---+---+---+---+-----+
> {code}
> The columns have the wrong data in them, even though the schema is correct. 
> Additionally, if you add another column you get an exception:
> {code:java}
> df = (spark.range(1)
>  .withColumn('data', F.struct()
>    .withField('a', F.struct().withField('aa', F.lit('aa')))
>    .withField('b', F.struct().withField('ba', F.lit('ba')))
>  )
>  .withColumn('data', F.col('data').withField('a.ab', F.lit('ab')))
>  .withColumn('data', F.col('data').withField('b.bb', F.lit('bb')))
>  .withColumn('data', F.col('data').withField('a.ac', F.lit('ac')))
> )
> df.groupBy('data.a.aa', 'data.a.ab', 'data.a.ac', 'data.b.ba', 
> 'data.b.bb').count().show()
> java.lang.ArrayIndexOutOfBoundsException: 2 at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:201)
>  at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35)
>  at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
>  at 
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
>  at 
> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
>  at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doConsume_0$(Unknown
>  Source) at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown
>  Source) at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
>  Source)
> {code}
>  But if you reorder the withField expressions, you get correct behavior:
> {code:java}
> df = (spark.range(1)
>  .withColumn('data', F.struct()
>    .withField('a', F.struct().withField('aa', F.lit('aa')))
>    .withField('b', F.struct().withField('ba', F.lit('ba')))
>  )
>  .withColumn('data', F.col('data').withField('a.ab', F.lit('ab')))
>  .withColumn('data', F.col('data').withField('a.ac', F.lit('ac')))
>  .withColumn('data', F.col('data').withField('b.bb', F.lit('bb')))
> )
> df.groupBy('data.a.aa', 'data.a.ab', 'data.a.ac', 'data.b.ba', 
> 'data.b.bb').count().show()
> +---+---+---+---+---+-----+
> | aa| ab| ac| ba| bb|count|
> +---+---+---+---+---+-----+
> | aa| ab| ac| ba| bb|    1|
> +---+---+---+---+---+-----+
> {code}
>  I think this has to do with the double ".reverse" method to dedupe 
> expressions in OptimizeUpdateFields. I'm working on a PR to try to fix these 
> issues.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to