Thanks a lot, hope it will be fixed soon!
________________________________
发件人: Jark Wu <imj...@gmail.com>
发送时间: 2020年3月3日 11:25
收件人: Lu Weizheng <luweizhen...@hotmail.com>
抄送: user@flink.apache.org <user@flink.apache.org>
主题: Re: Table API connect method timestamp watermark assignment problem

Hi Lu,

DDL and Schema descriptor do not share the same code path. I guess the reason 
why Schema descriptor doesn't work is because of FLINK-16160.
We will fix that in the next minor release. Please use DDL to define watermark 
which is also the suggested way to do that.
The current Schema descriptor will be refactored to share the same code path of 
DDL in the near future.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-16160

On Tue, 3 Mar 2020 at 10:09, Lu Weizheng 
<luweizhen...@hotmail.com<mailto:luweizhen...@hotmail.com>> wrote:
Hey guys,

I am using Flink Table API recently. I want to use EventTime and use a Kakfa 
Table Connector. I think in my code Flink cannot recognize event time timestamp 
field. Here is my code :

public static void main(String[] args) throws Exception {

        EnvironmentSettings fsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, 
fsSettings);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        tEnv
            // 使用connect函数连接外部系统
            .connect(
                new Kafka()
                .version("universal")     // 必填,合法的参数有"0.8", "0.9", "0.10", 
"0.11"或"universal"
                .topic("user_behavior")   // 必填,Topic名
                .startFromLatest()        // 首次消费时数据读取的位置
                .property("zookeeper.connect", "localhost:2181")  // Kafka连接参数
                .property("bootstrap.servers", "localhost:9092")
            )
            // 序列化方式 可以是JSON、Avro等
            .withFormat(new Json())
            // 数据的Schema
            .withSchema(
                new Schema()
                    .field("user_id", DataTypes.BIGINT())
                    .field("item_id", DataTypes.BIGINT())
                    .field("category_id", DataTypes.BIGINT())
                    .field("behavior", DataTypes.STRING())
                    .field("ts", DataTypes.TIMESTAMP(3))
                    .rowtime(new 
Rowtime().timestampsFromField("ts").watermarksPeriodicAscending())
            )
            // 临时表的表名,后续可以在SQL语句中使用这个表名
            .createTemporaryTable("user_behavior");

        Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" +
                "\tuser_id, \n" +
                "\tCOUNT(behavior) AS behavior_cnt, \n" +
                "\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" +
                "FROM user_behavior\n" +
                "GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)");
        DataStream<Tuple2<Boolean, Row>> result = 
tEnv.toRetractStream(tumbleGroupByUserId, Row.class);
        result.print();

        env.execute("table api");
    }

As shown in the code above, I use rowtime() method when I want to define a 
Schema. When I try to run, I get the following error: Window aggregate can only 
be defined over a time attribute column, but TIMESTAMP(3) encountered.

​I tried another method based on a DLL, and it worked. So it is not my Kafka 
source problem.

tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
                "    user_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    category_id BIGINT,\n" +
                "    behavior STRING,\n" +
                "    ts TIMESTAMP(3),\n" +
//                "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
                "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 
在ts上定义watermark,ts成为事件时间列\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
                "    'connector.version' = 'universal',  -- kafka 版本,universal 
支持 0.11 以上的版本\n" +
                "    'connector.topic' = 'user_behavior',  -- kafka topic\n" +
                "    'connector.startup-mode' = 'latest-offset',  -- 从起始 offset 
开始读取\n" +
                "    'connector.properties.zookeeper.connect' = 
'localhost:2181',  -- zookeeper 地址\n" +
                "    'connector.properties.bootstrap.servers' = 
'localhost:9092',  -- kafka broker 地址\n" +
                "    'format.type' = 'json'  -- 数据源格式为 json\n" +
                ")");

Hope anyone can give some suggestions. Thanks.

Reply via email to