> 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?

窗口里的时间用来做time attribute 列了吧,只能是TIMESTAMP(3), 
其TIMESTAMP字段Flink是可以支持到TIMESTAMP(9)的

祝好
Leonard 

> 在 2020年7月27日,20:05,RS <tinyshr...@163.com> 写道:
> 
> Hi,
> 改了下sql,遇到一个新的问题:
> Caused by: org.apache.flink.table.planner.codegen.CodeGenException: 
> Unsupported cast from 'ROW<`EXPR$0` BIGINT NOT NULL, `EXPR$1` STRING, 
> `EXPR$2` TIMESTAMP(3) *ROWTIME*> NOT NULL' to 'ROW<`total_count` BIGINT, 
> `username` STRING, `update_time` TIMESTAMP(6)>'.
> 
> 
> SELECT里面的时间是这样定义的:TUMBLE_START(update_time,INTERVAL '1' MINUTE) as 
> update_time) as payload
> 
> 
> 我把TIMESTAMP(6)修改为TIMESTAMP(3)之后,就没有报错了,所以Flink里面窗口的时间精度只是3位吗?
> 
> 
> Thanks
> 在 2020-07-27 17:49:18,"Jark Wu" <imj...@gmail.com> 写道:
>> Hi,
>> 
>> 你需要在 DDL 和 query 上都补上 schema 和 payload:
>> 
>> CREATE TABLE print_table \
>> (`schema` STRING, `payload` ROW<total_count BIGINT, username STRING,
>> update_time TIMESTAMP(6)>) \
>> WITH (\
>> 'connector' = 'kafka', \
>> 'topic' = 'test_out', \
>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>> 'sink.partitioner' = 'round-robin', \
>> 'format' = 'json')
>> 
>> -- DML 上可以用常量写死 schema, 用 ROW 函数封装 payload
>> INSERT INTO output
>> SELECT '{ "type": "struct", ...}' as schema, ROW(totall_count, username,
>> update_time) as payload
>> FROM ...
>> 
>> 
>> Btw, 我想问一下,为什么一定要用 kafka-jdbc-connect 去同步到 mysql 呢?个人觉得直接用 Flink SQL 同步到
>> mysql 不是很方便么?
>> 
>> Best,
>> Jark
>> 
>> 
>> On Mon, 27 Jul 2020 at 17:33, RS <tinyshr...@163.com> wrote:
>> 
>>> hi,
>>> kafka->Flink->kafka->mysql
>>> Flink用sql处理之后数据写入kafka里面,格式为json,再用kafka-connect-jdbc将数据导出到mysql中。
>>> 使用kafka-connect是方便数据同时导出到其他存储
>>> 
>>> 
>>> 
>>> Flink定义输出表结构:
>>> 
>>> CREATE TABLE print_table \
>>> 
>>> (total_count BIGINT, username STRING, update_time TIMESTAMP(6)) \
>>> 
>>> WITH (\
>>> 
>>> 'connector' = 'kafka', \
>>> 
>>> 'topic' = 'test_out', \
>>> 
>>> 'properties.bootstrap.servers' = '127.0.0.1:9092', \
>>> 
>>> 'sink.partitioner' = 'round-robin', \
>>> 
>>> 'format' = 'json')
>>> 
>>> 
>>> 
>>> 
>>> 输出的数据格式示例:
>>> 
>>> {"total_count":12,"username":"admin","update_time":"2020-07-27 17:23:00"}
>>> 
>>> 
>>> 
>>> 
>>> 但是kafka-connect-jdbc的json格式需要schema和payload,示例:
>>> 
>>> {
>>> 
>>>  "schema": {
>>> 
>>>    "type": "struct",
>>> 
>>>    "fields": [
>>> 
>>>      {
>>> 
>>>        "type": "int64",
>>> 
>>>        "optional": false,
>>> 
>>>        "field": "id"
>>> 
>>>      },
>>> 
>>>      {
>>> 
>>>        "type": "string",
>>> 
>>>        "optional": true,
>>> 
>>>        "field": "name"
>>> 
>>>      }
>>> 
>>>    ],
>>> 
>>>    "optional": true,
>>> 
>>>    "name": "user"
>>> 
>>>  },
>>> 
>>>  "payload": {
>>> 
>>>    "id": 1,
>>> 
>>>    "name": "admin"
>>> 
>>>  }
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 请教下在Flink里面如何处理(补上schema和payload?),才能形成kafka connect匹配的json格式?
>>> 
>>> 当前Flink处理sql:
>>> 
>>> INSERT INTO test_out(total_count,username,update_time) SELECT count(1) AS
>>> total_count,username,TUMBLE_START(update_time,INTERVAL '1' MINUTE) as
>>> update_time FROM table1 GROUP BY username,TUMBLE(update_time,INTERVAL '1'
>>> MINUTE)
>>> 
>>> 
>>> 

回复