I had a problem with Interval Join after using Deduplicate. I'm using Flink 
version 1.15.

I want to use Flink's Interval Join for double-stream association, and my first 
table needs to be de-duplicated. Here is my sample code.

```
CREATE TEMPORARY TABLE `source` (
  id INT,
  name STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time
) WITH (
  'connector' = 'datagen'
);


CREATE TEMPORARY TABLE B (
  id INT,
  `start` INT,
  `end` INT,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time
) WITH (
  'connector' = 'datagen'
);

create TEMPORARY view A as
select id, name, event_time from (
  select id, name, event_time,
  row_number() over(partition by id, name, event_time order by event_time asc) 
as rn
  from source
)
where rn = 1;

SELECT *
FROM A, B
WHERE 
    A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND 
    A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND 
    B.event_time + INTERVAL '10' SECOND;
```

I used to preserve the first row of data for the de-duplication, so view A 
should only produce insert rows, but running the SQL above would produce the 
following error.

```
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't 
support consuming update and delete changes which is produced by node 
Deduplicate(keep=[FirstRow], key=[id, name, event_time], order=[ROWTIME])
```

How to perform Interval Join after using Deduplicate?

Reply via email to