另一张表可以这么定义:
 String rTable = "CREATE TABLE r_table (  " +
                " r_a INT,  " +
                " r_b string,  " +
                " r_pt AS now(), " +
                 "WATERMARK FOR r_pt AS r_pt" +
                ") WITH (  " +
                " 'connector' = 'datagen',  " +
                " 'rows-per-second'='5',  " +
                " 'fields.r_a.min'='1',  " +
                " 'fields.r_a.max'='5',  " +
                " 'fields.r_b.length'='5'  " +
                ")";


Best,
Hailong




在 2020-11-25 19:05:04,"Asahi Lee" <978466...@qq.com> 写道:
>你好!
>&nbsp; &nbsp; &nbsp; 那两条拥有不同时间属性的流如何join呢?或者这样的需求如何处理?
>
>
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人:                                                                           
>                                             "user-zh"                         
>                                                           
><18868816...@163.com&gt;;
>发送时间:&nbsp;2020年11月25日(星期三) 晚上7:31
>收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
>主题:&nbsp;Re:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题
>
>
>
>Hi,
>&nbsp;&nbsp; 因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。
>&nbsp;&nbsp; 而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。
>&nbsp;&nbsp; Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。
>
>
>Best,
>Hailong
>
>在 2020-11-25 16:23:27,"Asahi Lee" <978466...@qq.com&gt; 写道:
>&gt;你好!&nbsp;&nbsp;&nbsp;&nbsp; 我需要将事件时间的流同处理时间的流做Interval 
>Join时提示错误,我是用的是flink 
>1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决?&nbsp;&nbsp;&nbsp;&nbsp; 
>我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - 
>INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join;
>&gt;
>&gt;import 
>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>&gt;import org.apache.flink.table.api.EnvironmentSettings;
>&gt;import org.apache.flink.table.api.Table;
>&gt;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>&gt;
>&gt;public class Test1 {
>&gt;
>&gt;&nbsp;&nbsp;&nbsp; public static void main(String[] args) {
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment 
>bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; EnvironmentSettings bsSettings 
>= 
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; StreamTableEnvironment 
>bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String lTable = "CREATE TABLE 
>l_table (&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " l_a INT,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " l_b string,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " l_rt AS localtimestamp,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " WATERMARK FOR l_rt AS l_rt&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> ") WITH (&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " 'connector' = 'datagen',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " 'rows-per-second'='5',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " 'fields.l_a.min'='1',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " 'fields.l_a.max'='5',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " 'fields.l_b.length'='5'&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> ")";
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql(lTable);
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String rTable = "CREATE TABLE 
>r_table (&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " r_a INT,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " r_b string,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " r_pt AS proctime()&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> ") WITH (&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " 'connector' = 'datagen',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " 'rows-per-second'='5',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " 'fields.r_a.min'='1',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " 'fields.r_a.max'='5',&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " 'fields.r_b.length'='5'&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> ")";
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql(rTable);
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; String printTable = "CREATE 
>TABLE print (" +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "&nbsp; l_a INT,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "&nbsp; l_b string,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "&nbsp; l_rt timestamp(3),&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "&nbsp; r_a INT,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "&nbsp; r_b string,&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> "&nbsp; r_pt timestamp(3)&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> ") WITH (&nbsp; " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> " 'connector' = 'print' " +
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> ") ";
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
>bsTableEnv.executeSql(printTable);
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 运行成功
>&gt;//&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Table joinTable = 
>bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = 
>r_table.r_a and l_table.l_rt = r_table.r_pt");
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; // 运行错误,提示Rowtime attributes 
>must not be in the input rows of a regular join. As a workaround you can cast 
>the time attributes of input tables to TIMESTAMP before.
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; Table joinTable = 
>bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = 
>r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND 
>r_table.r_pt + INTERVAL '5' SECOND");
>&gt;
>&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; bsTableEnv.executeSql("insert 
>into print select * from " + joinTable);
>&gt;
>&gt;&nbsp;&nbsp;&nbsp; }
>&gt;
>&gt;}

回复