[ 
https://issues.apache.org/jira/browse/FLINK-21675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303659#comment-17303659
 ] 

Yuval Itzchakov commented on FLINK-21675:
-----------------------------------------

I've implemented an initial solution in the form of 
PushFilterIntoTableSourceScanAcrossWatermarkRule 
(https://github.com/YuvalItzchakov/flink/blob/FLINK-16987/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanAcrossWatermarkRule.java)

I am running into a weird behavior in a test case I'm performing:

 
{code:java}
    @Test
    public void testFilterPushDownPartialPredicateMatch() {
        String ddl3 = "CREATE TABLE WithWatermark ("
                + "  name STRING,\n"
                + "  event_time TIMESTAMP(3),\n"
                + "  WATERMARK FOR event_time as event_time - INTERVAL '5' 
SECOND"
                + ") WITH (\n"
                + " 'connector' = 'values',\n"
                + " 'bounded' = 'true',\n"
                + " 'filterable-fields' = 'name',\n"
                + " 'enable-watermark-push-down' = 'true',\n"
                + " 'disable-lookup' = 'true'"
                + ")";        
        streamUtil.tableEnv().executeSql(ddl3);
        streamUtil.verifyRelPlan(
                "SELECT * FROM WithWatermark WHERE LOWER(name) = 'foo' AND 
UPPER(name) = 'FOO' AND name IS NOT NULL");
    }

{code}
When running a test with a full match on the predicate, the resulting optimized 
query is as expected:
{code:java}
TableSourceScan(table=[[default_catalog, default_database, WithWatermark, 
filter=[equals(lower(name), 'foo')], watermark=[-($1, 5000:INTERVAL SECOND)]]], 
fields=[name, event_time]){code}
But when running the above test, the optimizer seems to be discarding the 
optimization completely (where some of the predicates were pushed to the source 
and some are still remaining) and generated a plan where the Calc contains both 
filters and non are being pushed.

Expected:
{code:java}
Calc(select=[name, event_time], where=[IS NOT NULL(name)])
   +- TableSourceScan(table=[[default_catalog, default_database, WithWatermark, 
filter=[equals(lower(name), 'foo')], watermark=[-($1, 5000:INTERVAL SECOND)]]], 
fields=[name, event_time])]]>{code}
Actual:
{code:java}
Calc(select=[name, event_time], where=[AND(=(LOWER(name), 
_UTF-16LE'foo':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), =(UPPER(name), 
_UTF-16LE'FOO':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), IS NOT 
NULL(name))])
   +- TableSourceScan(table=[[default_catalog, default_database, WithWatermark, 
watermark=[-($1, 5000:INTERVAL SECOND)]]], fields=[name, event_time]){code}
 

The watermark is being pushed, but not the predicates. Would appreciate some 
help pointing me to the right place where the optimizer could be changing this?

> Push filter into the scan when watermark assigner is the parent of the table 
> scan
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-21675
>                 URL: https://issues.apache.org/jira/browse/FLINK-21675
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>            Reporter: Shengkai Fang
>            Priority: Major
>
> When watermark assigner is the parent of the table scan, it will block rule 
> to apply the filter push down. We can add a rule just like 
> {{ProjectWatermarkAssignerTransposeRule}} or extend the rule like 
> {{PushWatermarkIntoTableSourceScanAcrossCalcRule}} and 
> {{PushWatermarkIntoTableSourceScanRule}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to