I had a problem with Interval Join after using Deduplicate. I'm using Flink version 1.15.
I want to use Flink's Interval Join for double-stream association, and my first table needs to be de-duplicated. Here is my sample code. ``` CREATE TEMPORARY TABLE `source` ( id INT, name STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE B ( id INT, `start` INT, `end` INT, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time ) WITH ( 'connector' = 'datagen' ); create TEMPORARY view A as select id, name, event_time from ( select id, name, event_time, row_number() over(partition by id, name, event_time order by event_time asc) as rn from source ) where rn = 1; SELECT * FROM A, B WHERE A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND B.event_time + INTERVAL '10' SECOND; ``` I used to preserve the first row of data for the de-duplication, so view A should only produce insert rows, but running the SQL above would produce the following error. ``` [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[FirstRow], key=[id, name, event_time], order=[ROWTIME]) ``` How to perform Interval Join after using Deduplicate?