语句:
CREATE TABLE A (
w_data  STRING,
w_table  STRING,
w_ts TIMESTAMP(3)


CREATE TABLE B (
w_ts TIMESTAMP(3),
city1_id  STRING,
cate3_id  STRING,
pay_order_id  STRING
)

insert into B
select w_ts,

'test' as city1_id,

ArrayIndexOf(w_data, 0) AS cate3_id,
w_data as pay_order_id
from A

部分数据
A
{"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"}

B
{"w_ts":"2020-05-20T13:58:37.131Z","city1_id":"test","cate3_id":null,"pay_order_id":""}



guaishushu1...@163.com
 
发件人: Leonard Xu
发送时间: 2020-05-20 16:03
收件人: user-zh
主题: Re: Flink 1.10-SQL解析复杂json问题
Hi, guaishushu
贴query或者图床链接吧,flink-sql中的json的解析支持是比较全的[1],可以把json的 schema 和 异常数据贴下吗?
用个单元测试应该就可以复现问题
 
Best,
Leonard
 
[1] 
https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java
 
<https://github.com/apache/flink/blob/master/flink-formats/flink-json/src/test/java/org/apache/flink/formats/json/JsonRowDeserializationSchemaTest.java>
 
> 在 2020年5月20日,15:51,guaishushu1...@163.com <mailto:guaishushu1...@163.com> 写道:
> 
> kafka数据写入kafka 数据,flink1.10-sql解析复杂json中字段为string,导致数据丢失。
> 
> 
> 
> 
> 
> 
> 
> guaishushu1...@163.com <mailto:guaishushu1...@163.com>

回复