Hongshun Wang created FLINK-35715: ------------------------------------- Summary: Mysql Source support schema cache to deserialize record Key: FLINK-35715 URL: https://issues.apache.org/jira/browse/FLINK-35715 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.1 Reporter: Hongshun Wang Fix For: cdc-3.2.0
Current, DebeziumEventDeserializationSchema will deserialize each record with schema inferred by this record. {code:java} private RecordData extractDataRecord(Struct value, Schema valueSchema) throws Exception { DataType dataType = schemaDataTypeInference.infer(value, valueSchema); return (RecordData) getOrCreateConverter(dataType).convert(value, valueSchema); } {code} There are some issues: # Inferring and creating a converter as soon as a record arrives will incur additional costs. # Inferring from a record might not reflect the real table schema accurately. For instance, a timestamp type with precision 6 in MySQL might have a value with 0 nanoseconds of the millisecond. When inferred, it will appear to have a precision of 0. {code:java} protected DataType inferString(Object value, Schema schema) { if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) { int nano = Optional.ofNullable((String) value) .map(s -> ZonedTimestamp.FORMATTER.parse(s, Instant::from)) .map(Instant::getNano) .orElse(0); int precision; if (nano == 0) { precision = 0; } else if (nano % 1000 > 0) { precision = 9; } else if (nano % 1000_000 > 0) { precision = 6; } else if (nano % 1000_000_000 > 0) { precision = 3; } else { precision = 0; } return DataTypes.TIMESTAMP_LTZ(precision); } return DataTypes.STRING(); } {code} However, timestamps with different precisions will have different data formats in BinaryRecordData. Placing data with a timestamp of 0 precision and then parsing it with a precision of 6 will result in an exception being thrown. {code:java} //org.apache.flink.cdc.common.data.binary.BinaryRecordData#getTimestamp @Override public TimestampData getTimestamp(int pos, int precision) { assertIndexIsValid(pos); if (TimestampData.isCompact(precision)) { return TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos))); } int fieldOffset = getFieldOffset(pos); final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset); return BinarySegmentUtils.readTimestampData(segments, offset, offsetAndNanoOfMilli); } {code} Thus, I think we should cache the table schema in Source, and only update it with SchemaChangeRecord. Thus, the schema of source SourceRecordEventDeserializer is always same with database. -- This message was sent by Atlassian Jira (v8.20.10#820010)