Re: Filter cannot be pushed via a Join

2019-06-18 Thread William Wong
Hi Xiao,

Just report this with JIRA SPARK-28103.

https://issues.apache.org/jira/browse/SPARK-28103

Thanks and Regards,
William

On Wed, 19 Jun 2019 at 1:35 AM, Xiao Li  wrote:

> Hi, William,
>
> Thanks for reporting it. Could you open a JIRA?
>
> Cheers,
>
> Xiao
>
> William Wong  于2019年6月18日周二 上午8:57写道:
>
>> BTW, I noticed a workaround is creating a custom rule to remove 'empty
>> local relation' from a union table. However, I am not 100% sure if it is
>> the right approach.
>>
>> On Tue, Jun 18, 2019 at 11:53 PM William Wong 
>> wrote:
>>
>>> Dear all,
>>>
>>> I am not sure if it is something expected or not, and should I report it
>>> as a bug.  Basically, the constraints of a union table could be turned
>>> empty if any subtable is turned into an empty local relation. The side
>>> effect is filter cannot be inferred correctly (by
>>> InferFiltersFromConstrains)
>>>
>>> We may reproduce the issue with the following setup:
>>> 1) Prepare two tables:
>>> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
>>> USING PARQUET");
>>> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
>>> USING PARQUET");
>>>
>>> 2) Create a union view on table1.
>>> * spark.sql("""
>>>  | CREATE VIEW partitioned_table_1 AS
>>>  | SELECT * FROM table1 WHERE id = 'a'
>>>  | UNION ALL
>>>  | SELECT * FROM table1 WHERE id = 'b'
>>>  | UNION ALL
>>>  | SELECT * FROM table1 WHERE id = 'c'
>>>  | UNION ALL
>>>  | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>>  | """.stripMargin)
>>>
>>> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot
>>> be inferred. We can see that the constraints of the left table are empty.
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
>>> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>>> Join Inner, (id#0 = id#4)
>>> :- Union
>>> :  :- Filter (isnotnull(id#0) && (id#0 = a))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- LocalRelation , [id#0, val#1]
>>> :  :- LocalRelation , [id#0, val#1]
>>> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
>>> : +- Relation[id#0,val#1] parquet
>>> +- Filter isnotnull(id#4)
>>>+- Relation[id#4,val#5] parquet
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id =
>>> 'a'").queryExecution.optimizedPlan.children(0).constraints
>>> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>>>
>>> 4) Modified the query to avoid empty local relation. The filter 't2.id
>>> in ('a','b','c','d')' is then inferred properly. The constraints of the
>>> left table are not empty as well.
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id IN
>>> ('a','b','c','d')").queryExecution.optimizedPlan
>>> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>>> Join Inner, (id#0 = id#4)
>>> :- Union
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
>>> :  :  +- Relation[id#0,val#1] parquet
>>> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>>> isnotnull(id#0))
>>> : +- Relation[id#0,val#1] parquet
>>> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) ||
>>> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>>>+- Relation[id#4,val#5] parquet
>>>
>>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id IN
>>> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
>>> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
>>> Set(isnotnull(id#0), id#0 IN (a,b,c,d), id#0 = a) || (id#0 = b)) ||
>>> (id#0 = c)) || NOT id#0 IN (a,b,c)))
>>>
>>>
>>> Thanks and regards,
>>> William
>>>
>>>
>>> On Sat, Jun 15, 2019 at 1:13 AM William Wong 
>>> wrote:
>>>
 Hi all,

 Appreciate any expert may help on this strange behavior..

 It is interesting that... I implemented a custom rule to remove empty
 LocalRelation children under Union and run the same query. The filter 'id =
 'a' is inferred to the table2 and pushed via the Join.

 scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2
 WHERE t1.id = t2.id AND t1.id = 'a'").explain
 == Physical Plan ==
 *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
 :- Union
 :  :- *(1) Project [id#0, val#1]
 :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
 :  : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
 true, Format: Parquet, Location:
 

Re: Filter cannot be pushed via a Join

2019-06-18 Thread Xiao Li
Hi, William,

Thanks for reporting it. Could you open a JIRA?

Cheers,

Xiao

William Wong  于2019年6月18日周二 上午8:57写道:

> BTW, I noticed a workaround is creating a custom rule to remove 'empty
> local relation' from a union table. However, I am not 100% sure if it is
> the right approach.
>
> On Tue, Jun 18, 2019 at 11:53 PM William Wong 
> wrote:
>
>> Dear all,
>>
>> I am not sure if it is something expected or not, and should I report it
>> as a bug.  Basically, the constraints of a union table could be turned
>> empty if any subtable is turned into an empty local relation. The side
>> effect is filter cannot be inferred correctly (by
>> InferFiltersFromConstrains)
>>
>> We may reproduce the issue with the following setup:
>> 1) Prepare two tables:
>> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
>> USING PARQUET");
>> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
>> USING PARQUET");
>>
>> 2) Create a union view on table1.
>> * spark.sql("""
>>  | CREATE VIEW partitioned_table_1 AS
>>  | SELECT * FROM table1 WHERE id = 'a'
>>  | UNION ALL
>>  | SELECT * FROM table1 WHERE id = 'b'
>>  | UNION ALL
>>  | SELECT * FROM table1 WHERE id = 'c'
>>  | UNION ALL
>>  | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>>  | """.stripMargin)
>>
>> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot
>> be inferred. We can see that the constraints of the left table are empty.
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
>> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>> Join Inner, (id#0 = id#4)
>> :- Union
>> :  :- Filter (isnotnull(id#0) && (id#0 = a))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  :- LocalRelation , [id#0, val#1]
>> :  :- LocalRelation , [id#0, val#1]
>> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
>> : +- Relation[id#0,val#1] parquet
>> +- Filter isnotnull(id#4)
>>+- Relation[id#4,val#5] parquet
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id =
>> 'a'").queryExecution.optimizedPlan.children(0).constraints
>> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>>
>> 4) Modified the query to avoid empty local relation. The filter 't2.id
>> in ('a','b','c','d')' is then inferred properly. The constraints of the
>> left table are not empty as well.
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id IN
>> ('a','b','c','d')").queryExecution.optimizedPlan
>> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
>> Join Inner, (id#0 = id#4)
>> :- Union
>> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
>> :  :  +- Relation[id#0,val#1] parquet
>> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
>> isnotnull(id#0))
>> : +- Relation[id#0,val#1] parquet
>> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) ||
>> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>>+- Relation[id#4,val#5] parquet
>>
>> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id IN
>> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
>> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
>> Set(isnotnull(id#0), id#0 IN (a,b,c,d), id#0 = a) || (id#0 = b)) ||
>> (id#0 = c)) || NOT id#0 IN (a,b,c)))
>>
>>
>> Thanks and regards,
>> William
>>
>>
>> On Sat, Jun 15, 2019 at 1:13 AM William Wong 
>> wrote:
>>
>>> Hi all,
>>>
>>> Appreciate any expert may help on this strange behavior..
>>>
>>> It is interesting that... I implemented a custom rule to remove empty
>>> LocalRelation children under Union and run the same query. The filter 'id =
>>> 'a' is inferred to the table2 and pushed via the Join.
>>>
>>> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>>> t1.id = t2.id AND t1.id = 'a'").explain
>>> == Physical Plan ==
>>> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
>>> :- Union
>>> :  :- *(1) Project [id#0, val#1]
>>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>>> :  : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>>> true, Format: Parquet, Location:
>>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>>> ReadSchema: struct
>>> :  +- *(2) Project [id#0, val#1]
>>> : +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0
>>> = a))
>>> :+- *(2) FileScan parquet default.table1[id#0,val#1] Batched:

Re: Filter cannot be pushed via a Join

2019-06-18 Thread William Wong
BTW, I noticed a workaround is creating a custom rule to remove 'empty
local relation' from a union table. However, I am not 100% sure if it is
the right approach.

On Tue, Jun 18, 2019 at 11:53 PM William Wong  wrote:

> Dear all,
>
> I am not sure if it is something expected or not, and should I report it
> as a bug.  Basically, the constraints of a union table could be turned
> empty if any subtable is turned into an empty local relation. The side
> effect is filter cannot be inferred correctly (by
> InferFiltersFromConstrains)
>
> We may reproduce the issue with the following setup:
> 1) Prepare two tables:
> * spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
> USING PARQUET");
> * spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
> USING PARQUET");
>
> 2) Create a union view on table1.
> * spark.sql("""
>  | CREATE VIEW partitioned_table_1 AS
>  | SELECT * FROM table1 WHERE id = 'a'
>  | UNION ALL
>  | SELECT * FROM table1 WHERE id = 'b'
>  | UNION ALL
>  | SELECT * FROM table1 WHERE id = 'c'
>  | UNION ALL
>  | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>  | """.stripMargin)
>
> 3) View the optimized plan of this SQL. The filter 't2.id = 'a'' cannot
> be inferred. We can see that the constraints of the left table are empty.
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id = 'a'").queryExecution.optimizedPlan
> res39: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Join Inner, (id#0 = id#4)
> :- Union
> :  :- Filter (isnotnull(id#0) && (id#0 = a))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- LocalRelation , [id#0, val#1]
> :  :- LocalRelation , [id#0, val#1]
> :  +- Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 = a))
> : +- Relation[id#0,val#1] parquet
> +- Filter isnotnull(id#4)
>+- Relation[id#4,val#5] parquet
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id =
> 'a'").queryExecution.optimizedPlan.children(0).constraints
> res40: org.apache.spark.sql.catalyst.expressions.ExpressionSet = Set()
>
> 4) Modified the query to avoid empty local relation. The filter 't2.id in
> ('a','b','c','d')' is then inferred properly. The constraints of the left
> table are not empty as well.
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id IN
> ('a','b','c','d')").queryExecution.optimizedPlan
> res42: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
> Join Inner, (id#0 = id#4)
> :- Union
> :  :- Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  :- Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
> :  :  +- Relation[id#0,val#1] parquet
> :  +- Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
> isnotnull(id#0))
> : +- Relation[id#0,val#1] parquet
> +- Filter ((id#4 IN (a,b,c,d) && ((isnotnull(id#4) && (((id#4 = a) ||
> (id#4 = b)) || (id#4 = c))) || NOT id#4 IN (a,b,c))) && isnotnull(id#4))
>+- Relation[id#4,val#5] parquet
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id IN
> ('a','b','c','d')").queryExecution.optimizedPlan.children(0).constraints
> res44: org.apache.spark.sql.catalyst.expressions.ExpressionSet =
> Set(isnotnull(id#0), id#0 IN (a,b,c,d), id#0 = a) || (id#0 = b)) ||
> (id#0 = c)) || NOT id#0 IN (a,b,c)))
>
>
> Thanks and regards,
> William
>
>
> On Sat, Jun 15, 2019 at 1:13 AM William Wong 
> wrote:
>
>> Hi all,
>>
>> Appreciate any expert may help on this strange behavior..
>>
>> It is interesting that... I implemented a custom rule to remove empty
>> LocalRelation children under Union and run the same query. The filter 'id =
>> 'a' is inferred to the table2 and pushed via the Join.
>>
>> scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
>> t1.id = t2.id AND t1.id = 'a'").explain
>> == Physical Plan ==
>> *(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
>> :- Union
>> :  :- *(1) Project [id#0, val#1]
>> :  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
>> :  : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
>> ReadSchema: struct
>> :  +- *(2) Project [id#0, val#1]
>> : +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
>> a))
>> :+- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
>> true, Format: Parquet, Location:
>> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
>> PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
>> EqualTo(id,a)], ReadSchema: struct
>> +- 

Re: Filter cannot be pushed via a Join

2019-06-17 Thread Gourav Sengupta
Hi,
Can I ask which is the version of SPARK you are using? And in what
environment?

Regards,
Gourav

On Fri, Jun 14, 2019 at 5:14 PM William Wong  wrote:

> Dear all,
>
> I created two tables.
>
> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
> USING PARQUET");
> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
> metastore. hive.metastore.schema.verification is not enabled so recording
> the schema version 1.2.0
> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
> returning NoSuchObjectException
> res1: org.apache.spark.sql.DataFrame = []
>
> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
> USING PARQUET");
> res2: org.apache.spark.sql.DataFrame = []
>
>
> It is the plan of joining these two column via ID column. It looks good to
> me as the filter 'id ='a'' is pushed to both tables as expected.
>
> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id
> AND t1.id ='a'").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
> :- *(2) Project [id#23, val#24]
> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
> : +- *(2) FileScan parquet default.table2[id#23,val#24] Batched: true,
> Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], 
> *PartitionFilters:
> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
> struct
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>+- *(1) Project [id#68, val#69]
>   +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>  +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], 
> *PartitionFilters:
> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
> struct
>
>
> Somehow, we created a view on table1 by union a few partitions like this:
>
> scala> spark.sql("""
>  | CREATE VIEW partitioned_table_1 AS
>  | SELECT * FROM table1 WHERE id = 'a'
>  | UNION ALL
>  | SELECT * FROM table1 WHERE id = 'b'
>  | UNION ALL
>  | SELECT * FROM table1 WHERE id = 'c'
>  | UNION ALL
>  | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>  | """.stripMargin)
> res7: org.apache.spark.sql.DataFrame = []
>
>
> In theory, selecting data via this view 'partitioned_table_1' should be
> the same as via the table 'table1'
>
> This query also can push the filter 'id IN ('a','b','c','d') to table2 as
> expected.
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
> == Physical Plan ==
> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
> :- Union
> :  :- *(1) Project [id#0, val#1]
> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
> :  : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
> [a,b,c,d])], ReadSchema: struct
> :  :- *(2) Project [id#0, val#1]
> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
> :  : +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
> [a,b,c,d])], ReadSchema: struct
> :  :- *(3) Project [id#0, val#1]
> :  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
> :  : +- *(3) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
> [a,b,c,d])], ReadSchema: struct
> :  +- *(4) Project [id#0, val#1]
> : +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
> isnotnull(id#0))
> :+- *(4) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
> [a,b,c,d]), IsNotNull(id)], ReadSchema: struct
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>+- *(5) Project [id#23, val#24]
>   +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) &&
> (((id#23 = a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
> isnotnull(id#23))
>  +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
> PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
> 

Re: Filter cannot be pushed via a Join

2019-06-14 Thread William Wong
Hi all,

Appreciate any expert may help on this strange behavior..

It is interesting that... I implemented a custom rule to remove empty
LocalRelation children under Union and run the same query. The filter 'id =
'a' is inferred to the table2 and pushed via the Join.

scala> spark2.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id = 'a'").explain
== Physical Plan ==
*(4) BroadcastHashJoin [id#0], [id#4], Inner, BuildRight
:- Union
:  :- *(1) Project [id#0, val#1]
:  :  +- *(1) Filter (isnotnull(id#0) && (id#0 = a))
:  : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],
ReadSchema: struct
:  +- *(2) Project [id#0, val#1]
: +- *(2) Filter ((isnotnull(id#0) && NOT id#0 IN (a,b,c)) && (id#0 =
a))
:+- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), Not(In(id, [a,b,c])),
EqualTo(id,a)], ReadSchema: struct
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]))
   +- *(3) Project [id#4, val#5]
  +- *(3) Filter ((id#4 = a) && isnotnull(id#4))
 +- *(3) FileScan parquet default.table2[id#4,val#5] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
PartitionFilters: [], *PushedFilters: [EqualTo(id,a), IsNotNull(id)],*
ReadSchema: struct

scala>

Thanks and regards,
William



On Sat, Jun 15, 2019 at 12:13 AM William Wong  wrote:

> Dear all,
>
> I created two tables.
>
> scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
> USING PARQUET");
> 19/06/14 23:49:10 WARN ObjectStore: Version information not found in
> metastore. hive.metastore.schema.verification is not enabled so recording
> the schema version 1.2.0
> 19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
> returning NoSuchObjectException
> res1: org.apache.spark.sql.DataFrame = []
>
> scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
> USING PARQUET");
> res2: org.apache.spark.sql.DataFrame = []
>
>
> It is the plan of joining these two column via ID column. It looks good to
> me as the filter 'id ='a'' is pushed to both tables as expected.
>
> scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id
> AND t1.id ='a'").explain
> == Physical Plan ==
> *(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
> :- *(2) Project [id#23, val#24]
> :  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
> : +- *(2) FileScan parquet default.table2[id#23,val#24] Batched: true,
> Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], 
> *PartitionFilters:
> [], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
> struct
> +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
> true]))
>+- *(1) Project [id#68, val#69]
>   +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
>  +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2], 
> *PartitionFilters:
> [], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
> struct
>
>
> Somehow, we created a view on table1 by union a few partitions like this:
>
> scala> spark.sql("""
>  | CREATE VIEW partitioned_table_1 AS
>  | SELECT * FROM table1 WHERE id = 'a'
>  | UNION ALL
>  | SELECT * FROM table1 WHERE id = 'b'
>  | UNION ALL
>  | SELECT * FROM table1 WHERE id = 'c'
>  | UNION ALL
>  | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
>  | """.stripMargin)
> res7: org.apache.spark.sql.DataFrame = []
>
>
> In theory, selecting data via this view 'partitioned_table_1' should be
> the same as via the table 'table1'
>
> This query also can push the filter 'id IN ('a','b','c','d') to table2 as
> expected.
>
> scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
> t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
> == Physical Plan ==
> *(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
> :- Union
> :  :- *(1) Project [id#0, val#1]
> :  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
> :  : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
> PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
> [a,b,c,d])], ReadSchema: struct
> :  :- *(2) Project [id#0, val#1]
> :  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
> :  : +- *(2) FileScan parquet default.table1[id#0,val#1] Batched:
> true, Format: Parquet, Location:
> 

Filter cannot be pushed via a Join

2019-06-14 Thread William Wong
Dear all,

I created two tables.

scala> spark.sql("CREATE TABLE IF NOT EXISTS table1(id string, val string)
USING PARQUET");
19/06/14 23:49:10 WARN ObjectStore: Version information not found in
metastore. hive.metastore.schema.verification is not enabled so recording
the schema version 1.2.0
19/06/14 23:49:11 WARN ObjectStore: Failed to get database default,
returning NoSuchObjectException
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("CREATE TABLE IF NOT EXISTS table2(id string, val string)
USING PARQUET");
res2: org.apache.spark.sql.DataFrame = []


It is the plan of joining these two column via ID column. It looks good to
me as the filter 'id ='a'' is pushed to both tables as expected.

scala> spark.sql("SELECT * FROM table2 t1, table2 t2 WHERE t1.id = t2.id
AND t1.id ='a'").explain
== Physical Plan ==
*(2) BroadcastHashJoin [id#23], [id#68], Inner, BuildRight
:- *(2) Project [id#23, val#24]
:  +- *(2) Filter (isnotnull(id#23) && (id#23 = a))
: +- *(2) FileScan parquet default.table2[id#23,val#24] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
*PartitionFilters:
[], PushedFilters: [IsNotNull(id), EqualTo(id,a)],* ReadSchema:
struct
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]))
   +- *(1) Project [id#68, val#69]
  +- *(1) Filter ((id#68 = a) && isnotnull(id#68))
 +- *(1) FileScan parquet default.table2[id#68,val#69] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
*PartitionFilters:
[], PushedFilters: [EqualTo(id,a), IsNotNull(id)],* ReadSchema:
struct


Somehow, we created a view on table1 by union a few partitions like this:

scala> spark.sql("""
 | CREATE VIEW partitioned_table_1 AS
 | SELECT * FROM table1 WHERE id = 'a'
 | UNION ALL
 | SELECT * FROM table1 WHERE id = 'b'
 | UNION ALL
 | SELECT * FROM table1 WHERE id = 'c'
 | UNION ALL
 | SELECT * FROM table1 WHERE id NOT IN ('a','b','c')
 | """.stripMargin)
res7: org.apache.spark.sql.DataFrame = []


In theory, selecting data via this view 'partitioned_table_1' should be the
same as via the table 'table1'

This query also can push the filter 'id IN ('a','b','c','d') to table2 as
expected.

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id IN ('a','b','c','d')").explain
== Physical Plan ==
*(6) BroadcastHashJoin [id#0], [id#23], Inner, BuildRight
:- Union
:  :- *(1) Project [id#0, val#1]
:  :  +- *(1) Filter ((isnotnull(id#0) && (id#0 = a)) && id#0 IN (a,b,c,d))
:  : +- *(1) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,a), In(id,
[a,b,c,d])], ReadSchema: struct
:  :- *(2) Project [id#0, val#1]
:  :  +- *(2) Filter ((isnotnull(id#0) && (id#0 = b)) && id#0 IN (a,b,c,d))
:  : +- *(2) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,b), In(id,
[a,b,c,d])], ReadSchema: struct
:  :- *(3) Project [id#0, val#1]
:  :  +- *(3) Filter ((isnotnull(id#0) && (id#0 = c)) && id#0 IN (a,b,c,d))
:  : +- *(3) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [IsNotNull(id), EqualTo(id,c), In(id,
[a,b,c,d])], ReadSchema: struct
:  +- *(4) Project [id#0, val#1]
: +- *(4) Filter ((NOT id#0 IN (a,b,c) && id#0 IN (a,b,c,d)) &&
isnotnull(id#0))
:+- *(4) FileScan parquet default.table1[id#0,val#1] Batched: true,
Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table1],
PartitionFilters: [], PushedFilters: [Not(In(id, [a,b,c])), In(id,
[a,b,c,d]), IsNotNull(id)], ReadSchema: struct
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string,
true]))
   +- *(5) Project [id#23, val#24]
  +- *(5) Filter ((id#23 IN (a,b,c,d) && ((isnotnull(id#23) && (((id#23
= a) || (id#23 = b)) || (id#23 = c))) || NOT id#23 IN (a,b,c))) &&
isnotnull(id#23))
 +- *(5) FileScan parquet default.table2[id#23,val#24] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/Users/williamwong/spark-warehouse/table2],
PartitionFilters: [], *PushedFilters: [In(id, [a,b,c,d]),
Or(And(IsNotNull(id),Or(Or(EqualTo(id,a),EqualTo(id,b)),EqualTo(id,c))),Not(I...,
*ReadSchema: struct

scala>


However, if we change the filter to 'id ='a', something strange happened.
The filter 'id = 'a' cannot be pushed via table2...

scala> spark.sql("SELECT * FROM partitioned_table_1 t1, table2 t2 WHERE
t1.id = t2.id AND t1.id = 'a'").explain
== Physical Plan ==
*(4) BroadcastHashJoin [id#0], [id#23],