Re: Filter cannot be pushed via a Join
Hi Xiao, Just report this with JIRA SPARK-28103. https://issues.apache.org/jira/browse/SPARK-28103 Thanks and Regards, William On Wed, 19 Jun 2019 at 1:35 AM, Xiao Li wrote: > Hi, William, > > Thanks for reporting it. Could you open a JIRA? > > Cheers, > > Xiao > > William Wong 于2019年6月18日周二 上午8:57写道: > >> BTW, I noticed a workaround is creating a custom rule to remove 'empty >> local relation' from a union table. However, I am not 100% sure if it is >> the right approach. >> >> On Tue, Jun 18, 2019 at 11:53 PM William Wong >> wrote: >> >>> Dear all, >>> >>> I am not sure if it is something expected or not, and should I report it >>> as a bug. Basically, the constraints of a union table could be turned >>> empty if any subtable is turned into an empty local relation. The side >>> effect is filter cannot be inferred correctly (by >>> InferFiltersFromConstrains) >>> >>> We may reproduce the issue with the following setup: >>> 1) Prepare two tables: >>> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) >>> USING PARQUET"); >>> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) >>> USING PARQUET"); >>> >>> 2) Create a union view on table1. >>> * spark.sql(""" >>> | CREATE VIEW partitioned_table_1 AS >>> | SELECT * FROM table1 WHERE id = 'a' >>> | UNION ALL >>> | SELECT * FROM table1 WHERE id = 'b' >>> | UNION ALL >>> | SELECT * FROM table1 WHERE id = 'c' >>> | UNION ALL >>> | SELECT * FROM table1 WHERE id NOT IN ('a','b','c') >>> | """.stripMargin) >>> >>> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot >>> be inferred. We can see that the constraints of the left table are empty. >>> >>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE >>> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan >>> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = >>> Join Inner, (id#0 = id#4) >>> :- Union >>> : :- Filter (isnotnull(id#0) && (id#0 = a)) >>> : : +- Relation[id#0,val#1] parquet >>> : :- LocalRelation , [id#0, val#1] >>> : :- LocalRelation , [id#0, val#1] >>> : +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a)) >>> : +- Relation[id#0,val#1] parquet >>> +- Filter isnotnull(id#4) >>>+- Relation[id#4,val#5] parquet >>> >>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE >>> t1.id = t2.id AND t1.id = >>> 'a'").queryExecution.optimizedPlan.children(0).constraints >>> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set() >>> >>> 4) Modified the query to avoid empty local relation. The filter 't2.id >>> in ('a','b','c','d')' is then inferred properly. The constraints of the >>> left table are not empty as well. >>> >>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE >>> t1.id = t2.id AND t1.id IN >>> ('a','b','c','d')").queryExecution.optimizedPlan >>> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = >>> Join Inner, (id#0 = id#4) >>> :- Union >>> : :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d)) >>> : : +- Relation[id#0,val#1] parquet >>> : :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d)) >>> : : +- Relation[id#0,val#1] parquet >>> : :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d)) >>> : : +- Relation[id#0,val#1] parquet >>> : +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && >>> isnotnull(id#0)) >>> : +- Relation[id#0,val#1] parquet >>> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || >>> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4)) >>>+- Relation[id#4,val#5] parquet >>> >>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE >>> t1.id = t2.id AND t1.id IN >>> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints >>> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet = >>> Set(isnotnull(id#0), id#0 IN (a,b,c,d), id#0 = a) || (id#0 = b)) || >>> (id#0 = c)) || NOT id#0 IN (a,b,c))) >>> >>> >>> Thanks and regards, >>> William >>> >>> >>> On Sat, Jun 15, 2019 at 1:13 AM William Wong >>> wrote: >>> Hi all, Appreciate any expert may help on this strange behavior.. It is interesting that... I implemented a custom rule to remove empty LocalRelation children under Union and run the same query. The filter 'id = 'a' is inferred to the table2 and pushed via the Join. scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id = t2.id AND t1.id = 'a'").explain == Physical Plan == *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight :- Union : :- *(1) Project [id#0, val#1] : : +- *(1) Filter (isnotnull(id#0) && (id#0 = a)) : : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location:
Re: Filter cannot be pushed via a Join
Hi, William, Thanks for reporting it. Could you open a JIRA? Cheers, Xiao William Wong 于2019年6月18日周二 上午8:57写道: > BTW, I noticed a workaround is creating a custom rule to remove 'empty > local relation' from a union table. However, I am not 100% sure if it is > the right approach. > > On Tue, Jun 18, 2019 at 11:53 PM William Wong > wrote: > >> Dear all, >> >> I am not sure if it is something expected or not, and should I report it >> as a bug. Basically, the constraints of a union table could be turned >> empty if any subtable is turned into an empty local relation. The side >> effect is filter cannot be inferred correctly (by >> InferFiltersFromConstrains) >> >> We may reproduce the issue with the following setup: >> 1) Prepare two tables: >> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) >> USING PARQUET"); >> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) >> USING PARQUET"); >> >> 2) Create a union view on table1. >> * spark.sql(""" >> | CREATE VIEW partitioned_table_1 AS >> | SELECT * FROM table1 WHERE id = 'a' >> | UNION ALL >> | SELECT * FROM table1 WHERE id = 'b' >> | UNION ALL >> | SELECT * FROM table1 WHERE id = 'c' >> | UNION ALL >> | SELECT * FROM table1 WHERE id NOT IN ('a','b','c') >> | """.stripMargin) >> >> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot >> be inferred. We can see that the constraints of the left table are empty. >> >> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE >> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan >> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = >> Join Inner, (id#0 = id#4) >> :- Union >> : :- Filter (isnotnull(id#0) && (id#0 = a)) >> : : +- Relation[id#0,val#1] parquet >> : :- LocalRelation , [id#0, val#1] >> : :- LocalRelation , [id#0, val#1] >> : +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a)) >> : +- Relation[id#0,val#1] parquet >> +- Filter isnotnull(id#4) >>+- Relation[id#4,val#5] parquet >> >> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE >> t1.id = t2.id AND t1.id = >> 'a'").queryExecution.optimizedPlan.children(0).constraints >> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set() >> >> 4) Modified the query to avoid empty local relation. The filter 't2.id >> in ('a','b','c','d')' is then inferred properly. The constraints of the >> left table are not empty as well. >> >> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE >> t1.id = t2.id AND t1.id IN >> ('a','b','c','d')").queryExecution.optimizedPlan >> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = >> Join Inner, (id#0 = id#4) >> :- Union >> : :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d)) >> : : +- Relation[id#0,val#1] parquet >> : :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d)) >> : : +- Relation[id#0,val#1] parquet >> : :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d)) >> : : +- Relation[id#0,val#1] parquet >> : +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && >> isnotnull(id#0)) >> : +- Relation[id#0,val#1] parquet >> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || >> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4)) >>+- Relation[id#4,val#5] parquet >> >> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE >> t1.id = t2.id AND t1.id IN >> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints >> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet = >> Set(isnotnull(id#0), id#0 IN (a,b,c,d), id#0 = a) || (id#0 = b)) || >> (id#0 = c)) || NOT id#0 IN (a,b,c))) >> >> >> Thanks and regards, >> William >> >> >> On Sat, Jun 15, 2019 at 1:13 AM William Wong >> wrote: >> >>> Hi all, >>> >>> Appreciate any expert may help on this strange behavior.. >>> >>> It is interesting that... I implemented a custom rule to remove empty >>> LocalRelation children under Union and run the same query. The filter 'id = >>> 'a' is inferred to the table2 and pushed via the Join. >>> >>> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE >>> t1.id = t2.id AND t1.id = 'a'").explain >>> == Physical Plan == >>> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight >>> :- Union >>> : :- *(1) Project [id#0, val#1] >>> : : +- *(1) Filter (isnotnull(id#0) && (id#0 = a)) >>> : : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: >>> true, Format: Parquet, Location: >>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], >>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)], >>> ReadSchema: struct >>> : +- *(2) Project [id#0, val#1] >>> : +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 >>> = a)) >>> :+- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
Re: Filter cannot be pushed via a Join
BTW, I noticed a workaround is creating a custom rule to remove 'empty local relation' from a union table. However, I am not 100% sure if it is the right approach. On Tue, Jun 18, 2019 at 11:53 PM William Wong wrote: > Dear all, > > I am not sure if it is something expected or not, and should I report it > as a bug. Basically, the constraints of a union table could be turned > empty if any subtable is turned into an empty local relation. The side > effect is filter cannot be inferred correctly (by > InferFiltersFromConstrains) > > We may reproduce the issue with the following setup: > 1) Prepare two tables: > * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) > USING PARQUET"); > * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) > USING PARQUET"); > > 2) Create a union view on table1. > * spark.sql(""" > | CREATE VIEW partitioned_table_1 AS > | SELECT * FROM table1 WHERE id = 'a' > | UNION ALL > | SELECT * FROM table1 WHERE id = 'b' > | UNION ALL > | SELECT * FROM table1 WHERE id = 'c' > | UNION ALL > | SELECT * FROM table1 WHERE id NOT IN ('a','b','c') > | """.stripMargin) > > 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot > be inferred. We can see that the constraints of the left table are empty. > > scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE > t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan > res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = > Join Inner, (id#0 = id#4) > :- Union > : :- Filter (isnotnull(id#0) && (id#0 = a)) > : : +- Relation[id#0,val#1] parquet > : :- LocalRelation , [id#0, val#1] > : :- LocalRelation , [id#0, val#1] > : +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a)) > : +- Relation[id#0,val#1] parquet > +- Filter isnotnull(id#4) >+- Relation[id#4,val#5] parquet > > scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE > t1.id = t2.id AND t1.id = > 'a'").queryExecution.optimizedPlan.children(0).constraints > res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set() > > 4) Modified the query to avoid empty local relation. The filter 't2.id in > ('a','b','c','d')' is then inferred properly. The constraints of the left > table are not empty as well. > > scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE > t1.id = t2.id AND t1.id IN > ('a','b','c','d')").queryExecution.optimizedPlan > res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = > Join Inner, (id#0 = id#4) > :- Union > : :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d)) > : : +- Relation[id#0,val#1] parquet > : :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d)) > : : +- Relation[id#0,val#1] parquet > : :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d)) > : : +- Relation[id#0,val#1] parquet > : +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && > isnotnull(id#0)) > : +- Relation[id#0,val#1] parquet > +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) || > (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4)) >+- Relation[id#4,val#5] parquet > > scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE > t1.id = t2.id AND t1.id IN > ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints > res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet = > Set(isnotnull(id#0), id#0 IN (a,b,c,d), id#0 = a) || (id#0 = b)) || > (id#0 = c)) || NOT id#0 IN (a,b,c))) > > > Thanks and regards, > William > > > On Sat, Jun 15, 2019 at 1:13 AM William Wong > wrote: > >> Hi all, >> >> Appreciate any expert may help on this strange behavior.. >> >> It is interesting that... I implemented a custom rule to remove empty >> LocalRelation children under Union and run the same query. The filter 'id = >> 'a' is inferred to the table2 and pushed via the Join. >> >> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE >> t1.id = t2.id AND t1.id = 'a'").explain >> == Physical Plan == >> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight >> :- Union >> : :- *(1) Project [id#0, val#1] >> : : +- *(1) Filter (isnotnull(id#0) && (id#0 = a)) >> : : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: >> true, Format: Parquet, Location: >> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], >> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)], >> ReadSchema: struct >> : +- *(2) Project [id#0, val#1] >> : +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = >> a)) >> :+- *(2) FileScan parquet default.table1[id#0,val#1] Batched: >> true, Format: Parquet, Location: >> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], >> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])), >> EqualTo(id,a)], ReadSchema: struct >> +-
Re: Filter cannot be pushed via a Join
Hi, Can I ask which is the version of SPARK you are using? And in what environment? Regards, Gourav On Fri, Jun 14, 2019 at 5:14 PM William Wong wrote: > Dear all, > > I created two tables. > > scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) > USING PARQUET"); > 19/06/14 23:49:10 WARN ObjectStore: Version information not found in > metastore. hive.metastore.schema.verification is not enabled so recording > the schema version 1.2.0 > 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default, > returning NoSuchObjectException > res1: org.apache.spark.sql.DataFrame = [] > > scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) > USING PARQUET"); > res2: org.apache.spark.sql.DataFrame = [] > > > It is the plan of joining these two column via ID column. It looks good to > me as the filter 'id ='a'' is pushed to both tables as expected. > > scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id > AND t1.id ='a'").explain > == Physical Plan == > *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight > :- *(2) Project [id#23, val#24] > : +- *(2) Filter (isnotnull(id#23) && (id#23 = a)) > : +- *(2) FileScan parquet default.table2[id#23,val#24] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], > *PartitionFilters: > [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema: > struct > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, > true])) >+- *(1) Project [id#68, val#69] > +- *(1) Filter ((id#68 = a) && isnotnull(id#68)) > +- *(1) FileScan parquet default.table2[id#68,val#69] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], > *PartitionFilters: > [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema: > struct > > > Somehow, we created a view on table1 by union a few partitions like this: > > scala> spark.sql(""" > | CREATE VIEW partitioned_table_1 AS > | SELECT * FROM table1 WHERE id = 'a' > | UNION ALL > | SELECT * FROM table1 WHERE id = 'b' > | UNION ALL > | SELECT * FROM table1 WHERE id = 'c' > | UNION ALL > | SELECT * FROM table1 WHERE id NOT IN ('a','b','c') > | """.stripMargin) > res7: org.apache.spark.sql.DataFrame = [] > > > In theory, selecting data via this view 'partitioned_table_1' should be > the same as via the table 'table1' > > This query also can push the filter 'id IN ('a','b','c','d') to table2 as > expected. > > scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE > t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain > == Physical Plan == > *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight > :- Union > : :- *(1) Project [id#0, val#1] > : : +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d)) > : : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], > PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id, > [a,b,c,d])], ReadSchema: struct > : :- *(2) Project [id#0, val#1] > : : +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d)) > : : +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], > PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id, > [a,b,c,d])], ReadSchema: struct > : :- *(3) Project [id#0, val#1] > : : +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d)) > : : +- *(3) FileScan parquet default.table1[id#0,val#1] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], > PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id, > [a,b,c,d])], ReadSchema: struct > : +- *(4) Project [id#0, val#1] > : +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && > isnotnull(id#0)) > :+- *(4) FileScan parquet default.table1[id#0,val#1] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], > PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id, > [a,b,c,d]), IsNotNull(id)], ReadSchema: struct > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, > true])) >+- *(5) Project [id#23, val#24] > +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) && > (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) && > isnotnull(id#23)) > +- *(5) FileScan parquet default.table2[id#23,val#24] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], > PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]), >
Re: Filter cannot be pushed via a Join
Hi all, Appreciate any expert may help on this strange behavior.. It is interesting that... I implemented a custom rule to remove empty LocalRelation children under Union and run the same query. The filter 'id = 'a' is inferred to the table2 and pushed via the Join. scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id = t2.id AND t1.id = 'a'").explain == Physical Plan == *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight :- Union : :- *(1) Project [id#0, val#1] : : +- *(1) Filter (isnotnull(id#0) && (id#0 = a)) : : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)], ReadSchema: struct : +- *(2) Project [id#0, val#1] : +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a)) :+- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])), EqualTo(id,a)], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) +- *(3) Project [id#4, val#5] +- *(3) Filter ((id#4 = a) && isnotnull(id#4)) +- *(3) FileScan parquet default.table2[id#4,val#5] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema: struct scala> Thanks and regards, William On Sat, Jun 15, 2019 at 12:13 AM William Wong wrote: > Dear all, > > I created two tables. > > scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) > USING PARQUET"); > 19/06/14 23:49:10 WARN ObjectStore: Version information not found in > metastore. hive.metastore.schema.verification is not enabled so recording > the schema version 1.2.0 > 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default, > returning NoSuchObjectException > res1: org.apache.spark.sql.DataFrame = [] > > scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) > USING PARQUET"); > res2: org.apache.spark.sql.DataFrame = [] > > > It is the plan of joining these two column via ID column. It looks good to > me as the filter 'id ='a'' is pushed to both tables as expected. > > scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id > AND t1.id ='a'").explain > == Physical Plan == > *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight > :- *(2) Project [id#23, val#24] > : +- *(2) Filter (isnotnull(id#23) && (id#23 = a)) > : +- *(2) FileScan parquet default.table2[id#23,val#24] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], > *PartitionFilters: > [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema: > struct > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, > true])) >+- *(1) Project [id#68, val#69] > +- *(1) Filter ((id#68 = a) && isnotnull(id#68)) > +- *(1) FileScan parquet default.table2[id#68,val#69] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], > *PartitionFilters: > [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema: > struct > > > Somehow, we created a view on table1 by union a few partitions like this: > > scala> spark.sql(""" > | CREATE VIEW partitioned_table_1 AS > | SELECT * FROM table1 WHERE id = 'a' > | UNION ALL > | SELECT * FROM table1 WHERE id = 'b' > | UNION ALL > | SELECT * FROM table1 WHERE id = 'c' > | UNION ALL > | SELECT * FROM table1 WHERE id NOT IN ('a','b','c') > | """.stripMargin) > res7: org.apache.spark.sql.DataFrame = [] > > > In theory, selecting data via this view 'partitioned_table_1' should be > the same as via the table 'table1' > > This query also can push the filter 'id IN ('a','b','c','d') to table2 as > expected. > > scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE > t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain > == Physical Plan == > *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight > :- Union > : :- *(1) Project [id#0, val#1] > : : +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d)) > : : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], > PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id, > [a,b,c,d])], ReadSchema: struct > : :- *(2) Project [id#0, val#1] > : : +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d)) > : : +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: > true, Format: Parquet, Location: >
Filter cannot be pushed via a Join
Dear all, I created two tables. scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) USING PARQUET"); 19/06/14 23:49:10 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException res1: org.apache.spark.sql.DataFrame = [] scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) USING PARQUET"); res2: org.apache.spark.sql.DataFrame = [] It is the plan of joining these two column via ID column. It looks good to me as the filter 'id ='a'' is pushed to both tables as expected. scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id AND t1.id ='a'").explain == Physical Plan == *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight :- *(2) Project [id#23, val#24] : +- *(2) Filter (isnotnull(id#23) && (id#23 = a)) : +- *(2) FileScan parquet default.table2[id#23,val#24] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) +- *(1) Project [id#68, val#69] +- *(1) Filter ((id#68 = a) && isnotnull(id#68)) +- *(1) FileScan parquet default.table2[id#68,val#69] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], *PartitionFilters: [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema: struct Somehow, we created a view on table1 by union a few partitions like this: scala> spark.sql(""" | CREATE VIEW partitioned_table_1 AS | SELECT * FROM table1 WHERE id = 'a' | UNION ALL | SELECT * FROM table1 WHERE id = 'b' | UNION ALL | SELECT * FROM table1 WHERE id = 'c' | UNION ALL | SELECT * FROM table1 WHERE id NOT IN ('a','b','c') | """.stripMargin) res7: org.apache.spark.sql.DataFrame = [] In theory, selecting data via this view 'partitioned_table_1' should be the same as via the table 'table1' This query also can push the filter 'id IN ('a','b','c','d') to table2 as expected. scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain == Physical Plan == *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight :- Union : :- *(1) Project [id#0, val#1] : : +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d)) : : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id, [a,b,c,d])], ReadSchema: struct : :- *(2) Project [id#0, val#1] : : +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d)) : : +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id, [a,b,c,d])], ReadSchema: struct : :- *(3) Project [id#0, val#1] : : +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d)) : : +- *(3) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id, [a,b,c,d])], ReadSchema: struct : +- *(4) Project [id#0, val#1] : +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && isnotnull(id#0)) :+- *(4) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id, [a,b,c,d]), IsNotNull(id)], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) +- *(5) Project [id#23, val#24] +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) && (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) && isnotnull(id#23)) +- *(5) FileScan parquet default.table2[id#23,val#24] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]), Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I..., *ReadSchema: struct scala> However, if we change the filter to 'id ='a', something strange happened. The filter 'id = 'a' cannot be pushed via table2... scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id = t2.id AND t1.id = 'a'").explain == Physical Plan == *(4) BroadcastHashJoin [id#0], [id#23],