Hi all,

Appreciate any expert may help on this strange behavior..

It is interesting that... I implemented a custom rule to remove empty
LocalRelation children under Union and run the same query. The filter 'id =
'a' is inferred to the table2 and pushed via the Join.

scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id = 'a'").explain
== Physical Plan ==
*(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
:- Union
:  :- *(1) Project [id#0, val#1]
:  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
:  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
ReadSchema: struct<id:string,val:string>
:  +- *(2) Project [id#0, val#1]
:     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
a))
:        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]))
   +- *(3) Project [id#4, val#5]
      +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
         +- *(3) FileScan parquet default.table2[id#4,val#5] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
ReadSchema: struct<id:string,val:string>

scala>

Thanks and regards,
William



On Sat, Jun 15, 2019 at 12:13 AM William Wong <william1...@gmail.com> wrote:

> Dear all,
>
> I created two tables.
>
> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
> USING PARQUET");
> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
> metastore. hive.metastore.schema.verification is not enabled so recording
> the schema version 1.2.0
> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
> returning NoSuchObjectException
> res1: org.apache.spark.sql.DataFrame = []
>
> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
> USING PARQUET");
> res2: org.apache.spark.sql.DataFrame = []
>
>
> It is the plan of joining these two column via ID column. It looks good to
> me as the filter 'id ='a'' is pushed to both tables as expected.
>
> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id
> AND t1.id ='a'").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
> :- *(2) Project [id#23, val#24]
> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
> :     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched: true,
> Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], 
> *PartitionFilters:
> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
> struct<id:string,val:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>    +- *(1) Project [id#68, val#69]
>       +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>          +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], 
> *PartitionFilters:
> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
> struct<id:string,val:string>
>
>
> Somehow, we created a view on table1 by union a few partitions like this:
>
> scala> spark.sql("""
>      | CREATE VIEW partitioned_table_1 AS
>      | SELECT * FROM table1 WHERE id = 'a'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'b'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id = 'c'
>      | UNION ALL
>      | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>      | """.stripMargin)
> res7: org.apache.spark.sql.DataFrame = []
>
>
> In theory, selecting data via this view 'partitioned_table_1' should be
> the same as via the table 'table1'
>
> This query also can push the filter 'id IN ('a','b','c','d') to table2 as
> expected.
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
> == Physical Plan ==
> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
> :- Union
> :  :- *(1) Project [id#0, val#1]
> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
> :  :- *(2) Project [id#0, val#1]
> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
> :  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
> :  :- *(3) Project [id#0, val#1]
> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
> :  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
> [a,b,c,d])], ReadSchema: struct<id:string,val:string>
> :  +- *(4) Project [id#0, val#1]
> :     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
> isnotnull(id#0))
> :        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>    +- *(5) Project [id#23, val#24]
>       +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) &&
> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
> isnotnull(id#23))
>          +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
> Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
> *ReadSchema: struct<id:string,val:string>
>
> scala>
>
>
> However, if we change the filter to 'id ='a', something strange happened.
> The filter 'id = 'a' cannot be pushed via table2...
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id = 'a'").explain
> == Physical Plan ==
> *(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
> :- Union
> :  :- *(1) Project [id#0, val#1]
> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
> :  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
> ReadSchema: struct<id:string,val:string>
> :  :- LocalTableScan <empty>, [id#0, val#1]
> :  :- LocalTableScan <empty>, [id#0, val#1]
> :  +- *(2) Project [id#0, val#1]
> :     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
> a))
> :        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
> EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>    +- *(3) Project [id#23, val#24]
>       +- *(3) Filter isnotnull(id#23)
>          +- *(3) FileScan parquet default.table2[id#23,val#24] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
> PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
> struct<id:string,val:string>
>
>
> Appreciate if anyone has an idea on it. Many thanks.
>
> Best regards,
> William
>

Reply via email to