Hi 大家好,

近期在处理 LEFT JOIN 语句时,发现了一个奇怪的现象:假设有如下 SQL 语句:

CREATE TABLE A (
    key INT
) WITH (
    'connector' = 'kafka',
);

CREATE TABLE B (
  key INT
) WITH (
    'connector' = 'kafka',
);

CREATE TABLE Sink (
    id INTEGER,
    upsert_value BIGINT,
    primary key (`id`) NOT ENFORCED
) WITH (
    'connector.type' = 'elasticsearch',
    'update-mode' = 'upsert',
-- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式
);

INSERT INTO Sink
SELECT A.key, SUM(B.key)
FROM A LEFT JOIN B ON A.key = B.key
GROUP BY A.key;


用于 LEFT JOIN 的左表叫做 A,右表叫做 B,那么:

*场景 1. *如果左表 A 来了一条数据 key=100,在右表 B 中首次没有 JOIN 成功(此时 B 还没有 key=100
的数据),则会向下游 ES Sink 输出 Upsert 消息(true, 100, null)。如果过段时间之后,B 有了 key=100
的数据,此时 Flink 会发出 DELETE 消息(false, 100, null),随后再发送一条 UPSERT 消息(例如 true,
100, 100)更新下游结果。此后无论如何,再也不会输出 DELETE 消息了。

*场景 2. * 如果左表 A 来了一条数据 key=100,在右表 B 中首次 JOIN 成功(即 B 已经有 key=100 的数据)
,则不会输出 DELETE 消息,而是直接输出 Upsert 消息(true, 100, 100),此后无论如何,再也不会输出 DELETE 消息。


*问题:*

请问场景 1 中的 LEFT JOIN 输出 Delete 消息是否有必要呢?我理解直接对于场景 1,直接发出 Upsert 消息也可以,Delete
看似用途不大。而且,Delete 消息会造成对应 doc id 中的一些字段被清除(如果之前该 doc 保存有其他 Flink
表中未定义的字段的话),造成字段的意外丢失。

阅读了 GroupAggFunction 的代码,看到有如下的逻辑,请问这个设计是否可以阐述一下是为了避免什么情况呢?非常感谢 :)

[image: image.png]

回复