Flink SQL比较适合处理结构化的数据,不知道你的body_data中的filed数量是否是固定的。如果是固定的,那可以将源和目标的格式写成Table形式。 比如:
SourceT: ( uuid String, body_data ARRAY<ROW<field1 String, field2 String>> ) SinkT ( result ARRAY<ROW< uuid string, body_data String, body_data.fild1 String, body_data.fild2 String>> ) Insert into SinkT (result) select Array[ROW(uuid, null,body_data[1]. field1 as body_data.fild1, body_data[1]. Field2 as body_data.fild2), ROW(uuid, null,body_data[2]. field, body_data[2]. field2)] as result 希望对你有帮助 > 2023年11月22日 20:54,casel.chen <casel_c...@126.com> 写道: > > 输入: > > { > > "uuid":"XXXX", > > "body_data": > "[{\"fild1\":1"1231","fild2\":1"2341"},{"fild1\":"abc\","fild2\":"cdf\"}]" > > } > > > > > 输出: > > [ > > { > > "uuid": "XXXX", > > "body_data: null, > > "body_data.fild1": "123”, > > "body_data.fild2": "234" > > }, > > { > > "uuid": "XXXX", > > "body_data": null, > > "body_data.fild1": "abc", > > "body_data.fild2": "cdf" > > } > > ] > > > > > 当格式错误时 > > > > > 输入: > > { > > "uuid": "XXXX”, > > "body_data": "abc" > > } > > 输出: > > { > > "uuid": "XXXX", > > "body_data": "abc", > > "body_data.fild1": null, > > "body_data.fild2": null > > }