输入:
{
"uuid":"",
"body_data":
"[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
}
输出:
[
{
"uuid": "",
"body_data: null,
"body_data.fild1": "123”,
"body_data.fild2": "234"
},
{
"uuid": "",
"body_data": null,
"body_data.fild1": "abc",
Flink SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。
比如:
SourceT: (
uuid String,
body_data ARRAY>
)
SinkT (
result ARRAY>
)
Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1 as
body_data.fild1, body_data[1]. Field2 as
filed字段数量是固定的,但body_data数组包含的元素个数不固定,所以
Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1 as
body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid,
null,body_data[2]. field, body_data[2]. field2)] as result
这种写死body_data[X]的sql语句应该不work
在 20
>发件人: casel.chen
>发送时间: 2023-11-22 20:54
>收件人: user-zh@flink.apache.org
>主题: flink sql如何实现json字符数据解析?
>输入:
>
>{
>
> "uuid":"",
>
> "body_data":
> "[{\"fild1\":1"1231","fild2\":1"2341&