fsk119 commented on pull request #15307:
URL: https://github.com/apache/flink/pull/15307#issuecomment-812502909


   > @fsk119 After the change, my partial predicate match no longer works:
   > 
   > ```java
   >     @Test
   >     public void testPushdownAcrossWatermarkPartialPredicateMatch() {
   >         String ddl3 = "CREATE TABLE WithWatermark ("
   >                 + "  name STRING,\n"
   >                 + "  event_time TIMESTAMP(3),\n"
   >                 + "  WATERMARK FOR event_time as event_time - INTERVAL '5' 
SECOND"
   >                 + ") WITH (\n"
   >                 + " 'connector' = 'values',\n"
   >                 + " 'bounded' = 'true',\n"
   >                 + " 'filterable-fields' = 'name',\n"
   >                 + " 'enable-watermark-push-down' = 'false',\n"
   >                 + " 'disable-lookup' = 'true'"
   >                 + ")";
   > 
   >         util.tableEnv().executeSql(ddl3);
   >         util.verifyRelPlan(
   >                 "SELECT * FROM WithWatermark WHERE LOWER(name) = 'foo' AND 
name IS NOT NULL");
   >     }
   > ```
   > 
   > Expected:
   > 
   > ```
   > Calc(select=[name, event_time], where=[IS NOT NULL(name)])
   > +- WatermarkAssigner(rowtime=[event_time], watermark=[-(event_time, 
5000:INTERVAL SECOND)])
   >    +- TableSourceScan(table=[[default_catalog, default_database, 
WithWatermark, filter=[equals(lower(name), 'foo')]]], fields=[name, event_time])
   > ```
   > 
   > Actual:
   > 
   > ```
   > FlinkLogicalCalc(select=[name, event_time])
   > +- FlinkLogicalCalc(select=[name, event_time], where=[AND(=(LOWER(name), 
_UTF-16LE'foo':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), IS NOT 
NULL(name))])
   >    +- FlinkLogicalWatermarkAssigner(rowtime=[event_time], watermark=[-($1, 
5000:INTERVAL SECOND)])
   >       +- FlinkLogicalTableSourceScan(table=[[default_catalog, 
default_database, WithWatermark]], fields=[name, event_time])
   > ```
   > 
   > Partial match is not being pushed down.
   
   We shouldn't push down the filter if watermark assigner exists. If we push 
down the filter without watermark assigner push down, the scan will only emit 
the records meet the condition.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to