[ https://issues.apache.org/jira/browse/SPARK-26352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Josh Rosen updated SPARK-26352: ------------------------------- Labels: correctness (was: ) > join reordering should not change the order of output attributes > ---------------------------------------------------------------- > > Key: SPARK-26352 > URL: https://issues.apache.org/jira/browse/SPARK-26352 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.0, 2.4.0 > Reporter: Kris Mok > Assignee: Kris Mok > Priority: Major > Labels: correctness > Fix For: 2.3.3, 2.4.1, 3.0.0 > > > The optimizer rule {{org.apache.spark.sql.catalyst.optimizer.ReorderJoin}} > performs join reordering on inner joins. This was introduced from SPARK-12032 > in 2015-12. > After it had reordered the joins, though, it didn't check whether or not the > column order (in terms of the {{output}} attribute list) is still the same as > before. Thus, it's possible to have a mismatch between the reordered column > order vs the schema that a DataFrame thinks it has. > This can be demonstrated with the example: > {code:none} > spark.sql("create table table_a (x int, y int) using parquet") > spark.sql("create table table_b (i int, j int) using parquet") > spark.sql("create table table_c (a int, b int) using parquet") > val df = spark.sql("with df1 as (select * from table_a cross join table_b) > select * from df1 join table_c on a = x and b = i") > {code} > here's what the DataFrame thinks: > {code:none} > scala> df.printSchema > root > |-- x: integer (nullable = true) > |-- y: integer (nullable = true) > |-- i: integer (nullable = true) > |-- j: integer (nullable = true) > |-- a: integer (nullable = true) > |-- b: integer (nullable = true) > {code} > here's what the optimized plan thinks, after join reordering: > {code:none} > scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- > ${a.name}: ${a.dataType.typeName}")) > |-- x: integer > |-- y: integer > |-- a: integer > |-- b: integer > |-- i: integer > |-- j: integer > {code} > If we exclude the {{ReorderJoin}} rule (using Spark 2.4's optimizer rule > exclusion feature), it's back to normal: > {code:none} > scala> spark.conf.set("spark.sql.optimizer.excludedRules", > "org.apache.spark.sql.catalyst.optimizer.ReorderJoin") > scala> val df = spark.sql("with df1 as (select * from table_a cross join > table_b) select * from df1 join table_c on a = x and b = i") > df: org.apache.spark.sql.DataFrame = [x: int, y: int ... 4 more fields] > scala> df.queryExecution.optimizedPlan.output.foreach(a => println(s"|-- > ${a.name}: ${a.dataType.typeName}")) > |-- x: integer > |-- y: integer > |-- i: integer > |-- j: integer > |-- a: integer > |-- b: integer > {code} > Note that this column ordering problem leads to data corruption, and can > manifest itself in various symptoms: > * Silently corrupting data, if the reordered columns happen to either have > matching types or have sufficiently-compatible types (e.g. all fixed length > primitive types are considered as "sufficiently compatible" in an UnsafeRow), > then only the resulting data is going to be wrong but it might not trigger > any alarms immediately. Or > * Weird Java-level exceptions like {{java.lang.NegativeArraySizeException}}, > or even SIGSEGVs. -- This message was sent by Atlassian JIRA (v7.6.14#76016) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org