[ 
https://issues.apache.org/jira/browse/FLINK-26982?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17516221#comment-17516221
 ] 

zoucao edited comment on FLINK-26982 at 4/2/22 5:25 AM:
--------------------------------------------------------

I think we can optimize it in 2 ways:
1). add a new method in `Supports project/filter/limit/partition PushDown 
interface`, which is called `boolean enablePushDown()`, then add a new config 
`source.enforce-reuse` whose default value is false, if users want to reuse the 
source relNode, they can invoke the source table by add hints /*+ 
options('source.enforce-reuse'='true') */. In this situation, the method 
enablePushDown will return false to ensure the source table can be reused.

2). scan all the source relNode and calculate the number of the same source 
relNode before doing optimization, if the number is more than the threshold 
specified by users, then we do not push project/filter/limit/partition down to 
ensure reuse.

cc [~jark], [~godfreyhe], What do you think about this?


was (Author: zoucao):
I think we can optimize it in 2 ways:
1). add a new method in Supports project/filter/limit/partition PushDown 
interface, which is called `boolean enablePushDown`, then add a new config 
`source.enforce-reuse` whose default value is false, if users want to reuse the 
source relNode, they can invoke the source table by add hints /*+ 
options('source.enforce-reuse'='true') */. In this situation, the method 
enablePushDown will return false to ensure the source table can be reused.

2). scan all the source relNode and calculate the number of the same source 
relNode before doing optimization, if the number is more than the threshold 
specified by users, then we do not push project/filter/limit/partition down to 
ensure reuse.

cc [~jark], [~godfreyhe], What do you think about this?

>  strike a balance between reuse the same RelNode and 
> project/filter/limit/partition push down
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26982
>                 URL: https://issues.apache.org/jira/browse/FLINK-26982
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Planner
>            Reporter: zoucao
>            Priority: Major
>
> Now, Flink has effective reuse logic to reuse the same RelNode and subplan, 
> but it will lose efficacy in some situations, like 
> project/filter/limit/partition push down, if one of them is enabled, the new 
> source is not the same as old one, such that the new source can not be reused 
> anymore. 
> For some complicated SQL, many views will be created from the same source 
> table, and the scan RelNode can not be reused, such that many of the same 
> threads about reading source data will be created in one task, which will 
> cause the memory problem and sometimes will cause reading amplification.
> Should we do something to enforce reusing some specific relNode decided by 
> users themselves?
> The following SQL shows the situation proposed above.
> {code:java}
> create table fs(
>     a int,
>     b string,
>     c  bigint
> ) PARTITIONED by ( c )with (
>     'connector' = 'filesystem',
>     'format' = 'csv',
>     'path' = 'file:///tmp/test'
> );
> select * from
>    (select * from fs limit 1)
> union all
>    (select * from fs where a = 2)
> union all
>    (select 1, b, c from fs)
> union all
>    (select 1, b, c from fs where c = 1)
> {code}
> == Optimized Execution Plan ==
> {code:java}
> Union(all=[true], union=[a, b, c])
> :- Union(all=[true], union=[a, b, c])
> :  :- Union(all=[true], union=[a, b, c])
> :  :  :- Limit(offset=[0], fetch=[1])
> :  :  :  +- Exchange(distribution=[single])
> :  :  :     +- TableSourceScan(table=[[default_catalog, default_database, fs, 
> limit=[1]]], fields=[a, b, c])
> :  :  +- Calc(select=[CAST(2 AS INTEGER) AS a, b, c], where=[(a = 2)])
> :  :     +- TableSourceScan(table=[[default_catalog, default_database, fs, 
> filter=[=(a, 2)]]], fields=[a, b, c])
> :  +- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, c])
> :     +- TableSourceScan(table=[[default_catalog, default_database, fs, 
> project=[b, c], metadata=[]]], fields=[b, c])
> +- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, CAST(1 AS BIGINT) AS c])
>    +- TableSourceScan(table=[[default_catalog, default_database, fs, 
> partitions=[{c=1}], project=[b], metadata=[]]], fields=[b])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to