Hi 猫猫: 在DDL上定义rowtime是刚刚支持的功能,文档正在编写中。[1] 你可以通过master的代码来试用,社区正在准备发布1.10,到时候会有release版本可用。
[2] 中有使用的完整例子,FYI。 [1] https://issues.apache.org/jira/browse/FLINK-14320 [2] https://github.com/apache/flink/blob/2ecf7cacbe742099d78c528de962fccf13c14629/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TimeAttributeITCase.scala Best, Jingsong Lee ------------------------------------------------------------------ From:猫猫 <16770...@qq.com> Send Time:2019年12月6日(星期五) 17:52 To:user-zh <user-zh@flink.apache.org> Subject:[flink-sql]使用tableEnv.sqlUpdate(ddl);方式创表,如何指定rowtime? 我使用tableEnv.sqlUpdate(ddl);方式创建表 但是我无法像通过流注册方式一样,添加rowtime。我尝试在定义字段上添加rowtime,但是创表语句会报错。 请问在flink中是否支持使用该种方式创建流表,并开窗? 我的例子是一个csv源,我还没有尝试kafka源,但是我看了文档,也没有找到相关描述。 sql创表语句如下: CREATE TABLE T_UserBehavior( userId BIGINT, itemId BIGINT, categoryId BIGINT, behavior VARCHAR, optime BIGINT ) WITH ( 'connector.type' = 'filesystem', -- required: specify to connector type 'connector.path' = 'file:///E:\MyGitProject\flink-study\Hot-Item\src\main\resources\UserBehavior-less.csv', -- required: path to a file or directory 'format.type' = 'csv', 'format.fields.0.name' = 'userId', -- required: define the schema either by using type information 'format.fields.0.type' = 'BIGINT', 'format.fields.1.name' = 'itemId', 'format.fields.1.type' = 'BIGINT', 'format.fields.2.name' = 'categoryId', 'format.fields.2.type' = 'BIGINT', 'format.fields.3.name' = 'behavior', 'format.fields.3.type' = 'VARCHAR', 'format.fields.4.name' = 'optime', 'format.fields.4.type' = 'BIGINT' );