Re: 回复: Flink SQL处理Array型的JSON

2020-07-21 文章 wxpcc
补充: 最终查询为 SELECT t.* FROM kafka_source, LATERAL TABLE( fromJson(data) ) as t -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 回复: Flink SQL处理Array型的JSON

2020-07-21 文章 wxpcc
如果不等待最新版本的话也可以这样 将 纯数组的数据作为字符串 从source消费,增加自定义的json解析函数,判断 isArray 之后 遍历进行 collect if (Objects.nonNull(str)) { if (isArray) { JsonNode node = objectMapper.readTree(str); if (node.isArray()) { Iterator nodeIterator =

回复: Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hi 那我觉得目前最佳实践就是,我用DataStream的API先把数据清洗成 json object in top level 在导入Kafka,之后再FlinkSQL 处理。 可爱的木兰 发件人: Benchao Li 发送时间: 2020年7月14日 11:00 收件人: user-zh 主题: Re: Flink SQL处理Array型的JSON 我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了, 我建了一个issue[1]. [1]

回复: Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hello,Leonard Xu 我这边JSON 不是 { "id": 2, "heap": [ { "foo": 14, "bar": "foo" }, { "foo": 16, "bar": "bar" } ], } 而是直接一个Array [ { "foo": 14, "bar": "foo" },