Hongshun Wang created FLINK-35715:
-------------------------------------
Summary: Mysql Source support schema cache to deserialize record
Key: FLINK-35715
URL: https://issues.apache.org/jira/browse/FLINK-35715
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.1.1
Reporter: Hongshun Wang
Fix For: cdc-3.2.0
Current, DebeziumEventDeserializationSchema will deserialize each record with
schema inferred by this record.
{code:java}
private RecordData extractDataRecord(Struct value, Schema valueSchema) throws
Exception {
DataType dataType = schemaDataTypeInference.infer(value, valueSchema);
return (RecordData) getOrCreateConverter(dataType).convert(value,
valueSchema);
}
{code}
There are some issues:
# Inferring and creating a converter as soon as a record arrives will incur
additional costs.
# Inferring from a record might not reflect the real table schema accurately.
For instance, a timestamp type with precision 6 in MySQL might have a value
with 0 nanoseconds of the millisecond. When inferred, it will appear to have a
precision of 0.
{code:java}
protected DataType inferString(Object value, Schema schema) {
if (ZonedTimestamp.SCHEMA_NAME.equals(schema.name())) {
int nano =
Optional.ofNullable((String) value)
.map(s -> ZonedTimestamp.FORMATTER.parse(s,
Instant::from))
.map(Instant::getNano)
.orElse(0);
int precision;
if (nano == 0) {
precision = 0;
} else if (nano % 1000 > 0) {
precision = 9;
} else if (nano % 1000_000 > 0) {
precision = 6;
} else if (nano % 1000_000_000 > 0) {
precision = 3;
} else {
precision = 0;
}
return DataTypes.TIMESTAMP_LTZ(precision);
}
return DataTypes.STRING();
} {code}
However, timestamps with different precisions will have different data formats
in BinaryRecordData. Placing data with a timestamp of 0 precision and then
parsing it with a precision of 6 will result in an exception being thrown.
{code:java}
//org.apache.flink.cdc.common.data.binary.BinaryRecordData#getTimestamp
@Override
public TimestampData getTimestamp(int pos, int precision) {
assertIndexIsValid(pos);
if (TimestampData.isCompact(precision)) {
return
TimestampData.fromMillis(segments[0].getLong(getFieldOffset(pos)));
}
int fieldOffset = getFieldOffset(pos);
final long offsetAndNanoOfMilli = segments[0].getLong(fieldOffset);
return BinarySegmentUtils.readTimestampData(segments, offset,
offsetAndNanoOfMilli);
} {code}
Thus, I think we should cache the table schema in Source, and only update it
with SchemaChangeRecord. Thus, the schema of source
SourceRecordEventDeserializer is always same with database.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)