Dear all,

I created two tables.

scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
USING PARQUET");
19/06/14 23:49:10 WARN ObjectStore: Version information not found in
metastore. hive.metastore.schema.verification is not enabled so recording
the schema version 1.2.0
19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
returning NoSuchObjectException
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
USING PARQUET");
res2: org.apache.spark.sql.DataFrame = []


It is the plan of joining these two column via ID column. It looks good to
me as the filter 'id ='a'' is pushed to both tables as expected.

scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id
AND t1.id ='a'").explain
== Physical Plan ==
*(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
:- *(2) Project [id#23, val#24]
:  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
:     +- *(2) FileScan parquet default.table2[id#23,val#24] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
*PartitionFilters:
[], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
struct<id:string,val:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]))
   +- *(1) Project [id#68, val#69]
      +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
         +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
*PartitionFilters:
[], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
struct<id:string,val:string>


Somehow, we created a view on table1 by union a few partitions like this:

scala> spark.sql("""
     | CREATE VIEW partitioned_table_1 AS
     | SELECT * FROM table1 WHERE id = 'a'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'b'
     | UNION ALL
     | SELECT * FROM table1 WHERE id = 'c'
     | UNION ALL
     | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
     | """.stripMargin)
res7: org.apache.spark.sql.DataFrame = []


In theory, selecting data via this view 'partitioned_table_1' should be the
same as via the table 'table1'

This query also can push the filter 'id IN ('a','b','c','d') to table2 as
expected.

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
== Physical Plan ==
*(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
:- Union
:  :- *(1) Project [id#0, val#1]
:  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
:  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
[a,b,c,d])], ReadSchema: struct<id:string,val:string>
:  :- *(2) Project [id#0, val#1]
:  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
:  :     +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
[a,b,c,d])], ReadSchema: struct<id:string,val:string>
:  :- *(3) Project [id#0, val#1]
:  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
:  :     +- *(3) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
[a,b,c,d])], ReadSchema: struct<id:string,val:string>
:  +- *(4) Project [id#0, val#1]
:     +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
isnotnull(id#0))
:        +- *(4) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
[a,b,c,d]), IsNotNull(id)], ReadSchema: struct<id:string,val:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]))
   +- *(5) Project [id#23, val#24]
      +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) && (((id#23
= a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
isnotnull(id#23))
         +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
*ReadSchema: struct<id:string,val:string>

scala>


However, if we change the filter to 'id ='a', something strange happened.
The filter 'id = 'a' cannot be pushed via table2...

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id = 'a'").explain
== Physical Plan ==
*(4) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
:- Union
:  :- *(1) Project [id#0, val#1]
:  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
:  :     +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
ReadSchema: struct<id:string,val:string>
:  :- LocalTableScan <empty>, [id#0, val#1]
:  :- LocalTableScan <empty>, [id#0, val#1]
:  +- *(2) Project [id#0, val#1]
:     +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
a))
:        +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
EqualTo(id,a)], ReadSchema: struct<id:string,val:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]))
   +- *(3) Project [id#23, val#24]
      +- *(3) Filter isnotnull(id#23)
         +- *(3) FileScan parquet default.table2[id#23,val#24] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema:
struct<id:string,val:string>


Appreciate if anyone has an idea on it. Many thanks.

Best regards,
William

Reply via email to