zoucao created FLINK-26982:
------------------------------

             Summary:  strike a balance between reuse the same RelNode and 
project/filter/limit/partition push down
                 Key: FLINK-26982
                 URL: https://issues.apache.org/jira/browse/FLINK-26982
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
            Reporter: zoucao


Now, Flink has effective reuse logic to reuse the same RelNode and subplan, but 
it will lose efficacy in some situations, like project/filter/limit/partition 
push down, if one of them is enabled, the new source is not the same from old 
one, so the source can not be reused anymore. 
For some complicated SQL, many views will be created from the same table, and 
the scan RelNode can not be reused, such that many of the same threads about 
reading source data will be created in one task, which will cause the memory 
problem and sometimes will cause reading amplification.
Should we do something to enforce reusing decided by users themselves?
The following SQL shows the situation proposed above.

{code:java}
create table fs(
    a int,
    b string,
    c  bigint
) PARTITIONED by ( c )with (
    'connector' = 'filesystem',
    'format' = 'csv',
    'path' = 'file:///tmp/test'
);
select * from
   (select * from fs limit 1)
union all
   (select * from fs where a = 2)
union all
   (select 1, b, c from fs)
union all
   (select 1, b, c from fs where c = 1)
{code}
== Optimized Execution Plan ==
{code:java}
Union(all=[true], union=[a, b, c])
:- Union(all=[true], union=[a, b, c])
:  :- Union(all=[true], union=[a, b, c])
:  :  :- Limit(offset=[0], fetch=[1])
:  :  :  +- Exchange(distribution=[single])
:  :  :     +- TableSourceScan(table=[[default_catalog, default_database, fs, 
limit=[1]]], fields=[a, b, c])
:  :  +- Calc(select=[CAST(2 AS INTEGER) AS a, b, c], where=[(a = 2)])
:  :     +- TableSourceScan(table=[[default_catalog, default_database, fs, 
filter=[=(a, 2)]]], fields=[a, b, c])
:  +- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, c])
:     +- TableSourceScan(table=[[default_catalog, default_database, fs, 
project=[b, c], metadata=[]]], fields=[b, c])
+- Calc(select=[CAST(1 AS INTEGER) AS EXPR$0, b, CAST(1 AS BIGINT) AS c])
   +- TableSourceScan(table=[[default_catalog, default_database, fs, 
partitions=[{c=1}], project=[b], metadata=[]]], fields=[b])
{code}





--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to