[
https://issues.apache.org/jira/browse/FLINK-36547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895231#comment-17895231
]
Leonard Xu commented on FLINK-36547:
------------------------------------
Thanks [~liyubin117] for the ticket, splitting update changelog or not is an
interesting topic in Flink SQL. I met similar user cases before, but this is a
huge breaking change for SQL component,and we need more discussion in deve
mailing-list as well. CC [~lincoln.86xy] [~jark]
> Add option to retain `RowKind` semantics for cdc formats
> --------------------------------------------------------
>
> Key: FLINK-36547
> URL: https://issues.apache.org/jira/browse/FLINK-36547
> Project: Flink
> Issue Type: Improvement
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 2.0.0
> Reporter: Yubin Li
> Assignee: Yubin Li
> Priority: Major
> Attachments: image-2024-10-16-11-01-54-790.png,
> image-2024-10-16-11-02-34-406.png
>
>
> As official docs said, `RowKind` semantics have been changed: -U -> -D, +D ->
> +I
> {code:java}
> Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL
> as Debezium JSON or Avro messages, and emit to external systems like Kafka.
> However, currently Flink can’t combine UPDATE_BEFORE and UPDATE_AFTER into a
> single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and
> UDPATE_AFTER as DELETE and INSERT Debezium messages. {code}
> In fact, we also have a demand to make the `RowKind` sematics consistent in
> many scenarios, such as those that require different processing of -U/-D and
> +U/+I. we have taken advantage of the difference between UPDATE_BEFORE and
> UPDATE_AFTER to implement the feature and made it run well in business.
> {*}implementation details{*}: When serialization, -U/+U are both represented
> by `u`, the former has a non-empty `before` field and the latter has a
> non-empty `after` field; When deserializing data of type `u`, if `before` is
> not empty, parsed as -U; if `after` is not empty, parsed as +U.
> {code:java}
> create table datagen1 (id int, name string) with
> ('connector'='datagen','number-of-rows'='5', 'fields.id.min'='1',
> 'fields.id.max'='2');
> // add 'debezium-json.retain.rowkind' = 'true'
> create table t2 (id int, name string, num bigint) WITH (
> 'topic' = 't2',
> 'connector' = 'kafka',
> 'properties.bootstrap.servers' = 'xx',
> 'properties.group.id' = 'test',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'debezium-json',
> 'key.format' = 'json',
> 'key.fields' = 'id',
> 'debezium-json.timestamp-format.standard' = 'ISO-8601',
> 'debezium-json.schema-include' = 'false',
> 'debezium-json.retain.rowkind' = 'true'
> );
> insert into t2 select id, max(name) as name, count(1) as num from datagen1
> group by id;
> insert into print1 select * from t2;
> {code}
> output result:
> !image-2024-10-16-11-02-34-406.png|width=660,height=153!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)