Re: Flink 1.15 Deduplicate之后Interval Join出错

2022-10-17 文章 Zhiwen Sun
不用 interval join 用普通的流 join 。时间只要不是 proctime 或者 eventtime 就行。

Zhiwen Sun



On Sat, Oct 15, 2022 at 9:46 AM 余列冰  wrote:

> Hi!
>
> 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15
>
> 我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。
> ```sql
> CREATE TEMPORARY TABLE `source` (
>   id INT,
>   name STRING,
>   event_time TIMESTAMP(3),
>   WATERMARK FOR event_time AS event_time
> ) WITH (
>   'connector' = 'datagen'
> );
>
>
> CREATE TEMPORARY TABLE B (
>   id INT,
>   `start` INT,
>   `end` INT,
>   event_time TIMESTAMP(3),
>   WATERMARK FOR event_time AS event_time
> ) WITH (
>   'connector' = 'datagen'
> );
>
> create TEMPORARY view A as
> select id, name, event_time from (
>   select id, name, event_time,
>   row_number() over(partition by id, name, event_time order by event_time
> asc) as rn
>   from source
> )
> where rn = 1;
>
> SELECT *
> FROM A, B
> WHERE
> A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND
> A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND
> B.event_time + INTERVAL '10' SECOND;
> ```
>
> 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。
> ```
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin
> doesn't support consuming update and delete changes which is produced by
> node Deduplicate(keep=[FirstRow], key=[id, name, event_time],
> order=[ROWTIME])
> ```
>
> 请问如何在使用Deduplicate之后进行Interval Join?
>
>
> > -原始邮件-
> > 发件人: LB 
> > 发送时间: 2022-10-15 09:39:31 (星期六)
> > 收件人: user-zh 
> > 抄送:
> > 主题: Flink 1.15 Deduplicate之后Interval Join出错
> >
> > 抱歉上一封邮件格式有问题,以此为准。Hi! 我在使用Deduplicate之后进行Interval
> Join出现问题。我使用的Flink版本是1.15 我希望使用Flink的Interval
> Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 ```sql CREATE TEMPORARY TABLE `source`
> (   id INT,   name STRING,   event_time TIMESTAMP(3),   WATERMARK FOR
> event_time AS event_time ) WITH (   'connector' = 'datagen' ); CREATE
> TEMPORARY TABLE B (   id INT,   `start` INT,   `end` INT,   event_time
> TIMESTAMP(3),   WATERMARK FOR event_time AS event_time ) WITH (
>  'connector' = 'datagen' ); create TEMPORARY view A as select id, name,
> event_time from (   select id, name, event_time,   row_number()
> over(partition by id, name, event_time order by event_time asc) as rn
>  from source ) where rn = 1; SELECT * FROM A, B WHEREA.id = B.id AND
> A.id = B.`start` AND A.id <= B.`end` ANDA.event_time BETWEEN
> B.event_time - INTERVAL '10' SECOND ANDB.event_time + INTERVAL '10'
> SECOND; ``` 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 ```
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin
> doesn't support consuming update and delete changes which is produced by
> node Deduplicate(keep=[FirstRow], key=[id, name, event_time],
> order=[ROWTIME]) ``` 请问如何在使用Deduplicate之后进行Interval Join?
>


Re: Flink 1.15 Deduplicate之后Interval Join出错

2022-10-17 文章 Lincoln Lee
Hi,
  基于事件时间的 first row 去重是可能会产生更新消息的, 所以在 interval join 还不支持更新输入时会报错,
当前一个可行的方式时考虑基于 proctime 进行 first row 去重 (这将不会输出更新消息)

Best,
Lincoln Lee


余列冰  于2022年10月15日周六 09:46写道:

> Hi!
>
> 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15
>
> 我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。
> ```sql
> CREATE TEMPORARY TABLE `source` (
>   id INT,
>   name STRING,
>   event_time TIMESTAMP(3),
>   WATERMARK FOR event_time AS event_time
> ) WITH (
>   'connector' = 'datagen'
> );
>
>
> CREATE TEMPORARY TABLE B (
>   id INT,
>   `start` INT,
>   `end` INT,
>   event_time TIMESTAMP(3),
>   WATERMARK FOR event_time AS event_time
> ) WITH (
>   'connector' = 'datagen'
> );
>
> create TEMPORARY view A as
> select id, name, event_time from (
>   select id, name, event_time,
>   row_number() over(partition by id, name, event_time order by event_time
> asc) as rn
>   from source
> )
> where rn = 1;
>
> SELECT *
> FROM A, B
> WHERE
> A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND
> A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND
> B.event_time + INTERVAL '10' SECOND;
> ```
>
> 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。
> ```
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin
> doesn't support consuming update and delete changes which is produced by
> node Deduplicate(keep=[FirstRow], key=[id, name, event_time],
> order=[ROWTIME])
> ```
>
> 请问如何在使用Deduplicate之后进行Interval Join?
>
>
> > -原始邮件-
> > 发件人: LB 
> > 发送时间: 2022-10-15 09:39:31 (星期六)
> > 收件人: user-zh 
> > 抄送:
> > 主题: Flink 1.15 Deduplicate之后Interval Join出错
> >
> > 抱歉上一封邮件格式有问题,以此为准。Hi! 我在使用Deduplicate之后进行Interval
> Join出现问题。我使用的Flink版本是1.15 我希望使用Flink的Interval
> Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 ```sql CREATE TEMPORARY TABLE `source`
> (   id INT,   name STRING,   event_time TIMESTAMP(3),   WATERMARK FOR
> event_time AS event_time ) WITH (   'connector' = 'datagen' ); CREATE
> TEMPORARY TABLE B (   id INT,   `start` INT,   `end` INT,   event_time
> TIMESTAMP(3),   WATERMARK FOR event_time AS event_time ) WITH (
>  'connector' = 'datagen' ); create TEMPORARY view A as select id, name,
> event_time from (   select id, name, event_time,   row_number()
> over(partition by id, name, event_time order by event_time asc) as rn
>  from source ) where rn = 1; SELECT * FROM A, B WHEREA.id = B.id AND
> A.id = B.`start` AND A.id <= B.`end` ANDA.event_time BETWEEN
> B.event_time - INTERVAL '10' SECOND ANDB.event_time + INTERVAL '10'
> SECOND; ``` 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 ```
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin
> doesn't support consuming update and delete changes which is produced by
> node Deduplicate(keep=[FirstRow], key=[id, name, event_time],
> order=[ROWTIME]) ``` 请问如何在使用Deduplicate之后进行Interval Join?
>


Re: Flink 1.15 Deduplicate之后Interval Join出错

2022-10-14 文章 余列冰
Hi!

我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15

我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。
```sql
CREATE TEMPORARY TABLE `source` (
  id INT,
  name STRING,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time
) WITH (
  'connector' = 'datagen'
);


CREATE TEMPORARY TABLE B (
  id INT,
  `start` INT,
  `end` INT,
  event_time TIMESTAMP(3),
  WATERMARK FOR event_time AS event_time
) WITH (
  'connector' = 'datagen'
);

create TEMPORARY view A as
select id, name, event_time from (
  select id, name, event_time,
  row_number() over(partition by id, name, event_time order by event_time asc) 
as rn
  from source
)
where rn = 1;

SELECT *
FROM A, B
WHERE 
A.id = B.id AND A.id >= B.`start` AND A.id <= B.`end` AND 
A.event_time BETWEEN B.event_time - INTERVAL '10' SECOND AND 
B.event_time + INTERVAL '10' SECOND;
```

在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。
```
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: StreamPhysicalIntervalJoin doesn't 
support consuming update and delete changes which is produced by node 
Deduplicate(keep=[FirstRow], key=[id, name, event_time], order=[ROWTIME])
```

请问如何在使用Deduplicate之后进行Interval Join?


> -原始邮件-
> 发件人: LB 
> 发送时间: 2022-10-15 09:39:31 (星期六)
> 收件人: user-zh 
> 抄送: 
> 主题: Flink 1.15 Deduplicate之后Interval Join出错
> 
> 抱歉上一封邮件格式有问题,以此为准。Hi! 我在使用Deduplicate之后进行Interval Join出现问题。我使用的Flink版本是1.15 
> 我希望使用Flink的Interval Join进行双流关联,并且我的第一个表需要去重。以下是我的示例代码。 ```sql CREATE 
> TEMPORARY TABLE `source` (   id INT,   name STRING,   event_time 
> TIMESTAMP(3),   WATERMARK FOR event_time AS event_time ) WITH (   'connector' 
> = 'datagen' ); CREATE TEMPORARY TABLE B (   id INT,   `start` INT,   `end` 
> INT,   event_time TIMESTAMP(3),   WATERMARK FOR event_time AS event_time ) 
> WITH (   'connector' = 'datagen' ); create TEMPORARY view A as select id, 
> name, event_time from (   select id, name, event_time,   row_number() 
> over(partition by id, name, event_time order by event_time asc) as rn   from 
> source ) where rn = 1; SELECT * FROM A, B WHEREA.id = B.id AND A.id = 
> B.`start` AND A.id <= B.`end` ANDA.event_time BETWEEN B.event_time - 
> INTERVAL '10' SECOND ANDB.event_time + INTERVAL '10' SECOND; ``` 
> 在去重时我采用了保留第一行数据,这时view A应该只会产生insert的行,但是运行上述SQL会出现如下错误。 ``` [ERROR] Could 
> not execute SQL statement. Reason: org.apache.flink.table.api.TableException: 
> StreamPhysicalIntervalJoin doesn't support consuming update and delete 
> changes which is produced by node Deduplicate(keep=[FirstRow], key=[id, name, 
> event_time], order=[ROWTIME]) ``` 请问如何在使用Deduplicate之后进行Interval Join?