This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new f5204243e [FLINK-38806] Fix post-transform converters ineffective
without projection or filter rules (#4191)
f5204243e is described below
commit f5204243e58008a450a3ed41e18d4a872e559b32
Author: yuxiqian <[email protected]>
AuthorDate: Tue Dec 16 14:58:03 2025 +0800
[FLINK-38806] Fix post-transform converters ineffective without projection
or filter rules (#4191)
---
.../cdc/composer/definition/TransformDef.java | 10 -------
.../flink/translator/TransformTranslator.java | 20 ++++++-------
.../flink/FlinkPipelineTransformITCase.java | 34 +++++++++++++++++++---
3 files changed, 39 insertions(+), 25 deletions(-)
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
index c081178b8..43cac6095 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
@@ -17,8 +17,6 @@
package org.apache.flink.cdc.composer.definition;
-import org.apache.flink.cdc.common.utils.StringUtils;
-
import java.util.Objects;
/**
@@ -78,18 +76,10 @@ public class TransformDef {
return projection;
}
- public boolean isValidProjection() {
- return !StringUtils.isNullOrWhitespaceOnly(projection);
- }
-
public String getFilter() {
return filter;
}
- public boolean isValidFilter() {
- return !StringUtils.isNullOrWhitespaceOnly(filter);
- }
-
public String getDescription() {
return description;
}
diff --git
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
index fb8da7cea..4cc7a0b24 100644
---
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
+++
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
@@ -105,17 +105,15 @@ public class TransformTranslator {
PostTransformOperatorBuilder postTransformFunctionBuilder =
PostTransformOperator.newBuilder();
for (TransformDef transform : transforms) {
- if (transform.isValidProjection() || transform.isValidFilter()) {
- postTransformFunctionBuilder.addTransform(
- transform.getSourceTable(),
- transform.getProjection(),
- transform.getFilter(),
- transform.getPrimaryKeys(),
- transform.getPartitionKeys(),
- transform.getTableOptions(),
- transform.getPostTransformConverter(),
- supportedMetadataColumns);
- }
+ postTransformFunctionBuilder.addTransform(
+ transform.getSourceTable(),
+ transform.getProjection(),
+ transform.getFilter(),
+ transform.getPrimaryKeys(),
+ transform.getPartitionKeys(),
+ transform.getTableOptions(),
+ transform.getPostTransformConverter(),
+ supportedMetadataColumns);
}
postTransformFunctionBuilder.addTimezone(timezone);
postTransformFunctionBuilder.addUdfFunctions(
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 47a7ead5b..10ea31e53 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -577,11 +577,11 @@ class FlinkPipelineTransformITCase {
"A Transform Block without projection or
filter",
null)),
Arrays.asList(
-
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name,
partitionKeys=id, options=({bucket=17, replication_num=1})}",
+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT NOT NULL,`name` STRING NOT NULL,`age` INT},
primaryKeys=id;name, partitionKeys=id, options=({bucket=17,
replication_num=1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[1, Alice, 18], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2,
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
-
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2,
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description`
STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17,
replication_num=1})}",
+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2,
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`age`
TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id,
options=({bucket=17, replication_num=1})}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[3, Carol, 15, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[4, Derrida, 25, student], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4,
Derrida, 25, student], after=[], op=DELETE, meta=()}"));
@@ -2032,7 +2032,7 @@ class FlinkPipelineTransformITCase {
assertThat(outputEvents)
.containsExactly(
// Initial stage
-
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=name,
partitionKeys=id;name, options=()}",
+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT,`name` STRING NOT NULL,`age` INT}, primaryKeys=name,
partitionKeys=id;name, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[1, Alice, 21], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[2, Barcarolle, 22], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[3, Cecily, 23], op=INSERT, meta=()}",
@@ -2048,7 +2048,7 @@ class FlinkPipelineTransformITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1,
before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
// Alter column type stage
-
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1,
typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE},
oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
+
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1,
typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE},
oldTypeMapping={gender=TINYINT, name=STRING NOT NULL, age=INT}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1,
before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE,
meta=()}",
@@ -2997,6 +2997,32 @@ class FlinkPipelineTransformITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.my_table, before=[],
after=[2, null, null, null, null, null, null, null, null, null, null],
op=INSERT, meta=()}");
}
+ @ParameterizedTest
+ @EnumSource
+ void testPostTransformConvertersWoProjection(ValuesDataSink.SinkApi
sinkApi) throws Exception {
+ runGenericTransformTest(
+ sinkApi,
+ Collections.singletonList(
+ new TransformDef(
+ "default_namespace.default_schema.\\.*",
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ "SOFT_DELETE")),
+ Arrays.asList(
+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id,
options=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[1, Alice, 18], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[2, Bob, 20], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2,
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2,
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age`
TINYINT,`description` STRING}, primaryKeys=id, options=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[3, Carol, 15, student], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[4, Derrida, 25, student], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[4, Derrida, 25, student], op=INSERT, meta=()}"));
+ }
+
private List<Event> generateFloorCeilAndRoundEvents(TableId tableId) {
List<Event> events = new ArrayList<>();
Schema schema =