WangRuoQi created FLINK-20763: --------------------------------- Summary: canal format parse update record with null value get wrong result Key: FLINK-20763 URL: https://issues.apache.org/jira/browse/FLINK-20763 Project: Flink Issue Type: Bug Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.11.2 Reporter: WangRuoQi Attachments: canal_format.patch
When i use canal format to consume mysql binlog like this: {code:java} select ymd,count(order_no),count(*) from order_table where status>=3 group by ymd;{code} I get result like this: {code:java} (20201212,10,10) .. (20201212,20,24) .. (20201212,100,130) ..{code} I am ensure than when status>=3, every record has a valid order no, and i got a result with dirrent count(order_no) and count(*). I found this on debugging. {code:java} insert into order_table(ymd,order_no,status) values(20201212,null,1); -- +I(20201212,null,1) update table order_table set order_no=123,status=3 where id=1; -- -U(20201212,123,1) -- +U(20201212,123,3){code} So i notice that the canal format meet bug when parse update record. The source code logic is {code:java} } else if (OP_UPDATE.equals(type)) { // "data" field is an array of row, contains new rows ArrayData data = row.getArray(0); // "old" field is an array of row, contains old values ArrayData old = row.getArray(1); for (int i = 0; i < data.size(); i++) { // the underlying JSON deserialization schema always produce GenericRowData. GenericRowData after = (GenericRowData) data.getRow(i, fieldCount); GenericRowData before = (GenericRowData) old.getRow(i, fieldCount); for (int f = 0; f < fieldCount; f++) { if (before.isNullAt(f)) { // not null fields in "old" (before) means the fields are changed // null/empty fields in "old" (before) means the fields are not changed // so we just copy the not changed fields into before before.setField(f, after.getField(f)); } } before.setRowKind(RowKind.UPDATE_BEFORE); after.setRowKind(RowKind.UPDATE_AFTER); out.collect(before); out.collect(after); {code} When the old field has null value, it will be overwrite by the new record value. That lead the aggregation to a wrong result. I tried to fix this bug with following logic. For each field. Use old value when old row has this field whether it is null or nut, Use new value by default. I hope this bug will be fixed on the future version. [^canal_format.patch] -- This message was sent by Atlassian Jira (v8.3.4#803005)