Hi :
flink 版本 1.11.2
问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in 
the input rows of a regular join. As a workaround you can cast the time 
attributes of input tables to TIMESTAMP before.


代码:
// stream 1
create table kafkaSource1 (
id int,
field_1 int,
field_2 varchar,
ts1 timestamp(3),
watermark for `ts1` 
) with (
connector = kafka
)
// stream 2
create table kafkaSource2 (
id int,
field_3
ts2 timestamp(3),
watermark for `ts2` 
) with (
connector = kafka
)


//create view
create view kafkaSource1_view as 
select 
field_1 as field_1,
last_value(field_2) as field_2,
last_value(ts1) as ts1
from kafkaSouce1 
group by field_1


// query 
insert into sinkTable 
select 
a.field_1,
b.field_3
from kafkaSource2 a join kafkaSource1_view b
on a.id = b.id
and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY

回复