Hi xiao,

从报错来看,这个 SQL 应该是 match 了 `StreamExecJoinRule`,而 regular join 不能有 rowtime 属性。
应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 
导致不是时间属性类型->`TimeIndicatorRelDataType`,而在 rule 进行判断后没有 
windowBounds,所以就报了现在这个错误了。


Best,
Hailong Wang

在 2020-11-03 18:27:51,"xiao cai" <flin...@163.com> 写道:
>Hi :
>flink 版本 1.11.2
>问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in 
>the input rows of a regular join. As a workaround you can cast the time 
>attributes of input tables to TIMESTAMP before.
>
>
>代码:
>// stream 1
>create table kafkaSource1 (
>id int,
>field_1 int,
>field_2 varchar,
>ts1 timestamp(3),
>watermark for `ts1` 
>) with (
>connector = kafka
>)
>// stream 2
>create table kafkaSource2 (
>id int,
>field_3
>ts2 timestamp(3),
>watermark for `ts2` 
>) with (
>connector = kafka
>)
>
>
>//create view
>create view kafkaSource1_view as 
>select 
>field_1 as field_1,
>last_value(field_2) as field_2,
>last_value(ts1) as ts1
>from kafkaSouce1 
>group by field_1
>
>
>// query 
>insert into sinkTable 
>select 
>a.field_1,
>b.field_3
>from kafkaSource2 a join kafkaSource1_view b
>on a.id = b.id
>and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY

回复