[ 
https://issues.apache.org/jira/browse/FLINK-21172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17273347#comment-17273347
 ] 

Nicholas Jiang commented on FLINK-21172:
----------------------------------------

I agree with the point of [~jiabao.sun][~jark]. [~jark], could you please 
assign this ticket to me?

> canal-json format include es field
> ----------------------------------
>
>                 Key: FLINK-21172
>                 URL: https://issues.apache.org/jira/browse/FLINK-21172
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table 
> SQL / Ecosystem
>    Affects Versions: 1.12.0, 1.12.1
>            Reporter: jiabao sun
>            Priority: Minor
>
> Canal flat message json format has an 'es' field extracted from mysql binlog 
> which means the row data real change time in mysql. It expressed the event 
> time naturally but was ignored during deserialization.
> {code:json}
> {
>   "data": [
>     {
>       "id": "111",
>       "name": "scooter",
>       "description": "Big 2-wheel scooter",
>       "weight": "5.18"
>     }
>   ],
>   "database": "inventory",
>   "es": 1589373560000,
>   "id": 9,
>   "isDdl": false,
>   "mysqlType": {
>     "id": "INTEGER",
>     "name": "VARCHAR(255)",
>     "description": "VARCHAR(512)",
>     "weight": "FLOAT"
>   },
>   "old": [
>     {
>       "weight": "5.15"
>     }
>   ],
>   "pkNames": [
>     "id"
>   ],
>   "sql": "",
>   "sqlType": {
>     "id": 4,
>     "name": 12,
>     "description": 12,
>     "weight": 7
>   },
>   "table": "products",
>   "ts": 1589373560798,
>   "type": "UPDATE"
> }
> {code}
> org.apache.flink.formats.json.canal. CanalJsonDeserializationSchema
> {code:java}
>     private static RowType createJsonRowType(DataType databaseSchema) {
>         // Canal JSON contains other information, e.g. "ts", "sql", but we 
> don't need them
>         return (RowType)
>                 DataTypes.ROW(
>                                 DataTypes.FIELD("data", 
> DataTypes.ARRAY(databaseSchema)),
>                                 DataTypes.FIELD("old", 
> DataTypes.ARRAY(databaseSchema)),
>                                 DataTypes.FIELD("type", DataTypes.STRING()),
>                                 DataTypes.FIELD("database", 
> DataTypes.STRING()),
>                                 DataTypes.FIELD("table", DataTypes.STRING()))
>                         .getLogicalType();
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to