This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new f5204243e [FLINK-38806] Fix post-transform converters ineffective 
without projection or filter rules (#4191)
f5204243e is described below

commit f5204243e58008a450a3ed41e18d4a872e559b32
Author: yuxiqian <[email protected]>
AuthorDate: Tue Dec 16 14:58:03 2025 +0800

    [FLINK-38806] Fix post-transform converters ineffective without projection 
or filter rules (#4191)
---
 .../cdc/composer/definition/TransformDef.java      | 10 -------
 .../flink/translator/TransformTranslator.java      | 20 ++++++-------
 .../flink/FlinkPipelineTransformITCase.java        | 34 +++++++++++++++++++---
 3 files changed, 39 insertions(+), 25 deletions(-)

diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
index c081178b8..43cac6095 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/TransformDef.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.cdc.composer.definition;
 
-import org.apache.flink.cdc.common.utils.StringUtils;
-
 import java.util.Objects;
 
 /**
@@ -78,18 +76,10 @@ public class TransformDef {
         return projection;
     }
 
-    public boolean isValidProjection() {
-        return !StringUtils.isNullOrWhitespaceOnly(projection);
-    }
-
     public String getFilter() {
         return filter;
     }
 
-    public boolean isValidFilter() {
-        return !StringUtils.isNullOrWhitespaceOnly(filter);
-    }
-
     public String getDescription() {
         return description;
     }
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
index fb8da7cea..4cc7a0b24 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java
@@ -105,17 +105,15 @@ public class TransformTranslator {
         PostTransformOperatorBuilder postTransformFunctionBuilder =
                 PostTransformOperator.newBuilder();
         for (TransformDef transform : transforms) {
-            if (transform.isValidProjection() || transform.isValidFilter()) {
-                postTransformFunctionBuilder.addTransform(
-                        transform.getSourceTable(),
-                        transform.getProjection(),
-                        transform.getFilter(),
-                        transform.getPrimaryKeys(),
-                        transform.getPartitionKeys(),
-                        transform.getTableOptions(),
-                        transform.getPostTransformConverter(),
-                        supportedMetadataColumns);
-            }
+            postTransformFunctionBuilder.addTransform(
+                    transform.getSourceTable(),
+                    transform.getProjection(),
+                    transform.getFilter(),
+                    transform.getPrimaryKeys(),
+                    transform.getPartitionKeys(),
+                    transform.getTableOptions(),
+                    transform.getPostTransformConverter(),
+                    supportedMetadataColumns);
         }
         postTransformFunctionBuilder.addTimezone(timezone);
         postTransformFunctionBuilder.addUdfFunctions(
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 47a7ead5b..10ea31e53 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -577,11 +577,11 @@ class FlinkPipelineTransformITCase {
                                 "A Transform Block without projection or 
filter",
                                 null)),
                 Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id;name, 
partitionKeys=id, options=({bucket=17, replication_num=1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING NOT NULL,`age` INT}, 
primaryKeys=id;name, partitionKeys=id, options=({bucket=17, 
replication_num=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING}, primaryKeys=id;name, partitionKeys=id, options=({bucket=17, 
replication_num=1})}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`age` 
TINYINT,`description` STRING}, primaryKeys=id;name, partitionKeys=id, 
options=({bucket=17, replication_num=1})}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student], after=[], op=DELETE, meta=()}"));
@@ -2032,7 +2032,7 @@ class FlinkPipelineTransformITCase {
         assertThat(outputEvents)
                 .containsExactly(
                         // Initial stage
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=name, 
partitionKeys=id;name, options=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING NOT NULL,`age` INT}, primaryKeys=name, 
partitionKeys=id;name, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 21], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Barcarolle, 22], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, Cecily, 23], op=INSERT, meta=()}",
@@ -2048,7 +2048,7 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
 
                         // Alter column type stage
-                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, 
oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
+                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, 
oldTypeMapping={gender=TINYINT, name=STRING NOT NULL, age=INT}}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, 
meta=()}",
@@ -2997,6 +2997,32 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.my_table, before=[], 
after=[2, null, null, null, null, null, null, null, null, null, null], 
op=INSERT, meta=()}");
     }
 
+    @ParameterizedTest
+    @EnumSource
+    void testPostTransformConvertersWoProjection(ValuesDataSink.SinkApi 
sinkApi) throws Exception {
+        runGenericTransformTest(
+                sinkApi,
+                Collections.singletonList(
+                        new TransformDef(
+                                "default_namespace.default_schema.\\.*",
+                                null,
+                                null,
+                                null,
+                                null,
+                                null,
+                                null,
+                                "SOFT_DELETE")),
+                Arrays.asList(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT}, primaryKeys=id, 
options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING}, primaryKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student], op=INSERT, meta=()}"));
+    }
+
     private List<Event> generateFloorCeilAndRoundEvents(TableId tableId) {
         List<Event> events = new ArrayList<>();
         Schema schema =

Reply via email to