Adam Binford created SPARK-35213:
------------------------------------

             Summary: Corrupt DataFrame for certain withField patterns
                 Key: SPARK-35213
                 URL: https://issues.apache.org/jira/browse/SPARK-35213
             Project: Spark
          Issue Type: Bug
          Components: Optimizer, SQL
    Affects Versions: 3.1.1
            Reporter: Adam Binford


We encountered a very weird bug heavily using withField in production with the 
Spark 3.1.1. Jobs were dying with a lot of very weird JVM crashing errors (like 
jshort_disjoint_arraycopy during a copyMemory call), and occasional 
NegativeArraySize exceptions. We finally found a work around by ordering our 
withField calls in a certain way, and I was finally able to create some minimal 
examples to reproduce similar weird/broken behavior.

It seems to stem from the optimizations added in 
[https://github.com/apache/spark/pull/29812.] Because the same new optimization 
was added as an analyzer, there seems to be two different ways this issue can 
crop up, once at analysis time and once at runtime.

While these examples might seem odd, they represent how we've created a helper 
function that can create columns in arbitrary nested fields even if the 
intermediate fields don't exist yet.

Example of what I assume is an issue during analysis:
{code:java}
import pyspark.sql.functions as F 
df = spark.range(1).withColumn('data', F.struct()
    .withField('a', F.struct())
    .withField('b', F.struct())
    .withField('a.aa', F.lit('aa'))
    .withField('b.ba', F.lit('ba'))
    .withField('a.ab', F.lit('ab'))
    .withField('b.bb', F.lit('bb'))
    .withField('a.ac', F.lit('ac'))
)
df.printSchema(){code}
Output schema:
{code:java}
root
 |-- id: long (nullable = false)
 |-- data: struct (nullable = false)
 | |-- b: struct (nullable = false)
 | | |-- aa: string (nullable = false)
 | | |-- ab: string (nullable = false)
 | | |-- bb: string (nullable = false)
 | |-- a: struct (nullable = false)
 | | |-- aa: string (nullable = false)
 | | |-- ab: string (nullable = false)
 | | |-- ac: string (nullable = false){code}
And an example of runtime data issue:
{code:java}
df = (spark.range(1)
 .withColumn('data', F.struct()
   .withField('a', F.struct().withField('aa', F.lit('aa')))
   .withField('b', F.struct().withField('ba', F.lit('ba')))
 )
 .withColumn('data', F.col('data').withField('b.bb', F.lit('bb')))
 .withColumn('data', F.col('data').withField('a.ab', F.lit('ab')))
)
df.printSchema()
df.groupBy('data.a.aa', 'data.a.ab', 'data.b.ba', 'data.b.bb').count().show()
{code}
 Output:
{code:java}
root
 |-- id: long (nullable = false)
 |-- data: struct (nullable = false)
 | |-- a: struct (nullable = false)
 | | |-- aa: string (nullable = false)
 | | |-- ab: string (nullable = false)
 | |-- b: struct (nullable = false)
 | | |-- ba: string (nullable = false)
 | | |-- bb: string (nullable = false)
+---+---+---+---+-----+
| aa| ab| ba| bb|count|
+---+---+---+---+-----+
| ba| bb| aa| ab|    1|
+---+---+---+---+-----+
{code}
The columns have the wrong data in them, even though the schema is correct. 
Additionally, if you add another column you get an exception:
{code:java}
df = (spark.range(1)
 .withColumn('data', F.struct()
   .withField('a', F.struct().withField('aa', F.lit('aa')))
   .withField('b', F.struct().withField('ba', F.lit('ba')))
 )
 .withColumn('data', F.col('data').withField('a.ab', F.lit('ab')))
 .withColumn('data', F.col('data').withField('b.bb', F.lit('bb')))
 .withColumn('data', F.col('data').withField('a.ac', F.lit('ac')))
)
df.groupBy('data.a.aa', 'data.a.ab', 'data.a.ac', 'data.b.ba', 
'data.b.bb').count().show()
java.lang.ArrayIndexOutOfBoundsException: 2 at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:201)
 at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35)
 at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
 at 
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
 at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
 at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doConsume_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown
 Source) at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
{code}
 But if you reorder the withField expressions, you get correct behavior:
{code:java}
df = (spark.range(1)
 .withColumn('data', F.struct()
   .withField('a', F.struct().withField('aa', F.lit('aa')))
   .withField('b', F.struct().withField('ba', F.lit('ba')))
 )
 .withColumn('data', F.col('data').withField('a.ab', F.lit('ab')))
 .withColumn('data', F.col('data').withField('a.ac', F.lit('ac')))
 .withColumn('data', F.col('data').withField('b.bb', F.lit('bb')))
)
df.groupBy('data.a.aa', 'data.a.ab', 'data.a.ac', 'data.b.ba', 
'data.b.bb').count().show()
+---+---+---+---+---+-----+
| aa| ab| ac| ba| bb|count|
+---+---+---+---+---+-----+
| aa| ab| ac| ba| bb|    1|
+---+---+---+---+---+-----+
{code}
 I think this has to do with the double ".reverse" method to dedupe expressions 
in OptimizeUpdateFields. I'm working on a PR to try to fix these issues.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to