[ https://issues.apache.org/jira/browse/FLINK-22281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17321850#comment-17321850 ]
Jark Wu commented on FLINK-22281: --------------------------------- If you don't have the full historical data, only the incremental binlog data, then aggregating on it will get wrong results, because the input data is incorrect (update an non-existing data). You can try to turn on {{table.exec.source.cdc-events-duplicate=true}} to convert the in-complete changelog into a normalized changelog, e.g. a non-existing update will be converted into an insert. See https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/canal.html#duplicate-change-events > flink sql consumer kakfa canal-json message then sum(amount) > -------------------------------------------------------------- > > Key: FLINK-22281 > URL: https://issues.apache.org/jira/browse/FLINK-22281 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.12.0 > Environment: flink 1.12 local > Reporter: xx chai > Priority: Major > Attachments: screenshot-1.png > > > I use flink sql to consumer kafka canal-json message the sql is > CREATE TABLE kafka_mall_order_info ( > id int, > amount double, > PRIMARY KEY ( id) NOT ENFORCED > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_yx-dc-3-102_3306', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'kafka_to_hive', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'canal-json'); > create table t2 (amount double) with ('connector' = 'print'); > > > insert into t2 select sum(amount) from kafka_mall_order_info ; > but the result is not i think > the result in image -- This message was sent by Atlassian Jira (v8.3.4#803005)