Hi casel, Thank you for initiating the discussion about RowKind meta. I believe that in certain scenarios, it is necessary to expose RowKind. We also have similar situations internally:
In simple terms, we need to be able to control the behavior of RowKind in both Source and Sink: - When reading data from the Source, filter out unnecessary delete records. - When writing data to the Sink, filter out unnecessary delete records. Firstly, I don't think it is appropriate to expose RowKind at the Flink SQL framework level as it may lead to ambiguity in certain semantic scenarios. Therefore, I am more inclined towards perceiving specific information at the format layer. Here is my preliminary proposal: Introduce a proxy format where we can freely control which RowKinds are needed and which ones are not. At the same time, we can also expose RowKind. 1. For your scenario one, using a proxy format allows us to do this: ``` CREATE TABLE kafka_source ( f1 VARCHAR, f2 VARCHAR ) with ( 'connector' = 'kafka', 'format' = 'proxy', 'proxy.format' = 'canal-json', 'proxy.filter' = 'DELETE,UPDATE_BEFORE' ); CREATE TABLE kafka_sink ( f1 VARCHAR, f2 VARCHAR ) with ( 'connector' = 'kafka', 'format' = 'json' ); INSERT INTO kafka_sink select * from kafka_source; ``` By using a proxy, we can filter out the unwanted RowKind data types and transform the original RetractStream into an AppendOnly Stream. 2. For scenario 2, we can also use the proxy-format to achieve this. ``` CREATE TABLE kafka_source ( f1 VARCHAR, f2 VARCHAR ) with ( 'connector' = 'kafka', 'format' = 'canal-json' ); CREATE TABLE kafka_sink ( f1 VARCHAR, f2 VARCHAR ) with ( 'connector' = 'kafka', 'format' = 'proxy', 'proxy.format' = 'json', 'proxy.filter' = 'DELETE,UPDATE_BEFORE' ); INSERT INTO kafka_sink select * from kafka_source; ``` Through the proxy format, the original appendOnly sink is enabled to support sink retract stream. This is my preliminary idea. There are probably many more details to consider, but the overall concept is to use proxy format to implement some logic that we want to achieve without affecting the original format. Best, Feng On Wed, Jul 19, 2023 at 10:06 PM casel.chen <casel_c...@126.com> wrote: > CDC format like debezium-json and canal-json support read ROWKIND metadata. > > > 1. Our first scenario is syncing data of operational tables into our > streaming warehouse. > All operational data in mysql should NOT be physically deleted, so we use > "is_deleted" column to do logical delete, and there should NOT be any > delete operations happen on our streaming warehouse. > But as data grows up quickly we need to delete old data such as half year > ago in operational table to keep table size manageable and ensure the query > performance not to be decreased. These deleted records for maintain purpose > should be not synced into our streaming warehouse. So we have to filter our > them in our flink sql jobs. But currently it is not convenient to do > ROWKIND filtering. That is why I ask flink support read ROWKIND metadata by > ROW_KIND() function. Then we can use the following flink sql to do > filtering. For example: > > create table customer_source ( > id BIGINT PRIMARY KEY NOT ENFORCED, > name STRING, > region STRING > ) with ( > 'connector' = 'kafka', > 'format' = 'canal-json', > ... > ); > > > create table customer_sink ( > id BIGINT PRIMARY KEY NOT ENFORCED, > name STRING, > region STRING > ) with ( > 'connector' = 'paimon' > ... > ); > > > INSERT INTO customer_sink SELECT * FROM customer_source WHERE ROW_KIND() > <> '-D'; > > > 2. Out secondary scenario is we need sink aggregation result into MQ which > does NOT support retract data. Although flink provide upsert kafka > connector, but unfortunetly our sink system is NOT kafka, so we have to > write customized connector like upsert-kafka again. If flink sql support > filter data by ROWKIND, we don't need write any more upsert-xxx connector. > For example, > > create table customer_source ( > id BIGINT PRIMARY KEY NOT ENFORCED, > name STRING, > region STRING > ) with ( > 'connector' = 'kafka', > 'format' = 'canal-json', > ... > ); > > > create table customer_agg_sink ( > region STRING, > cust_count BIGINT > ) with ( > 'connector' = 'MQ', > 'format' = 'json', > ... > ); > > > INSERT INTO customer_agg_sink SELECT * FROM (SELECT region, count(1) as > cust_count from customer_source group by region) t WHERE ROW_KIND() <> > '-U' AND ROW_KIND() <> '-D'; > > > How do you think? Looking forward to your feedback, thanks!