Hi all, Appreciate any expert may help on this strange behavior..
It is interesting that... I implemented a custom rule to remove empty LocalRelation children under Union and run the same query. The filter 'id = 'a' is inferred to the table2 and pushed via the Join. scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE t1.id = t2.id AND t1.id = 'a'").explain == Physical Plan == *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight :- Union : :- *(1) Project [id#0, val#1] : : +- *(1) Filter (isnotnull(id#0) && (id#0 = a)) : : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)], ReadSchema: struct<id:string,val:string> : +- *(2) Project [id#0, val#1] : +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a)) : +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])), EqualTo(id,a)], ReadSchema: struct<id:string,val:string> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true])) +- *(3) Project [id#4, val#5] +- *(3) Filter ((id#4 = a) && isnotnull(id#4)) +- *(3) FileScan parquet default.table2[id#4,val#5] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema: struct<id:string,val:string> scala> Thanks and regards, William On Sat, Jun 15, 2019 at 12:13 AM William Wong <william1...@gmail.com> wrote: > Dear all, > > I created two tables. > > scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string) > USING PARQUET"); > 19/06/14 23:49:10 WARN ObjectStore: Version information not found in > metastore. hive.metastore.schema.verification is not enabled so recording > the schema version 1.2.0 > 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default, > returning NoSuchObjectException > res1: org.apache.spark.sql.DataFrame = [] > > scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string) > USING PARQUET"); > res2: org.apache.spark.sql.DataFrame = [] > > > It is the plan of joining these two column via ID column. It looks good to > me as the filter 'id ='a'' is pushed to both tables as expected. > > scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id > AND t1.id ='a'").explain > == Physical Plan == > *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight > :- *(2) Project [id#23, val#24] > : +- *(2) Filter (isnotnull(id#23) && (id#23 = a)) > : +- *(2) FileScan parquet default.table2[id#23,val#24] Batched: true, > Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], > *PartitionFilters: > [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema: > struct<id:string,val:string> > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, > true])) > +- *(1) Project [id#68, val#69] > +- *(1) Filter ((id#68 = a) && isnotnull(id#68)) > +- *(1) FileScan parquet default.table2[id#68,val#69] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], > *PartitionFilters: > [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema: > struct<id:string,val:string> > > > Somehow, we created a view on table1 by union a few partitions like this: > > scala> spark.sql(""" > | CREATE VIEW partitioned_table_1 AS > | SELECT * FROM table1 WHERE id = 'a' > | UNION ALL > | SELECT * FROM table1 WHERE id = 'b' > | UNION ALL > | SELECT * FROM table1 WHERE id = 'c' > | UNION ALL > | SELECT * FROM table1 WHERE id NOT IN ('a','b','c') > | """.stripMargin) > res7: org.apache.spark.sql.DataFrame = [] > > > In theory, selecting data via this view 'partitioned_table_1' should be > the same as via the table 'table1' > > This query also can push the filter 'id IN ('a','b','c','d') to table2 as > expected. > > scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE > t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain > == Physical Plan == > *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight > :- Union > : :- *(1) Project [id#0, val#1] > : : +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d)) > : : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], > PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id, > [a,b,c,d])], ReadSchema: struct<id:string,val:string> > : :- *(2) Project [id#0, val#1] > : : +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d)) > : : +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], > PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id, > [a,b,c,d])], ReadSchema: struct<id:string,val:string> > : :- *(3) Project [id#0, val#1] > : : +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d)) > : : +- *(3) FileScan parquet default.table1[id#0,val#1] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], > PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id, > [a,b,c,d])], ReadSchema: struct<id:string,val:string> > : +- *(4) Project [id#0, val#1] > : +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) && > isnotnull(id#0)) > : +- *(4) FileScan parquet default.table1[id#0,val#1] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], > PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id, > [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string> > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, > true])) > +- *(5) Project [id#23, val#24] > +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) && > (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) && > isnotnull(id#23)) > +- *(5) FileScan parquet default.table2[id#23,val#24] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], > PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]), > Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I..., > *ReadSchema: struct<id:string,val:string> > > scala> > > > However, if we change the filter to 'id ='a', something strange happened. > The filter 'id = 'a' cannot be pushed via table2... > > scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE > t1.id = t2.id AND t1.id = 'a'").explain > == Physical Plan == > *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight > :- Union > : :- *(1) Project [id#0, val#1] > : : +- *(1) Filter (isnotnull(id#0) && (id#0 = a)) > : : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], > PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)], > ReadSchema: struct<id:string,val:string> > : :- LocalTableScan <empty>, [id#0, val#1] > : :- LocalTableScan <empty>, [id#0, val#1] > : +- *(2) Project [id#0, val#1] > : +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = > a)) > : +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1], > PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])), > EqualTo(id,a)], ReadSchema: struct<id:string,val:string> > +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, > true])) > +- *(3) Project [id#23, val#24] > +- *(3) Filter isnotnull(id#23) > +- *(3) FileScan parquet default.table2[id#23,val#24] Batched: > true, Format: Parquet, Location: > InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], > PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: > struct<id:string,val:string> > > > Appreciate if anyone has an idea on it. Many thanks. > > Best regards, > William >