Hi xiao,
从报错来看,这个 SQL 应该是 match 了 `StreamExecJoinRule`,而 regular join 不能有 rowtime 属性。 应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 导致不是时间属性类型->`TimeIndicatorRelDataType`,而在 rule 进行判断后没有 windowBounds,所以就报了现在这个错误了。 Best, Hailong Wang 在 2020-11-03 18:27:51,"xiao cai" <flin...@163.com> 写道: >Hi : >flink 版本 1.11.2 >问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in >the input rows of a regular join. As a workaround you can cast the time >attributes of input tables to TIMESTAMP before. > > >代码: >// stream 1 >create table kafkaSource1 ( >id int, >field_1 int, >field_2 varchar, >ts1 timestamp(3), >watermark for `ts1` >) with ( >connector = kafka >) >// stream 2 >create table kafkaSource2 ( >id int, >field_3 >ts2 timestamp(3), >watermark for `ts2` >) with ( >connector = kafka >) > > >//create view >create view kafkaSource1_view as >select >field_1 as field_1, >last_value(field_2) as field_2, >last_value(ts1) as ts1 >from kafkaSouce1 >group by field_1 > > >// query >insert into sinkTable >select >a.field_1, >b.field_3 >from kafkaSource2 a join kafkaSource1_view b >on a.id = b.id >and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY