This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 5db16ba30 [FLINK-36093][transform] Fix preTransformoperator wrongly 
filters columns belong to different transforms
5db16ba30 is described below

commit 5db16ba30ce40d52cb5d76fcdce18a10da1c4cad
Author: MOBIN <18814118...@163.com>
AuthorDate: Thu Nov 14 11:41:56 2024 +0800

    [FLINK-36093][transform] Fix preTransformoperator wrongly filters columns 
belong to different transforms
    
    This closes #3572
---
 .../flink/FlinkPipelineTransformITCase.java        |  99 +++++++++++++++
 .../cdc/pipeline/tests/TransformE2eITCase.java     |  76 ++++++++++++
 .../operators/transform/PreTransformOperator.java  | 136 ++++++++++++---------
 .../operators/transform/PreTransformProcessor.java |  69 ++---------
 .../transform/PreTransformOperatorTest.java        | 121 ++++++++++++++++++
 5 files changed, 387 insertions(+), 114 deletions(-)

diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index b1a1c6a19..7b32a0af5 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -255,6 +255,105 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, OLD], after=[], op=DELETE, meta=()}"));
     }
 
+    @ParameterizedTest
+    @EnumSource
+    void testMultipleTransformWithDiffRefColumn(ValuesDataSink.SinkApi 
sinkApi) throws Exception {
+        runGenericTransformTest(
+                sinkApi,
+                Arrays.asList(
+                        new TransformDef(
+                                "default_namespace.default_schema.\\.*",
+                                "id,age,'Juvenile' AS roleName",
+                                "age < 18",
+                                null,
+                                null,
+                                null,
+                                null),
+                        new TransformDef(
+                                "default_namespace.default_schema.\\.*",
+                                "id,age,name AS roleName",
+                                "age >= 18",
+                                null,
+                                null,
+                                null,
+                                null)),
+                Arrays.asList(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`age` INT,`roleName` STRING}, primaryKeys=id, 
options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, 18, Alice], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, 20, Bob], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
20, Bob], after=[2, 30, Bob], op=UPDATE, meta=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`age` TINYINT,`roleName` STRING}, primaryKeys=id, 
options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, 15, Juvenile], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, 25, Derrida], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
25, Derrida], after=[], op=DELETE, meta=()}"));
+    }
+
+    @ParameterizedTest
+    @EnumSource
+    void testMultiTransformWithAsterisk(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
+        runGenericTransformTest(
+                sinkApi,
+                Arrays.asList(
+                        new TransformDef(
+                                "default_namespace.default_schema.mytable2",
+                                "*,'Juvenile' AS roleName",
+                                "age < 18",
+                                null,
+                                null,
+                                null,
+                                null),
+                        new TransformDef(
+                                "default_namespace.default_schema.mytable2",
+                                "id,name,age,description,name AS roleName",
+                                "age >= 18",
+                                null,
+                                null,
+                                null,
+                                null)),
+                Arrays.asList(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` 
STRING,`roleName` STRING}, primaryKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student, Juvenile], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, Derrida, 25, student, Derrida], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
Derrida, 25, student, Derrida], after=[], op=DELETE, meta=()}"));
+    }
+
+    @ParameterizedTest
+    @EnumSource
+    void testMultiTransformMissingProjection(ValuesDataSink.SinkApi sinkApi) 
throws Exception {
+        runGenericTransformTest(
+                sinkApi,
+                Arrays.asList(
+                        new TransformDef(
+                                "default_namespace.default_schema.mytable2",
+                                null,
+                                "age < 18",
+                                null,
+                                null,
+                                null,
+                                null),
+                        new TransformDef(
+                                "default_namespace.default_schema.mytable2",
+                                "id,UPPER(name) AS name,age,description",
+                                "age >= 18",
+                                null,
+                                null,
+                                null,
+                                null)),
+                Arrays.asList(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 18], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Bob, 20], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, 
schema=columns={`id` BIGINT,`name` STRING,`age` TINYINT,`description` STRING}, 
primaryKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[3, Carol, 15, student], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], 
after=[4, DERRIDA, 25, student], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 
DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
+    }
+
     /** This tests if transform generates metadata info correctly. */
     @ParameterizedTest
     @EnumSource
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index 36e7d9a86..ea3e2e017 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -597,6 +597,82 @@ public class TransformE2eITCase extends 
PipelineTestEnvironment {
                 "DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big 
Sur, 21, Eva, 3011, 11], after=[], op=DELETE, meta=()}");
     }
 
+    @Test
+    public void testMultipleTransformWithDiffRefColumn() throws Exception {
+        String pipelineJob =
+                String.format(
+                        "source:\n"
+                                + "  type: mysql\n"
+                                + "  hostname: %s\n"
+                                + "  port: 3306\n"
+                                + "  username: %s\n"
+                                + "  password: %s\n"
+                                + "  tables: %s.TABLEALPHA\n"
+                                + "  server-id: 5400-5404\n"
+                                + "  server-time-zone: UTC\n"
+                                + "sink:\n"
+                                + "  type: values\n"
+                                + "transform:\n"
+                                + "  - source-table: %s.TABLEALPHA\n"
+                                + "    projection: ID, VERSION, PRICEALPHA, 
AGEALPHA, 'Juvenile' AS ROLENAME\n"
+                                + "    filter: AGEALPHA < 18\n"
+                                + "  - source-table: %s.TABLEALPHA\n"
+                                + "    projection: ID, VERSION, PRICEALPHA, 
AGEALPHA, NAMEALPHA AS ROLENAME\n"
+                                + "    filter: AGEALPHA >= 18\n"
+                                + "pipeline:\n"
+                                + "  parallelism: %d",
+                        INTER_CONTAINER_MYSQL_ALIAS,
+                        MYSQL_TEST_USER,
+                        MYSQL_TEST_PASSWORD,
+                        transformTestDatabase.getDatabaseName(),
+                        transformTestDatabase.getDatabaseName(),
+                        transformTestDatabase.getDatabaseName(),
+                        parallelism);
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path valuesCdcJar = 
TestUtils.getResource("values-cdc-pipeline-connector.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+        submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, 
mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+
+        waitUntilSpecificEvent(
+                String.format(
+                        "CreateTableEvent{tableId=%s.TABLEALPHA, 
schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` 
INT,`AGEALPHA` INT,`ROLENAME` STRING}, primaryKeys=ID, options=()}",
+                        transformTestDatabase.getDatabaseName()),
+                60000L);
+
+        validateEvents(
+                "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], 
after=[1008, 8, 199, 17, Juvenile], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], 
after=[1009, 8.1, 0, 18, Bob], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], 
after=[1010, 10, 99, 19, Carol], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], 
after=[1011, 11, 59, 20, Dave], op=INSERT, meta=()}");
+
+        LOG.info("Begin incremental reading stage.");
+        // generate binlogs
+        String mysqlJdbcUrl =
+                String.format(
+                        "jdbc:mysql://%s:%s/%s",
+                        MYSQL.getHost(),
+                        MYSQL.getDatabasePort(),
+                        transformTestDatabase.getDatabaseName());
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                mysqlJdbcUrl, MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+                Statement stat = conn.createStatement()) {
+            stat.execute("UPDATE TABLEALPHA SET VERSION='100' WHERE id=1009;");
+            stat.execute("INSERT INTO TABLEALPHA VALUES (3007, '7', 79, 25, 
'IINA');");
+            stat.execute("DELETE FROM TABLEALPHA WHERE id=1011;");
+        } catch (SQLException e) {
+            LOG.error("Update table for CDC failed.", e);
+            throw e;
+        }
+
+        validateEvents(
+                "DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 
18, Bob], after=[1009, 100, 0, 18, Bob], op=UPDATE, meta=()}",
+                "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], 
after=[3007, 7, 79, 25, IINA], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.TABLEALPHA, before=[1011, 11, 59, 
20, Dave], after=[], op=DELETE, meta=()}");
+    }
+
     @Test
     public void testTransformWithCast() throws Exception {
         String pipelineJob =
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 7b43166ac..b0fc8218d 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.event.TruncateTableEvent;
+import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.schema.Selectors;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
@@ -48,6 +49,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -289,6 +291,7 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
                     SchemaUtils.applySchemaChangeEvent(
                             tableChangeInfo.getPreTransformedSchema(), 
schemaChangeEvent.get());
         }
+        cachePreTransformProcessor(tableId, originalSchema);
         preTransformChangeInfoMap.put(
                 tableId, PreTransformChangeInfo.of(tableId, originalSchema, 
preTransformedSchema));
         return schemaChangeEvent;
@@ -319,8 +322,6 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
 
     private CreateTableEvent transformCreateTableEvent(CreateTableEvent 
createTableEvent) {
         TableId tableId = createTableEvent.tableId();
-        PreTransformChangeInfo tableChangeInfo = 
preTransformChangeInfoMap.get(tableId);
-        cacheTransformRuleInfo(createTableEvent);
         for (Tuple2<Selectors, SchemaMetadataTransform> transform : 
schemaMetadataTransformers) {
             Selectors selectors = transform.f0;
             if (selectors.isMatch(tableId)) {
@@ -332,27 +333,74 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
             }
         }
 
+        cachePreTransformProcessor(tableId, createTableEvent.getSchema());
+        if (preTransformProcessorMap.containsKey(tableId)) {
+            return preTransformProcessorMap
+                    .get(tableId)
+                    .preTransformCreateTableEvent(createTableEvent);
+        }
+        return createTableEvent;
+    }
+
+    private void cachePreTransformProcessor(TableId tableId, Schema 
tableSchema) {
+        LinkedHashSet<Column> referencedColumnsSet = new LinkedHashSet<>();
+        boolean hasMatchTransform = false;
         for (PreTransformer transform : transforms) {
-            Selectors selectors = transform.getSelectors();
-            if (selectors.isMatch(tableId) && 
transform.getProjection().isPresent()) {
+            if (!transform.getSelectors().isMatch(tableId)) {
+                continue;
+            }
+            if (!transform.getProjection().isPresent()) {
+                processProjectionTransform(tableId, tableSchema, 
referencedColumnsSet, null);
+                hasMatchTransform = true;
+            } else {
                 TransformProjection transformProjection = 
transform.getProjection().get();
-                TransformFilter transformFilter = 
transform.getFilter().orElse(null);
                 if (transformProjection.isValid()) {
-                    if (!preTransformProcessorMap.containsKey(tableId)) {
-                        preTransformProcessorMap.put(
-                                tableId,
-                                new PreTransformProcessor(
-                                        tableChangeInfo, transformProjection, 
transformFilter));
-                    }
-                    PreTransformProcessor preTransformProcessor =
-                            preTransformProcessorMap.get(tableId);
-                    // TODO: Currently this code wrongly filters out rows that 
weren't referenced in
-                    // the first matching transform rule but in the following 
transform rules.
-                    return 
preTransformProcessor.preTransformCreateTableEvent(createTableEvent);
+                    processProjectionTransform(
+                            tableId, tableSchema, referencedColumnsSet, 
transform);
+                    hasMatchTransform = true;
                 }
             }
         }
-        return createTableEvent;
+        if (!hasMatchTransform) {
+            processProjectionTransform(tableId, tableSchema, 
referencedColumnsSet, null);
+        }
+    }
+
+    public void processProjectionTransform(
+            TableId tableId,
+            Schema tableSchema,
+            LinkedHashSet<Column> referencedColumnsSet,
+            @Nullable PreTransformer transform) {
+        // If this TableId isn't presented in any transform block, it should 
behave like a "*"
+        // projection and should be regarded as asterisk-ful.
+        if (transform == null) {
+            referencedColumnsSet.addAll(tableSchema.getColumns());
+            hasAsteriskMap.put(tableId, true);
+        } else {
+            TransformProjection transformProjection = 
transform.getProjection().get();
+            boolean hasAsterisk = 
TransformParser.hasAsterisk(transformProjection.getProjection());
+            if (hasAsterisk) {
+                referencedColumnsSet.addAll(tableSchema.getColumns());
+                hasAsteriskMap.put(tableId, true);
+            } else {
+                TransformFilter transformFilter = 
transform.getFilter().orElse(null);
+                List<Column> referencedColumns =
+                        TransformParser.generateReferencedColumns(
+                                transformProjection.getProjection(),
+                                transformFilter != null ? 
transformFilter.getExpression() : null,
+                                tableSchema.getColumns());
+                // update referenced columns of other projections of the same 
tableId, if any
+                referencedColumnsSet.addAll(referencedColumns);
+                hasAsteriskMap.putIfAbsent(tableId, false);
+            }
+        }
+
+        PreTransformChangeInfo tableChangeInfo =
+                PreTransformChangeInfo.of(
+                        tableId,
+                        tableSchema,
+                        tableSchema.copy(new 
ArrayList<>(referencedColumnsSet)));
+        preTransformProcessorMap.put(tableId, new 
PreTransformProcessor(tableChangeInfo));
     }
 
     private Schema transformSchemaMetaData(
@@ -376,46 +424,22 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
         return schemaBuilder.build();
     }
 
-    private DataChangeEvent processDataChangeEvent(DataChangeEvent 
dataChangeEvent)
-            throws Exception {
-        TableId tableId = dataChangeEvent.tableId();
-        for (PreTransformer transform : transforms) {
-            Selectors selectors = transform.getSelectors();
-
-            if (selectors.isMatch(tableId) && 
transform.getProjection().isPresent()) {
-                TransformProjection transformProjection = 
transform.getProjection().get();
-                TransformFilter transformFilter = 
transform.getFilter().orElse(null);
-                if (transformProjection.isValid()) {
-                    return processProjection(transformProjection, 
transformFilter, dataChangeEvent);
-                }
+    private DataChangeEvent processDataChangeEvent(DataChangeEvent 
dataChangeEvent) {
+        if (!transforms.isEmpty()) {
+            PreTransformProcessor preTransformProcessor =
+                    preTransformProcessorMap.get(dataChangeEvent.tableId());
+            BinaryRecordData before = (BinaryRecordData) 
dataChangeEvent.before();
+            BinaryRecordData after = (BinaryRecordData) 
dataChangeEvent.after();
+            if (before != null) {
+                BinaryRecordData projectedBefore =
+                        preTransformProcessor.processFillDataField(before);
+                dataChangeEvent = 
DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore);
             }
-        }
-        return dataChangeEvent;
-    }
-
-    private DataChangeEvent processProjection(
-            TransformProjection transformProjection,
-            @Nullable TransformFilter transformFilter,
-            DataChangeEvent dataChangeEvent) {
-        TableId tableId = dataChangeEvent.tableId();
-        PreTransformChangeInfo tableChangeInfo = 
preTransformChangeInfoMap.get(tableId);
-        if (!preTransformProcessorMap.containsKey(tableId)
-                || 
!preTransformProcessorMap.get(tableId).hasTableChangeInfo()) {
-            preTransformProcessorMap.put(
-                    tableId,
-                    new PreTransformProcessor(
-                            tableChangeInfo, transformProjection, 
transformFilter));
-        }
-        PreTransformProcessor preTransformProcessor = 
preTransformProcessorMap.get(tableId);
-        BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before();
-        BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after();
-        if (before != null) {
-            BinaryRecordData projectedBefore = 
preTransformProcessor.processFillDataField(before);
-            dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, 
projectedBefore);
-        }
-        if (after != null) {
-            BinaryRecordData projectedAfter = 
preTransformProcessor.processFillDataField(after);
-            dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, 
projectedAfter);
+            if (after != null) {
+                BinaryRecordData projectedAfter = 
preTransformProcessor.processFillDataField(after);
+                dataChangeEvent = 
DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter);
+            }
+            return dataChangeEvent;
         }
         return dataChangeEvent;
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
index 1ede3ce77..d4f6fec7d 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java
@@ -22,11 +22,8 @@ import 
org.apache.flink.cdc.common.data.binary.BinaryRecordData;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.runtime.parser.TransformParser;
 import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.List;
 
@@ -43,23 +40,9 @@ import java.util.List;
  */
 public class PreTransformProcessor {
     private PreTransformChangeInfo tableChangeInfo;
-    private TransformProjection transformProjection;
-    private @Nullable TransformFilter transformFilter;
-    private List<Boolean> cachedProjectionColumnsState;
 
-    public PreTransformProcessor(
-            PreTransformChangeInfo tableChangeInfo,
-            TransformProjection transformProjection,
-            @Nullable TransformFilter transformFilter) {
+    public PreTransformProcessor(PreTransformChangeInfo tableChangeInfo) {
         this.tableChangeInfo = tableChangeInfo;
-        this.transformProjection = transformProjection;
-        this.transformFilter = transformFilter;
-        this.cachedProjectionColumnsState =
-                cacheIsProjectionColumnMap(tableChangeInfo, 
transformProjection);
-    }
-
-    public boolean hasTableChangeInfo() {
-        return this.tableChangeInfo != null;
     }
 
     /**
@@ -69,32 +52,24 @@ public class PreTransformProcessor {
      * will be sent to downstream, and (D, E) column along with corresponding 
data will be trimmed.
      */
     public CreateTableEvent preTransformCreateTableEvent(CreateTableEvent 
createTableEvent) {
-        List<Column> preTransformColumns =
-                TransformParser.generateReferencedColumns(
-                        transformProjection.getProjection(),
-                        transformFilter != null ? 
transformFilter.getExpression() : null,
-                        createTableEvent.getSchema().getColumns());
-        Schema schema = createTableEvent.getSchema().copy(preTransformColumns);
+        Schema schema =
+                createTableEvent
+                        .getSchema()
+                        
.copy(tableChangeInfo.getPreTransformedSchema().getColumns());
         return new CreateTableEvent(createTableEvent.tableId(), schema);
     }
 
     public BinaryRecordData processFillDataField(BinaryRecordData data) {
         List<Object> valueList = new ArrayList<>();
         List<Column> columns = 
tableChangeInfo.getPreTransformedSchema().getColumns();
-
         for (int i = 0; i < columns.size(); i++) {
-            if (cachedProjectionColumnsState.get(i)) {
-                valueList.add(null);
-            } else {
-                valueList.add(
-                        getValueFromBinaryRecordData(
-                                columns.get(i).getName(),
-                                data,
-                                tableChangeInfo.getSourceSchema().getColumns(),
-                                tableChangeInfo.getSourceFieldGetters()));
-            }
+            valueList.add(
+                    getValueFromBinaryRecordData(
+                            columns.get(i).getName(),
+                            data,
+                            tableChangeInfo.getSourceSchema().getColumns(),
+                            tableChangeInfo.getSourceFieldGetters()));
         }
-
         return tableChangeInfo
                 .getPreTransformedRecordDataGenerator()
                 .generate(valueList.toArray(new Object[0]));
@@ -113,26 +88,4 @@ public class PreTransformProcessor {
         }
         return null;
     }
-
-    private List<Boolean> cacheIsProjectionColumnMap(
-            PreTransformChangeInfo tableChangeInfo, TransformProjection 
transformProjection) {
-        List<Boolean> cachedMap = new ArrayList<>();
-        if (!hasTableChangeInfo()) {
-            return cachedMap;
-        }
-
-        for (Column column : 
tableChangeInfo.getPreTransformedSchema().getColumns()) {
-            boolean isProjectionColumn = false;
-            for (ProjectionColumn projectionColumn : 
transformProjection.getProjectionColumns()) {
-                if (column.getName().equals(projectionColumn.getColumnName())
-                        && 
projectionColumn.isValidTransformedProjectionColumn()) {
-                    isProjectionColumn = true;
-                    break;
-                }
-            }
-            cachedMap.add(isProjectionColumn);
-        }
-
-        return cachedMap;
-    }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
index cc257401e..0e3553e8c 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
@@ -164,6 +164,23 @@ public class PreTransformOperatorTest {
                     .primaryKey("sid")
                     .build();
 
+    private static final Schema MULTITRANSFORM_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("id", DataTypes.INT())
+                    .physicalColumn("age", DataTypes.INT())
+                    .physicalColumn("name", DataTypes.STRING())
+                    .physicalColumn("sex", DataTypes.STRING())
+                    .primaryKey("id")
+                    .build();
+
+    private static final Schema EXPECTED_MULTITRANSFORM_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("id", DataTypes.INT())
+                    .physicalColumn("age", DataTypes.INT())
+                    .physicalColumn("name", DataTypes.STRING())
+                    .primaryKey("id")
+                    .build();
+
     @Test
     void testEventTransform() throws Exception {
         PreTransformOperator transform =
@@ -526,4 +543,108 @@ public class PreTransformOperatorTest {
                         new StreamRecord<>(
                                 new CreateTableEvent(METADATA_TABLEID, 
EXPECTED_METADATA_SCHEMA)));
     }
+
+    @Test
+    void testMultiTransformWithDiffRefColumns() throws Exception {
+        PreTransformOperator transform =
+                PreTransformOperator.newBuilder()
+                        .addTransform(
+                                CUSTOMERS_TABLEID.identifier(),
+                                "id, 'Juvenile' as roleName",
+                                "age < 18",
+                                "id",
+                                null,
+                                null)
+                        .addTransform(
+                                CUSTOMERS_TABLEID.identifier(),
+                                "id, name as roleName",
+                                "age >= 18",
+                                "id",
+                                null,
+                                null)
+                        .build();
+        EventOperatorTestHarness<PreTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        new EventOperatorTestHarness<>(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(CUSTOMERS_TABLEID, MULTITRANSFORM_SCHEMA);
+
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(
+                                        CUSTOMERS_TABLEID, 
EXPECTED_MULTITRANSFORM_SCHEMA)));
+    }
+
+    @Test
+    void testMultiTransformWithAsterisk() throws Exception {
+        PreTransformOperator transform =
+                PreTransformOperator.newBuilder()
+                        .addTransform(
+                                CUSTOMERS_TABLEID.identifier(),
+                                "*, 'Juvenile' as roleName",
+                                "age < 18",
+                                "id",
+                                null,
+                                null)
+                        .addTransform(
+                                CUSTOMERS_TABLEID.identifier(),
+                                "id, age, name, sex, 'Juvenile' as roleName",
+                                "age >= 18",
+                                "id",
+                                null,
+                                null)
+                        .build();
+        EventOperatorTestHarness<PreTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        new EventOperatorTestHarness<>(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(CUSTOMERS_TABLEID, MULTITRANSFORM_SCHEMA);
+
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(CUSTOMERS_TABLEID, 
MULTITRANSFORM_SCHEMA)));
+    }
+
+    @Test
+    void testMultiTransformMissingProjection() throws Exception {
+        PreTransformOperator transform =
+                PreTransformOperator.newBuilder()
+                        .addTransform(
+                                CUSTOMERS_TABLEID.identifier(), null, "age < 
18", "id", null, null)
+                        .addTransform(
+                                CUSTOMERS_TABLEID.identifier(),
+                                "id, age, UPPER(name) as name, sex",
+                                "age >= 18",
+                                "id",
+                                null,
+                                null)
+                        .build();
+        EventOperatorTestHarness<PreTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        new EventOperatorTestHarness<>(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(CUSTOMERS_TABLEID, MULTITRANSFORM_SCHEMA);
+
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(CUSTOMERS_TABLEID, 
MULTITRANSFORM_SCHEMA)));
+    }
 }


Reply via email to