[ https://issues.apache.org/jira/browse/SPARK-28103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiao Li updated SPARK-28103: ---------------------------- Target Version/s: 3.0.0 > Cannot infer filters from union table with empty local relation table properly > ------------------------------------------------------------------------------ > > Key: SPARK-28103 > URL: https://issues.apache.org/jira/browse/SPARK-28103 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.2, 2.4.1 > Reporter: William Wong > Priority: Major > > Basically, the constraints of a union table could be turned empty if any > subtable is turned into an empty local relation. The side effect is filter > cannot be inferred correctly (by InferFiltersFromConstrains) > > We may reproduce the issue with the following setup: > 1) Prepare two tables: > > {code:java} > spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) USING > PARQUET"); > spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) USING > PARQUET");{code} > > 2) Create a union view on table1. > {code:java} > spark.sql(""" > | CREATE VIEW partitioned_table_1 AS > | SELECT * FROM table1 WHERE id = 'a' > | UNION ALL > | SELECT * FROM table1 WHERE id = 'b' > | UNION ALL > | SELECT * FROM table1 WHERE id = 'c' > | UNION ALL > | SELECT * FROM table1 WHERE id NOT IN ('a','b','c') > | """.stripMargin){code} > > 3) View the optimized plan of this SQL. The filter '[t2.id = 'a']' cannot be > inferred. We can see that the constraints of the left table are empty. > {code:java} > scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id > [t1.id] = t2.id [t2.id] AND t1.id [t1.id] = 'a'").queryExecution.optimizedPlan > res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = > Join Inner, (id#0 = id#4) > :- Union > : :- Filter (isnotnull(id#0) && (id#0 = a)) > : : +- Relation[id#0,val#1] parquet > : :- LocalRelation <empty>, [id#0, val#1] > : :- LocalRelation <empty>, [id#0, val#1] > : +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a)) > : +- Relation[id#0,val#1] parquet > +- Filter isnotnull(id#4) > +- Relation[id#4,val#5] parquet > scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id > [t1.id] = t2.id [t2.id] AND t1.id [t1.id] = > 'a'").queryExecution.optimizedPlan.children(0).constraints > res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set() > > {code} > > 4) Modified the query to avoid empty local relation. The filter '[td.id in > ('a','b','c','d')' is then inferred properly. The constraints of the left > table are not empty as well. > {code:java} > scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id > [t1.id] = t2.id [t2.id] AND t1.id [t1.id] IN > ('a','b','c','d')").queryExecution.optimizedPlan > res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = > Join Inner, (id#0 = id#4) > :- Union > : :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d)) > : : +- Relation[id#0,val#1] parquet > : :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d)) > : : +- Relation[id#0,val#1] parquet > : :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d)) > : : +- Relation[id#0,val#1] parquet > : +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && isnotnull(id#0)) > : +- Relation[id#0,val#1] parquet > +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4 = > b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4)) > +- Relation[id#4,val#5] parquet > > scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id > [t1.id] = t2.id [t2.id] AND t1.id [t1.id] IN > ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints > res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet = > Set(isnotnull(id#0), id#0 IN (a,b,c,d), ((((id#0 = a) || (id#0 = b)) || (id#0 > = c)) || NOT id#0 IN (a,b,c))) > {code} > > One of the possible workaround is create a rule to remove all empty local > relation from a union table. Or, when we convert a relation to into an empty > local relation, we should preserve those constraints in the empty local > relation as well. > > A side node. Expression in optimized plan is not well optimized. For example, > the expression > {code:java} > ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || (id#4 = b)) || > (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4)){code} > could be further optimized into > {code:java} > (isnotnull(id#4) && (id = d)){code} > We may implement another rule to > 1) convert all 'equal' operators into 'in' operator, and then group all > expressions by 'attribute reference' > 3) merge all those 'in' (or not in) operators > 4) revert in operator into 'equal' if there is only one element in the set. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org