Hi! 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15
我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 ```sql CREATE TEMPORARY TABLE `source` ( id INT, name STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE B ( id INT, `start` INT, `end` INT, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time ) WITH ( 'connector' = 'datagen' ); create TEMPORARY view A as select id, name, event_time from ( select id, name, event_time, row_number() over(partition by id, name, event_time order by event_time asc) as rn from source ) where rn = 1; SELECT * FROM A, B WHERE A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND B.event_time + INTERVAL '10' SECOND; ``` 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 ``` [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[FirstRow], key=[id, name, event_time], order=[ROWTIME]) ``` 请问如何在使用Deduplicate之后进行Interval Join? > -----原始邮件----- > 发件人: LB <liebin...@whu.edu.cn> > 发送时间: 2022-10-15 09:39:31 (星期六) > 收件人: user-zh <user-zh@flink.apache.org> > 抄送: > 主题: Flink 1.15 Deduplicate之后Interval Join出错 > > 抱歉上一封邮件格式有问题,以此为准。Hi! 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15 > 我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 ```sql CREATE > TEMPORARY TABLE `source` ( id INT, name STRING, event_time > TIMESTAMP(3), WATERMARK FOR event_time AS event_time ) WITH ( 'connector' > = 'datagen' ); CREATE TEMPORARY TABLE B ( id INT, `start` INT, `end` > INT, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time ) > WITH ( 'connector' = 'datagen' ); create TEMPORARY view A as select id, > name, event_time from ( select id, name, event_time, row_number() > over(partition by id, name, event_time order by event_time asc) as rn from > source ) where rn = 1; SELECT * FROM A, B WHERE A.id = B.id AND A.id >= > B.`start` AND A.id <= B.`end` AND A.event_time BETWEEN B.event_time - > INTERVAL '10' SECOND AND B.event_time + INTERVAL '10' SECOND; ``` > 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 ``` [ERROR] Could > not execute SQL statement. Reason: org.apache.flink.table.api.TableException: > StreamPhysicalIntervalJoin doesn't support consuming update and delete > changes which is produced by node Deduplicate(keep=[FirstRow], key=[id, name, > event_time], order=[ROWTIME]) ``` 请问如何在使用Deduplicate之后进行Interval Join?