补充:
最终查询为
SELECT
t.*
FROM
kafka_source,
LATERAL TABLE( fromJson(data) ) as t
--
Sent from: http://apache-flink.147419.n8.nabble.com/
如果不等待最新版本的话也可以这样
将 纯数组的数据作为字符串 从source消费,增加自定义的json解析函数,判断 isArray 之后 遍历进行 collect
if (Objects.nonNull(str)) {
if (isArray) {
JsonNode node = objectMapper.readTree(str);
if (node.isArray()) {
Iterator nodeIterator =
Hi
那我觉得目前最佳实践就是,我用DataStream的API先把数据清洗成 json object in top level
在导入Kafka,之后再FlinkSQL 处理。
可爱的木兰
发件人: Benchao Li
发送时间: 2020年7月14日 11:00
收件人: user-zh
主题: Re: Flink SQL处理Array型的JSON
我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了,
我建了一个issue[1].
[1]
Hello,Leonard Xu
我这边JSON 不是
{
"id": 2,
"heap": [
{
"foo": 14,
"bar": "foo"
},
{
"foo": 16,
"bar": "bar"
}
],
}
而是直接一个Array
[
{
"foo": 14,
"bar": "foo"
},