自己实现一个DeserializationFormatFactory就行
可以参考官方的CanalJsonFormatFactory或者DebeziumJsonFormatFactory 在 2021-07-09 08:59:36,"Chenzhiyuan(HR)" <zhiyuan.c...@huawei.com> 写道: >大家好: >我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type. >如果json, avro不能满足的话,是不是得自己自定义一个。 >自定义的话不知道如何写,请各位帮忙指教下。 > > 定义的表如下: > CREATE TABLE MyUserTable( >uuid VARCHAR, >orgId VARCHAR >) with ( >'connector.type' = 'kafka', >'connector.version' = '0.11', >'connector.topic' = 'topic_name', >'connector.properties.zookeeper.connect' = 'localhost:2181', >'connector.properties.bootstrap.servers' = 'localhost:9092', >'connector.properties.group.id' = 'testGroup', >'format.type' = '?' >) > > >Kafka的消息体如下, 好像不符合avro之类的标准格式: > >{ > "beforeData": [], > "byteSize": 272, > "columnNumber": 32, > "data": [{ > "byteSize": 8, > "columnName": "APPLY_PERSON_ID", > "rawData": 10017, > "type": "LONG" > }, { > "byteSize": 12, > "columnName": "UPDATE_SALARY", > "rawData": "11000.000000", > "type": "DOUBLE" > }, { > "byteSize": 11, > "columnName": "UP_AMOUNT", > "rawData": "1000.000000", > "type": "DOUBLE" > }, { > "byteSize": 3, > "columnName": "CURRENCY", > "rawData": "CNY", > "type": "STRING" > }, { > "byteSize": 32, > "columnName": "EXCHANGE_RATE", > "rawData": "1.000000000000000000000000000000", > "type": "DOUBLE" > }, { > "byteSize": 11, > "columnName": "DEDUCTED_ACCOUNT", > "rawData": "1000.000000", > "type": "DOUBLE" > }, { > "byteSize": 1, > "columnName": "ENTER_AT_PROCESS", > "rawData": "Y", > "type": "STRING" > }], > "dataCount": 0, > "dataMetaData": { > "connector": "mysql", > "pos": 1000368076, > "row": 0, > "ts_ms": 1625565737000, > "snapshot": "false", > "db": "testdb", > "table": "flow_person_t" > }, > "key": "APPLY_PERSON_ID", > "memorySize": 1120, > "operation": "insert", > "rowIndex": -1, > "timestamp": "1970-01-01 00:00:00" >} >