Hi Lu,

DDL and Schema descriptor do not share the same code path. I guess the
reason why Schema descriptor doesn't work is because of FLINK-16160.
We will fix that in the next minor release. Please use DDL to define
watermark which is also the suggested way to do that.
The current Schema descriptor will be refactored to share the same code
path of DDL in the near future.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-16160

On Tue, 3 Mar 2020 at 10:09, Lu Weizheng <luweizhen...@hotmail.com> wrote:

> Hey guys,
>
> I am using Flink Table API recently. I want to use EventTime and use a
> Kakfa Table Connector. I think in my code Flink cannot recognize event time
> timestamp field. Here is my code :
>
> public static void main(String[] args) throws Exception {
>
>         EnvironmentSettings fsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> fsSettings);
>
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>         tEnv
>             // 使用connect函数连接外部系统
>             .connect(
>                 new Kafka()
>                 .version("universal")     // 必填,合法的参数有"0.8", "0.9",
> "0.10", "0.11"或"universal"
>                 .topic("user_behavior")   // 必填,Topic名
>                 .startFromLatest()        // 首次消费时数据读取的位置
>                 .property("zookeeper.connect", "localhost:2181")  //
> Kafka连接参数
>                 .property("bootstrap.servers", "localhost:9092")
>             )
>             // 序列化方式 可以是JSON、Avro等
>             .withFormat(new Json())
>             // 数据的Schema
>             .withSchema(
>                 new Schema()
>                     .field("user_id", DataTypes.BIGINT())
>                     .field("item_id", DataTypes.BIGINT())
>                     .field("category_id", DataTypes.BIGINT())
>                     .field("behavior", DataTypes.STRING())
>                     .field("ts", DataTypes.TIMESTAMP(3))
>                     .rowtime(new
> Rowtime().timestampsFromField("ts").watermarksPeriodicAscending())
>             )
>             // 临时表的表名,后续可以在SQL语句中使用这个表名
>             .createTemporaryTable("user_behavior");
>
>         Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" +
>                 "\tuser_id, \n" +
>                 "\tCOUNT(behavior) AS behavior_cnt, \n" +
>                 "\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" +
>                 "FROM user_behavior\n" +
>                 "GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)");
>         DataStream<Tuple2<Boolean, Row>> result =
> tEnv.toRetractStream(tumbleGroupByUserId, Row.class);
>         result.print();
>
>         env.execute("table api");
>     }
>
> As shown in the code above, I use rowtime() method when I want to define a
> Schema. When I try to run, I get the following error: Window aggregate
> can only be defined over a time attribute column, but TIMESTAMP(3)
> encountered.
>
> ​I tried another method based on a DLL, and it worked. So it is not my
> Kafka source problem.
>
> tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
>                 "    user_id BIGINT,\n" +
>                 "    item_id BIGINT,\n" +
>                 "    category_id BIGINT,\n" +
>                 "    behavior STRING,\n" +
>                 "    ts TIMESTAMP(3),\n" +
> //                "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
>                 "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
> 在ts上定义watermark,ts成为事件时间列\n" +
>                 ") WITH (\n" +
>                 "    'connector.type' = 'kafka',  -- 使用 kafka connector\n"
> +
>                 "    'connector.version' = 'universal',  -- kafka
> 版本,universal 支持 0.11 以上的版本\n" +
>                 "    'connector.topic' = 'user_behavior',  -- kafka
> topic\n" +
>                 "    'connector.startup-mode' = 'latest-offset',  -- 从起始
> offset 开始读取\n" +
>                 "    'connector.properties.zookeeper.connect' =
> 'localhost:2181',  -- zookeeper 地址\n" +
>                 "    'connector.properties.bootstrap.servers' =
> 'localhost:9092',  -- kafka broker 地址\n" +
>                 "    'format.type' = 'json'  -- 数据源格式为 json\n" +
>                 ")");
>
> Hope anyone can give some suggestions. Thanks.
>

Reply via email to