自己实现一个DeserializationFormatFactory就行



可以参考官方的CanalJsonFormatFactory或者DebeziumJsonFormatFactory



在 2021-07-09 08:59:36,"Chenzhiyuan(HR)" <zhiyuan.c...@huawei.com> 写道:
>大家好:
>我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
>如果json, avro不能满足的话,是不是得自己自定义一个。
>自定义的话不知道如何写,请各位帮忙指教下。
>
>   定义的表如下:
>   CREATE TABLE MyUserTable(
>uuid VARCHAR,
>orgId VARCHAR
>) with (
>'connector.type' = 'kafka',
>'connector.version' = '0.11',
>'connector.topic' = 'topic_name',
>'connector.properties.zookeeper.connect' = 'localhost:2181',
>'connector.properties.bootstrap.servers' = 'localhost:9092',
>'connector.properties.group.id' = 'testGroup',
>'format.type' = '?'
>)
>
>
>Kafka的消息体如下, 好像不符合avro之类的标准格式:
>
>{
>   "beforeData": [],
>    "byteSize": 272,
>    "columnNumber": 32,
>    "data": [{
>        "byteSize": 8,
>        "columnName": "APPLY_PERSON_ID",
>        "rawData": 10017,
>        "type": "LONG"
>    }, {
>        "byteSize": 12,
>        "columnName": "UPDATE_SALARY",
>        "rawData": "11000.000000",
>        "type": "DOUBLE"
>    }, {
>        "byteSize": 11,
>        "columnName": "UP_AMOUNT",
>        "rawData": "1000.000000",
>        "type": "DOUBLE"
>    }, {
>        "byteSize": 3,
>        "columnName": "CURRENCY",
>        "rawData": "CNY",
>        "type": "STRING"
>    }, {
>        "byteSize": 32,
>        "columnName": "EXCHANGE_RATE",
>        "rawData": "1.000000000000000000000000000000",
>        "type": "DOUBLE"
>    },  {
>        "byteSize": 11,
>        "columnName": "DEDUCTED_ACCOUNT",
>        "rawData": "1000.000000",
>        "type": "DOUBLE"
>    }, {
>        "byteSize": 1,
>        "columnName": "ENTER_AT_PROCESS",
>        "rawData": "Y",
>        "type": "STRING"
>    }],
>    "dataCount": 0,
>    "dataMetaData": {
>        "connector": "mysql",
>        "pos": 1000368076,
>        "row": 0,
>        "ts_ms": 1625565737000,
>        "snapshot": "false",
>        "db": "testdb",
>        "table": "flow_person_t"
>    },
>    "key": "APPLY_PERSON_ID",
>    "memorySize": 1120,
>    "operation": "insert",
>    "rowIndex": -1,
>    "timestamp": "1970-01-01 00:00:00"
>}
>

回复