Hi, 基于事件时间的 first row 去重是可能会产生更新消息的, 所以在 interval join 还不支持更新输入时会报错, 当前一个可行的方式时考虑基于 proctime 进行 first row 去重 (这将不会输出更新消息)
Best, Lincoln Lee 余列冰 <liebin...@whu.edu.cn> 于2022年10月15日周六 09:46写道: > Hi! > > 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15 > > 我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 > ```sql > CREATE TEMPORARY TABLE `source` ( > id INT, > name STRING, > event_time TIMESTAMP(3), > WATERMARK FOR event_time AS event_time > ) WITH ( > 'connector' = 'datagen' > ); > > > CREATE TEMPORARY TABLE B ( > id INT, > `start` INT, > `end` INT, > event_time TIMESTAMP(3), > WATERMARK FOR event_time AS event_time > ) WITH ( > 'connector' = 'datagen' > ); > > create TEMPORARY view A as > select id, name, event_time from ( > select id, name, event_time, > row_number() over(partition by id, name, event_time order by event_time > asc) as rn > from source > ) > where rn = 1; > > SELECT * > FROM A, B > WHERE > A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND > A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND > B.event_time + INTERVAL '10' SECOND; > ``` > > 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 > ``` > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin > doesn't support consuming update and delete changes which is produced by > node Deduplicate(keep=[FirstRow], key=[id, name, event_time], > order=[ROWTIME]) > ``` > > 请问如何在使用Deduplicate之后进行Interval Join? > > > > -----原始邮件----- > > 发件人: LB <liebin...@whu.edu.cn> > > 发送时间: 2022-10-15 09:39:31 (星期六) > > 收件人: user-zh <user-zh@flink.apache.org> > > 抄送: > > 主题: Flink 1.15 Deduplicate之后Interval Join出错 > > > > 抱歉上一封邮件格式有问题,以此为准。Hi! 我在使用Deduplicate之后进行Interval > Join出现问题。我使用的Flink版本是1.15 我希望使用Flink的Interval > Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 ```sql CREATE TEMPORARY TABLE `source` > ( id INT, name STRING, event_time TIMESTAMP(3), WATERMARK FOR > event_time AS event_time ) WITH ( 'connector' = 'datagen' ); CREATE > TEMPORARY TABLE B ( id INT, `start` INT, `end` INT, event_time > TIMESTAMP(3), WATERMARK FOR event_time AS event_time ) WITH ( > 'connector' = 'datagen' ); create TEMPORARY view A as select id, name, > event_time from ( select id, name, event_time, row_number() > over(partition by id, name, event_time order by event_time asc) as rn > from source ) where rn = 1; SELECT * FROM A, B WHERE A.id = B.id AND > A.id >= B.`start` AND A.id <= B.`end` AND A.event_time BETWEEN > B.event_time - INTERVAL '10' SECOND AND B.event_time + INTERVAL '10' > SECOND; ``` 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 ``` > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin > doesn't support consuming update and delete changes which is produced by > node Deduplicate(keep=[FirstRow], key=[id, name, event_time], > order=[ROWTIME]) ``` 请问如何在使用Deduplicate之后进行Interval Join? >