列转行是在json的 DDL 里面可以写,还是在获取kafka数据后java代码里再转换一次。

-----邮件原件-----
发件人: JasonLee [mailto:17610775...@163.com] 
发送时间: 2021年7月9日 11:34
收件人: user-zh@flink.apache.org
主题: 回复: 如何从复杂的kafka消息体定义 table

Hi


事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.


Best
JasonLee


在2021年07月9日 10:06,Chenzhiyuan(HR)<zhiyuan.c...@huawei.com> 写道:
消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:

CREATE TABLE MyUserTable(
APPLY_PERSON_ID VARCHAR,
UPDATE_SALARY DECIMAL,
UP_AMOUNT DECIMAL,
CURRENCY VARCHAR,
EXCHANGE_RATE DECIMAL
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181', 
'connector.properties.bootstrap.servers' = 'localhost:9092', 
'connector.properties.group.id' = 'testGroup', 'format.type' = '?'

接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, 
CURRENCY, EXCHANGE_RATE from MyUserTable ");

请教下这个该如何定义DDL.



发件人: 17610775726 [mailto:17610775...@163.com]
发送时间: 2021年7月9日 9:26
收件人: Chenzhiyuan(HR) <zhiyuan.c...@huawei.com>
主题: 回复:如何从复杂的kafka消息体定义 table

hi

用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA

Best
JasonLee
---- 回复的原邮件 ----
发件人

Chenzhiyuan(HR)<zhiyuan.c...@huawei.com><mailto:zhiyuan.c...@huawei.com>

发送日期

2021年07月09日 08:59

收件人

user-zh@flink.apache.org<user-zh@flink.apache.org><mailto:user-zh@flink.apache.org>

主题

如何从复杂的kafka消息体定义 table

大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

定义的表如下:
CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181', 
'connector.properties.bootstrap.servers' = 'localhost:9092', 
'connector.properties.group.id' = 'testGroup', 'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
"beforeData": [],
"byteSize": 272,
"columnNumber": 32,
"data": [{
"byteSize": 8,
"columnName": "APPLY_PERSON_ID",
"rawData": 10017,
"type": "LONG"
}, {
"byteSize": 12,
"columnName": "UPDATE_SALARY",
"rawData": "11000.000000",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "UP_AMOUNT",
"rawData": "1000.000000",
"type": "DOUBLE"
}, {
"byteSize": 3,
"columnName": "CURRENCY",
"rawData": "CNY",
"type": "STRING"
}, {
"byteSize": 32,
"columnName": "EXCHANGE_RATE",
"rawData": "1.000000000000000000000000000000",
"type": "DOUBLE"
},  {
"byteSize": 11,
"columnName": "DEDUCTED_ACCOUNT",
"rawData": "1000.000000",
"type": "DOUBLE"
}, {
"byteSize": 1,
"columnName": "ENTER_AT_PROCESS",
"rawData": "Y",
"type": "STRING"
}],
"dataCount": 0,
"dataMetaData": {
"connector": "mysql",
"pos": 1000368076,
"row": 0,
"ts_ms": 1625565737000,
"snapshot": "false",
"db": "testdb",
"table": "flow_person_t"
},
"key": "APPLY_PERSON_ID",
"memorySize": 1120,
"operation": "insert",
"rowIndex": -1,
"timestamp": "1970-01-01 00:00:00"
}

回复