WangRuoQi created FLINK-20763:
---------------------------------

             Summary: canal format parse update record with null value get 
wrong result
                 Key: FLINK-20763
                 URL: https://issues.apache.org/jira/browse/FLINK-20763
             Project: Flink
          Issue Type: Bug
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.11.2
            Reporter: WangRuoQi
         Attachments: canal_format.patch

When i use canal format to consume mysql binlog like this:
{code:java}
select ymd,count(order_no),count(*) from order_table where status>=3 group by 
ymd;{code}
I get result like this:
{code:java}
(20201212,10,10)
..
(20201212,20,24)
..
(20201212,100,130)
..{code}
I am ensure than when status>=3, every record has a valid order no, and i got a 
result with dirrent count(order_no) and count(*).

I found this on debugging.
{code:java}
insert into order_table(ymd,order_no,status) values(20201212,null,1);
-- +I(20201212,null,1)
update table order_table set order_no=123,status=3 where id=1;
-- -U(20201212,123,1)
-- +U(20201212,123,3){code}
So i notice that the canal format meet bug when parse update record.

 

The source code logic is
{code:java}
} else if (OP_UPDATE.equals(type)) {
   // "data" field is an array of row, contains new rows
   ArrayData data = row.getArray(0);
   // "old" field is an array of row, contains old values
   ArrayData old = row.getArray(1);
   for (int i = 0; i < data.size(); i++) {
      // the underlying JSON deserialization schema always produce 
GenericRowData.
      GenericRowData after = (GenericRowData) data.getRow(i, fieldCount);
      GenericRowData before = (GenericRowData) old.getRow(i, fieldCount);
      for (int f = 0; f < fieldCount; f++) {
         if (before.isNullAt(f)) {
            // not null fields in "old" (before) means the fields are changed
            // null/empty fields in "old" (before) means the fields are not 
changed
            // so we just copy the not changed fields into before
            before.setField(f, after.getField(f));
         }
      }
      before.setRowKind(RowKind.UPDATE_BEFORE);
      after.setRowKind(RowKind.UPDATE_AFTER);
      out.collect(before);
      out.collect(after);
{code}
When the old field has null value, it will be overwrite by the new record 
value. That lead the aggregation to a wrong result.

 

I tried to fix this bug with following logic.

For each field. Use old value when old row has this field whether it is null or 
nut, Use new value by default.

I hope this bug will be fixed on the future version.

[^canal_format.patch]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to