另一张表可以这么定义: String rTable = "CREATE TABLE r_table ( " + " r_a INT, " + " r_b string, " + " r_pt AS now(), " + "WATERMARK FOR r_pt AS r_pt" + ") WITH ( " + " 'connector' = 'datagen', " + " 'rows-per-second'='5', " + " 'fields.r_a.min'='1', " + " 'fields.r_a.max'='5', " + " 'fields.r_b.length'='5' " + ")";
Best, Hailong 在 2020-11-25 19:05:04,"Asahi Lee" <978466...@qq.com> 写道: >你好! > 那两条拥有不同时间属性的流如何join呢?或者这样的需求如何处理? > > > > >------------------ 原始邮件 ------------------ >发件人: > "user-zh" > ><18868816...@163.com>; >发送时间: 2020年11月25日(星期三) 晚上7:31 >收件人: "user-zh"<user-zh@flink.apache.org>; > >主题: Re:flink 1.11.2 rowtime和proctime流 Interval Join 问题错误问题 > > > >Hi, > 因为你的 2 个流的时间属性不一样,所以就认为不是 interval join。 > 而在 match 到到了regular join 后,又因为 join 条件中有时间属性,故报了这个错。 > Interval join 需要 2 个流的时间属性一样,所以你需要对这 2 条流使用相同的时间属性。 > > >Best, >Hailong > >在 2020-11-25 16:23:27,"Asahi Lee" <978466...@qq.com> 写道: >>你好! 我需要将事件时间的流同处理时间的流做Interval >Join时提示错误,我是用的是flink >1.11.2版本,我的示例程序如下,请问为什么提示我是个常规join,而不是区间join呢?我该如何解决? >我的 l_table.l_rt = r_table.r_pt 可以运行成功,而l_table.l_rt BETWEEN r_table.r_pt - >INTERVAL '10' SECOND AND r_table.r_pt + INTERVAL '5' SECOND运行错误!package join; >> >>import >org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>import org.apache.flink.table.api.EnvironmentSettings; >>import org.apache.flink.table.api.Table; >>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >> >>public class Test1 { >> >> public static void main(String[] args) { >> StreamExecutionEnvironment >bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); >> EnvironmentSettings bsSettings >= >EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> StreamTableEnvironment >bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); >> >> String lTable = "CREATE TABLE >l_table ( " + >> > " l_a INT, " + >> > " l_b string, " + >> > " l_rt AS localtimestamp, " + >> > " WATERMARK FOR l_rt AS l_rt " + >> > ") WITH ( " + >> > " 'connector' = 'datagen', " + >> > " 'rows-per-second'='5', " + >> > " 'fields.l_a.min'='1', " + >> > " 'fields.l_a.max'='5', " + >> > " 'fields.l_b.length'='5' " + >> > ")"; >> bsTableEnv.executeSql(lTable); >> >> String rTable = "CREATE TABLE >r_table ( " + >> > " r_a INT, " + >> > " r_b string, " + >> > " r_pt AS proctime() " + >> > ") WITH ( " + >> > " 'connector' = 'datagen', " + >> > " 'rows-per-second'='5', " + >> > " 'fields.r_a.min'='1', " + >> > " 'fields.r_a.max'='5', " + >> > " 'fields.r_b.length'='5' " + >> > ")"; >> bsTableEnv.executeSql(rTable); >> >> String printTable = "CREATE >TABLE print (" + >> > " l_a INT, " + >> > " l_b string, " + >> > " l_rt timestamp(3), " + >> > " r_a INT, " + >> > " r_b string, " + >> > " r_pt timestamp(3) " + >> > ") WITH ( " + >> > " 'connector' = 'print' " + >> > ") "; >> >> >bsTableEnv.executeSql(printTable); >> >> // 运行成功 >>// Table joinTable = >bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = >r_table.r_a and l_table.l_rt = r_table.r_pt"); >> >> // 运行错误,提示Rowtime attributes >must not be in the input rows of a regular join. As a workaround you can cast >the time attributes of input tables to TIMESTAMP before. >> Table joinTable = >bsTableEnv.sqlQuery("select * from l_table join r_table on l_table.l_a = >r_table.r_a and l_table.l_rt BETWEEN r_table.r_pt - INTERVAL '10' SECOND AND >r_table.r_pt + INTERVAL '5' SECOND"); >> >> bsTableEnv.executeSql("insert >into print select * from " + joinTable); >> >> } >> >>}