This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e2188387 [FLINK-39232] Loosen transformed schema merging validation 
check (#4315)
5e2188387 is described below

commit 5e218838751cada0b684d84884a3cb5fc87e2afe
Author: yuxiqian <[email protected]>
AuthorDate: Mon Mar 16 21:10:51 2026 +0800

    [FLINK-39232] Loosen transformed schema merging validation check (#4315)
---
 .../flink/cdc/common/utils/SchemaMergingUtils.java | 134 -------
 .../flink/FlinkPipelineBatchComposerITCase.java    |  67 ----
 .../flink/FlinkPipelineComposerITCase.java         |  72 ----
 .../flink/FlinkPipelineComposerLenientITCase.java  |  70 ----
 .../flink/FlinkPipelineTransformITCase.java        | 284 ++-------------
 .../tests/SchemaEvolvingTransformE2eITCase.java    |   7 +-
 .../cdc/pipeline/tests/TransformE2eITCase.java     |  14 +-
 .../operators/transform/PostTransformOperator.java | 157 ++++----
 .../transform/PostTransformOperatorTest.java       | 396 ++-------------------
 pom.xml                                            |   1 -
 10 files changed, 141 insertions(+), 1061 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
index 0d95f050b..50870edb5 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
@@ -63,7 +63,6 @@ import org.apache.flink.cdc.common.types.VariantType;
 import org.apache.flink.cdc.common.types.ZonedTimestampType;
 import org.apache.flink.cdc.common.types.variant.Variant;
 
-import 
org.apache.flink.shaded.guava31.com.google.common.collect.ArrayListMultimap;
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
 import org.apache.flink.shaded.guava31.com.google.common.collect.Streams;
 import org.apache.flink.shaded.guava31.com.google.common.io.BaseEncoding;
@@ -320,139 +319,6 @@ public class SchemaMergingUtils {
         return coercedRow;
     }
 
-    /**
-     * Try to merge given {@link Schema}s and ensure they're identical. The 
only difference allowed
-     * is nullability, string and varchar precision, default value, and 
comments.
-     */
-    public static Schema strictlyMergeSchemas(List<Schema> schemas) {
-        Preconditions.checkArgument(
-                !schemas.isEmpty(), "Trying to merge transformed schemas %s, 
but got empty list");
-        if (schemas.size() == 1) {
-            return schemas.get(0);
-        }
-
-        List<List<String>> primaryKeys =
-                schemas.stream()
-                        .map(Schema::primaryKeys)
-                        .filter(p -> !p.isEmpty())
-                        .distinct()
-                        .collect(Collectors.toList());
-        List<List<String>> partitionKeys =
-                schemas.stream()
-                        .map(Schema::partitionKeys)
-                        .filter(p -> !p.isEmpty())
-                        .distinct()
-                        .collect(Collectors.toList());
-        List<Map<String, String>> options =
-                schemas.stream()
-                        .map(Schema::options)
-                        .filter(p -> !p.isEmpty())
-                        .distinct()
-                        .collect(Collectors.toList());
-        List<List<String>> columnNames =
-                schemas.stream()
-                        .map(Schema::getColumnNames)
-                        .distinct()
-                        .collect(Collectors.toList());
-
-        Preconditions.checkArgument(
-                primaryKeys.size() <= 1,
-                "Trying to merge transformed schemas %s, but got more than one 
primary key configurations: %s",
-                schemas,
-                primaryKeys);
-        Preconditions.checkArgument(
-                partitionKeys.size() <= 1,
-                "Trying to merge transformed schemas %s, but got more than one 
partition key configurations: %s",
-                schemas,
-                partitionKeys);
-        Preconditions.checkArgument(
-                options.size() <= 1,
-                "Trying to merge transformed schemas %s, but got more than one 
option configurations: %s",
-                schemas,
-                options);
-        Preconditions.checkArgument(
-                columnNames.size() == 1,
-                "Trying to merge transformed schemas %s, but got more than one 
column name views: %s",
-                schemas,
-                columnNames);
-
-        int arity = columnNames.get(0).size();
-
-        ArrayListMultimap<Integer, DataType> toBeMergedColumnTypes =
-                ArrayListMultimap.create(arity, 1);
-        for (Schema schema : schemas) {
-            List<DataType> columnTypes = schema.getColumnDataTypes();
-            for (int colIndex = 0; colIndex < columnTypes.size(); colIndex++) {
-                toBeMergedColumnTypes.put(colIndex, columnTypes.get(colIndex));
-            }
-        }
-
-        List<String> mergedColumnNames = columnNames.iterator().next();
-        List<DataType> mergedColumnTypes = new ArrayList<>(arity);
-        for (int i = 0; i < arity; i++) {
-            
mergedColumnTypes.add(strictlyMergeDataTypes(toBeMergedColumnTypes.get(i)));
-        }
-
-        List<Column> mergedColumns = new ArrayList<>();
-        for (int i = 0; i < mergedColumnNames.size(); i++) {
-            mergedColumns.add(
-                    Column.physicalColumn(mergedColumnNames.get(i), 
mergedColumnTypes.get(i)));
-        }
-
-        return Schema.newBuilder()
-                .primaryKey(primaryKeys.isEmpty() ? Collections.emptyList() : 
primaryKeys.get(0))
-                .partitionKey(
-                        partitionKeys.isEmpty() ? Collections.emptyList() : 
partitionKeys.get(0))
-                .options(options.isEmpty() ? Collections.emptyMap() : 
options.get(0))
-                .setColumns(mergedColumns)
-                .build();
-    }
-
-    private static DataType strictlyMergeDataTypes(List<DataType> dataTypes) {
-        Preconditions.checkArgument(
-                !dataTypes.isEmpty(),
-                "Trying to merge transformed data types %s, but got empty 
list");
-
-        List<DataType> simpleMergeTypes =
-                dataTypes.stream().distinct().collect(Collectors.toList());
-        if (simpleMergeTypes.size() == 1) {
-            return simpleMergeTypes.get(0);
-        }
-
-        List<DataTypeRoot> typeRoots =
-                dataTypes.stream()
-                        .map(DataType::getTypeRoot)
-                        .distinct()
-                        .collect(Collectors.toList());
-        Preconditions.checkArgument(
-                typeRoots.size() == 1,
-                "Trying to merge types %s, but got more than one type root: 
%s",
-                dataTypes,
-                typeRoots);
-
-        // Decay types to the most
-        DataType type = dataTypes.get(0);
-
-        if (type.is(DataTypeRoot.CHAR)) {
-            return DataTypes.CHAR(CharType.MAX_LENGTH);
-        } else if (type.is(DataTypeRoot.VARCHAR)) {
-            return DataTypes.STRING();
-        } else if (type.is(DataTypeRoot.BINARY)) {
-            return DataTypes.BINARY(BinaryType.MAX_LENGTH);
-        } else if (type.is(DataTypeRoot.VARBINARY)) {
-            return DataTypes.VARBINARY(VarBinaryType.MAX_LENGTH);
-        } else if (type.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
-            return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION);
-        } else if (type.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) {
-            return DataTypes.TIMESTAMP_TZ(ZonedTimestampType.MAX_PRECISION);
-        } else if (type.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
-            return 
DataTypes.TIMESTAMP_LTZ(LocalZonedTimestampType.MAX_PRECISION);
-        } else {
-            throw new IllegalArgumentException(
-                    "Unable to merge data types with different precision: " + 
dataTypes);
-        }
-    }
-
     @VisibleForTesting
     static boolean isDataTypeCompatible(@Nullable DataType currentType, 
DataType upcomingType) {
         // If two types are identical, they're compatible of course.
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java
index 0074039c9..5e521c3df 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java
@@ -613,73 +613,6 @@ public class FlinkPipelineBatchComposerITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 20, +I, 2], op=INSERT, meta=({op_ts=2})}");
     }
 
-    @ParameterizedTest
-    @EnumSource
-    void testTransformTwiceInBatchMode(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
-        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
-
-        // Setup value source
-        Configuration sourceConfig = new Configuration();
-        sourceConfig.set(
-                ValuesDataSourceOptions.EVENT_SET_ID,
-                ValuesDataSourceHelper.EventSetId.TRANSFORM_BATCH_TABLE);
-        SourceDef sourceDef =
-                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
-
-        // Setup value sink
-        Configuration sinkConfig = new Configuration();
-        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
-        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
-        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
-
-        // Setup transform
-        TransformDef transformDef1 =
-                new TransformDef(
-                        "default_namespace.default_schema.table1",
-                        "*,concat(col1,'1') as col12",
-                        "col1 = '1' OR col1 = '999'",
-                        "col1",
-                        "col12",
-                        "key1=value1",
-                        "",
-                        null);
-        TransformDef transformDef2 =
-                new TransformDef(
-                        "default_namespace.default_schema.table1",
-                        "*,concat(col1,'2') as col12",
-                        "col1 = '2'",
-                        null,
-                        null,
-                        null,
-                        "",
-                        null);
-        // Setup pipeline
-        Configuration pipelineConfig = new Configuration();
-        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-        pipelineConfig.set(
-                PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
-        PipelineDef pipelineDef =
-                new PipelineDef(
-                        sourceDef,
-                        sinkDef,
-                        Collections.emptyList(),
-                        new ArrayList<>(Arrays.asList(transformDef1, 
transformDef2)),
-                        Collections.emptyList(),
-                        pipelineConfig);
-
-        // Execute the pipeline
-        PipelineExecution execution = composer.compose(pipelineDef);
-        execution.execute();
-
-        // Check the order and content of all received events
-        String[] outputEvents = outCaptor.toString().trim().split("\n");
-        assertThat(outputEvents)
-                .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, 
primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}");
-    }
-
     @ParameterizedTest
     @EnumSource
     void testOneToOneRoutingInBatchMode(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index fd2368b6c..d01d987d4 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -423,78 +423,6 @@ class FlinkPipelineComposerITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 
20, -U, 5], after=[2, x, 20, +U, 5], op=UPDATE, meta=({op_ts=5})}");
     }
 
-    @ParameterizedTest
-    @EnumSource
-    void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
-        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
-
-        // Setup value source
-        Configuration sourceConfig = new Configuration();
-        sourceConfig.set(
-                ValuesDataSourceOptions.EVENT_SET_ID,
-                ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
-        SourceDef sourceDef =
-                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
-
-        // Setup value sink
-        Configuration sinkConfig = new Configuration();
-        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
-        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
-        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
-
-        // Setup transform
-        TransformDef transformDef1 =
-                new TransformDef(
-                        "default_namespace.default_schema.table1",
-                        "*,concat(col1,'1') as col12",
-                        "col1 = '1' OR col1 = '999'",
-                        "col1",
-                        "col12",
-                        "key1=value1",
-                        "",
-                        null);
-        TransformDef transformDef2 =
-                new TransformDef(
-                        "default_namespace.default_schema.table1",
-                        "*,concat(col1,'2') as col12",
-                        "col1 = '2'",
-                        null,
-                        null,
-                        null,
-                        "",
-                        null);
-        // Setup pipeline
-        Configuration pipelineConfig = new Configuration();
-        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-        pipelineConfig.set(
-                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
-        PipelineDef pipelineDef =
-                new PipelineDef(
-                        sourceDef,
-                        sinkDef,
-                        Collections.emptyList(),
-                        new ArrayList<>(Arrays.asList(transformDef1, 
transformDef2)),
-                        Collections.emptyList(),
-                        pipelineConfig);
-
-        // Execute the pipeline
-        PipelineExecution execution = composer.compose(pipelineDef);
-        execution.execute();
-
-        // Check the order and content of all received events
-        String[] outputEvents = outCaptor.toString().trim().split("\n");
-        assertThat(outputEvents)
-                .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, 
primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}",
-                        
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER, 
existedColumnName=col2}]}",
-                        
"RenameColumnEvent{tableId=default_namespace.default_schema.table1, 
nameMapping={col2=newCol2, col3=newCol3}}",
-                        
"DropColumnEvent{tableId=default_namespace.default_schema.table1, 
droppedColumnNames=[newCol2]}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1, 
11], after=[], op=DELETE, meta=({op_ts=4})}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, , 
22], after=[2, x, 22], op=UPDATE, meta=({op_ts=5})}");
-    }
-
     @ParameterizedTest
     @EnumSource
     void testOneToOneRouting(ValuesDataSink.SinkApi sinkApi) throws Exception {
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
index ede3c4b17..7348749b7 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
@@ -465,76 +465,6 @@ class FlinkPipelineComposerLenientITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 
null, 20, -U, null, null, ], after=[2, null, 20, +U, null, null, x], op=UPDATE, 
meta=({op_ts=5})}");
     }
 
-    @ParameterizedTest
-    @EnumSource
-    void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
-        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
-
-        // Setup value source
-        Configuration sourceConfig = new Configuration();
-        sourceConfig.set(
-                ValuesDataSourceOptions.EVENT_SET_ID,
-                ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
-        SourceDef sourceDef =
-                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
-
-        // Setup value sink
-        Configuration sinkConfig = new Configuration();
-        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
-        sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
-        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
-
-        // Setup transform
-        TransformDef transformDef1 =
-                new TransformDef(
-                        "default_namespace.default_schema.table1",
-                        "*,concat(col1,'1') as col12",
-                        "col1 = '1' OR col1 = '999'",
-                        "col1",
-                        "col12",
-                        "key1=value1",
-                        "",
-                        null);
-        TransformDef transformDef2 =
-                new TransformDef(
-                        "default_namespace.default_schema.table1",
-                        "*,concat(col1,'2') as col12",
-                        "col1 = '2'",
-                        null,
-                        null,
-                        null,
-                        "",
-                        null);
-        // Setup pipeline
-        Configuration pipelineConfig = new Configuration();
-        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
-        PipelineDef pipelineDef =
-                new PipelineDef(
-                        sourceDef,
-                        sinkDef,
-                        Collections.emptyList(),
-                        new ArrayList<>(Arrays.asList(transformDef1, 
transformDef2)),
-                        Collections.emptyList(),
-                        pipelineConfig);
-
-        // Execute the pipeline
-        PipelineExecution execution = composer.compose(pipelineDef);
-        execution.execute();
-
-        // Check the order and content of all received events
-        String[] outputEvents = 
outCaptor.toString().trim().split(LINE_SEPARATOR);
-        assertThat(outputEvents)
-                .containsExactly(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.table1, 
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING}, 
primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[], 
after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}",
-                        
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST, 
existedColumnName=null}]}",
-                        
"AddColumnEvent{tableId=default_namespace.default_schema.table1, 
addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST, 
existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING, 
position=LAST, existedColumnName=null}]}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 
null, 11, null, null, 1], after=[], op=DELETE, meta=({op_ts=4})}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, 
null, 22, null, null, ], after=[2, null, 22, null, null, x], op=UPDATE, 
meta=({op_ts=5})}");
-    }
-
     @Test
     void testOneToOneRouting() throws Exception {
         FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index e4da2fca3..2842729cd 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -253,20 +253,11 @@ class FlinkPipelineTransformITCase {
                 Arrays.asList(
                         new TransformDef(
                                 "default_namespace.default_schema.\\.*",
-                                "*, 'YOUNG' AS category",
-                                "age < 20",
+                                "*, CASE WHEN age < 20 THEN 'YOUNG' WHEN age 
>= 20 THEN 'OLD' END AS category",
                                 null,
                                 null,
                                 null,
                                 null,
-                                null),
-                        new TransformDef(
-                                "default_namespace.default_schema.\\.*",
-                                "*, 'OLD' AS category",
-                                "age >= 20",
-                                null,
-                                null,
-                                null,
                                 null,
                                 null)),
                 Arrays.asList(
@@ -288,17 +279,8 @@ class FlinkPipelineTransformITCase {
                 Arrays.asList(
                         new TransformDef(
                                 "default_namespace.default_schema.\\.*",
-                                "id,age,'Juvenile' AS roleName",
-                                "age < 18",
-                                null,
-                                null,
-                                null,
+                                "id,age, CASE WHEN age < 18 THEN 'Juvenile' 
WHEN age >= 18 THEN name END AS roleName",
                                 null,
-                                null),
-                        new TransformDef(
-                                "default_namespace.default_schema.\\.*",
-                                "id,age,name AS roleName",
-                                "age >= 18",
                                 null,
                                 null,
                                 null,
@@ -317,42 +299,7 @@ class FlinkPipelineTransformITCase {
 
     @ParameterizedTest
     @EnumSource
-    void testMultiTransformWithAsterisk(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
-        runGenericTransformTest(
-                sinkApi,
-                Arrays.asList(
-                        new TransformDef(
-                                "default_namespace.default_schema.mytable2",
-                                "*,'Juvenile' AS roleName",
-                                "age < 18",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null),
-                        new TransformDef(
-                                "default_namespace.default_schema.mytable2",
-                                "id,name,age,description,name AS roleName",
-                                "age >= 18",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null)),
-                Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18], op=INSERT, meta=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20], op=INSERT, meta=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`roleName` STRING}, primaryKeys=id, options=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, Juvenile], op=INSERT, meta=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, Derrida], op=INSERT, meta=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, Derrida], after=[], op=DELETE, meta=()}"));
-    }
-
-    @ParameterizedTest
-    @EnumSource
-    void testMultiTransformMissingProjection(ValuesDataSink.SinkApi sinkApi) 
throws Exception {
+    void testMissingProjection(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
         runGenericTransformTest(
                 sinkApi,
                 Arrays.asList(
@@ -364,170 +311,14 @@ class FlinkPipelineTransformITCase {
                                 null,
                                 null,
                                 null,
-                                null),
-                        new TransformDef(
-                                "default_namespace.default_schema.mytable2",
-                                "id,UPPER(name) AS name,age,description",
-                                "age >= 18",
-                                null,
-                                null,
-                                null,
-                                null,
-                                null)),
-                Arrays.asList(
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18], op=INSERT, meta=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20], op=INSERT, meta=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` STRING,`age` TINYINT,`description` 
STRING}, primaryKeys=id, options=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student], op=INSERT, meta=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, DERRIDA, 25, student], op=INSERT, meta=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
-    }
-
-    @ParameterizedTest
-    @EnumSource
-    @Disabled("to be fixed in FLINK-37132")
-    void testMultiTransformSchemaColumnsCompatibilityWithNullProjection(
-            ValuesDataSink.SinkApi sinkApi) {
-        TransformDef nullProjection =
-                new TransformDef(
-                        "default_namespace.default_schema.mytable2",
-                        null,
-                        "age < 18",
-                        null,
-                        null,
-                        null,
-                        null,
-                        null);
-
-        assertThatThrownBy(
-                        () ->
-                                runGenericTransformTest(
-                                        sinkApi,
-                                        Arrays.asList(
-                                                nullProjection,
-                                                new TransformDef(
-                                                        
"default_namespace.default_schema.mytable2",
-                                                        // reference part 
column
-                                                        "id,UPPER(name) AS 
name",
-                                                        "age >= 18",
-                                                        null,
-                                                        null,
-                                                        null,
-                                                        null,
-                                                        null)),
-                                        Collections.emptyList()))
-                .rootCause()
-                .isExactlyInstanceOf(IllegalStateException.class)
-                .hasMessage(
-                        "Unable to merge schema columns={`id` BIGINT,`name` 
VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
-                                + "and columns={`id` BIGINT,`name` STRING}, 
primaryKeys=id, options=() with different column counts.");
-    }
-
-    @ParameterizedTest
-    @EnumSource
-    @Disabled("to be fixed in FLINK-37132")
-    void testMultiTransformSchemaColumnsCompatibilityWithEmptyProjection(
-            ValuesDataSink.SinkApi sinkApi) {
-        TransformDef emptyProjection =
-                new TransformDef(
-                        "default_namespace.default_schema.mytable2",
-                        "",
-                        "age < 18",
-                        null,
-                        null,
-                        null,
-                        null,
-                        null);
-
-        assertThatThrownBy(
-                        () ->
-                                runGenericTransformTest(
-                                        sinkApi,
-                                        Arrays.asList(
-                                                emptyProjection,
-                                                new TransformDef(
-                                                        
"default_namespace.default_schema.mytable2",
-                                                        // reference part 
column
-                                                        "id,UPPER(name) AS 
name",
-                                                        "age >= 18",
-                                                        null,
-                                                        null,
-                                                        null,
-                                                        null,
-                                                        null)),
-                                        Collections.emptyList()))
-                .rootCause()
-                .isExactlyInstanceOf(IllegalStateException.class)
-                .hasMessage(
-                        "Unable to merge schema columns={`id` BIGINT,`name` 
VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
-                                + "and columns={`id` BIGINT,`name` STRING}, 
primaryKeys=id, options=() with different column counts.");
-    }
-
-    @ParameterizedTest
-    @EnumSource
-    void 
testMultiTransformWithNullEmptyAsteriskProjections(ValuesDataSink.SinkApi 
sinkApi)
-            throws Exception {
-        TransformDef nullProjection =
-                new TransformDef(
-                        "default_namespace.default_schema.mytable2",
-                        null,
-                        "age < 18",
-                        null,
-                        null,
-                        null,
-                        null,
-                        null);
-
-        TransformDef emptyProjection =
-                new TransformDef(
-                        "default_namespace.default_schema.mytable2",
-                        "",
-                        "age < 18",
-                        null,
-                        null,
-                        null,
-                        null,
-                        null);
-
-        TransformDef asteriskProjection =
-                new TransformDef(
-                        "default_namespace.default_schema.mytable2",
-                        "*",
-                        "age < 18",
-                        null,
-                        null,
-                        null,
-                        null,
-                        null);
-
-        runGenericTransformTest(
-                sinkApi,
-                Arrays.asList(
-                        // Setting projection as null, '', or * should be 
equivalent
-                        nullProjection,
-                        emptyProjection,
-                        asteriskProjection,
-                        new TransformDef(
-                                "default_namespace.default_schema.mytable2",
-                                // reference all column
-                                "id,UPPER(name) AS name,age,description",
-                                "age >= 18",
-                                null,
-                                null,
-                                null,
-                                null,
                                 null)),
                 Arrays.asList(
                         
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
-                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` STRING,`age` TINYINT,`description` 
STRING}, primaryKeys=id, options=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student], op=INSERT, meta=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, DERRIDA, 25, student], op=INSERT, meta=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING}, primaryKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student], op=INSERT, meta=()}"));
     }
 
     /** This tests if transform generates metadata info correctly. */
@@ -1237,47 +1028,49 @@ class FlinkPipelineTransformITCase {
 
     @ParameterizedTest
     @EnumSource
-    void testTransformMergingIncompatibleRules(ValuesDataSink.SinkApi 
apiVersion) {
-        Assertions.assertThatThrownBy(
-                        () ->
-                                runGenericTransformTest(
-                                        apiVersion,
-                                        Arrays.asList(
-                                                new TransformDef(
-                                                        "\\.*.\\.*.mytable1",
-                                                        "*, 'rule_1_matched' 
AS rule_1_matched",
-                                                        "id > 0",
-                                                        null,
-                                                        "id",
-                                                        null,
-                                                        null,
-                                                        null),
-                                                new TransformDef(
-                                                        "\\.*.\\.*.\\.*",
-                                                        "*, 'rule_fallback' AS 
rule_fallback",
-                                                        null,
-                                                        null,
-                                                        "id",
-                                                        null,
-                                                        null,
-                                                        null)),
-                                        Collections.emptyList()))
-                .rootCause()
-                .isExactlyInstanceOf(IllegalArgumentException.class)
-                .hasMessage(
-                        "Trying to merge transformed schemas [columns={`id` 
INT,`name` STRING,`age` INT,`rule_1_matched` STRING}, primaryKeys=id, 
partitionKeys=id, options=(), columns={`id` INT,`name` STRING,`age` 
INT,`rule_fallback` STRING}, primaryKeys=id, partitionKeys=id, options=()], but 
got more than one column name views: [[id, name, age, rule_1_matched], [id, 
name, age, rule_fallback]]");
+    void testTransformWithFallbackRules(ValuesDataSink.SinkApi apiVersion) 
throws Exception {
+        runGenericTransformTest(
+                apiVersion,
+                Arrays.asList(
+                        new TransformDef(
+                                "\\.*.\\.*.mytable1",
+                                "*, 'rule_1_matched' AS rule_1_matched",
+                                null,
+                                null,
+                                "id",
+                                null,
+                                null,
+                                null),
+                        new TransformDef(
+                                "\\.*.\\.*.\\.*",
+                                "*, 'rule_fallback' AS rule_fallback",
+                                null,
+                                null,
+                                "id",
+                                null,
+                                null,
+                                null)),
+                Arrays.asList(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`rule_1_matched` 
STRING}, primaryKeys=id, partitionKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, rule_1_matched], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, rule_1_matched], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, rule_1_matched], after=[2, Bob, 30, rule_1_matched], op=UPDATE, 
meta=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`rule_fallback` STRING}, primaryKeys=id, 
partitionKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, rule_fallback], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, rule_fallback], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, rule_fallback], after=[], op=DELETE, meta=()}"));
     }
 
     @ParameterizedTest
     @EnumSource
-    void testTransformWithFallbackRules(ValuesDataSink.SinkApi apiVersion) 
throws Exception {
+    void testTransformFilterWithFallbackRules(ValuesDataSink.SinkApi 
apiVersion) throws Exception {
         runGenericTransformTest(
                 apiVersion,
                 Arrays.asList(
                         new TransformDef(
                                 "\\.*.\\.*.mytable1",
                                 "*, 'rule_1_matched' AS rule_1_matched",
-                                null,
+                                "id > 1",
                                 null,
                                 "id",
                                 null,
@@ -1294,7 +1087,6 @@ class FlinkPipelineTransformITCase {
                                 null)),
                 Arrays.asList(
                         
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`rule_1_matched` 
STRING}, primaryKeys=id, partitionKeys=id, options=()}",
-                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18, rule_1_matched], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20, rule_1_matched], op=INSERT, meta=()}",
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20, rule_1_matched], after=[2, Bob, 30, rule_1_matched], op=UPDATE, 
meta=()}",
                         
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` 
TINYINT,`description` STRING,`rule_fallback` STRING}, primaryKeys=id, 
partitionKeys=id, options=()}",
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
index 5410eb903..766621971 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
@@ -270,11 +270,7 @@ class SchemaEvolvingTransformE2eITCase extends 
PipelineTestEnvironment {
                                 + (triggerError ? "  error.on.schema.change: 
true\n" : "\n")
                                 + "transform:\n"
                                 + "  - source-table: %s.\\.*\n"
-                                + "    projection: CAST(id AS VARCHAR) || ' -> 
' || name AS uid, *, id * id AS id_square, 'age < 20' as tag\n"
-                                + "    filter: age < 20\n"
-                                + "  - source-table: %s.\\.*\n"
-                                + "    projection: CAST(id AS VARCHAR) || ' -> 
' || name AS uid, *, 0 - id * id AS id_square, 'age >= 20' as tag\n"
-                                + "    filter: age >= 20\n"
+                                + "    projection: CAST(id AS VARCHAR) || ' -> 
' || name AS uid, *, CASE WHEN age < 20 THEN id * id WHEN age >= 20 THEN 0 - id 
* id END AS id_square, CASE WHEN age < 20 THEN 'age < 20' WHEN age >= 20 THEN 
'age >= 20' END as tag\n"
                                 + (mergeTable
                                         ? String.format(
                                                 "route:\n"
@@ -292,7 +288,6 @@ class SchemaEvolvingTransformE2eITCase extends 
PipelineTestEnvironment {
                         dbName,
                         mergeTable ? "(members|new_members)" : "members",
                         dbName,
-                        dbName,
                         behavior,
                         parallelism);
         submitPipelineJob(pipelineJob);
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index 925bc6059..6b2e7b6dd 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -188,11 +188,7 @@ class TransformE2eITCase extends PipelineTestEnvironment {
                                 + "\n"
                                 + "transform:\n"
                                 + "  - source-table: %s.\\.*\n"
-                                + "    projection: ID, VERSION, 'Type-A' AS 
CATEGORY\n"
-                                + "    filter: ID > 1008\n"
-                                + "  - source-table: %s.\\.*\n"
-                                + "    projection: ID, VERSION, 'Type-B' AS 
CATEGORY\n"
-                                + "    filter: ID <= 1008\n"
+                                + "    projection: ID, VERSION, CASE WHEN ID > 
1008 THEN 'Type-A' WHEN ID <= 1008 THEN 'Type-B' END AS CATEGORY\n"
                                 + "\n"
                                 + "pipeline:\n"
                                 + "  execution.runtime-mode: %s\n"
@@ -203,7 +199,6 @@ class TransformE2eITCase extends PipelineTestEnvironment {
                         startupMode,
                         transformTestDatabase.getDatabaseName(),
                         transformTestDatabase.getDatabaseName(),
-                        transformTestDatabase.getDatabaseName(),
                         runtimeMode,
                         parallelism);
         submitPipelineJob(pipelineJob);
@@ -612,11 +607,7 @@ class TransformE2eITCase extends PipelineTestEnvironment {
                                 + "  type: values\n"
                                 + "transform:\n"
                                 + "  - source-table: %s.TABLEALPHA\n"
-                                + "    projection: ID, VERSION, PRICEALPHA, 
AGEALPHA, 'Juvenile' AS ROLENAME\n"
-                                + "    filter: AGEALPHA < 18\n"
-                                + "  - source-table: %s.TABLEALPHA\n"
-                                + "    projection: ID, VERSION, PRICEALPHA, 
AGEALPHA, NAMEALPHA AS ROLENAME\n"
-                                + "    filter: AGEALPHA >= 18\n"
+                                + "    projection: ID, VERSION, PRICEALPHA, 
AGEALPHA, CASE WHEN AGEALPHA < 18 THEN 'Juvenile' WHEN AGEALPHA >= 18 THEN 
NAMEALPHA END AS ROLENAME\n"
                                 + "pipeline:\n"
                                 + "  execution.runtime-mode: %s\n"
                                 + "  parallelism: %d",
@@ -626,7 +617,6 @@ class TransformE2eITCase extends PipelineTestEnvironment {
                         startupMode,
                         transformTestDatabase.getDatabaseName(),
                         transformTestDatabase.getDatabaseName(),
-                        transformTestDatabase.getDatabaseName(),
                         runtimeMode,
                         parallelism);
         submitPipelineJob(pipelineJob);
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index 36e348e09..8683a07b8 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -32,7 +32,6 @@ import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.schema.Selectors;
 import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext;
-import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
 import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter;
 import 
org.apache.flink.cdc.runtime.operators.transform.converter.PostTransformConverters;
@@ -43,6 +42,9 @@ import 
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
+import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
 import 
org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable;
 import org.apache.flink.shaded.guava31.com.google.common.collect.Table;
 
@@ -56,7 +58,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
 
@@ -88,6 +89,8 @@ public class PostTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
             projectionProcessors;
     private transient Table<TableId, PostTransformer, 
TransformFilterProcessor> filterProcessors;
 
+    private transient LoadingCache<TableId, Optional<PostTransformer>> 
transformersCache;
+
     public static PostTransformOperatorBuilder newBuilder() {
         return new PostTransformOperatorBuilder();
     }
@@ -116,6 +119,16 @@ public class PostTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
         initializeUdf();
 
         this.transformers = createTransformers();
+        this.transformersCache =
+                CacheBuilder.newBuilder()
+                        .maximumSize(1024)
+                        .build(
+                                new CacheLoader<>() {
+                                    @Override
+                                    public Optional<PostTransformer> 
load(TableId tableId) {
+                                        return 
getEffectiveTransformer(tableId);
+                                    }
+                                });
     }
 
     @Override
@@ -162,26 +175,26 @@ public class PostTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
 
         ChangeEvent changeEvent = (ChangeEvent) event;
         TableId tableId = changeEvent.tableId();
-        List<PostTransformer> transformers = getEffectiveTransformers(tableId);
+        Optional<PostTransformer> transformer = 
transformersCache.getUnchecked(tableId);
 
         // Short-circuit if there's no effective transformers.
-        if (transformers.isEmpty()) {
+        if (transformer.isEmpty()) {
             output.collect(element);
             return;
         }
 
         if (event instanceof CreateTableEvent) {
-            processCreateTableEvent((CreateTableEvent) event, transformers)
+            processCreateTableEvent((CreateTableEvent) event, 
transformer.get())
                     .map(StreamRecord::new)
                     .ifPresent(output::collect);
             invalidateCache(tableId);
         } else if (event instanceof SchemaChangeEvent) {
-            processSchemaChangeEvent((SchemaChangeEvent) event, transformers)
+            processSchemaChangeEvent((SchemaChangeEvent) event, 
transformer.get())
                     .map(StreamRecord::new)
                     .ifPresent(output::collect);
             invalidateCache(tableId);
         } else if (event instanceof DataChangeEvent) {
-            processDataChangeEvent((DataChangeEvent) event, transformers)
+            processDataChangeEvent((DataChangeEvent) event, transformer.get())
                     .map(StreamRecord::new)
                     .ifPresent(output::collect);
         } else {
@@ -197,18 +210,12 @@ public class PostTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
      * Apply effective transform rules to {@link CreateTableEvent}s based on 
effective transformers.
      */
     private Optional<Event> processCreateTableEvent(
-            CreateTableEvent event, List<PostTransformer> 
effectiveTransformers) {
+            CreateTableEvent event, PostTransformer effectiveTransformer) {
         TableId tableId = event.tableId();
         Schema preSchema = event.getSchema();
 
-        // Apply transform rules and verify we can get a deterministic post 
schema
-        List<Schema> schemas =
-                effectiveTransformers.stream()
-                        .map(trans -> transformSchema(preSchema, trans))
-                        .collect(Collectors.toList());
-
         Schema postSchema =
-                
SchemaUtils.ensurePkNonNull(SchemaMergingUtils.strictlyMergeSchemas(schemas));
+                SchemaUtils.ensurePkNonNull(transformSchema(preSchema, 
effectiveTransformer));
 
         // Update transform info map
         postTransformInfoMap.put(
@@ -216,11 +223,10 @@ public class PostTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
 
         // Update "if-table-has-been–wildcard–matched" map
         boolean wildcardMatched =
-                effectiveTransformers.stream()
-                        .map(PostTransformer::getProjection)
-                        .flatMap(this::optionalToStream)
-                        .map(TransformProjection::getProjection)
-                        .anyMatch(TransformParser::hasAsterisk);
+                effectiveTransformer.getProjection().isPresent()
+                        && TransformParser.hasAsterisk(
+                                
effectiveTransformer.getProjection().get().getProjection());
+
         hasAsteriskMap.put(tableId, wildcardMatched);
         projectedColumnsMap.put(
                 tableId,
@@ -236,7 +242,7 @@ public class PostTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
      * transformers and existing {@link PostTransformChangeInfo}.
      */
     private Optional<Event> processSchemaChangeEvent(
-            SchemaChangeEvent event, List<PostTransformer> 
effectiveTransformers) {
+            SchemaChangeEvent event, PostTransformer effectiveTransformer) {
         TableId tableId = event.tableId();
         PostTransformChangeInfo info = 
checkNotNull(postTransformInfoMap.get(tableId));
 
@@ -244,14 +250,8 @@ public class PostTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
         Schema prevPreSchema = info.getPreTransformedSchema();
         Schema nextPreSchema = 
SchemaUtils.applySchemaChangeEvent(prevPreSchema, event);
 
-        // Apply transform rules and verify we can get a deterministic post 
schema
-        List<Schema> schemas =
-                effectiveTransformers.stream()
-                        .map(trans -> transformSchema(nextPreSchema, trans))
-                        .collect(Collectors.toList());
-
         Schema nextPostSchema =
-                
SchemaUtils.ensurePkNonNull(SchemaMergingUtils.strictlyMergeSchemas(schemas));
+                SchemaUtils.ensurePkNonNull(transformSchema(nextPreSchema, 
effectiveTransformer));
 
         // Update transform info map
         postTransformInfoMap.put(
@@ -274,7 +274,7 @@ public class PostTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
 
     /** Apply projection rules to given {@link DataChangeEvent}. */
     private Optional<Event> processDataChangeEvent(
-            DataChangeEvent event, List<PostTransformer> 
effectiveTransformers) {
+            DataChangeEvent event, PostTransformer effectiveTransformer) {
         TableId tableId = event.tableId();
         PostTransformChangeInfo info = 
checkNotNull(postTransformInfoMap.get(tableId));
 
@@ -285,50 +285,39 @@ public class PostTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
 
         String beforeOp = event.opTypeString(false);
         String afterOp = event.opTypeString(true);
-
-        for (PostTransformer transformer : effectiveTransformers) {
-            TransformProjectionProcessor projectionProcessor =
-                    getProjectionProcessor(tableId, transformer);
-            TransformFilterProcessor filterProcessor = 
getFilterProcessor(tableId, transformer);
-
-            RecordData beforeRow = null;
-            RecordData afterRow = null;
-            boolean filterPassed = true;
-
-            if (event.before() != null) {
-                context.opType = beforeOp;
-                Tuple2<BinaryRecordData, Boolean> result =
-                        transformRecord(
-                                event.before(),
-                                info,
-                                projectionProcessor,
-                                filterProcessor,
-                                context);
-                beforeRow = result.f0;
-                filterPassed = result.f1;
-            }
-
-            if (event.after() != null) {
-                context.opType = afterOp;
-                Tuple2<BinaryRecordData, Boolean> result =
-                        transformRecord(
-                                event.after(), info, projectionProcessor, 
filterProcessor, context);
-                afterRow = result.f0;
-                filterPassed = result.f1;
-            }
-
-            if (filterPassed) {
-                DataChangeEvent finalEvent =
-                        DataChangeEvent.projectRecords(event, beforeRow, 
afterRow);
-                if (transformer.getPostTransformConverter().isPresent()) {
-                    return transformer
-                            .getPostTransformConverter()
-                            .get()
-                            .convert(finalEvent)
-                            .map(Event.class::cast);
-                } else {
-                    return Optional.of(finalEvent);
-                }
+        TransformProjectionProcessor projectionProcessor =
+                getProjectionProcessor(tableId, effectiveTransformer);
+        TransformFilterProcessor filterProcessor =
+                getFilterProcessor(tableId, effectiveTransformer);
+        RecordData beforeRow = null;
+        RecordData afterRow = null;
+        boolean filterPassed = true;
+        if (event.before() != null) {
+            context.opType = beforeOp;
+            Tuple2<BinaryRecordData, Boolean> result =
+                    transformRecord(
+                            event.before(), info, projectionProcessor, 
filterProcessor, context);
+            beforeRow = result.f0;
+            filterPassed = result.f1;
+        }
+        if (event.after() != null) {
+            context.opType = afterOp;
+            Tuple2<BinaryRecordData, Boolean> result =
+                    transformRecord(
+                            event.after(), info, projectionProcessor, 
filterProcessor, context);
+            afterRow = result.f0;
+            filterPassed = result.f1;
+        }
+        if (filterPassed) {
+            DataChangeEvent finalEvent = DataChangeEvent.projectRecords(event, 
beforeRow, afterRow);
+            if (effectiveTransformer.getPostTransformConverter().isPresent()) {
+                return effectiveTransformer
+                        .getPostTransformConverter()
+                        .get()
+                        .convert(finalEvent)
+                        .map(Event.class::cast);
+            } else {
+                return Optional.of(finalEvent);
             }
         }
 
@@ -397,22 +386,14 @@ public class PostTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
     // Convenience methods for coping with transient fields.
     // -------------------
 
-    /** Obtain effective transformers based on given {@link TableId}. */
-    private List<PostTransformer> getEffectiveTransformers(TableId tableId) {
-        List<PostTransformer> effectiveTransformers = new ArrayList<>();
+    /** Obtain effective transformer based on given {@link TableId}. */
+    private Optional<PostTransformer> getEffectiveTransformer(TableId tableId) 
{
         for (PostTransformer transformer : transformers) {
             if (transformer.getSelectors().isMatch(tableId)) {
-                effectiveTransformers.add(transformer);
-
-                // Transform module works with "First-match" rule. If we have 
met an uncondition
-                // transform rule (without any filtering expression), then any 
following transform
-                // rule will not be effective.
-                if (!transformer.getFilter().isPresent()) {
-                    break;
-                }
+                return Optional.of(transformer);
             }
         }
-        return effectiveTransformers;
+        return Optional.empty();
     }
 
     /**
@@ -546,10 +527,4 @@ public class PostTransformOperator extends 
AbstractStreamOperatorAdapter<Event>
         udfDescriptors.clear();
         udfFunctionInstances.clear();
     }
-
-    /** Backport of {@code Optional#stream} before Java 11. */
-    @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
-    private <T> Stream<T> optionalToStream(Optional<T> optional) {
-        return optional.map(Stream::of).orElseGet(Stream::empty);
-    }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 22e0cfbd3..ddd5aeb76 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -468,112 +468,6 @@ class PostTransformOperatorTest {
         transformFunctionEventEventOperatorTestHarness.close();
     }
 
-    @Test
-    void testDataChangeEventTransformTwice() throws Exception {
-        PostTransformOperator transform =
-                PostTransformOperator.newBuilder()
-                        .addTransform(
-                                CUSTOMERS_TABLEID.identifier(),
-                                "*, concat(col1, '1') col12",
-                                "col1 = '1'")
-                        .addTransform(
-                                CUSTOMERS_TABLEID.identifier(),
-                                "*, concat(col1, '2') col12",
-                                "col1 = '2'")
-                        .build();
-        RegularEventOperatorTestHarness<PostTransformOperator, Event>
-                transformFunctionEventEventOperatorTestHarness =
-                        RegularEventOperatorTestHarness.with(transform, 1);
-        // Initialization
-        transformFunctionEventEventOperatorTestHarness.open();
-        // Create table
-        CreateTableEvent createTableEvent =
-                new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA);
-        BinaryRecordDataGenerator recordDataGenerator =
-                new BinaryRecordDataGenerator(((RowType) 
CUSTOMERS_SCHEMA.toRowDataType()));
-        // Insert
-        DataChangeEvent insertEvent =
-                DataChangeEvent.insertEvent(
-                        CUSTOMERS_TABLEID,
-                        recordDataGenerator.generate(
-                                new Object[] {
-                                    new BinaryStringData("1"), new 
BinaryStringData("2"), null
-                                }));
-        DataChangeEvent insertEventExpect =
-                DataChangeEvent.insertEvent(
-                        CUSTOMERS_TABLEID,
-                        recordDataGenerator.generate(
-                                new Object[] {
-                                    new BinaryStringData("1"),
-                                    new BinaryStringData("2"),
-                                    new BinaryStringData("11")
-                                }));
-        // Insert
-        DataChangeEvent insertEvent2 =
-                DataChangeEvent.insertEvent(
-                        CUSTOMERS_TABLEID,
-                        recordDataGenerator.generate(
-                                new Object[] {
-                                    new BinaryStringData("2"), new 
BinaryStringData("2"), null
-                                }));
-        DataChangeEvent insertEvent2Expect =
-                DataChangeEvent.insertEvent(
-                        CUSTOMERS_TABLEID,
-                        recordDataGenerator.generate(
-                                new Object[] {
-                                    new BinaryStringData("2"),
-                                    new BinaryStringData("2"),
-                                    new BinaryStringData("22")
-                                }));
-        // Update
-        DataChangeEvent updateEvent =
-                DataChangeEvent.updateEvent(
-                        CUSTOMERS_TABLEID,
-                        recordDataGenerator.generate(
-                                new Object[] {
-                                    new BinaryStringData("1"), new 
BinaryStringData("2"), null
-                                }),
-                        recordDataGenerator.generate(
-                                new Object[] {
-                                    new BinaryStringData("1"), new 
BinaryStringData("3"), null
-                                }));
-        DataChangeEvent updateEventExpect =
-                DataChangeEvent.updateEvent(
-                        CUSTOMERS_TABLEID,
-                        recordDataGenerator.generate(
-                                new Object[] {
-                                    new BinaryStringData("1"),
-                                    new BinaryStringData("2"),
-                                    new BinaryStringData("11")
-                                }),
-                        recordDataGenerator.generate(
-                                new Object[] {
-                                    new BinaryStringData("1"),
-                                    new BinaryStringData("3"),
-                                    new BinaryStringData("11")
-                                }));
-
-        transform.processElement(new StreamRecord<>(createTableEvent));
-        Assertions.assertThat(
-                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
-                .isEqualTo(
-                        new StreamRecord<>(
-                                new CreateTableEvent(CUSTOMERS_TABLEID, 
CUSTOMERS_SCHEMA)));
-        transform.processElement(new StreamRecord<>(insertEvent));
-        Assertions.assertThat(
-                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
-                .isEqualTo(new StreamRecord<>(insertEventExpect));
-        transform.processElement(new StreamRecord<>(insertEvent2));
-        Assertions.assertThat(
-                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
-                .isEqualTo(new StreamRecord<>(insertEvent2Expect));
-        transform.processElement(new StreamRecord<>(updateEvent));
-        Assertions.assertThat(
-                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
-                .isEqualTo(new StreamRecord<>(updateEventExpect));
-        transformFunctionEventEventOperatorTestHarness.close();
-    }
-
     @Test
     void testDataChangeEventTransformProjectionDataTypeConvert() throws 
Exception {
         PostTransformOperator transform =
@@ -1259,21 +1153,12 @@ class PostTransformOperatorTest {
                         .addTransform(
                                 TIMESTAMPDIFF_TABLEID.identifier(),
                                 "col1, TIMESTAMPDIFF(SECOND, LOCALTIMESTAMP, 
CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as second_diff,"
-                                        + " TIMESTAMPDIFF(MINUTE, 
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as minute_diff,"
-                                        + " TIMESTAMPDIFF(HOUR, 
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as hour_diff,"
-                                        + " TIMESTAMPDIFF(DAY, LOCALTIMESTAMP, 
CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as day_diff,"
-                                        + " TIMESTAMPDIFF(MONTH, 
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as month_diff,"
-                                        + " TIMESTAMPDIFF(YEAR, 
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as year_diff",
-                                "col1='1'")
-                        .addTransform(
-                                TIMESTAMPDIFF_TABLEID.identifier(),
-                                "col1, TIMESTAMPDIFF(SECOND, LOCALTIMESTAMP, 
CAST(NOW() AS TIMESTAMP)) as second_diff,"
-                                        + " TIMESTAMPDIFF(MINUTE, 
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as minute_diff,"
-                                        + " TIMESTAMPDIFF(HOUR, 
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as hour_diff,"
-                                        + " TIMESTAMPDIFF(DAY, LOCALTIMESTAMP, 
CAST(NOW() AS TIMESTAMP)) as day_diff,"
-                                        + " TIMESTAMPDIFF(MONTH, 
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as month_diff,"
-                                        + " TIMESTAMPDIFF(YEAR, 
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as year_diff",
-                                "col1='2'")
+                                        + " CASE WHEN col1='1' THEN 
TIMESTAMPDIFF(MINUTE, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) 
WHEN col1='2' THEN TIMESTAMPDIFF(MINUTE, LOCALTIMESTAMP, CAST(NOW() AS 
TIMESTAMP)) END as minute_diff,"
+                                        + " CASE WHEN col1='1' THEN 
TIMESTAMPDIFF(HOUR, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) WHEN 
col1='2' THEN TIMESTAMPDIFF(HOUR, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) END 
as hour_diff,"
+                                        + " CASE WHEN col1='1' THEN 
TIMESTAMPDIFF(DAY, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) WHEN 
col1='2' THEN TIMESTAMPDIFF(DAY, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) END 
as day_diff,"
+                                        + " CASE WHEN col1='1' THEN 
TIMESTAMPDIFF(MONTH, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) WHEN 
col1='2' THEN TIMESTAMPDIFF(MONTH, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) 
END as month_diff,"
+                                        + " CASE WHEN col1='1' THEN 
TIMESTAMPDIFF(YEAR, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) WHEN 
col1='2' THEN TIMESTAMPDIFF(YEAR, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) END 
as year_diff",
+                                null)
                         .addTimezone("Asia/Shanghai")
                         .build();
         RegularEventOperatorTestHarness<PostTransformOperator, Event>
@@ -1336,51 +1221,11 @@ class PostTransformOperatorTest {
                         .addTransform(
                                 TIMESTAMPDIFF_DATA_TABLEID.identifier(),
                                 "col1, time_interval_unit,"
-                                        + " TIMESTAMPDIFF(SECOND, 
start_timestamp, end_timestamp) as timestamp_timestamp,"
-                                        + " TIMESTAMPDIFF(SECOND, 
start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz,"
-                                        + " TIMESTAMPDIFF(SECOND, 
start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp,"
-                                        + " TIMESTAMPDIFF(SECOND, 
start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz",
-                                "time_interval_unit='SECOND'")
-                        .addTransform(
-                                TIMESTAMPDIFF_DATA_TABLEID.identifier(),
-                                "col1, time_interval_unit,"
-                                        + " TIMESTAMPDIFF(MINUTE, 
start_timestamp, end_timestamp) as timestamp_timestamp,"
-                                        + " TIMESTAMPDIFF(MINUTE, 
start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz,"
-                                        + " TIMESTAMPDIFF(MINUTE, 
start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp,"
-                                        + " TIMESTAMPDIFF(MINUTE, 
start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz",
-                                "time_interval_unit='MINUTE'")
-                        .addTransform(
-                                TIMESTAMPDIFF_DATA_TABLEID.identifier(),
-                                "col1, time_interval_unit,"
-                                        + " TIMESTAMPDIFF(HOUR, 
start_timestamp, end_timestamp) as timestamp_timestamp,"
-                                        + " TIMESTAMPDIFF(HOUR, 
start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz,"
-                                        + " TIMESTAMPDIFF(HOUR, 
start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp,"
-                                        + " TIMESTAMPDIFF(HOUR, 
start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz",
-                                "time_interval_unit='HOUR'")
-                        .addTransform(
-                                TIMESTAMPDIFF_DATA_TABLEID.identifier(),
-                                "col1, time_interval_unit,"
-                                        + " TIMESTAMPDIFF(DAY, 
start_timestamp, end_timestamp) as timestamp_timestamp,"
-                                        + " TIMESTAMPDIFF(DAY, 
start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz,"
-                                        + " TIMESTAMPDIFF(DAY, 
start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp,"
-                                        + " TIMESTAMPDIFF(DAY, 
start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz",
-                                "time_interval_unit='DAY'")
-                        .addTransform(
-                                TIMESTAMPDIFF_DATA_TABLEID.identifier(),
-                                "col1, time_interval_unit,"
-                                        + " TIMESTAMPDIFF(MONTH, 
start_timestamp, end_timestamp) as timestamp_timestamp,"
-                                        + " TIMESTAMPDIFF(MONTH, 
start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz,"
-                                        + " TIMESTAMPDIFF(MONTH, 
start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp,"
-                                        + " TIMESTAMPDIFF(MONTH, 
start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz",
-                                "time_interval_unit='MONTH'")
-                        .addTransform(
-                                TIMESTAMPDIFF_DATA_TABLEID.identifier(),
-                                "col1, time_interval_unit,"
-                                        + " TIMESTAMPDIFF(YEAR, 
start_timestamp, end_timestamp) as timestamp_timestamp,"
-                                        + " TIMESTAMPDIFF(YEAR, 
start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz,"
-                                        + " TIMESTAMPDIFF(YEAR, 
start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp,"
-                                        + " TIMESTAMPDIFF(YEAR, 
start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz",
-                                "time_interval_unit='YEAR'")
+                                        + " CASE WHEN 
time_interval_unit='SECOND' THEN TIMESTAMPDIFF(SECOND, start_timestamp, 
end_timestamp) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPDIFF(MINUTE, 
start_timestamp, end_timestamp)  WHEN time_interval_unit='HOUR' THEN 
TIMESTAMPDIFF(HOUR, start_timestamp, end_timestamp) WHEN 
time_interval_unit='DAY' THEN TIMESTAMPDIFF(DAY, start_timestamp, 
end_timestamp) WHEN time_interval_unit='MONTH' THEN TIMESTAMPDIFF(MONTH, 
start_timestamp, end_times [...]
+                                        + " CASE WHEN 
time_interval_unit='SECOND' THEN TIMESTAMPDIFF(SECOND, start_timestamp, 
end_timestamp_ltz) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPDIFF(MINUTE, 
start_timestamp, end_timestamp_ltz) WHEN time_interval_unit='HOUR' THEN 
TIMESTAMPDIFF(HOUR, start_timestamp, end_timestamp_ltz) WHEN 
time_interval_unit='DAY' THEN TIMESTAMPDIFF(DAY, start_timestamp, 
end_timestamp_ltz) WHEN time_interval_unit='MONTH' THEN TIMESTAMPDIFF(MONTH, 
start_times [...]
+                                        + " CASE WHEN 
time_interval_unit='SECOND' THEN TIMESTAMPDIFF(SECOND, start_timestamp_ltz, 
end_timestamp) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPDIFF(MINUTE, 
start_timestamp_ltz, end_timestamp) WHEN time_interval_unit='HOUR' THEN 
TIMESTAMPDIFF(HOUR, start_timestamp_ltz, end_timestamp) WHEN 
time_interval_unit='DAY' THEN TIMESTAMPDIFF(DAY, start_timestamp_ltz, 
end_timestamp) WHEN time_interval_unit='MONTH' THEN TIMESTAMPDIFF(MONTH, 
start_times [...]
+                                        + " CASE WHEN 
time_interval_unit='SECOND' THEN TIMESTAMPDIFF(SECOND, start_timestamp_ltz, 
end_timestamp_ltz) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPDIFF(MINUTE, 
start_timestamp_ltz, end_timestamp_ltz) WHEN time_interval_unit='HOUR' THEN 
TIMESTAMPDIFF(HOUR, start_timestamp_ltz, end_timestamp_ltz) WHEN 
time_interval_unit='DAY' THEN TIMESTAMPDIFF(DAY, start_timestamp_ltz, 
end_timestamp_ltz) WHEN time_interval_unit='MONTH' THEN TIMESTAMPDIFF(MO [...]
+                                null)
                         .addTimezone("UTC")
                         .build();
         RegularEventOperatorTestHarness<PostTransformOperator, Event>
@@ -1676,22 +1521,14 @@ class PostTransformOperatorTest {
                 PostTransformOperator.newBuilder()
                         .addTransform(
                                 TIMESTAMPADD_TABLEID.identifier(),
-                                "col1, DATE_FORMAT(TIMESTAMPADD(SECOND, 1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as second_add,"
-                                        + " DATE_FORMAT(TIMESTAMPADD(MINUTE, 
1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as minute_add,"
-                                        + " DATE_FORMAT(TIMESTAMPADD(HOUR, 1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as hour_add,"
-                                        + " DATE_FORMAT(TIMESTAMPADD(DAY, 1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as day_add,"
-                                        + " DATE_FORMAT(TIMESTAMPADD(MONTH, 1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as month_add,"
-                                        + " DATE_FORMAT(TIMESTAMPADD(YEAR, 1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as year_add",
-                                "col1='1'")
-                        .addTransform(
-                                TIMESTAMPADD_TABLEID.identifier(),
-                                "col1, DATE_FORMAT(TIMESTAMPADD(SECOND, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as second_add,"
-                                        + " DATE_FORMAT(TIMESTAMPADD(MINUTE, 
-1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as minute_add,"
-                                        + " DATE_FORMAT(TIMESTAMPADD(HOUR, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as hour_add,"
-                                        + " DATE_FORMAT(TIMESTAMPADD(DAY, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as day_add,"
-                                        + " DATE_FORMAT(TIMESTAMPADD(MONTH, 
-1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as month_add,"
-                                        + " DATE_FORMAT(TIMESTAMPADD(YEAR, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as year_add",
-                                "col1='2'")
+                                "col1, "
+                                        + "CASE WHEN col1='1' THEN 
DATE_FORMAT(TIMESTAMPADD(SECOND, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 
'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(SECOND, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as second_add,"
+                                        + "CASE WHEN col1='1' THEN 
DATE_FORMAT(TIMESTAMPADD(MINUTE, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 
'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(MINUTE, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as minute_add,"
+                                        + "CASE WHEN col1='1' THEN 
DATE_FORMAT(TIMESTAMPADD(HOUR, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 
'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(HOUR, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as hour_add,"
+                                        + "CASE WHEN col1='1' THEN 
DATE_FORMAT(TIMESTAMPADD(DAY, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 
'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(DAY, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as day_add,"
+                                        + "CASE WHEN col1='1' THEN 
DATE_FORMAT(TIMESTAMPADD(MONTH, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 
'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(MONTH, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as month_add,"
+                                        + "CASE WHEN col1='1' THEN 
DATE_FORMAT(TIMESTAMPADD(YEAR, 1, TO_TIMESTAMP('2024-10-01 00:00:00')), 
'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(YEAR, -1, 
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as year_add",
+                                null)
                         .addTimezone("UTC")
                         .build();
         RegularEventOperatorTestHarness<PostTransformOperator, Event>
@@ -1771,39 +1608,9 @@ class PostTransformOperatorTest {
                         .addTransform(
                                 TIMESTAMPADD_DATA_TABLEID.identifier(),
                                 "col1, time_interval_unit, interval_value,"
-                                        + " TIMESTAMPADD(SECOND, 
interval_value, time_point_timestamp) as time_point_timestamp,"
-                                        + " TIMESTAMPADD(SECOND, 
interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz",
-                                "time_interval_unit='SECOND'")
-                        .addTransform(
-                                TIMESTAMPADD_DATA_TABLEID.identifier(),
-                                "col1, time_interval_unit, interval_value,"
-                                        + " TIMESTAMPADD(MINUTE, 
interval_value, time_point_timestamp) as time_point_timestamp,"
-                                        + " TIMESTAMPADD(MINUTE, 
interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz",
-                                "time_interval_unit='MINUTE'")
-                        .addTransform(
-                                TIMESTAMPADD_DATA_TABLEID.identifier(),
-                                "col1, time_interval_unit, interval_value,"
-                                        + " TIMESTAMPADD(HOUR, interval_value, 
time_point_timestamp) as time_point_timestamp,"
-                                        + " TIMESTAMPADD(HOUR, interval_value, 
time_point_timestamp_ltz) as time_point_timestamp_ltz",
-                                "time_interval_unit='HOUR'")
-                        .addTransform(
-                                TIMESTAMPADD_DATA_TABLEID.identifier(),
-                                "col1, time_interval_unit, interval_value,"
-                                        + " TIMESTAMPADD(DAY, interval_value, 
time_point_timestamp) as time_point_timestamp,"
-                                        + " TIMESTAMPADD(DAY, interval_value, 
time_point_timestamp_ltz) as time_point_timestamp_ltz",
-                                "time_interval_unit='DAY'")
-                        .addTransform(
-                                TIMESTAMPADD_DATA_TABLEID.identifier(),
-                                "col1, time_interval_unit, interval_value,"
-                                        + " TIMESTAMPADD(MONTH, 
interval_value, time_point_timestamp) as time_point_timestamp,"
-                                        + " TIMESTAMPADD(MONTH, 
interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz",
-                                "time_interval_unit='MONTH'")
-                        .addTransform(
-                                TIMESTAMPADD_DATA_TABLEID.identifier(),
-                                "col1, time_interval_unit, interval_value,"
-                                        + " TIMESTAMPADD(YEAR, interval_value, 
time_point_timestamp) as time_point_timestamp,"
-                                        + " TIMESTAMPADD(YEAR, interval_value, 
time_point_timestamp_ltz) as time_point_timestamp_ltz",
-                                "time_interval_unit='YEAR'")
+                                        + " CASE WHEN 
time_interval_unit='SECOND' THEN TIMESTAMPADD(SECOND, interval_value, 
time_point_timestamp) WHEN time_interval_unit='MINUTE' THEN 
TIMESTAMPADD(MINUTE, interval_value, time_point_timestamp) WHEN 
time_interval_unit='HOUR' THEN TIMESTAMPADD(HOUR, interval_value, 
time_point_timestamp) WHEN time_interval_unit='DAY' THEN TIMESTAMPADD(DAY, 
interval_value, time_point_timestamp) WHEN time_interval_unit='MONTH' THEN 
TIMESTAMPADD(MONTH, interval [...]
+                                        + " CASE WHEN 
time_interval_unit='SECOND' THEN TIMESTAMPADD(SECOND, interval_value, 
time_point_timestamp_ltz) WHEN time_interval_unit='MINUTE' THEN 
TIMESTAMPADD(MINUTE, interval_value, time_point_timestamp_ltz) WHEN 
time_interval_unit='HOUR' THEN TIMESTAMPADD(HOUR, interval_value, 
time_point_timestamp_ltz) WHEN time_interval_unit='DAY' THEN TIMESTAMPADD(DAY, 
interval_value, time_point_timestamp_ltz) WHEN time_interval_unit='MONTH' THEN 
TIMESTAMPADD [...]
+                                null)
                         .addTimezone("UTC")
                         .build();
         RegularEventOperatorTestHarness<PostTransformOperator, Event>
@@ -2135,153 +1942,18 @@ class PostTransformOperatorTest {
                         .addTransform(
                                 CAST_TABLEID.identifier(),
                                 "col1"
-                                        + ",cast(col1 as int) as castInt"
-                                        + ",cast(col1 as boolean) as 
castBoolean"
-                                        + ",cast(col1 as tinyint) as 
castTinyint"
-                                        + ",cast(col1 as smallint) as 
castSmallint"
-                                        + ",cast(col1 as bigint) as castBigint"
-                                        + ",cast(col1 as float) as castFloat"
-                                        + ",cast(col1 as double) as castDouble"
-                                        + ",cast(col1 as char) as castChar"
-                                        + ",cast(col1 as varchar) as 
castVarchar"
-                                        + ",cast(col1 as DECIMAL(4,2)) as 
castDecimal"
-                                        + ", castTimestamp",
-                                "col1 = '1'")
-                        .addTransform(
-                                CAST_TABLEID.identifier(),
-                                "col1"
-                                        + ",cast(castInt as int) as castInt"
-                                        + ",cast(castInt as boolean) as 
castBoolean"
-                                        + ",cast(castInt as tinyint) as 
castTinyint"
-                                        + ",cast(castInt as smallint) as 
castSmallint"
-                                        + ",cast(castInt as bigint) as 
castBigint"
-                                        + ",cast(castInt as float) as 
castFloat"
-                                        + ",cast(castInt as double) as 
castDouble"
-                                        + ",cast(castInt as char) as castChar"
-                                        + ",cast(castInt as varchar) as 
castVarchar"
-                                        + ",cast(castInt as DECIMAL(4,2)) as 
castDecimal"
-                                        + ", castTimestamp",
-                                "col1 = '2'")
-                        .addTransform(
-                                CAST_TABLEID.identifier(),
-                                "col1"
-                                        + ",cast(castBoolean as int) as 
castInt"
-                                        + ",cast(castBoolean as boolean) as 
castBoolean"
-                                        + ",cast(castBoolean as tinyint) as 
castTinyint"
-                                        + ",cast(castBoolean as smallint) as 
castSmallint"
-                                        + ",cast(castBoolean as bigint) as 
castBigint"
-                                        + ",castFloat"
-                                        + ",castDouble"
-                                        + ",cast(castBoolean as char) as 
castChar"
-                                        + ",cast(castBoolean as varchar) as 
castVarchar"
-                                        + ",castDecimal"
-                                        + ", castTimestamp",
-                                "col1 = '3'")
-                        .addTransform(
-                                CAST_TABLEID.identifier(),
-                                "col1"
-                                        + ",cast(castTinyint as int) as 
castInt"
-                                        + ",cast(castTinyint as boolean) as 
castBoolean"
-                                        + ",cast(castTinyint as tinyint) as 
castTinyint"
-                                        + ",cast(castTinyint as smallint) as 
castSmallint"
-                                        + ",cast(castTinyint as bigint) as 
castBigint"
-                                        + ",cast(castTinyint as float) as 
castFloat"
-                                        + ",cast(castTinyint as double) as 
castDouble"
-                                        + ",cast(castTinyint as char) as 
castChar"
-                                        + ",cast(castTinyint as varchar) as 
castVarchar"
-                                        + ",cast(castTinyint as DECIMAL(4,2)) 
as castDecimal"
-                                        + ", castTimestamp",
-                                "col1 = '4'")
-                        .addTransform(
-                                CAST_TABLEID.identifier(),
-                                "col1"
-                                        + ",cast(castSmallint as int) as 
castInt"
-                                        + ",cast(castSmallint as boolean) as 
castBoolean"
-                                        + ",cast(castSmallint as tinyint) as 
castTinyint"
-                                        + ",cast(castSmallint as smallint) as 
castSmallint"
-                                        + ",cast(castSmallint as bigint) as 
castBigint"
-                                        + ",cast(castSmallint as float) as 
castFloat"
-                                        + ",cast(castSmallint as double) as 
castDouble"
-                                        + ",cast(castSmallint as char) as 
castChar"
-                                        + ",cast(castSmallint as varchar) as 
castVarchar"
-                                        + ",cast(castSmallint as DECIMAL(4,2)) 
as castDecimal"
-                                        + ", castTimestamp",
-                                "col1 = '5'")
-                        .addTransform(
-                                CAST_TABLEID.identifier(),
-                                "col1"
-                                        + ",cast(castBigint as int) as castInt"
-                                        + ",cast(castBigint as boolean) as 
castBoolean"
-                                        + ",cast(castBigint as tinyint) as 
castTinyint"
-                                        + ",cast(castBigint as smallint) as 
castSmallint"
-                                        + ",cast(castBigint as bigint) as 
castBigint"
-                                        + ",cast(castBigint as float) as 
castFloat"
-                                        + ",cast(castBigint as double) as 
castDouble"
-                                        + ",cast(castBigint as char) as 
castChar"
-                                        + ",cast(castBigint as varchar) as 
castVarchar"
-                                        + ",cast(castBigint as DECIMAL(4,2)) 
as castDecimal"
-                                        + ", castTimestamp",
-                                "col1 = '6'")
-                        .addTransform(
-                                CAST_TABLEID.identifier(),
-                                "col1"
-                                        + ",castInt"
-                                        + ",cast(castFloat as boolean) as 
castBoolean"
-                                        + ",castTinyint"
-                                        + ",castSmallint"
-                                        + ",castBigint"
-                                        + ",cast(castFloat as float) as 
castFloat"
-                                        + ",cast(castFloat as double) as 
castDouble"
-                                        + ",cast(castFloat as char) as 
castChar"
-                                        + ",cast(castFloat as varchar) as 
castVarchar"
-                                        + ",cast(castFloat as DECIMAL(4,2)) as 
castDecimal"
-                                        + ", castTimestamp",
-                                "col1 = '7'")
-                        .addTransform(
-                                CAST_TABLEID.identifier(),
-                                "col1"
-                                        + ",castInt"
-                                        + ",cast(castDouble as boolean) as 
castBoolean"
-                                        + ",castTinyint"
-                                        + ",castSmallint"
-                                        + ",castBigint"
-                                        + ",cast(castDouble as float) as 
castFloat"
-                                        + ",cast(castDouble as double) as 
castDouble"
-                                        + ",cast(castDouble as char) as 
castChar"
-                                        + ",cast(castDouble as varchar) as 
castVarchar"
-                                        + ",cast(castDouble as DECIMAL(4,2)) 
as castDecimal"
-                                        + ", castTimestamp",
-                                "col1 = '8'")
-                        .addTransform(
-                                CAST_TABLEID.identifier(),
-                                "col1"
-                                        + ",castInt"
-                                        + ",cast(castDecimal as boolean) as 
castBoolean"
-                                        + ",castTinyint"
-                                        + ",castSmallint"
-                                        + ",castBigint"
-                                        + ",cast(castDecimal as float) as 
castFloat"
-                                        + ",cast(castDecimal as double) as 
castDouble"
-                                        + ",cast(castDecimal as char) as 
castChar"
-                                        + ",cast(castDecimal as varchar) as 
castVarchar"
-                                        + ",cast(castDecimal as DECIMAL(4,2)) 
as castDecimal"
-                                        + ", castTimestamp",
-                                "col1 = '9'")
-                        .addTransform(
-                                CAST_TABLEID.identifier(),
-                                "col1"
-                                        + ",castInt"
-                                        + ",castBoolean"
-                                        + ",castTinyint"
-                                        + ",castSmallint"
-                                        + ",castBigint"
-                                        + ",castFloat"
-                                        + ",castDouble"
-                                        + ",castChar"
-                                        + ",cast(castTimestamp as varchar) as 
castVarchar"
-                                        + ",castDecimal"
-                                        + ",cast('1970-01-01T00:00:01.234' as 
TIMESTAMP(3)) as castTimestamp",
-                                "col1 = '10'")
+                                        + ",CASE WHEN col1 = '1' THEN 
cast(col1 as int) WHEN col1 = '2' THEN cast(castInt as int) WHEN col1 = '3' 
THEN cast(castBoolean as int) WHEN col1 = '4' THEN cast(castTinyint as int) 
WHEN col1 = '5' THEN cast(castSmallint as int) WHEN col1 = '6' THEN 
cast(castBigint as int) WHEN col1 = '7' THEN castInt WHEN col1 = '8' THEN 
castInt WHEN col1 = '9' THEN castInt WHEN col1 = '10' THEN castInt END as 
castInt"
+                                        + ",CASE WHEN col1 = '1' THEN 
cast(col1 as boolean) WHEN col1 = '2' THEN cast(castInt as boolean) WHEN col1 = 
'3' THEN cast(castBoolean as boolean) WHEN col1 = '4' THEN cast(castTinyint as 
boolean) WHEN col1 = '5' THEN cast(castSmallint as boolean) WHEN col1 = '6' 
THEN cast(castBigint as boolean) WHEN col1 = '7' THEN cast(castFloat as 
boolean) WHEN col1 = '8' THEN cast(castDouble as boolean) WHEN col1 = '9' THEN 
cast(castDecimal as boolean) WHEN co [...]
+                                        + ",CASE WHEN col1 = '1' THEN 
cast(col1 as tinyint) WHEN col1 = '2' THEN cast(castInt as tinyint) WHEN col1 = 
'3' THEN cast(castBoolean as tinyint) WHEN col1 = '4' THEN cast(castTinyint as 
tinyint) WHEN col1 = '5' THEN cast(castSmallint as tinyint) WHEN col1 = '6' 
THEN cast(castBigint as tinyint) WHEN col1 = '7' THEN castTinyint WHEN col1 = 
'8' THEN castTinyint WHEN col1 = '9' THEN castTinyint WHEN col1 = '10' THEN 
castTinyint END as castTinyint"
+                                        + ",CASE WHEN col1 = '1' THEN 
cast(col1 as smallint) WHEN col1 = '2' THEN cast(castInt as smallint) WHEN col1 
= '3' THEN cast(castBoolean as smallint) WHEN col1 = '4' THEN cast(castTinyint 
as smallint) WHEN col1 = '5' THEN cast(castSmallint as smallint) WHEN col1 = 
'6' THEN cast(castBigint as smallint) WHEN col1 = '7' THEN castSmallint WHEN 
col1 = '8' THEN castSmallint WHEN col1 = '9' THEN castSmallint WHEN col1 = '10' 
THEN castSmallint END as cast [...]
+                                        + ",CASE WHEN col1 = '1' THEN 
cast(col1 as bigint) WHEN col1 = '2' THEN cast(castInt as bigint) WHEN col1 = 
'3' THEN cast(castBoolean as bigint) WHEN col1 = '4' THEN cast(castTinyint as 
bigint) WHEN col1 = '5' THEN cast(castSmallint as bigint) WHEN col1 = '6' THEN 
cast(castBigint as bigint) WHEN col1 = '7' THEN castBigint WHEN col1 = '8' THEN 
castBigint WHEN col1 = '9' THEN castBigint WHEN col1 = '10' THEN castBigint END 
as castBigint"
+                                        + ",CASE WHEN col1 = '1' THEN 
cast(col1 as float) WHEN col1 = '2' THEN cast(castInt as float) WHEN col1 = '3' 
THEN castFloat WHEN col1 = '4' THEN cast(castTinyint as float) WHEN col1 = '5' 
THEN cast(castSmallint as float) WHEN col1 = '6' THEN cast(castBigint as float) 
WHEN col1 = '7' THEN cast(castFloat as float) WHEN col1 = '8' THEN 
cast(castDouble as float) WHEN col1 = '9' THEN cast(castDecimal as float) WHEN 
col1 = '10' THEN castFloat END as castFloat"
+                                        + ",CASE WHEN col1 = '1' THEN 
cast(col1 as double) WHEN col1 = '2' THEN cast(castInt as double) WHEN col1 = 
'3' THEN castDouble WHEN col1 = '4' THEN cast(castTinyint as double) WHEN col1 
= '5' THEN cast(castSmallint as double) WHEN col1 = '6' THEN cast(castBigint as 
double) WHEN col1 = '7' THEN cast(castFloat as double) WHEN col1 = '8' THEN 
cast(castDouble as double) WHEN col1 = '9' THEN cast(castDecimal as double) 
WHEN col1 = '10' THEN castDouble  [...]
+                                        + ",CASE WHEN col1 = '1' THEN 
cast(col1 as char) WHEN col1 = '2' THEN cast(castInt as char) WHEN col1 = '3' 
THEN cast(castBoolean as char) WHEN col1 = '4' THEN cast(castTinyint as char) 
WHEN col1 = '5' THEN cast(castSmallint as char) WHEN col1 = '6' THEN 
cast(castBigint as char) WHEN col1 = '7' THEN cast(castFloat as char) WHEN col1 
= '8' THEN cast(castDouble as char) WHEN col1 = '9' THEN cast(castDecimal as 
char) WHEN col1 = '10' THEN castChar END [...]
+                                        + ",CASE WHEN col1 = '1' THEN 
cast(col1 as varchar) WHEN col1 = '2' THEN cast(castInt as varchar) WHEN col1 = 
'3' THEN cast(castBoolean as varchar) WHEN col1 = '4' THEN cast(castTinyint as 
varchar) WHEN col1 = '5' THEN cast(castSmallint as varchar) WHEN col1 = '6' 
THEN cast(castBigint as varchar) WHEN col1 = '7' THEN cast(castFloat as 
varchar) WHEN col1 = '8' THEN cast(castDouble as varchar) WHEN col1 = '9' THEN 
cast(castDecimal as varchar) WHEN co [...]
+                                        + ",CASE WHEN col1 = '1' THEN 
cast(col1 as DECIMAL(4,2)) WHEN col1 = '2' THEN cast(castInt as DECIMAL(4,2)) 
WHEN col1 = '3' THEN castDecimal WHEN col1 = '4' THEN cast(castTinyint as 
DECIMAL(4,2)) WHEN col1 = '5' THEN cast(castSmallint as DECIMAL(4,2)) WHEN col1 
= '6' THEN cast(castBigint as DECIMAL(4,2)) WHEN col1 = '7' THEN cast(castFloat 
as DECIMAL(4,2)) WHEN col1 = '8' THEN cast(castDouble as DECIMAL(4,2)) WHEN 
col1 = '9' THEN cast(castDecimal a [...]
+                                        + ",CASE WHEN col1 = '1' THEN 
castTimestamp WHEN col1 = '2' THEN castTimestamp WHEN col1 = '3' THEN 
castTimestamp WHEN col1 = '4' THEN castTimestamp WHEN col1 = '5' THEN 
castTimestamp WHEN col1 = '6' THEN castTimestamp WHEN col1 = '7' THEN 
castTimestamp WHEN col1 = '8' THEN castTimestamp WHEN col1 = '9' THEN 
castTimestamp WHEN col1 = '10' THEN cast('1970-01-01T00:00:01.234' as 
TIMESTAMP(3)) END as castTimestamp",
+                                null)
                         .addTimezone("UTC")
                         .build();
         RegularEventOperatorTestHarness<PostTransformOperator, Event>
diff --git a/pom.xml b/pom.xml
index 53220c9ef..fb431f0af 100644
--- a/pom.xml
+++ b/pom.xml
@@ -366,7 +366,6 @@ limitations under the License.
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>
             <version>1.15</version>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 

Reply via email to