[ https://issues.apache.org/jira/browse/FLINK-27663?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhiwen Sun closed FLINK-27663. ------------------------------ Resolution: Not A Problem > upsert-kafka can't process delete message from upsert-kafka sink > ---------------------------------------------------------------- > > Key: FLINK-27663 > URL: https://issues.apache.org/jira/browse/FLINK-27663 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.15.0, 1.13.6, 1.14.4 > Reporter: Zhiwen Sun > Priority: Major > > upsert-kafka write DELETE data as Kafka messages with null values (indicate > tombstone for the key). > But when use upsert-kafka as a source table to consumer kafka messages write > by upsert-kafka sink, DELETE messages will be ignored. > > related sql : > > > {code:java} > create table order_system_log( > id bigint, > PRIMARY KEY (id) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'test_use', > 'properties.bootstrap.servers' = 'your broker', > 'properties.group.id' = 'your group id', > 'value.json.fail-on-missing-field' = 'false', > 'value.json.ignore-parse-errors' = 'true', > 'key.json.fail-on-missing-field' = 'false', > 'key.json.ignore-parse-errors' = 'true', > 'key.format' = 'json', > 'value.format' = 'json' > ); > select > * > from > order_system_log > ; > {code} > > > The problem may be produced by DeserializationSchema#deserialize, > this method does not collect data while subclass's deserialize return null. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)