This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 2acf69337 [FLINK-39230] Transform should convert partially filtered 
UPDATE events to INSERT / DELETE (#4319)
2acf69337 is described below

commit 2acf69337783702c448c9855924bc20aa652044f
Author: yuxiqian <[email protected]>
AuthorDate: Wed Mar 18 13:51:19 2026 +0800

    [FLINK-39230] Transform should convert partially filtered UPDATE events to 
INSERT / DELETE (#4319)
---
 docs/content.zh/docs/core-concept/transform.md     |  13 +-
 docs/content/docs/core-concept/transform.md        |  14 +-
 .../flink/FlinkPipelineTransformITCase.java        | 132 +++++++++++++-
 .../cdc/composer/specs/TransformSpecsITCase.java   |   7 -
 .../src/test/resources/specs/basic.yaml            |   5 +-
 .../src/test/resources/specs/nested.yaml           |   5 +-
 .../operators/transform/PostTransformOperator.java |  67 +++++--
 .../transform/PostTransformOperatorTest.java       | 198 +++++++++++++++++++++
 .../transform/UnifiedTransformOperatorTest.java    |   4 +-
 9 files changed, 398 insertions(+), 47 deletions(-)

diff --git a/docs/content.zh/docs/core-concept/transform.md 
b/docs/content.zh/docs/core-concept/transform.md
index a9bae80cd..99b87ae73 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -338,7 +338,7 @@ transform:
 小技巧:table-options 的格式是 `key1=value1,key2=value2`。
 
 ## 分类映射
-多个转换规则可以定义为分类映射。
+在一张表同时被多个转换规则命中时,
 只有第一个匹配的转换规则将应用。
 举个例子,我们可以定义一个转换规则如下:
 
@@ -346,14 +346,13 @@ transform:
 transform:
   - source-table: mydb.web_order
     projection: id, order_id
-    filter: UPPER(province) = 'SHANGHAI'
-    description: classification mapping example
-  - source-table: mydb.web_order
-    projection: order_id as id, id as order_id
-    filter: UPPER(province) = 'BEIJING'
-    description: classification mapping example
+    filter: id > 1001
+  - source-table: mydb.\.*
+    projection: \*, 'fallback' AS FALLBACK
 ```
 
+这里,即使 `mydb.web_order` 表同样可以被第二条规则匹配,但因为排序靠前的第一条规则已经匹配,因此不会落入后续的 Transform 规则中。
+
 ## 用户自定义函数
 用户自定义函数(UDF)可以在转换规则中使用。
 
diff --git a/docs/content/docs/core-concept/transform.md 
b/docs/content/docs/core-concept/transform.md
index aed162567..1617bc8df 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -341,22 +341,20 @@ transform:
 Tips: The format of table-options is `key1=value1,key2=value2`.
 
 ## Classification mapping
-Multiple transform rules can be defined to classify input data rows and apply 
different processing.
-Only the first matched transform rule will apply.
+If a table hits ultiple transform rules, only the first matched transform rule 
will apply.
 For example, we may define a transform rule as follows:
 
 ```yaml
 transform:
   - source-table: mydb.web_order
     projection: id, order_id
-    filter: UPPER(province) = 'SHANGHAI'
-    description: classification mapping example
-  - source-table: mydb.web_order
-    projection: order_id as id, id as order_id
-    filter: UPPER(province) = 'BEIJING'
-    description: classification mapping example
+    filter: id > 1001
+  - source-table: mydb.\.*
+    projection: \*, 'fallback' AS FALLBACK
 ```
 
+Here, though `mydb.web_order` matches the second rule (`mydb.\.*`), it will 
not fall through the next rule as it has been handled in the first rule. 
+
 ## User-defined Functions
 
 User-defined functions (UDFs) can be used in transform rules.
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 2842729cd..4c4067e29 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -58,7 +58,6 @@ import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
 
-import org.apache.calcite.sql.validate.SqlValidatorException;
 import org.assertj.core.api.Assertions;
 import org.assertj.core.api.ThrowableAssert;
 import org.codehaus.commons.compiler.CompileException;
@@ -241,6 +240,136 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student], after=[], op=DELETE, meta=()}"));
     }
 
+    @ParameterizedTest(name = "API version: {0}")
+    @EnumSource(ValuesDataSink.SinkApi.class)
+    void testFilterUpdateOpTypeConversion(ValuesDataSink.SinkApi sinkApi) 
throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+        TableId myTable1 = TableId.tableId("default_namespace", 
"default_schema", "mytable1");
+        Schema table1Schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .physicalColumn("age", DataTypes.INT())
+                        .primaryKey("id")
+                        .build();
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(
+                        table1Schema.getColumnDataTypes().toArray(new 
DataType[0]));
+
+        List<Event> events = new ArrayList<>();
+        events.add(new CreateTableEvent(myTable1, table1Schema));
+        // Case 1: before=Y, after=Y -> UPDATE
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable1,
+                        generator.generate(
+                                new Object[] {1, 
BinaryStringData.fromString("Alice"), 30})));
+        events.add(
+                DataChangeEvent.updateEvent(
+                        myTable1,
+                        generator.generate(
+                                new Object[] {1, 
BinaryStringData.fromString("Alice"), 30}),
+                        generator.generate(
+                                new Object[] {1, 
BinaryStringData.fromString("Alice"), 40})));
+        // Case 2: before=Y, after=N -> DELETE
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable1,
+                        generator.generate(
+                                new Object[] {2, 
BinaryStringData.fromString("Bob"), 30})));
+        events.add(
+                DataChangeEvent.updateEvent(
+                        myTable1,
+                        generator.generate(
+                                new Object[] {2, 
BinaryStringData.fromString("Bob"), 30}),
+                        generator.generate(
+                                new Object[] {2, 
BinaryStringData.fromString("Bob"), 20})));
+        // Case 3: before=N, after=Y -> INSERT
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable1,
+                        generator.generate(
+                                new Object[] {3, 
BinaryStringData.fromString("Carol"), 20})));
+        events.add(
+                DataChangeEvent.updateEvent(
+                        myTable1,
+                        generator.generate(
+                                new Object[] {3, 
BinaryStringData.fromString("Carol"), 20}),
+                        generator.generate(
+                                new Object[] {3, 
BinaryStringData.fromString("Carol"), 35})));
+        // Case 4: before=N, after=N -> drop
+        events.add(
+                DataChangeEvent.insertEvent(
+                        myTable1,
+                        generator.generate(
+                                new Object[] {4, 
BinaryStringData.fromString("Dave"), 10})));
+        events.add(
+                DataChangeEvent.updateEvent(
+                        myTable1,
+                        generator.generate(
+                                new Object[] {4, 
BinaryStringData.fromString("Dave"), 10}),
+                        generator.generate(
+                                new Object[] {4, 
BinaryStringData.fromString("Dave"), 15})));
+
+        
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.singletonList(
+                                new TransformDef(
+                                        
"default_namespace.default_schema.\\.*",
+                                        null,
+                                        "age > 25",
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null)),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+        assertThat(outputEvents)
+                .containsExactlyInAnyOrder(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, 
options=()}",
+                        // INSERT id=1 (age=30 passes)
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 30], op=INSERT, meta=()}",
+                        // UPDATE id=1 (30->40): before=Y, after=Y -> UPDATE
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[1, 
Alice, 30], after=[1, Alice, 40], op=UPDATE, meta=()}",
+                        // INSERT id=2 (age=30 passes)
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 30], op=INSERT, meta=()}",
+                        // UPDATE id=2 (30->20): before=Y, after=N -> DELETE
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 30], after=[], op=DELETE, meta=()}",
+                        // UPDATE id=3 (20->35): before=N, after=Y -> INSERT
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, Carol, 35], op=INSERT, meta=()}");
+        // INSERT id=3 (age=20 fails), INSERT id=4 (age=10 fails),
+        // UPDATE id=4 (10->15, both fail) are all filtered out.
+    }
+
     /**
      * This tests if transform rule could be used to classify source records 
based on filtering
      * rules.
@@ -2507,7 +2636,6 @@ class FlinkPipelineTransformITCase {
                                 + "to schema\n"
                                 + "\t(Unknown).")
                 .rootCause()
-                .isExactlyInstanceOf(SqlValidatorException.class)
                 .hasMessage("Column 'id1' not found in any table");
 
         // Unexpected column in filter rule
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java
index f97af9e38..21e7ddc5d 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java
@@ -501,13 +501,6 @@ class TransformSpecsITCase {
         }
     }
 
-    enum SpecContext {
-        PROJECTION,
-        EXPECT,
-        EXPECT_ERROR,
-        NULL
-    }
-
     private static final String[] EXPECTED_SPECS = {
         "specs/arithmetic.yaml",
         "specs/basic.yaml",
diff --git a/flink-cdc-composer/src/test/resources/specs/basic.yaml 
b/flink-cdc-composer/src/test/resources/specs/basic.yaml
index 1dcf180c3..b9b173329 100644
--- a/flink-cdc-composer/src/test/resources/specs/basic.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/basic.yaml
@@ -108,13 +108,14 @@
   expect: |-
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`string_` STRING}, primaryKeys=id_, options=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is 
Lie], op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie], 
after=[], op=DELETE, meta=()}
 - do: Filter by Expression
   projection: id_, string_
   filter: id_ + 1 <= 1
   primary-key: id_
   expect: |-
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`string_` STRING}, primaryKeys=id_, options=()}
-    DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie], 
after=[-1, 天地玄黄宇宙洪荒], op=UPDATE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, 天地玄黄宇宙洪荒], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[-1, 天地玄黄宇宙洪荒], after=[], 
op=DELETE, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null], 
op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[], 
op=DELETE, meta=()}
@@ -126,6 +127,7 @@
   expect: |-
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`string_` STRING,`strlen_` INT}, primaryKeys=id_, options=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is 
Lie, 18], op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie, 18], 
after=[], op=DELETE, meta=()}
 - do: Filter by Calculation Column (With NULL)
   ignore: FLINK-38905
   projection: id_, string_, CHAR_LENGTH(string_) AS strlen_
@@ -134,6 +136,7 @@
   expect: |-
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`string_` STRING,`strlen_` INT}, primaryKeys=id_, options=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is 
Lie, 18], op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie, 18], 
after=[], op=DELETE, meta=()}
 - do: Invalid Projection Expr
   projection: id_, a_column_that_is_nowhere_to_be_found
   primary-key: id_
diff --git a/flink-cdc-composer/src/test/resources/specs/nested.yaml 
b/flink-cdc-composer/src/test/resources/specs/nested.yaml
index a600b469b..3ac67facd 100644
--- a/flink-cdc-composer/src/test/resources/specs/nested.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/nested.yaml
@@ -176,7 +176,7 @@
   primary-key: id_
   expect: |-
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`array_string_` ARRAY<STRING>}, primaryKeys=id_, options=()}
-    DataChangeEvent{tableId=foo.bar.baz, before=[1, [one, one, two, three, 
five]], after=[-1, [二, san, 五, qi, 十一]], op=UPDATE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, [二, san, 五, qi, 
十一]], op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[-1, [二, san, 五, qi, 十一]], 
after=[], op=DELETE, meta=()}
 - do: Map Subscripting
   projection: |-
@@ -248,6 +248,7 @@
   expect: |-
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`map_int_string_` MAP<INT, STRING>}, primaryKeys=id_, 
options=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, {1 -> one, 2 -> 
two, 3 -> three}], op=INSERT, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[1, {1 -> one, 2 -> two, 3 -> 
three}], after=[], op=DELETE, meta=()}
 - do: Record Subscripting With Index
   projection: |-
     id_
@@ -282,7 +283,7 @@
   primary-key: id_
   expect: |-
     CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT 
NULL 'Identifier',`complex_row_` ROW<`name` STRING, `length` INT>}, 
primaryKeys=id_, options=()}
-    DataChangeEvent{tableId=foo.bar.baz, before=[1, {name: STRING -> Alice, 
length: INT -> 5}], after=[-1, {name: STRING -> Derrida, length: INT -> 7}], 
op=UPDATE, meta=()}
+    DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, {name: STRING 
-> Derrida, length: INT -> 7}], op=INSERT, meta=()}
     DataChangeEvent{tableId=foo.bar.baz, before=[-1, {name: STRING -> Derrida, 
length: INT -> 7}], after=[], op=DELETE, meta=()}
 - do: Variant Object Subscripting With String Key
   projection: |-
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index 8683a07b8..0a8b63703 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -289,16 +289,19 @@ public class PostTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
                 getProjectionProcessor(tableId, effectiveTransformer);
         TransformFilterProcessor filterProcessor =
                 getFilterProcessor(tableId, effectiveTransformer);
-        RecordData beforeRow = null;
-        RecordData afterRow = null;
-        boolean filterPassed = true;
+
+        BinaryRecordData beforeRow = null;
+        BinaryRecordData afterRow = null;
+        boolean beforeFilterPassed = false;
+        boolean afterFilterPassed = false;
+
         if (event.before() != null) {
             context.opType = beforeOp;
             Tuple2<BinaryRecordData, Boolean> result =
                     transformRecord(
                             event.before(), info, projectionProcessor, 
filterProcessor, context);
             beforeRow = result.f0;
-            filterPassed = result.f1;
+            beforeFilterPassed = result.f1;
         }
         if (event.after() != null) {
             context.opType = afterOp;
@@ -306,23 +309,51 @@ public class PostTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
                     transformRecord(
                             event.after(), info, projectionProcessor, 
filterProcessor, context);
             afterRow = result.f0;
-            filterPassed = result.f1;
+            afterFilterPassed = result.f1;
         }
-        if (filterPassed) {
-            DataChangeEvent finalEvent = DataChangeEvent.projectRecords(event, 
beforeRow, afterRow);
-            if (effectiveTransformer.getPostTransformConverter().isPresent()) {
-                return effectiveTransformer
-                        .getPostTransformConverter()
-                        .get()
-                        .convert(finalEvent)
-                        .map(Event.class::cast);
-            } else {
-                return Optional.of(finalEvent);
-            }
+        // For UPDATE events, before and after filter results may differ, 
requiring op type
+        // conversion:
+        //   before=Y, after=Y -> UPDATE;  before=Y, after=N -> DELETE;
+        //   before=N, after=Y -> INSERT;  before=N, after=N -> drop.
+        DataChangeEvent finalEvent;
+        switch (event.op()) {
+            case INSERT:
+            case REPLACE:
+                if (!afterFilterPassed) {
+                    return Optional.empty();
+                }
+                finalEvent = DataChangeEvent.projectRecords(event, beforeRow, 
afterRow);
+                break;
+            case DELETE:
+                if (!beforeFilterPassed) {
+                    return Optional.empty();
+                }
+                finalEvent = DataChangeEvent.projectRecords(event, beforeRow, 
afterRow);
+                break;
+            case UPDATE:
+                if (beforeFilterPassed && afterFilterPassed) {
+                    finalEvent = DataChangeEvent.projectRecords(event, 
beforeRow, afterRow);
+                } else if (beforeFilterPassed) {
+                    finalEvent = DataChangeEvent.deleteEvent(tableId, 
beforeRow, event.meta());
+                } else if (afterFilterPassed) {
+                    finalEvent = DataChangeEvent.insertEvent(tableId, 
afterRow, event.meta());
+                } else {
+                    return Optional.empty();
+                }
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported operation type: " + event.op());
         }
 
-        // Events with no matching filters satisfied won't be emitted to 
downstream.
-        return Optional.empty();
+        if (effectiveTransformer.getPostTransformConverter().isPresent()) {
+            return effectiveTransformer
+                    .getPostTransformConverter()
+                    .get()
+                    .convert(finalEvent)
+                    .map(Event.class::cast);
+        }
+        return Optional.of(finalEvent);
     }
 
     /**
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index ddd5aeb76..57563b280 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -784,6 +784,204 @@ class PostTransformOperatorTest {
         transformFunctionEventEventOperatorTestHarness.close();
     }
 
+    @Test
+    void testUpdateEventFilterOpTypeConversion() throws Exception {
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                COLUMN_SQUARE_TABLE.identifier(),
+                                "col1, col2, col2 * col2 as square_col2",
+                                "col2 < 3 OR col2 > 5")
+                        .build();
+        RegularEventOperatorTestHarness<PostTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        RegularEventOperatorTestHarness.with(transform, 1);
+        transformFunctionEventEventOperatorTestHarness.open();
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(COLUMN_SQUARE_TABLE, 
COLUMN_SQUARE_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
COLUMN_SQUARE_SCHEMA.toRowDataType()));
+
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(COLUMN_SQUARE_TABLE, 
COLUMN_SQUARE_SCHEMA)));
+
+        // Case 1: before=Y(col2=1), after=Y(col2=6) -> UPDATE
+        DataChangeEvent updateBothPass =
+                DataChangeEvent.updateEvent(
+                        COLUMN_SQUARE_TABLE,
+                        recordDataGenerator.generate(new Object[] {1, 1, 
null}),
+                        recordDataGenerator.generate(new Object[] {1, 6, 
null}));
+        transform.processElement(new StreamRecord<>(updateBothPass));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                DataChangeEvent.updateEvent(
+                                        COLUMN_SQUARE_TABLE,
+                                        recordDataGenerator.generate(new 
Object[] {1, 1, 1}),
+                                        recordDataGenerator.generate(new 
Object[] {1, 6, 36}))));
+
+        // Case 2: before=Y(col2=1), after=N(col2=4) -> DELETE(before only)
+        DataChangeEvent updateBeforeOnly =
+                DataChangeEvent.updateEvent(
+                        COLUMN_SQUARE_TABLE,
+                        recordDataGenerator.generate(new Object[] {2, 1, 
null}),
+                        recordDataGenerator.generate(new Object[] {2, 4, 
null}));
+        transform.processElement(new StreamRecord<>(updateBeforeOnly));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                DataChangeEvent.deleteEvent(
+                                        COLUMN_SQUARE_TABLE,
+                                        recordDataGenerator.generate(new 
Object[] {2, 1, 1}))));
+
+        // Case 3: before=N(col2=4), after=Y(col2=6) -> INSERT(after only)
+        DataChangeEvent updateAfterOnly =
+                DataChangeEvent.updateEvent(
+                        COLUMN_SQUARE_TABLE,
+                        recordDataGenerator.generate(new Object[] {3, 4, 
null}),
+                        recordDataGenerator.generate(new Object[] {3, 6, 
null}));
+        transform.processElement(new StreamRecord<>(updateAfterOnly));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                DataChangeEvent.insertEvent(
+                                        COLUMN_SQUARE_TABLE,
+                                        recordDataGenerator.generate(new 
Object[] {3, 6, 36}))));
+
+        // Case 4: before=N(col2=4), after=N(col2=5) -> drop
+        DataChangeEvent updateNonePass =
+                DataChangeEvent.updateEvent(
+                        COLUMN_SQUARE_TABLE,
+                        recordDataGenerator.generate(new Object[] {4, 4, 
null}),
+                        recordDataGenerator.generate(new Object[] {4, 5, 
null}));
+        transform.processElement(new StreamRecord<>(updateNonePass));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isNull();
+
+        transformFunctionEventEventOperatorTestHarness.close();
+    }
+
+    @Test
+    void testUpdateEventFilterOpTypeConversionWithDataEventType() throws 
Exception {
+        // This test validates the behavior of __data_event_type__ metadata 
column
+        // when UPDATE events are converted to synthetic INSERT/DELETE events.
+        // Note: The projected __data_event_type__ reflects the original op 
type (-U/+U)
+        // rather than the converted op type (-D/+I).
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                COLUMN_SQUARE_TABLE.identifier(),
+                                "col1, col2, col2 * col2 as square_col2, 
__data_event_type__ as event_type",
+                                "col2 < 3 OR col2 > 5")
+                        .build();
+        RegularEventOperatorTestHarness<PostTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        RegularEventOperatorTestHarness.with(transform, 1);
+        transformFunctionEventEventOperatorTestHarness.open();
+
+        Schema expectedSchema =
+                Schema.newBuilder()
+                        .physicalColumn("col1", DataTypes.INT().notNull())
+                        .physicalColumn("col2", DataTypes.INT())
+                        .physicalColumn("square_col2", DataTypes.INT())
+                        .physicalColumn("event_type", 
DataTypes.STRING().notNull())
+                        .primaryKey("col1")
+                        .build();
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(COLUMN_SQUARE_TABLE, 
COLUMN_SQUARE_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
COLUMN_SQUARE_SCHEMA.toRowDataType()));
+        BinaryRecordDataGenerator expectedRecordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
expectedSchema.toRowDataType()));
+
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(COLUMN_SQUARE_TABLE, 
expectedSchema)));
+
+        // Case 1: before=Y(col2=1), after=Y(col2=6) -> UPDATE
+        // __data_event_type__ for before is -U, for after is +U
+        DataChangeEvent updateBothPass =
+                DataChangeEvent.updateEvent(
+                        COLUMN_SQUARE_TABLE,
+                        recordDataGenerator.generate(new Object[] {1, 1, 
null}),
+                        recordDataGenerator.generate(new Object[] {1, 6, 
null}));
+        transform.processElement(new StreamRecord<>(updateBothPass));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                DataChangeEvent.updateEvent(
+                                        COLUMN_SQUARE_TABLE,
+                                        expectedRecordDataGenerator.generate(
+                                                new Object[] {1, 1, 1, new 
BinaryStringData("-U")}),
+                                        expectedRecordDataGenerator.generate(
+                                                new Object[] {
+                                                    1, 6, 36, new 
BinaryStringData("+U")
+                                                }))));
+
+        // Case 2: before=Y(col2=1), after=N(col2=4) -> DELETE(before only)
+        // __data_event_type__ is -U (original before type, not -D)
+        DataChangeEvent updateBeforeOnly =
+                DataChangeEvent.updateEvent(
+                        COLUMN_SQUARE_TABLE,
+                        recordDataGenerator.generate(new Object[] {2, 1, 
null}),
+                        recordDataGenerator.generate(new Object[] {2, 4, 
null}));
+        transform.processElement(new StreamRecord<>(updateBeforeOnly));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                DataChangeEvent.deleteEvent(
+                                        COLUMN_SQUARE_TABLE,
+                                        expectedRecordDataGenerator.generate(
+                                                new Object[] {
+                                                    2, 1, 1, new 
BinaryStringData("-U")
+                                                }))));
+
+        // Case 3: before=N(col2=4), after=Y(col2=6) -> INSERT(after only)
+        // __data_event_type__ is +U (original after type, not +I)
+        DataChangeEvent updateAfterOnly =
+                DataChangeEvent.updateEvent(
+                        COLUMN_SQUARE_TABLE,
+                        recordDataGenerator.generate(new Object[] {3, 4, 
null}),
+                        recordDataGenerator.generate(new Object[] {3, 6, 
null}));
+        transform.processElement(new StreamRecord<>(updateAfterOnly));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                DataChangeEvent.insertEvent(
+                                        COLUMN_SQUARE_TABLE,
+                                        expectedRecordDataGenerator.generate(
+                                                new Object[] {
+                                                    3, 6, 36, new 
BinaryStringData("+U")
+                                                }))));
+
+        // Case 4: before=N(col2=4), after=N(col2=5) -> drop
+        DataChangeEvent updateNonePass =
+                DataChangeEvent.updateEvent(
+                        COLUMN_SQUARE_TABLE,
+                        recordDataGenerator.generate(new Object[] {4, 4, 
null}),
+                        recordDataGenerator.generate(new Object[] {4, 5, 
null}));
+        transform.processElement(new StreamRecord<>(updateNonePass));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isNull();
+
+        transformFunctionEventEventOperatorTestHarness.close();
+    }
+
     @Test
     void testTimestampTransform() throws Exception {
         PostTransformOperator transform =
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
index f55a0da8e..b00b1a29e 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
@@ -487,7 +487,7 @@ class UnifiedTransformOperatorTest {
                 .updatePreTransformed(
                         new Object[] {"id004", 18, "NeoReference004", 2018},
                         new Object[] {"id004", 10, "NeoReference004", 2018})
-                .updatePostTransformed()
+                .deletePostTransformed("id004", "ID004", 19, "neoreference004")
                 .deleteSource("id001", "Alice", 17, "Reference001", 2021)
                 .deletePreTransformed("id001", 17, "Reference001", 2021)
                 .deletePostTransformed("id001", "ID001", 18, "reference001")
@@ -1032,7 +1032,7 @@ class UnifiedTransformOperatorTest {
                         new Object[] {"1004", "Colin", 10, "NeoReference004", 
2018})
                 .updatePreTransformed(
                         new Object[] {"1004", 19, 2018}, new Object[] {"1004", 
10, 2018})
-                .updatePostTransformed()
+                .deletePostTransformed("1004", 20, 1023L, "19")
                 .deleteSource("1001", "Alice", 17, "Reference001", 2021)
                 .deletePreTransformed("1001", 17, 2021)
                 .deletePostTransformed("1001", 18, 1018L, "17")


Reply via email to