Hi!

我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15

我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。
```sql
CREATE TEMPORARY TABLE `source` (
  id INT,
  name STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time
) WITH (
  'connector' = 'datagen'
);


CREATE TEMPORARY TABLE B (
  id INT,
  `start` INT,
  `end` INT,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time
) WITH (
  'connector' = 'datagen'
);

create TEMPORARY view A as
select id, name, event_time from (
  select id, name, event_time,
  row_number() over(partition by id, name, event_time order by event_time asc) 
as rn
  from source
)
where rn = 1;

SELECT *
FROM A, B
WHERE 
        A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND 
        A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND 
        B.event_time + INTERVAL '10' SECOND;
```

在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。
```
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't 
support consuming update and delete changes which is produced by node 
Deduplicate(keep=[FirstRow], key=[id, name, event_time], order=[ROWTIME])
```

请问如何在使用Deduplicate之后进行Interval Join?


> -----原始邮件-----
> 发件人: LB <liebin...@whu.edu.cn>
> 发送时间: 2022-10-15 09:39:31 (星期六)
> 收件人: user-zh <user-zh@flink.apache.org>
> 抄送: 
> 主题: Flink 1.15 Deduplicate之后Interval Join出错
> 
> 抱歉上一封邮件格式有问题,以此为准。Hi! 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15 
> 我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 ```sql CREATE 
> TEMPORARY TABLE `source` (   id INT,   name STRING,   event_time 
> TIMESTAMP(3),   WATERMARK FOR event_time AS event_time ) WITH (   'connector' 
> = 'datagen' ); CREATE TEMPORARY TABLE B (   id INT,   `start` INT,   `end` 
> INT,   event_time TIMESTAMP(3),   WATERMARK FOR event_time AS event_time ) 
> WITH (   'connector' = 'datagen' ); create TEMPORARY view A as select id, 
> name, event_time from (   select id, name, event_time,   row_number() 
> over(partition by id, name, event_time order by event_time asc) as rn   from 
> source ) where rn = 1; SELECT * FROM A, B WHERE    A.id = B.id AND A.id &gt;= 
> B.`start` AND A.id <= B.`end` AND    A.event_time BETWEEN B.event_time - 
> INTERVAL '10' SECOND AND    B.event_time + INTERVAL '10' SECOND; ``` 
> 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 ``` [ERROR] Could 
> not execute SQL statement. Reason: org.apache.flink.table.api.TableException: 
> StreamPhysicalIntervalJoin doesn't support consuming update and delete 
> changes which is produced by node Deduplicate(keep=[FirstRow], key=[id, name, 
> event_time], order=[ROWTIME]) ``` 请问如何在使用Deduplicate之后进行Interval Join?

回复