[ 
https://issues.apache.org/jira/browse/FLINK-34924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeyhun Karimov updated FLINK-34924:
-----------------------------------
    Description: 
Consider the following tables: 
{code:java}
create table partitionedTable1 (
   a int, 
   b int, 
   c int)  
partitioned by (a, b) 
with ( ... )    {code}
 
{code:java}
create table partitionedTable2 (
    c int, 
    d int, 
    e int)  
 partitioned by (d, e) 
 with ( ... )  {code}
 

And the following queries:
{code:java}
select t1.b 
from partitionedTable1 t1 inner join partitionedTable2 t2 
on t1.a = t2.d 
where t1.a > 1

or 

select t1.b  from partitionedTable1 t1 inner join  partitionedTable2 t2 
on t1.a = t2.d and t1.b = t2.e 
where t1.a > 1{code}
 

For the above-mentioned queries, currently, the partition pushdown rules in 
Flink only consider the filter clause (t1.a > 1) and pushe the related 
partitions to the source operator. 

However, we should be able to also pushdown partitions because of join clause. 
Note that in the above-mentioned queries partitioned columns are the same as 
join fields (or prefix-subset of them). So, we can fetch existing partitions 
from each table, intersect them, and push their intersection to their source 
operators. 

  was:
Consider the following tables: 
{code:java}
create table partitionedTable1 (
   a int, 
   b int, 
   c int)  
partitioned by (a, b) 
with ( ... )    {code}
 
{code:java}
create table partitionedTable2 (
    c int, 
    d int, 
    e int)  
 partitioned by (d, e) 
 with ( ... )  {code}
 

And the following query:
{code:java}
select t1.b 
from partitionedTable1 t1 inner join partitionedTable2 t2 
on t1.a = t2.d 
where t1.a > 1{code}
 

Currently, the partition pushdown only considers the filter clause (t1.a > 1) 
and pushes the related partitions to the source operator. 

However, we should be able to also pushdown partitions because of join clause. 
Note that partitioned columns are the same as join fields. So, we can fetch 
existing partitions from each table, intersect them, and push their 
intersection to their source operators. 


> Support partition pushdown for join queries
> -------------------------------------------
>
>                 Key: FLINK-34924
>                 URL: https://issues.apache.org/jira/browse/FLINK-34924
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>    Affects Versions: 1.19.0
>            Reporter: Jeyhun Karimov
>            Priority: Major
>
> Consider the following tables: 
> {code:java}
> create table partitionedTable1 (
>    a int, 
>    b int, 
>    c int)  
> partitioned by (a, b) 
> with ( ... )    {code}
>  
> {code:java}
> create table partitionedTable2 (
>     c int, 
>     d int, 
>     e int)  
>  partitioned by (d, e) 
>  with ( ... )  {code}
>  
> And the following queries:
> {code:java}
> select t1.b 
> from partitionedTable1 t1 inner join partitionedTable2 t2 
> on t1.a = t2.d 
> where t1.a > 1
> or 
> select t1.b  from partitionedTable1 t1 inner join  partitionedTable2 t2 
> on t1.a = t2.d and t1.b = t2.e 
> where t1.a > 1{code}
>  
> For the above-mentioned queries, currently, the partition pushdown rules in 
> Flink only consider the filter clause (t1.a > 1) and pushe the related 
> partitions to the source operator. 
> However, we should be able to also pushdown partitions because of join 
> clause. Note that in the above-mentioned queries partitioned columns are the 
> same as join fields (or prefix-subset of them). So, we can fetch existing 
> partitions from each table, intersect them, and push their intersection to 
> their source operators. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to