Hi,

It's mentioned in the docs [1], but unfortunately this is not very well
documented in 1.10. In short you have to provide a custom implementation of
a `DeserializationSchemaFactory`. Please look at the built-in factories for
examples of how it can be done.

In newer versions it's both easier and better documented. For example in
1.13 please take a look at `DeserializationFormatFactory` and [2]

Best,
Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sourceSinks.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sourcessinks/#factories

czw., 8 lip 2021 o 14:21 Chenzhiyuan(HR) <zhiyuan.c...@huawei.com>
napisaƂ(a):

> I create table as below, and the data is from kafka.
>
> I want to deserialize the json message to Pojo object.
>
> But the message format is not avro or simple json.
>
> *So I need to know how to register custormized serializer and use it for
> the 'format.type' property.*
>
> By the way, my flink version is 1.10.0.
>
> CREATE TABLE MyUserTable(
>
> uuid VARCHAR,
>
> orgId VARCHAR
>
> ) with (
>
> 'connector.type' = 'kafka',
>
> 'connector.version' = '0.11',
>
> 'connector.topic' = 'topic_name',
>
> 'connector.properties.zookeeper.connect' = 'localhost:2181',
>
> 'connector.properties.bootstrap.servers' = 'localhost:9092',
>
> 'connector.properties.group.id' = 'testGroup',
>
> 'format.type' = 'cutormizeSerializer'
>
> )
>
> The kafka message body sample, each columnName is the key for Pojo object,
> and rawData is value:
>
> {
>
>    "beforeData": [],
>
>     "byteSize": 272,
>
>     "columnNumber": 32,
>
>     "data": [{
>
>         "byteSize": 8,
>
>         "columnName": "APPLY_PERSON_ID",
>
>         "rawData": 10017,
>
>         "type": "LONG"
>
>     }, {
>
>         "byteSize": 12,
>
>         "columnName": "UPDATE_SALARY",
>
>         "rawData": "11000.000000",
>
>         "type": "DOUBLE"
>
>     }, {
>
>         "byteSize": 11,
>
>         "columnName": "UP_AMOUNT",
>
>         "rawData": "1000.000000",
>
>         "type": "DOUBLE"
>
>     }, {
>
>         "byteSize": 3,
>
>         "columnName": "CURRENCY",
>
>         "rawData": "CNY",
>
>         "type": "STRING"
>
>     }, {
>
>         "byteSize": 32,
>
>         "columnName": "EXCHANGE_RATE",
>
>         "rawData": "1.000000000000000000000000000000",
>
>         "type": "DOUBLE"
>
>     },  {
>
>         "byteSize": 11,
>
>         "columnName": "DEDUCTED_ACCOUNT",
>
>         "rawData": "1000.000000",
>
>         "type": "DOUBLE"
>
>     }, {
>
>         "byteSize": 1,
>
>         "columnName": "ENTER_AT_PROCESS",
>
>         "rawData": "Y",
>
>         "type": "STRING"
>
>     }],
>
>     "dataCount": 0,
>
>     "dataMetaData": {
>
>         "connector": "mysql",
>
>         "pos": 1000368076,
>
>         "row": 0,
>
>         "ts_ms": 1625565737000,
>
>         "snapshot": "false",
>
>         "db": "testdb",
>
>         "table": "flow_person_t"
>
>     },
>
>     "key": "APPLY_PERSON_ID",
>
>     "memorySize": 1120,
>
>     "operation": "insert",
>
>     "rowIndex": -1,
>
>     "timestamp": "1970-01-01 00:00:00"
>
> }
>
> The Pojo object as below:
>
> import lombok.Data;
>
>
>
> @Data
>
> public class HrSalaryPersonVO {
>
>     private String uuid;
>
>     private String orgId;
>
>     private String unitId;
>
>     private String effectiveDate;
>
>
>
>     private int adjustPersonCount;
>
>
>
>     private Double adjustAmount;
>
>
>
>     private Double beforeSalaryAmount;
>
>     private Double adjustRate;
>
>
>
>     private String data0prateType;
>
>
>
>     private String status;
>
> }
>
>
>

Reply via email to