Re: 回复: Flink SQL处理Array型的JSON

2020-07-21 文章 wxpcc
补充:
最终查询为

SELECT
 t.*
FROM
  kafka_source,
  LATERAL TABLE( fromJson(data) ) as t



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: Flink SQL处理Array型的JSON

2020-07-21 文章 wxpcc
如果不等待最新版本的话也可以这样

将 纯数组的数据作为字符串 从source消费,增加自定义的json解析函数,判断 isArray 之后 遍历进行 collect

if (Objects.nonNull(str)) {
if (isArray) {
JsonNode node = objectMapper.readTree(str);
if (node.isArray()) {
Iterator nodeIterator = node.elements();
while (nodeIterator.hasNext()) {
   
collect(deserializationSchema.deserialize(nodeIterator.next().toString().getBytes()));
}
}
} else {
   
collect(deserializationSchema.deserialize(str.getBytes()));
}
}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hi

那我觉得目前最佳实践就是,我用DataStream的API先把数据清洗成 json object in top level 
在导入Kafka,之后再FlinkSQL 处理。

可爱的木兰


发件人: Benchao Li 
发送时间: 2020年7月14日 11:00
收件人: user-zh 
主题: Re: Flink SQL处理Array型的JSON

我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了,
我建了一个issue[1].

[1] https://issues.apache.org/jira/browse/FLINK-18590

Leonard Xu  于2020年7月14日周二 上午10:42写道:

> Hello,可爱的木兰
>
> 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]
>
> SELECT users, tag
> FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> <
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> >
>
> > 在 2020年7月14日,10:34,hua mulan  写道:
> >
> > 可爱的木兰
>
>

--

Best,
Benchao Li


Re: Flink SQL处理Array型的JSON

2020-07-13 文章 Benchao Li
我感觉这是一个合理的需求,因为1.11之后我们支持了format返回多条数据,我们可以支持这种形式的数据了,
我建了一个issue[1].

[1] https://issues.apache.org/jira/browse/FLINK-18590

Leonard Xu  于2020年7月14日周二 上午10:42写道:

> Hello,可爱的木兰
>
> 可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]
>
> SELECT users, tag
> FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> <
> https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
> >
>
> > 在 2020年7月14日,10:34,hua mulan  写道:
> >
> > 可爱的木兰
>
>

-- 

Best,
Benchao Li


回复: Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hello,Leonard Xu

我这边JSON 不是

{
"id": 2,
"heap": [
{
"foo": 14,
"bar": "foo"

},
{
"foo": 16,
"bar": "bar"
}
],
}

而是直接一个Array

[
{
"foo": 14,
"bar": "foo"

},
{
"foo": 16,
"bar": "bar"
    }
    ]

我发现DDL没法声明,SQL层面我不知道怎么做了。

可爱的木兰


发件人: Leonard Xu 
发送时间: 2020年7月14日 10:42
收件人: user-zh 
主题: Re: Flink SQL处理Array型的JSON

Hello,可爱的木兰

可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]

SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
 
<https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html>

> 在 2020年7月14日,10:34,hua mulan  写道:
>
> 可爱的木兰



Re: Flink SQL处理Array型的JSON

2020-07-13 文章 Leonard Xu
Hello,可爱的木兰

可以不用改json的,可以用 UNNEST 把数组拆成多行,也可以写UDTF自己解析对应字段,参考[1]

SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/zh/dev/table/sql/queries.html
 


> 在 2020年7月14日,10:34,hua mulan  写道:
> 
> 可爱的木兰



Flink SQL处理Array型的JSON

2020-07-13 文章 hua mulan
Hi

Kafka中的JSON结构是个Array例子如下。
[
 { "id": 1},
 { "id": 2}
]
读出来变成表的两行。Flink SQL层面最佳实践是什么?
如果没有办法是不是只能改JSON结构了。


可爱的木兰