Flink Table API Schema定义里面的 Types.SQL_TTIMESTAMP 类型用json表示的话一定要用 yyyy-MM-dd'T'HH:mm:ss.SSS'Z'表示吗? 示例程序如下: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); TableConfig tableConfig = tEnv.getConfig();
tableConfig.setIdleStateRetentionTime(Time.minutes(10),Time.minutes(15)); tEnv.registerFunction("DateUtil",new DateUtil()); tEnv.connect( new Kafka() .version("universal") // "0.8", "0.9", "0.10", "0.11", and "universal" .topic("jsontest") .property("bootstrap.servers", "localhost:9092") .property("group.id","test") .startFromLatest() ) .withFormat( new Json() .failOnMissingField(false) .deriveSchema() ) .withSchema( new Schema() *.field("rowtime", Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("eventtime") .watermarksPeriodicBounded(2000) )* .field("fruit", Types.STRING) .field("number", Types.INT) ) .inAppendMode() .registerTableSource("source"); Table sourceTbl = tEnv.scan("source"); sourceTbl.printSchema(); tEnv.toAppendStream(sourceTbl, Row.class).print(); env.execute(); 测试数据 {"eventtime": "2019-12-17T11:11:29.555Z", "fruit": "orange", "number": 45}2019-12-17T11:11:29.555Z", "fruit": "orange", "number": 45} 我想问的是关于 加粗 部分代码的两个问题: 这个示例中我想使用事件时间eventtime,所以 field("rowtime", Types.SQL_TIMESTAMP) 中就一定要使用 rowtime 名称吗?[1] 如果是处理时间processingtime 要如何表示?实际传入的json数据字段却是"eventtime",而且格式方式我试了用 long表示的 epochInMillis不行,改成 yyyy-MM-dd HH:mm:ss也不行,后来看了源码用了 yyyy-MM-dd'T'HH:mm:ss.SSS'Z' 才通过了。想问一下有没有办法指定dateFormat? [2]