[
https://issues.apache.org/jira/browse/FLINK-21675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17303659#comment-17303659
]
Yuval Itzchakov commented on FLINK-21675:
-----------------------------------------
I've implemented an initial solution in the form of
PushFilterIntoTableSourceScanAcrossWatermarkRule
(https://github.com/YuvalItzchakov/flink/blob/FLINK-16987/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanAcrossWatermarkRule.java)
I am running into a weird behavior in a test case I'm performing:
{code:java}
@Test
public void testFilterPushDownPartialPredicateMatch() {
String ddl3 = "CREATE TABLE WithWatermark ("
+ " name STRING,\n"
+ " event_time TIMESTAMP(3),\n"
+ " WATERMARK FOR event_time as event_time - INTERVAL '5'
SECOND"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'bounded' = 'true',\n"
+ " 'filterable-fields' = 'name',\n"
+ " 'enable-watermark-push-down' = 'true',\n"
+ " 'disable-lookup' = 'true'"
+ ")";
streamUtil.tableEnv().executeSql(ddl3);
streamUtil.verifyRelPlan(
"SELECT * FROM WithWatermark WHERE LOWER(name) = 'foo' AND
UPPER(name) = 'FOO' AND name IS NOT NULL");
}
{code}
When running a test with a full match on the predicate, the resulting optimized
query is as expected:
{code:java}
TableSourceScan(table=[[default_catalog, default_database, WithWatermark,
filter=[equals(lower(name), 'foo')], watermark=[-($1, 5000:INTERVAL SECOND)]]],
fields=[name, event_time]){code}
But when running the above test, the optimizer seems to be discarding the
optimization completely (where some of the predicates were pushed to the source
and some are still remaining) and generated a plan where the Calc contains both
filters and non are being pushed.
Expected:
{code:java}
Calc(select=[name, event_time], where=[IS NOT NULL(name)])
+- TableSourceScan(table=[[default_catalog, default_database, WithWatermark,
filter=[equals(lower(name), 'foo')], watermark=[-($1, 5000:INTERVAL SECOND)]]],
fields=[name, event_time])]]>{code}
Actual:
{code:java}
Calc(select=[name, event_time], where=[AND(=(LOWER(name),
_UTF-16LE'foo':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), =(UPPER(name),
_UTF-16LE'FOO':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), IS NOT
NULL(name))])
+- TableSourceScan(table=[[default_catalog, default_database, WithWatermark,
watermark=[-($1, 5000:INTERVAL SECOND)]]], fields=[name, event_time]){code}
The watermark is being pushed, but not the predicates. Would appreciate some
help pointing me to the right place where the optimizer could be changing this?
> Push filter into the scan when watermark assigner is the parent of the table
> scan
> ---------------------------------------------------------------------------------
>
> Key: FLINK-21675
> URL: https://issues.apache.org/jira/browse/FLINK-21675
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / Planner
> Reporter: Shengkai Fang
> Priority: Major
>
> When watermark assigner is the parent of the table scan, it will block rule
> to apply the filter push down. We can add a rule just like
> {{ProjectWatermarkAssignerTransposeRule}} or extend the rule like
> {{PushWatermarkIntoTableSourceScanAcrossCalcRule}} and
> {{PushWatermarkIntoTableSourceScanRule}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)