Hi Andrey, Regarding the streaming join operator, it handles RowKind differently based on the type of join:
For inner joins, it directly forwards the input RowKind. For other types of joins (e.g., left, right, or full outer joins), it simplifies processing by always emitting INSERT and DELETE messages. This behavior is documented in FLINK-17337. As shown in the code snippet for step 4, this is an inner join scenario. If you were using a left, right, or full outer join, the observed behavior would be expected due to the simplified handling of RowKind. Could you confirm which type of join you are working with? Best, Xu Shuai > 2025年3月29日 18:19,Andrey <fsstar....@gmail.com> 写道: > > Dear Flink Community, > > I'm starting my Flink journey, and I have stumbled upon a behavior of Dynamic > Tables / Changelog Streams which I cannot quite understand. So far I haven't > found an answer across the mailing list, articles and conference talk > recordings. Could you please help to understand the situation? > > TLDR: > Many-to-one JOIN converts `+U` events to `-D`/`+I` pairs. Is there a way to > keep `+U` flowing through the system? > > More detailed: > > My use-case seems to be quite standard: maintaining asynchronous incremental > materialized denormalized view of my PostgreSQL database > > In simple terms, I have a bunch of tables that I need to join together in > Flink and send the result to multiple destinations, including ElasticSearch. > > In the setup I'm working with there is already Debezium and Kafka present, so > it seemed natural to me to subscribe to Kafka topics in Flink instead of the > PostgreSQL directly. > > The end-to-end setup looks like this: > PostgreSQL -> Debezium -> Kafka -> Flink -> ElasticSearch > > In the Flink application I did the following: > 1. Create a `KafkaSource` for each table to subscribe to the debezium topics, > deserializing from AWS Glue Avro schemas to the generated POJOs. > 2. Manually map POJO stream to `DataStream<Row>` with the Upsert semantics > 3. Define two tables A and B using `.fromChangelogStream(source, schema, > ChangelogMode.upsert())` where schema includes a primary key. > 4. Use `SELECT A.id as a_id, B.id as b_id, A.code as a_code, B.code as b_code > FROM A JOIN B ON A.b_id = B.id` to join the two tables together > 5. Convert back to a stream with `.toChangelogStream(joinedTable, schema, > ChangelogMode.upsert())` where `schema` includes `a_id` as the primary key > 6. Sink that stream to ElasticSearch > > On Step 2, I generate a stream of `+U` row kinds. However, the JOIN on Step 4 > converts them into `-D`/`+I` pairs even if in the source I only have `+U` > changing just the `A.code` field. The primary key information seems to have > been lost. > > This leads to a stream of Delete/Insert events going to Elastic which isn't > optimal. > > I understand that in a general scenario, JOINs are many-to-many, but in my > case I'm joining many-to-one, so I expect `A.id` to be preserved as the > primary key for the join result. > > I can work around this limitation with buffering before the final sink to > aggregate `-D`/`+I` pairs back into `+U`. However, I wonder if there is a way > to somehow preserve the `+U` through the Join, as that would reduce the > amount of events going through the system? > > Thank you in advance for taking the time to read and answer this question, I > really appreciate the help. > > Kind regards, > Andrey Starostin >