> 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?
窗口里的时间用来做time attribute 列了吧,只能是TIMESTAMP(3), 其TIMESTAMP字段Flink是可以支持到TIMESTAMP(9)的 祝好 Leonard > 在 2020年7月27日,20:05,RS <tinyshr...@163.com> 写道: > > Hi, > 改了下sql,遇到一个新的问题: > Caused by: org.apache.flink.table.planner.codegen.CodeGenException: > Unsupported cast from 'ROW<`EXPR$0` BIGINT NOT NULL, `EXPR$1` STRING, > `EXPR$2` TIMESTAMP(3) *ROWTIME*> NOT NULL' to 'ROW<`total_count` BIGINT, > `username` STRING, `update_time` TIMESTAMP(6)>'. > > > SELECT里面的时间是这样定义的:TUMBLE_START(update_time,INTERVAL '1' MINUTE) as > update_time) as payload > > > 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗? > > > Thanks > 在 2020-07-27 17:49:18,"Jark Wu" <imj...@gmail.com> 写道: >> Hi, >> >> 你需要在 DDL 和 query 上都补上 schema 和 payload: >> >> CREATE TABLE print_table \ >> (`schema` STRING, `payload` ROW<total_count BIGINT, username STRING, >> update_time TIMESTAMP(6)>) \ >> WITH (\ >> 'connector' = 'kafka', \ >> 'topic' = 'test_out', \ >> 'properties.bootstrap.servers' = '127.0.0.1:9092', \ >> 'sink.partitioner' = 'round-robin', \ >> 'format' = 'json') >> >> -- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload >> INSERT INTO output >> SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username, >> update_time) as payload >> FROM ... >> >> >> Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到 >> mysql 不是很方便么? >> >> Best, >> Jark >> >> >> On Mon, 27 Jul 2020 at 17:33, RS <tinyshr...@163.com> wrote: >> >>> hi, >>> kafka->Flink->kafka->mysql >>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。 >>> 使用kafka-connect是方便数据同时导出到其他存储 >>> >>> >>> >>> Flink定义输出表结构: >>> >>> CREATE TABLE print_table \ >>> >>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \ >>> >>> WITH (\ >>> >>> 'connector' = 'kafka', \ >>> >>> 'topic' = 'test_out', \ >>> >>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \ >>> >>> 'sink.partitioner' = 'round-robin', \ >>> >>> 'format' = 'json') >>> >>> >>> >>> >>> 输出的数据格式示例: >>> >>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"} >>> >>> >>> >>> >>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例: >>> >>> { >>> >>> "schema": { >>> >>> "type": "struct", >>> >>> "fields": [ >>> >>> { >>> >>> "type": "int64", >>> >>> "optional": false, >>> >>> "field": "id" >>> >>> }, >>> >>> { >>> >>> "type": "string", >>> >>> "optional": true, >>> >>> "field": "name" >>> >>> } >>> >>> ], >>> >>> "optional": true, >>> >>> "name": "user" >>> >>> }, >>> >>> "payload": { >>> >>> "id": 1, >>> >>> "name": "admin" >>> >>> } >>> >>> } >>> >>> >>> >>> >>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式? >>> >>> 当前Flink处理sql: >>> >>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS >>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as >>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1' >>> MINUTE) >>> >>> >>>