[ 
https://issues.apache.org/jira/browse/FLINK-21172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiabao sun updated FLINK-21172:
-------------------------------
    Description: 
Canal flat message json format has an 'es' field extracted from mysql binlog 
which means the row data real change time in mysql. It expressed the event time 
naturally but was ignored during deserialization.

org.apache.flink.formats.json.canal. CanalJsonDeserializationSchema
{code:java}
    private static RowType createJsonRowType(DataType databaseSchema) {
        // Canal JSON contains other information, e.g. "ts", "sql", but we 
don't need them
        return (RowType)
                DataTypes.ROW(
                                DataTypes.FIELD("data", 
DataTypes.ARRAY(databaseSchema)),
                                DataTypes.FIELD("old", 
DataTypes.ARRAY(databaseSchema)),
                                DataTypes.FIELD("type", DataTypes.STRING()),
                                DataTypes.FIELD("database", DataTypes.STRING()),
                                DataTypes.FIELD("table", DataTypes.STRING()))
                        .getLogicalType();
    }
{code}



  was:
Canal flat message json format has an 'es' field extracted from mysql binlog 
which means the row data real change time in mysql. It expressed the event time 
naturally but is ignored during deserialization.

org.apache.flink.formats.json.canal. CanalJsonDeserializationSchema
{code:java}
    private static RowType createJsonRowType(DataType databaseSchema) {
        // Canal JSON contains other information, e.g. "ts", "sql", but we 
don't need them
        return (RowType)
                DataTypes.ROW(
                                DataTypes.FIELD("data", 
DataTypes.ARRAY(databaseSchema)),
                                DataTypes.FIELD("old", 
DataTypes.ARRAY(databaseSchema)),
                                DataTypes.FIELD("type", DataTypes.STRING()),
                                DataTypes.FIELD("database", DataTypes.STRING()),
                                DataTypes.FIELD("table", DataTypes.STRING()))
                        .getLogicalType();
    }
{code}




> canal-json format include es field
> ----------------------------------
>
>                 Key: FLINK-21172
>                 URL: https://issues.apache.org/jira/browse/FLINK-21172
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.12.0, 1.12.1
>            Reporter: jiabao sun
>            Priority: Minor
>
> Canal flat message json format has an 'es' field extracted from mysql binlog 
> which means the row data real change time in mysql. It expressed the event 
> time naturally but was ignored during deserialization.
> org.apache.flink.formats.json.canal. CanalJsonDeserializationSchema
> {code:java}
>     private static RowType createJsonRowType(DataType databaseSchema) {
>         // Canal JSON contains other information, e.g. "ts", "sql", but we 
> don't need them
>         return (RowType)
>                 DataTypes.ROW(
>                                 DataTypes.FIELD("data", 
> DataTypes.ARRAY(databaseSchema)),
>                                 DataTypes.FIELD("old", 
> DataTypes.ARRAY(databaseSchema)),
>                                 DataTypes.FIELD("type", DataTypes.STRING()),
>                                 DataTypes.FIELD("database", 
> DataTypes.STRING()),
>                                 DataTypes.FIELD("table", DataTypes.STRING()))
>                         .getLogicalType();
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to