Hongshun Wang created FLINK-35715:
-------------------------------------

             Summary: Mysql Source support schema cache to deserialize record
                 Key: FLINK-35715
                 URL: https://issues.apache.org/jira/browse/FLINK-35715
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.1.1
            Reporter: Hongshun Wang
             Fix For: cdc-3.2.0


 

Current, DebeziumEventDeserializationSchema will deserialize each record with 
schema inferred by this record.

 
{code:java}
private RecordData extractDataRecord(Struct value, Schema valueSchema) throws 
Exception {
    DataType dataType = schemaDataTypeInference.infer(value, valueSchema);
    return (RecordData) getOrCreateConverter(dataType).convert(value, 
valueSchema);
}
 {code}
There are some issues:
 # Inferring and creating a converter as soon as a record arrives will incur 
additional costs.
 # Inferring from a record might not reflect the real table schema accurately. 
For instance, a timestamp type with precision 6 in MySQL might have a value 
with 0 nanoseconds of the millisecond. When inferred, it will appear to have a 
precision of 0.

{code:java}
protected DataType inferString(Object value, Schema schema) {
    if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
        int nano =
                Optional.ofNullable((String) value)
                        .map(s -> ZonedTimestamp.FORMATTER.parse(s, 
Instant::from))
                        .map(Instant::getNano)
                        .orElse(0);

        int precision;
        if (nano == 0) {
            precision = 0;
        } else if (nano % 1000 > 0) {
            precision = 9;
        } else if (nano % 1000_000 > 0) {
            precision = 6;
        } else if (nano % 1000_000_000 > 0) {
            precision = 3;
        } else {
            precision = 0;
        }
        return DataTypes.TIMESTAMP_LTZ(precision);
    }
    return DataTypes.STRING();
} {code}
However, timestamps with different precisions will have different data formats 
in BinaryRecordData. Placing data with a timestamp of 0 precision and then 
parsing it with a precision of 6 will result in an exception being thrown.

 
{code:java}
//org.apache.flink.cdc.common.data.binary.BinaryRecordData#getTimestamp
@Override
public TimestampData getTimestamp(int pos, int precision) {
    assertIndexIsValid(pos);

    if (TimestampData.isCompact(precision)) {
        return 
TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos)));
    }

    int fieldOffset = getFieldOffset(pos);
    final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
    return BinarySegmentUtils.readTimestampData(segments, offset, 
offsetAndNanoOfMilli);
} {code}
Thus, I think we should cache the table schema in Source, and only update it 
with SchemaChangeRecord. Thus, the schema of source 
SourceRecordEventDeserializer is always same with database.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to