Alexander Smirnov created FLINK-31729:
-----------------------------------------
Summary: Unexpected UPDATE_BEFORE output record in LEFT OUTER JOIN
Key: FLINK-31729
URL: https://issues.apache.org/jira/browse/FLINK-31729
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.17.0
Reporter: Alexander Smirnov
Fix For: 1.18.0
Attachments: image-2023-04-05-00-08-32-984.png
Currently, in streaming LEFT/RIGHT/FULL OUTER JOIN Flink SQL doesn't emit
UPDATE_BEFORE/UPDATE_AFTER records, but instead explicitly change RowKind of
output records to INSERT/DELETE for simplicity. However, it doesn't work as
expected, because sometimes UPDATE_BEFORE rows can be emitted. What is more
confusing - after UPDATE_BEFORE record there will be INSERT record (not
UPDATE_AFTER), which can cause bugs in case when downstream operators process
UPDATE records in a different way than INSERT/DELETE (for example, it can
assume, that after UPDATE_BEFORE there should be UPDATE_AFTER record at some
point of time).
How to reproduce:
Suppose we have tables "source1" and "source2":
CREATE TABLE source1(
id int PRIMARY KEY,
c3 bigint
) WITH (
'connector' = 'kafka',
...
'format' = 'debezium-json'
);
CREATE TABLE source2(
id int PRIMARY KEY,
c3 bigint
) WITH (
'connector' = 'kafka',
...
'format' = 'debezium-json'
);
And we execute the following query:
"select t1.id, t1.c3,t2.id, t2.c3 from source1 t1 left join source2 t2 on
t1.id = t2.id"
Then we insert records one by one:
source1: \{"before":null,"after":{"id":2,"c3":7121},"op":"c"}
source2: \{"before":null,"after":{"id":2,"c3":364},"op":"c"}
source1: \{"before":{"id":2,"c3":7121},"after":\{"id":2,"c3":7222},"op":"u"}
source2: \{"before":{"id":2,"c3":364},"after":\{"id":2,"c3":564},"op":"u"}
The result will be as in the following screenshot:
!image-2023-04-05-00-08-32-984.png!
Note, that after implementing ticket
https://issues.apache.org/jira/browse/FLINK-17337 (support emitting
UPDATE_BEFORE/UPDATE_AFTER records not only in inner join) the described error
won't be relevant anymore.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)