[ https://issues.apache.org/jira/browse/SPARK-35213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
L. C. Hsieh resolved SPARK-35213. --------------------------------- Fix Version/s: 3.1.2 3.2.0 Resolution: Fixed Issue resolved by pull request 32338 [https://github.com/apache/spark/pull/32338] > Corrupt DataFrame for certain withField patterns > ------------------------------------------------ > > Key: SPARK-35213 > URL: https://issues.apache.org/jira/browse/SPARK-35213 > Project: Spark > Issue Type: Bug > Components: Optimizer, SQL > Affects Versions: 3.1.1 > Reporter: Adam Binford > Assignee: Adam Binford > Priority: Major > Fix For: 3.2.0, 3.1.2 > > > We encountered a very weird bug heavily using withField in production with > the Spark 3.1.1. Jobs were dying with a lot of very weird JVM crashing errors > (like jshort_disjoint_arraycopy during a copyMemory call), and occasional > NegativeArraySize exceptions. We finally found a work around by ordering our > withField calls in a certain way, and I was finally able to create some > minimal examples to reproduce similar weird/broken behavior. > It seems to stem from the optimizations added in > [https://github.com/apache/spark/pull/29812.] Because the same new > optimization was added as an analyzer, there seems to be two different ways > this issue can crop up, once at analysis time and once at runtime. > While these examples might seem odd, they represent how we've created a > helper function that can create columns in arbitrary nested fields even if > the intermediate fields don't exist yet. > Example of what I assume is an issue during analysis: > {code:java} > import pyspark.sql.functions as F > df = spark.range(1).withColumn('data', F.struct() > .withField('a', F.struct()) > .withField('b', F.struct()) > .withField('a.aa', F.lit('aa')) > .withField('b.ba', F.lit('ba')) > .withField('a.ab', F.lit('ab')) > .withField('b.bb', F.lit('bb')) > .withField('a.ac', F.lit('ac')) > ) > df.printSchema(){code} > Output schema: > {code:java} > root > |-- id: long (nullable = false) > |-- data: struct (nullable = false) > | |-- b: struct (nullable = false) > | | |-- aa: string (nullable = false) > | | |-- ab: string (nullable = false) > | | |-- bb: string (nullable = false) > | |-- a: struct (nullable = false) > | | |-- aa: string (nullable = false) > | | |-- ab: string (nullable = false) > | | |-- ac: string (nullable = false){code} > And an example of runtime data issue: > {code:java} > df = (spark.range(1) > .withColumn('data', F.struct() > .withField('a', F.struct().withField('aa', F.lit('aa'))) > .withField('b', F.struct().withField('ba', F.lit('ba'))) > ) > .withColumn('data', F.col('data').withField('b.bb', F.lit('bb'))) > .withColumn('data', F.col('data').withField('a.ab', F.lit('ab'))) > ) > df.printSchema() > df.groupBy('data.a.aa', 'data.a.ab', 'data.b.ba', 'data.b.bb').count().show() > {code} > Output: > {code:java} > root > |-- id: long (nullable = false) > |-- data: struct (nullable = false) > | |-- a: struct (nullable = false) > | | |-- aa: string (nullable = false) > | | |-- ab: string (nullable = false) > | |-- b: struct (nullable = false) > | | |-- ba: string (nullable = false) > | | |-- bb: string (nullable = false) > +---+---+---+---+-----+ > | aa| ab| ba| bb|count| > +---+---+---+---+-----+ > | ba| bb| aa| ab| 1| > +---+---+---+---+-----+ > {code} > The columns have the wrong data in them, even though the schema is correct. > Additionally, if you add another column you get an exception: > {code:java} > df = (spark.range(1) > .withColumn('data', F.struct() > .withField('a', F.struct().withField('aa', F.lit('aa'))) > .withField('b', F.struct().withField('ba', F.lit('ba'))) > ) > .withColumn('data', F.col('data').withField('a.ab', F.lit('ab'))) > .withColumn('data', F.col('data').withField('b.bb', F.lit('bb'))) > .withColumn('data', F.col('data').withField('a.ac', F.lit('ac'))) > ) > df.groupBy('data.a.aa', 'data.a.ab', 'data.a.ac', 'data.b.ba', > 'data.b.bb').count().show() > java.lang.ArrayIndexOutOfBoundsException: 2 at > org.apache.spark.sql.catalyst.expressions.GenericInternalRow.genericGet(rows.scala:201) > at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getAs(rows.scala:35) > at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46) > at > org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46) > at > org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doConsume_0$(Unknown > Source) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown > Source) at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown > Source) > {code} > But if you reorder the withField expressions, you get correct behavior: > {code:java} > df = (spark.range(1) > .withColumn('data', F.struct() > .withField('a', F.struct().withField('aa', F.lit('aa'))) > .withField('b', F.struct().withField('ba', F.lit('ba'))) > ) > .withColumn('data', F.col('data').withField('a.ab', F.lit('ab'))) > .withColumn('data', F.col('data').withField('a.ac', F.lit('ac'))) > .withColumn('data', F.col('data').withField('b.bb', F.lit('bb'))) > ) > df.groupBy('data.a.aa', 'data.a.ab', 'data.a.ac', 'data.b.ba', > 'data.b.bb').count().show() > +---+---+---+---+---+-----+ > | aa| ab| ac| ba| bb|count| > +---+---+---+---+---+-----+ > | aa| ab| ac| ba| bb| 1| > +---+---+---+---+---+-----+ > {code} > I think this has to do with the double ".reverse" method to dedupe > expressions in OptimizeUpdateFields. I'm working on a PR to try to fix these > issues. > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org