Hi,
      基于事件时间的 first row 去重是可能会产生更新消息的, 所以在 interval join 还不支持更新输入时会报错,
当前一个可行的方式时考虑基于 proctime 进行 first row 去重 (这将不会输出更新消息)

Best,
Lincoln Lee


余列冰 <liebin...@whu.edu.cn> 于2022年10月15日周六 09:46写道:

> Hi!
>
> 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15
>
> 我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。
> ```sql
> CREATE TEMPORARY TABLE `source` (
>   id INT,
>   name STRING,
>   event_time TIMESTAMP(3),
>   WATERMARK FOR event_time AS event_time
> ) WITH (
>   'connector' = 'datagen'
> );
>
>
> CREATE TEMPORARY TABLE B (
>   id INT,
>   `start` INT,
>   `end` INT,
>   event_time TIMESTAMP(3),
>   WATERMARK FOR event_time AS event_time
> ) WITH (
>   'connector' = 'datagen'
> );
>
> create TEMPORARY view A as
> select id, name, event_time from (
>   select id, name, event_time,
>   row_number() over(partition by id, name, event_time order by event_time
> asc) as rn
>   from source
> )
> where rn = 1;
>
> SELECT *
> FROM A, B
> WHERE
>         A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND
>         A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND
>         B.event_time + INTERVAL '10' SECOND;
> ```
>
> 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。
> ```
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin
> doesn't support consuming update and delete changes which is produced by
> node Deduplicate(keep=[FirstRow], key=[id, name, event_time],
> order=[ROWTIME])
> ```
>
> 请问如何在使用Deduplicate之后进行Interval Join?
>
>
> > -----原始邮件-----
> > 发件人: LB <liebin...@whu.edu.cn>
> > 发送时间: 2022-10-15 09:39:31 (星期六)
> > 收件人: user-zh <user-zh@flink.apache.org>
> > 抄送:
> > 主题: Flink 1.15 Deduplicate之后Interval Join出错
> >
> > 抱歉上一封邮件格式有问题,以此为准。Hi! 我在使用Deduplicate之后进行Interval
> Join出现问题。我使用的Flink版本是1.15 我希望使用Flink的Interval
> Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 ```sql CREATE TEMPORARY TABLE `source`
> (   id INT,   name STRING,   event_time TIMESTAMP(3),   WATERMARK FOR
> event_time AS event_time ) WITH (   'connector' = 'datagen' ); CREATE
> TEMPORARY TABLE B (   id INT,   `start` INT,   `end` INT,   event_time
> TIMESTAMP(3),   WATERMARK FOR event_time AS event_time ) WITH (
>  'connector' = 'datagen' ); create TEMPORARY view A as select id, name,
> event_time from (   select id, name, event_time,   row_number()
> over(partition by id, name, event_time order by event_time asc) as rn
>  from source ) where rn = 1; SELECT * FROM A, B WHERE    A.id = B.id AND
> A.id &gt;= B.`start` AND A.id <= B.`end` AND    A.event_time BETWEEN
> B.event_time - INTERVAL '10' SECOND AND    B.event_time + INTERVAL '10'
> SECOND; ``` 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 ```
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin
> doesn't support consuming update and delete changes which is produced by
> node Deduplicate(keep=[FirstRow], key=[id, name, event_time],
> order=[ROWTIME]) ``` 请问如何在使用Deduplicate之后进行Interval Join?
>

回复