列转行是在json的 DDL 里面可以写,还是在获取kafka数据后java代码里再转换一次。
-----邮件原件----- 发件人: JasonLee [mailto:17610775...@163.com] 发送时间: 2021年7月9日 11:34 收件人: user-zh@flink.apache.org 主题: 回复: 如何从复杂的kafka消息体定义 table Hi 事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了. Best JasonLee 在2021年07月9日 10:06,Chenzhiyuan(HR)<zhiyuan.c...@huawei.com> 写道: 消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下: CREATE TABLE MyUserTable( APPLY_PERSON_ID VARCHAR, UPDATE_SALARY DECIMAL, UP_AMOUNT DECIMAL, CURRENCY VARCHAR, EXCHANGE_RATE DECIMAL ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'topic_name', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'testGroup', 'format.type' = '?' 接下来直接查询每个字段的值: Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, CURRENCY, EXCHANGE_RATE from MyUserTable "); 请教下这个该如何定义DDL. 发件人: 17610775726 [mailto:17610775...@163.com] 发送时间: 2021年7月9日 9:26 收件人: Chenzhiyuan(HR) <zhiyuan.c...@huawei.com> 主题: 回复:如何从复杂的kafka消息体定义 table hi 用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA Best JasonLee ---- 回复的原邮件 ---- 发件人 Chenzhiyuan(HR)<zhiyuan.c...@huawei.com><mailto:zhiyuan.c...@huawei.com> 发送日期 2021年07月09日 08:59 收件人 user-zh@flink.apache.org<user-zh@flink.apache.org><mailto:user-zh@flink.apache.org> 主题 如何从复杂的kafka消息体定义 table 大家好: 我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type. 如果json, avro不能满足的话,是不是得自己自定义一个。 自定义的话不知道如何写,请各位帮忙指教下。 定义的表如下: CREATE TABLE MyUserTable( uuid VARCHAR, orgId VARCHAR ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'topic_name', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'testGroup', 'format.type' = '?' ) Kafka的消息体如下, 好像不符合avro之类的标准格式: { "beforeData": [], "byteSize": 272, "columnNumber": 32, "data": [{ "byteSize": 8, "columnName": "APPLY_PERSON_ID", "rawData": 10017, "type": "LONG" }, { "byteSize": 12, "columnName": "UPDATE_SALARY", "rawData": "11000.000000", "type": "DOUBLE" }, { "byteSize": 11, "columnName": "UP_AMOUNT", "rawData": "1000.000000", "type": "DOUBLE" }, { "byteSize": 3, "columnName": "CURRENCY", "rawData": "CNY", "type": "STRING" }, { "byteSize": 32, "columnName": "EXCHANGE_RATE", "rawData": "1.000000000000000000000000000000", "type": "DOUBLE" }, { "byteSize": 11, "columnName": "DEDUCTED_ACCOUNT", "rawData": "1000.000000", "type": "DOUBLE" }, { "byteSize": 1, "columnName": "ENTER_AT_PROCESS", "rawData": "Y", "type": "STRING" }], "dataCount": 0, "dataMetaData": { "connector": "mysql", "pos": 1000368076, "row": 0, "ts_ms": 1625565737000, "snapshot": "false", "db": "testdb", "table": "flow_person_t" }, "key": "APPLY_PERSON_ID", "memorySize": 1120, "operation": "insert", "rowIndex": -1, "timestamp": "1970-01-01 00:00:00" }