Flink SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。
比如:

SourceT: (
        uuid String,
       body_data ARRAY<ROW<field1 String, field2 String>>
)

SinkT (
        result ARRAY<ROW< uuid  string, body_data  String, body_data.fild1  
String, body_data.fild2  String>>
)

Insert into SinkT (result)  select Array[ROW(uuid, null,body_data[1]. field1 as 
body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, 
null,body_data[2]. field, body_data[2]. field2)] as result

希望对你有帮助

> 2023年11月22日 20:54,casel.chen <casel_c...@126.com> 写道:
> 
> 输入:
> 
> {
> 
>  "uuid":"XXXX",
> 
>  "body_data": 
> "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]"
> 
> }
> 
> 
> 
> 
> 输出:
> 
> [
> 
>  {
> 
> "uuid": "XXXX",
> 
> "body_data: null,
> 
> "body_data.fild1": "123”,
> 
> "body_data.fild2": "234"
> 
>  },
> 
>  {
> 
> "uuid": "XXXX",
> 
> "body_data": null,
> 
> "body_data.fild1": "abc",
> 
> "body_data.fild2": "cdf"
> 
>  }
> 
> ]
> 
> 
> 
> 
> 当格式错误时
> 
> 
> 
> 
> 输入:
> 
> {
> 
> "uuid": "XXXX”,
> 
> "body_data": "abc"
> 
> }
> 
> 输出:
> 
> {
> 
> "uuid": "XXXX",
> 
> "body_data": "abc",
> 
> "body_data.fild1": null,
> 
> "body_data.fild2": null
> 
> }

回复