Hi 大家好, 近期在处理 LEFT JOIN 语句时,发现了一个奇怪的现象:假设有如下 SQL 语句:
CREATE TABLE A ( key INT ) WITH ( 'connector' = 'kafka', ); CREATE TABLE B ( key INT ) WITH ( 'connector' = 'kafka', ); CREATE TABLE Sink ( id INTEGER, upsert_value BIGINT, primary key (`id`) NOT ENFORCED ) WITH ( 'connector.type' = 'elasticsearch', 'update-mode' = 'upsert', -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式 ); INSERT INTO Sink SELECT A.key, SUM(B.key) FROM A LEFT JOIN B ON A.key = B.key GROUP BY A.key; 用于 LEFT JOIN 的左表叫做 A,右表叫做 B,那么: *场景 1. *如果左表 A 来了一条数据 key=100,在右表 B 中首次没有 JOIN 成功(此时 B 还没有 key=100 的数据),则会向下游 ES Sink 输出 Upsert 消息(true, 100, null)。如果过段时间之后,B 有了 key=100 的数据,此时 Flink 会发出 DELETE 消息(false, 100, null),随后再发送一条 UPSERT 消息(例如 true, 100, 100)更新下游结果。此后无论如何,再也不会输出 DELETE 消息了。 *场景 2. * 如果左表 A 来了一条数据 key=100,在右表 B 中首次 JOIN 成功(即 B 已经有 key=100 的数据) ,则不会输出 DELETE 消息,而是直接输出 Upsert 消息(true, 100, 100),此后无论如何,再也不会输出 DELETE 消息。 *问题:* 请问场景 1 中的 LEFT JOIN 输出 Delete 消息是否有必要呢?我理解直接对于场景 1,直接发出 Upsert 消息也可以,Delete 看似用途不大。而且,Delete 消息会造成对应 doc id 中的一些字段被清除(如果之前该 doc 保存有其他 Flink 表中未定义的字段的话),造成字段的意外丢失。 阅读了 GroupAggFunction 的代码,看到有如下的逻辑,请问这个设计是否可以阐述一下是为了避免什么情况呢?非常感谢 :) [image: image.png]