This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 2acf69337 [FLINK-39230] Transform should convert partially filtered
UPDATE events to INSERT / DELETE (#4319)
2acf69337 is described below
commit 2acf69337783702c448c9855924bc20aa652044f
Author: yuxiqian <[email protected]>
AuthorDate: Wed Mar 18 13:51:19 2026 +0800
[FLINK-39230] Transform should convert partially filtered UPDATE events to
INSERT / DELETE (#4319)
---
docs/content.zh/docs/core-concept/transform.md | 13 +-
docs/content/docs/core-concept/transform.md | 14 +-
.../flink/FlinkPipelineTransformITCase.java | 132 +++++++++++++-
.../cdc/composer/specs/TransformSpecsITCase.java | 7 -
.../src/test/resources/specs/basic.yaml | 5 +-
.../src/test/resources/specs/nested.yaml | 5 +-
.../operators/transform/PostTransformOperator.java | 67 +++++--
.../transform/PostTransformOperatorTest.java | 198 +++++++++++++++++++++
.../transform/UnifiedTransformOperatorTest.java | 4 +-
9 files changed, 398 insertions(+), 47 deletions(-)
diff --git a/docs/content.zh/docs/core-concept/transform.md
b/docs/content.zh/docs/core-concept/transform.md
index a9bae80cd..99b87ae73 100644
--- a/docs/content.zh/docs/core-concept/transform.md
+++ b/docs/content.zh/docs/core-concept/transform.md
@@ -338,7 +338,7 @@ transform:
小技巧:table-options 的格式是 `key1=value1,key2=value2`。
## 分类映射
-多个转换规则可以定义为分类映射。
+在一张表同时被多个转换规则命中时,
只有第一个匹配的转换规则将应用。
举个例子,我们可以定义一个转换规则如下:
@@ -346,14 +346,13 @@ transform:
transform:
- source-table: mydb.web_order
projection: id, order_id
- filter: UPPER(province) = 'SHANGHAI'
- description: classification mapping example
- - source-table: mydb.web_order
- projection: order_id as id, id as order_id
- filter: UPPER(province) = 'BEIJING'
- description: classification mapping example
+ filter: id > 1001
+ - source-table: mydb.\.*
+ projection: \*, 'fallback' AS FALLBACK
```
+这里,即使 `mydb.web_order` 表同样可以被第二条规则匹配,但因为排序靠前的第一条规则已经匹配,因此不会落入后续的 Transform 规则中。
+
## 用户自定义函数
用户自定义函数(UDF)可以在转换规则中使用。
diff --git a/docs/content/docs/core-concept/transform.md
b/docs/content/docs/core-concept/transform.md
index aed162567..1617bc8df 100644
--- a/docs/content/docs/core-concept/transform.md
+++ b/docs/content/docs/core-concept/transform.md
@@ -341,22 +341,20 @@ transform:
Tips: The format of table-options is `key1=value1,key2=value2`.
## Classification mapping
-Multiple transform rules can be defined to classify input data rows and apply
different processing.
-Only the first matched transform rule will apply.
+If a table hits ultiple transform rules, only the first matched transform rule
will apply.
For example, we may define a transform rule as follows:
```yaml
transform:
- source-table: mydb.web_order
projection: id, order_id
- filter: UPPER(province) = 'SHANGHAI'
- description: classification mapping example
- - source-table: mydb.web_order
- projection: order_id as id, id as order_id
- filter: UPPER(province) = 'BEIJING'
- description: classification mapping example
+ filter: id > 1001
+ - source-table: mydb.\.*
+ projection: \*, 'fallback' AS FALLBACK
```
+Here, though `mydb.web_order` matches the second rule (`mydb.\.*`), it will
not fall through the next rule as it has been handled in the first rule.
+
## User-defined Functions
User-defined functions (UDFs) can be used in transform rules.
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 2842729cd..4c4067e29 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -58,7 +58,6 @@ import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
-import org.apache.calcite.sql.validate.SqlValidatorException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowableAssert;
import org.codehaus.commons.compiler.CompileException;
@@ -241,6 +240,136 @@ class FlinkPipelineTransformITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4,
Derrida, 25, student], after=[], op=DELETE, meta=()}"));
}
+ @ParameterizedTest(name = "API version: {0}")
+ @EnumSource(ValuesDataSink.SinkApi.class)
+ void testFilterUpdateOpTypeConversion(ValuesDataSink.SinkApi sinkApi)
throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+ TableId myTable1 = TableId.tableId("default_namespace",
"default_schema", "mytable1");
+ Schema table1Schema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT())
+ .physicalColumn("name", DataTypes.STRING())
+ .physicalColumn("age", DataTypes.INT())
+ .primaryKey("id")
+ .build();
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(
+ table1Schema.getColumnDataTypes().toArray(new
DataType[0]));
+
+ List<Event> events = new ArrayList<>();
+ events.add(new CreateTableEvent(myTable1, table1Schema));
+ // Case 1: before=Y, after=Y -> UPDATE
+ events.add(
+ DataChangeEvent.insertEvent(
+ myTable1,
+ generator.generate(
+ new Object[] {1,
BinaryStringData.fromString("Alice"), 30})));
+ events.add(
+ DataChangeEvent.updateEvent(
+ myTable1,
+ generator.generate(
+ new Object[] {1,
BinaryStringData.fromString("Alice"), 30}),
+ generator.generate(
+ new Object[] {1,
BinaryStringData.fromString("Alice"), 40})));
+ // Case 2: before=Y, after=N -> DELETE
+ events.add(
+ DataChangeEvent.insertEvent(
+ myTable1,
+ generator.generate(
+ new Object[] {2,
BinaryStringData.fromString("Bob"), 30})));
+ events.add(
+ DataChangeEvent.updateEvent(
+ myTable1,
+ generator.generate(
+ new Object[] {2,
BinaryStringData.fromString("Bob"), 30}),
+ generator.generate(
+ new Object[] {2,
BinaryStringData.fromString("Bob"), 20})));
+ // Case 3: before=N, after=Y -> INSERT
+ events.add(
+ DataChangeEvent.insertEvent(
+ myTable1,
+ generator.generate(
+ new Object[] {3,
BinaryStringData.fromString("Carol"), 20})));
+ events.add(
+ DataChangeEvent.updateEvent(
+ myTable1,
+ generator.generate(
+ new Object[] {3,
BinaryStringData.fromString("Carol"), 20}),
+ generator.generate(
+ new Object[] {3,
BinaryStringData.fromString("Carol"), 35})));
+ // Case 4: before=N, after=N -> drop
+ events.add(
+ DataChangeEvent.insertEvent(
+ myTable1,
+ generator.generate(
+ new Object[] {4,
BinaryStringData.fromString("Dave"), 10})));
+ events.add(
+ DataChangeEvent.updateEvent(
+ myTable1,
+ generator.generate(
+ new Object[] {4,
BinaryStringData.fromString("Dave"), 10}),
+ generator.generate(
+ new Object[] {4,
BinaryStringData.fromString("Dave"), 15})));
+
+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.singletonList(
+ new TransformDef(
+
"default_namespace.default_schema.\\.*",
+ null,
+ "age > 25",
+ null,
+ null,
+ null,
+ null,
+ null)),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+ assertThat(outputEvents)
+ .containsExactlyInAnyOrder(
+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id,
options=()}",
+ // INSERT id=1 (age=30 passes)
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[1, Alice, 30], op=INSERT, meta=()}",
+ // UPDATE id=1 (30->40): before=Y, after=Y -> UPDATE
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[1,
Alice, 30], after=[1, Alice, 40], op=UPDATE, meta=()}",
+ // INSERT id=2 (age=30 passes)
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[2, Bob, 30], op=INSERT, meta=()}",
+ // UPDATE id=2 (30->20): before=Y, after=N -> DELETE
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2,
Bob, 30], after=[], op=DELETE, meta=()}",
+ // UPDATE id=3 (20->35): before=N, after=Y -> INSERT
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[3, Carol, 35], op=INSERT, meta=()}");
+ // INSERT id=3 (age=20 fails), INSERT id=4 (age=10 fails),
+ // UPDATE id=4 (10->15, both fail) are all filtered out.
+ }
+
/**
* This tests if transform rule could be used to classify source records
based on filtering
* rules.
@@ -2507,7 +2636,6 @@ class FlinkPipelineTransformITCase {
+ "to schema\n"
+ "\t(Unknown).")
.rootCause()
- .isExactlyInstanceOf(SqlValidatorException.class)
.hasMessage("Column 'id1' not found in any table");
// Unexpected column in filter rule
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java
index f97af9e38..21e7ddc5d 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/specs/TransformSpecsITCase.java
@@ -501,13 +501,6 @@ class TransformSpecsITCase {
}
}
- enum SpecContext {
- PROJECTION,
- EXPECT,
- EXPECT_ERROR,
- NULL
- }
-
private static final String[] EXPECTED_SPECS = {
"specs/arithmetic.yaml",
"specs/basic.yaml",
diff --git a/flink-cdc-composer/src/test/resources/specs/basic.yaml
b/flink-cdc-composer/src/test/resources/specs/basic.yaml
index 1dcf180c3..b9b173329 100644
--- a/flink-cdc-composer/src/test/resources/specs/basic.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/basic.yaml
@@ -108,13 +108,14 @@
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`string_` STRING}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is
Lie], op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie],
after=[], op=DELETE, meta=()}
- do: Filter by Expression
projection: id_, string_
filter: id_ + 1 <= 1
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`string_` STRING}, primaryKeys=id_, options=()}
- DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie],
after=[-1, 天地玄黄宇宙洪荒], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, 天地玄黄宇宙洪荒],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, 天地玄黄宇宙洪荒], after=[],
op=DELETE, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[0, null],
op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[0, null], after=[],
op=DELETE, meta=()}
@@ -126,6 +127,7 @@
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`string_` STRING,`strlen_` INT}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is
Lie, 18], op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie, 18],
after=[], op=DELETE, meta=()}
- do: Filter by Calculation Column (With NULL)
ignore: FLINK-38905
projection: id_, string_, CHAR_LENGTH(string_) AS strlen_
@@ -134,6 +136,7 @@
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`string_` STRING,`strlen_` INT}, primaryKeys=id_, options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, From A to Z is
Lie, 18], op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1, From A to Z is Lie, 18],
after=[], op=DELETE, meta=()}
- do: Invalid Projection Expr
projection: id_, a_column_that_is_nowhere_to_be_found
primary-key: id_
diff --git a/flink-cdc-composer/src/test/resources/specs/nested.yaml
b/flink-cdc-composer/src/test/resources/specs/nested.yaml
index a600b469b..3ac67facd 100644
--- a/flink-cdc-composer/src/test/resources/specs/nested.yaml
+++ b/flink-cdc-composer/src/test/resources/specs/nested.yaml
@@ -176,7 +176,7 @@
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`array_string_` ARRAY<STRING>}, primaryKeys=id_, options=()}
- DataChangeEvent{tableId=foo.bar.baz, before=[1, [one, one, two, three,
five]], after=[-1, [二, san, 五, qi, 十一]], op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, [二, san, 五, qi,
十一]], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, [二, san, 五, qi, 十一]],
after=[], op=DELETE, meta=()}
- do: Map Subscripting
projection: |-
@@ -248,6 +248,7 @@
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`map_int_string_` MAP<INT, STRING>}, primaryKeys=id_,
options=()}
DataChangeEvent{tableId=foo.bar.baz, before=[], after=[1, {1 -> one, 2 ->
two, 3 -> three}], op=INSERT, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[1, {1 -> one, 2 -> two, 3 ->
three}], after=[], op=DELETE, meta=()}
- do: Record Subscripting With Index
projection: |-
id_
@@ -282,7 +283,7 @@
primary-key: id_
expect: |-
CreateTableEvent{tableId=foo.bar.baz, schema=columns={`id_` BIGINT NOT
NULL 'Identifier',`complex_row_` ROW<`name` STRING, `length` INT>},
primaryKeys=id_, options=()}
- DataChangeEvent{tableId=foo.bar.baz, before=[1, {name: STRING -> Alice,
length: INT -> 5}], after=[-1, {name: STRING -> Derrida, length: INT -> 7}],
op=UPDATE, meta=()}
+ DataChangeEvent{tableId=foo.bar.baz, before=[], after=[-1, {name: STRING
-> Derrida, length: INT -> 7}], op=INSERT, meta=()}
DataChangeEvent{tableId=foo.bar.baz, before=[-1, {name: STRING -> Derrida,
length: INT -> 7}], after=[], op=DELETE, meta=()}
- do: Variant Object Subscripting With String Key
projection: |-
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index 8683a07b8..0a8b63703 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -289,16 +289,19 @@ public class PostTransformOperator extends
AbstractStreamOperatorAdapter<Event>
getProjectionProcessor(tableId, effectiveTransformer);
TransformFilterProcessor filterProcessor =
getFilterProcessor(tableId, effectiveTransformer);
- RecordData beforeRow = null;
- RecordData afterRow = null;
- boolean filterPassed = true;
+
+ BinaryRecordData beforeRow = null;
+ BinaryRecordData afterRow = null;
+ boolean beforeFilterPassed = false;
+ boolean afterFilterPassed = false;
+
if (event.before() != null) {
context.opType = beforeOp;
Tuple2<BinaryRecordData, Boolean> result =
transformRecord(
event.before(), info, projectionProcessor,
filterProcessor, context);
beforeRow = result.f0;
- filterPassed = result.f1;
+ beforeFilterPassed = result.f1;
}
if (event.after() != null) {
context.opType = afterOp;
@@ -306,23 +309,51 @@ public class PostTransformOperator extends
AbstractStreamOperatorAdapter<Event>
transformRecord(
event.after(), info, projectionProcessor,
filterProcessor, context);
afterRow = result.f0;
- filterPassed = result.f1;
+ afterFilterPassed = result.f1;
}
- if (filterPassed) {
- DataChangeEvent finalEvent = DataChangeEvent.projectRecords(event,
beforeRow, afterRow);
- if (effectiveTransformer.getPostTransformConverter().isPresent()) {
- return effectiveTransformer
- .getPostTransformConverter()
- .get()
- .convert(finalEvent)
- .map(Event.class::cast);
- } else {
- return Optional.of(finalEvent);
- }
+ // For UPDATE events, before and after filter results may differ,
requiring op type
+ // conversion:
+ // before=Y, after=Y -> UPDATE; before=Y, after=N -> DELETE;
+ // before=N, after=Y -> INSERT; before=N, after=N -> drop.
+ DataChangeEvent finalEvent;
+ switch (event.op()) {
+ case INSERT:
+ case REPLACE:
+ if (!afterFilterPassed) {
+ return Optional.empty();
+ }
+ finalEvent = DataChangeEvent.projectRecords(event, beforeRow,
afterRow);
+ break;
+ case DELETE:
+ if (!beforeFilterPassed) {
+ return Optional.empty();
+ }
+ finalEvent = DataChangeEvent.projectRecords(event, beforeRow,
afterRow);
+ break;
+ case UPDATE:
+ if (beforeFilterPassed && afterFilterPassed) {
+ finalEvent = DataChangeEvent.projectRecords(event,
beforeRow, afterRow);
+ } else if (beforeFilterPassed) {
+ finalEvent = DataChangeEvent.deleteEvent(tableId,
beforeRow, event.meta());
+ } else if (afterFilterPassed) {
+ finalEvent = DataChangeEvent.insertEvent(tableId,
afterRow, event.meta());
+ } else {
+ return Optional.empty();
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported operation type: " + event.op());
}
- // Events with no matching filters satisfied won't be emitted to
downstream.
- return Optional.empty();
+ if (effectiveTransformer.getPostTransformConverter().isPresent()) {
+ return effectiveTransformer
+ .getPostTransformConverter()
+ .get()
+ .convert(finalEvent)
+ .map(Event.class::cast);
+ }
+ return Optional.of(finalEvent);
}
/**
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index ddd5aeb76..57563b280 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -784,6 +784,204 @@ class PostTransformOperatorTest {
transformFunctionEventEventOperatorTestHarness.close();
}
+ @Test
+ void testUpdateEventFilterOpTypeConversion() throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ COLUMN_SQUARE_TABLE.identifier(),
+ "col1, col2, col2 * col2 as square_col2",
+ "col2 < 3 OR col2 > 5")
+ .build();
+ RegularEventOperatorTestHarness<PostTransformOperator, Event>
+ transformFunctionEventEventOperatorTestHarness =
+ RegularEventOperatorTestHarness.with(transform, 1);
+ transformFunctionEventEventOperatorTestHarness.open();
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(COLUMN_SQUARE_TABLE,
COLUMN_SQUARE_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType)
COLUMN_SQUARE_SCHEMA.toRowDataType()));
+
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(COLUMN_SQUARE_TABLE,
COLUMN_SQUARE_SCHEMA)));
+
+ // Case 1: before=Y(col2=1), after=Y(col2=6) -> UPDATE
+ DataChangeEvent updateBothPass =
+ DataChangeEvent.updateEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new Object[] {1, 1,
null}),
+ recordDataGenerator.generate(new Object[] {1, 6,
null}));
+ transform.processElement(new StreamRecord<>(updateBothPass));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ DataChangeEvent.updateEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new
Object[] {1, 1, 1}),
+ recordDataGenerator.generate(new
Object[] {1, 6, 36}))));
+
+ // Case 2: before=Y(col2=1), after=N(col2=4) -> DELETE(before only)
+ DataChangeEvent updateBeforeOnly =
+ DataChangeEvent.updateEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new Object[] {2, 1,
null}),
+ recordDataGenerator.generate(new Object[] {2, 4,
null}));
+ transform.processElement(new StreamRecord<>(updateBeforeOnly));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ DataChangeEvent.deleteEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new
Object[] {2, 1, 1}))));
+
+ // Case 3: before=N(col2=4), after=Y(col2=6) -> INSERT(after only)
+ DataChangeEvent updateAfterOnly =
+ DataChangeEvent.updateEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new Object[] {3, 4,
null}),
+ recordDataGenerator.generate(new Object[] {3, 6,
null}));
+ transform.processElement(new StreamRecord<>(updateAfterOnly));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ DataChangeEvent.insertEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new
Object[] {3, 6, 36}))));
+
+ // Case 4: before=N(col2=4), after=N(col2=5) -> drop
+ DataChangeEvent updateNonePass =
+ DataChangeEvent.updateEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new Object[] {4, 4,
null}),
+ recordDataGenerator.generate(new Object[] {4, 5,
null}));
+ transform.processElement(new StreamRecord<>(updateNonePass));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isNull();
+
+ transformFunctionEventEventOperatorTestHarness.close();
+ }
+
+ @Test
+ void testUpdateEventFilterOpTypeConversionWithDataEventType() throws
Exception {
+ // This test validates the behavior of __data_event_type__ metadata
column
+ // when UPDATE events are converted to synthetic INSERT/DELETE events.
+ // Note: The projected __data_event_type__ reflects the original op
type (-U/+U)
+ // rather than the converted op type (-D/+I).
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ COLUMN_SQUARE_TABLE.identifier(),
+ "col1, col2, col2 * col2 as square_col2,
__data_event_type__ as event_type",
+ "col2 < 3 OR col2 > 5")
+ .build();
+ RegularEventOperatorTestHarness<PostTransformOperator, Event>
+ transformFunctionEventEventOperatorTestHarness =
+ RegularEventOperatorTestHarness.with(transform, 1);
+ transformFunctionEventEventOperatorTestHarness.open();
+
+ Schema expectedSchema =
+ Schema.newBuilder()
+ .physicalColumn("col1", DataTypes.INT().notNull())
+ .physicalColumn("col2", DataTypes.INT())
+ .physicalColumn("square_col2", DataTypes.INT())
+ .physicalColumn("event_type",
DataTypes.STRING().notNull())
+ .primaryKey("col1")
+ .build();
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(COLUMN_SQUARE_TABLE,
COLUMN_SQUARE_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType)
COLUMN_SQUARE_SCHEMA.toRowDataType()));
+ BinaryRecordDataGenerator expectedRecordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType)
expectedSchema.toRowDataType()));
+
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(COLUMN_SQUARE_TABLE,
expectedSchema)));
+
+ // Case 1: before=Y(col2=1), after=Y(col2=6) -> UPDATE
+ // __data_event_type__ for before is -U, for after is +U
+ DataChangeEvent updateBothPass =
+ DataChangeEvent.updateEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new Object[] {1, 1,
null}),
+ recordDataGenerator.generate(new Object[] {1, 6,
null}));
+ transform.processElement(new StreamRecord<>(updateBothPass));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ DataChangeEvent.updateEvent(
+ COLUMN_SQUARE_TABLE,
+ expectedRecordDataGenerator.generate(
+ new Object[] {1, 1, 1, new
BinaryStringData("-U")}),
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ 1, 6, 36, new
BinaryStringData("+U")
+ }))));
+
+ // Case 2: before=Y(col2=1), after=N(col2=4) -> DELETE(before only)
+ // __data_event_type__ is -U (original before type, not -D)
+ DataChangeEvent updateBeforeOnly =
+ DataChangeEvent.updateEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new Object[] {2, 1,
null}),
+ recordDataGenerator.generate(new Object[] {2, 4,
null}));
+ transform.processElement(new StreamRecord<>(updateBeforeOnly));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ DataChangeEvent.deleteEvent(
+ COLUMN_SQUARE_TABLE,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ 2, 1, 1, new
BinaryStringData("-U")
+ }))));
+
+ // Case 3: before=N(col2=4), after=Y(col2=6) -> INSERT(after only)
+ // __data_event_type__ is +U (original after type, not +I)
+ DataChangeEvent updateAfterOnly =
+ DataChangeEvent.updateEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new Object[] {3, 4,
null}),
+ recordDataGenerator.generate(new Object[] {3, 6,
null}));
+ transform.processElement(new StreamRecord<>(updateAfterOnly));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ DataChangeEvent.insertEvent(
+ COLUMN_SQUARE_TABLE,
+ expectedRecordDataGenerator.generate(
+ new Object[] {
+ 3, 6, 36, new
BinaryStringData("+U")
+ }))));
+
+ // Case 4: before=N(col2=4), after=N(col2=5) -> drop
+ DataChangeEvent updateNonePass =
+ DataChangeEvent.updateEvent(
+ COLUMN_SQUARE_TABLE,
+ recordDataGenerator.generate(new Object[] {4, 4,
null}),
+ recordDataGenerator.generate(new Object[] {4, 5,
null}));
+ transform.processElement(new StreamRecord<>(updateNonePass));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isNull();
+
+ transformFunctionEventEventOperatorTestHarness.close();
+ }
+
@Test
void testTimestampTransform() throws Exception {
PostTransformOperator transform =
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
index f55a0da8e..b00b1a29e 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
@@ -487,7 +487,7 @@ class UnifiedTransformOperatorTest {
.updatePreTransformed(
new Object[] {"id004", 18, "NeoReference004", 2018},
new Object[] {"id004", 10, "NeoReference004", 2018})
- .updatePostTransformed()
+ .deletePostTransformed("id004", "ID004", 19, "neoreference004")
.deleteSource("id001", "Alice", 17, "Reference001", 2021)
.deletePreTransformed("id001", 17, "Reference001", 2021)
.deletePostTransformed("id001", "ID001", 18, "reference001")
@@ -1032,7 +1032,7 @@ class UnifiedTransformOperatorTest {
new Object[] {"1004", "Colin", 10, "NeoReference004",
2018})
.updatePreTransformed(
new Object[] {"1004", 19, 2018}, new Object[] {"1004",
10, 2018})
- .updatePostTransformed()
+ .deletePostTransformed("1004", 20, 1023L, "19")
.deleteSource("1001", "Alice", 17, "Reference001", 2021)
.deletePreTransformed("1001", 17, 2021)
.deletePostTransformed("1001", 18, 1018L, "17")