[ 
https://issues.apache.org/jira/browse/FLINK-32639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17745327#comment-17745327
 ] 

luoyuxia commented on FLINK-32639:
----------------------------------

It's by design. If the limit is pushed down to table source, we'll get the 
wrong result.

If we only get 10 rows from the TableSource if limit is pused down, after the 
filter, it well not fiter out some rows which result we can't get actaul 10 row.

> Filter and Limit exist at the same time, limit cannot take effect
> -----------------------------------------------------------------
>
>                 Key: FLINK-32639
>                 URL: https://issues.apache.org/jira/browse/FLINK-32639
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.18.0
>            Reporter: Dove
>            Priority: Minor
>
> Define the environment using Flink Batch.
> The Source connector uses filesystem(FileSystemTableSource implements 
> SupportsLimitPushDown/SupportsFilterPushDown)
>  
> {code:java}
> EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().inBatchMode().build();
> StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);
> tabEnv.executeSql(
>         "CREATE TABLE source(uuid varchar, name varchar, age int, ts 
> timestamp,`partition` varchar)  "
>                 + "WITH ( 'connector' = 'filesystem', 
> 'path'='file:///tmp/file', 'format'='csv' "
>                 + ")");{code}
> Case 1: Filter
>  
> {code:java}
> tabEnv.executeSql("explain select * from source where name is null").print();
> == Optimized Execution Plan ==
> Calc(select=[uuid, null:VARCHAR(2147483647) AS name, age, ts, partition], 
> where=[name IS NULL])
> +- TableSourceScan(table=[[default_catalog, default_database, source, 
> filter=[IS NULL(name)]]], fields=[uuid, name, age, ts, partition]){code}
>  
> Case 2: Limit 
> {code:java}
> tabEnv.executeSql("explain select * from source limit 10").print();
> == Optimized Execution Plan ==
> Limit(offset=[0], fetch=[10], global=[true])
> +- Exchange(distribution=[single])
>    +- Limit(offset=[0], fetch=[10], global=[false])
>       +- TableSourceScan(table=[[default_catalog, default_database, source, 
> limit=[10]]], fields=[uuid, name, age, ts, partition]) {code}
> Case 3: Filter + Limit
> {code:java}
> tabEnv.executeSql("explain select * from source where name is null limit 
> 10").print();
> == Optimized Execution Plan ==
> Limit(offset=[0], fetch=[10], global=[true])
> +- Exchange(distribution=[single])
>    +- Limit(offset=[0], fetch=[10], global=[false])
>       +- Calc(select=[uuid, null:VARCHAR(2147483647) AS name, age, ts, 
> partition], where=[name IS NULL])
>          +- TableSourceScan(table=[[default_catalog, default_database, 
> source, filter=[IS NULL(name)]]], fields=[uuid, name, age, ts, partition]) 
> {code}
> When the Filter condition is in effect, Limit does not appear to be able to 
> be pushed down to Source.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to