大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

   定义的表如下:
   CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
   "beforeData": [],
    "byteSize": 272,
    "columnNumber": 32,
    "data": [{
        "byteSize": 8,
        "columnName": "APPLY_PERSON_ID",
        "rawData": 10017,
        "type": "LONG"
    }, {
        "byteSize": 12,
        "columnName": "UPDATE_SALARY",
        "rawData": "11000.000000",
        "type": "DOUBLE"
    }, {
        "byteSize": 11,
        "columnName": "UP_AMOUNT",
        "rawData": "1000.000000",
        "type": "DOUBLE"
    }, {
        "byteSize": 3,
        "columnName": "CURRENCY",
        "rawData": "CNY",
        "type": "STRING"
    }, {
        "byteSize": 32,
        "columnName": "EXCHANGE_RATE",
        "rawData": "1.000000000000000000000000000000",
        "type": "DOUBLE"
    },  {
        "byteSize": 11,
        "columnName": "DEDUCTED_ACCOUNT",
        "rawData": "1000.000000",
        "type": "DOUBLE"
    }, {
        "byteSize": 1,
        "columnName": "ENTER_AT_PROCESS",
        "rawData": "Y",
        "type": "STRING"
    }],
    "dataCount": 0,
    "dataMetaData": {
        "connector": "mysql",
        "pos": 1000368076,
        "row": 0,
        "ts_ms": 1625565737000,
        "snapshot": "false",
        "db": "testdb",
        "table": "flow_person_t"
    },
    "key": "APPLY_PERSON_ID",
    "memorySize": 1120,
    "operation": "insert",
    "rowIndex": -1,
    "timestamp": "1970-01-01 00:00:00"
}

回复