#x27; = 'localhost:9092',
>'connector.properties.group.id' = 'testGroup',
>'format.type' = '?'
>
>接下来直接查询每个字段的值:
>Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY,
>UP_AMOUNT, CURRENCY, EXCHANGE_RATE from My
nLee [mailto:17610775...@163.com]
> 发送时间: 2021年7月9日 11:34
> 收件人: user-zh@flink.apache.org
> 主题: 回复: 如何从复杂的kafka消息体定义 table
>
> Hi
>
>
> 事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.
>
>
> Best
> JasonLee
>
>
> 在2021年07月9日 10:06,Chenzhiyuan(HR) 写道:
&g
列转行是在json的 DDL 里面可以写,还是在获取kafka数据后java代码里再转换一次。
-邮件原件-
发件人: JasonLee [mailto:17610775...@163.com]
发送时间: 2021年7月9日 11:34
收件人: user-zh@flink.apache.org
主题: 回复: 如何从复杂的kafka消息体定义 table
Hi
事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.
Best
JasonLee
在2021年07月9日 10:06,Chenzhiyuan
vers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'
接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT,
CURRENCY, EXCHANGE_RATE from MyUserTable ");
请教下这个该如何
;format.type' = '?'
接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT,
CURRENCY, EXCHANGE_RATE from MyUserTable ");
请教下这个该如何定义DDL.
发件人: 17610775726 [mailto:17610775...@163.com]
发送时间: 2021年7月9日 9:26
收件人: Chenzhiyuan(HR)
主题: 回