This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 84ef9d5da [FLINK-35981][cdc-runtime] Transform supports referencing 
one column more than once
84ef9d5da is described below

commit 84ef9d5daa78412b3f2f4dc087e8ea2e4be93b7c
Author: MOBIN <18814118...@163.com>
AuthorDate: Sat Aug 10 00:01:36 2024 +0800

    [FLINK-35981][cdc-runtime] Transform supports referencing one column more 
than once
    
    This closes  #3515.
---
 .../transform/ProjectionColumnProcessor.java       |  8 ++-
 .../transform/TransformFilterProcessor.java        |  9 ++-
 .../transform/PostTransformOperatorTest.java       | 71 ++++++++++++++++++++++
 3 files changed, 81 insertions(+), 7 deletions(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
index 3dde9b20c..a27af2370 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
+import java.util.LinkedHashSet;
 import java.util.List;
 
 /**
@@ -98,7 +99,9 @@ public class ProjectionColumnProcessor {
 
         // 1 - Add referenced columns
         RecordData.FieldGetter[] fieldGetters = 
tableInfo.getPreTransformedFieldGetters();
-        for (String originalColumnName : 
projectionColumn.getOriginalColumnNames()) {
+        LinkedHashSet<String> originalColumnNames =
+                new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
+        for (String originalColumnName : originalColumnNames) {
             switch (originalColumnName) {
                 case TransformParser.DEFAULT_NAMESPACE_NAME:
                     params.add(tableInfo.getNamespace());
@@ -142,7 +145,8 @@ public class ProjectionColumnProcessor {
         List<Class<?>> paramTypes = new ArrayList<>();
         List<Column> columns = 
tableInfo.getPreTransformedSchema().getColumns();
         String scriptExpression = projectionColumn.getScriptExpression();
-        List<String> originalColumnNames = 
projectionColumn.getOriginalColumnNames();
+        LinkedHashSet<String> originalColumnNames =
+                new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
         for (String originalColumnName : originalColumnNames) {
             for (Column column : columns) {
                 if (column.getName().equals(originalColumnName)) {
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
index 84d483035..d1f67818b 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.stream.Stream;
 
@@ -91,14 +92,12 @@ public class TransformFilterProcessor {
         List<Class<?>> argTypes = new ArrayList<>();
         String scriptExpression = transformFilter.getScriptExpression();
         List<Column> columns = 
tableInfo.getPreTransformedSchema().getColumns();
-        List<String> columnNames = transformFilter.getColumnNames();
+        LinkedHashSet<String> columnNames = new 
LinkedHashSet<>(transformFilter.getColumnNames());
         for (String columnName : columnNames) {
             for (Column column : columns) {
                 if (column.getName().equals(columnName)) {
-                    if (!argNames.contains(columnName)) {
-                        argNames.add(columnName);
-                        
argTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
-                    }
+                    argNames.add(columnName);
+                    
argTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
                     break;
                 }
             }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 34b710374..067842c31 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -226,6 +226,16 @@ public class PostTransformOperatorTest {
                     .options(ImmutableMap.of("key1", "value1", "key2", 
"value2"))
                     .build();
 
+    private static final TableId COLUMN_SQUARE_TABLE =
+            TableId.tableId("my_company", "my_branch", "column_square");
+    private static final Schema COLUMN_SQUARE_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("col1", DataTypes.INT())
+                    .physicalColumn("col2", DataTypes.INT())
+                    .physicalColumn("square_col2", DataTypes.INT())
+                    .primaryKey("col1")
+                    .build();
+
     @Test
     void testDataChangeEventTransform() throws Exception {
         PostTransformOperator transform =
@@ -560,6 +570,67 @@ public class PostTransformOperatorTest {
                 .isEqualTo(new StreamRecord<>(insertEventExpect));
     }
 
+    @Test
+    void testDataChangeEventTransformWithDuplicateColumns() throws Exception {
+        PostTransformOperator transform =
+                PostTransformOperator.newBuilder()
+                        .addTransform(
+                                COLUMN_SQUARE_TABLE.identifier(),
+                                "col1, col2, col2 * col2 as square_col2",
+                                "col2 < 3 OR col2 > 5")
+                        .build();
+        EventOperatorTestHarness<PostTransformOperator, Event>
+                transformFunctionEventEventOperatorTestHarness =
+                        new EventOperatorTestHarness<>(transform, 1);
+        // Initialization
+        transformFunctionEventEventOperatorTestHarness.open();
+        // Create table
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(COLUMN_SQUARE_TABLE, 
COLUMN_SQUARE_SCHEMA);
+        BinaryRecordDataGenerator recordDataGenerator =
+                new BinaryRecordDataGenerator(((RowType) 
COLUMN_SQUARE_SCHEMA.toRowDataType()));
+        // Insert
+        DataChangeEvent insertEvent =
+                DataChangeEvent.insertEvent(
+                        COLUMN_SQUARE_TABLE,
+                        recordDataGenerator.generate(new Object[] {1, 1, 
null}));
+        DataChangeEvent insertEventExpect =
+                DataChangeEvent.insertEvent(
+                        COLUMN_SQUARE_TABLE, recordDataGenerator.generate(new 
Object[] {1, 1, 1}));
+
+        DataChangeEvent insertEvent2 =
+                DataChangeEvent.insertEvent(
+                        COLUMN_SQUARE_TABLE,
+                        recordDataGenerator.generate(new Object[] {6, 6, 
null}));
+        DataChangeEvent insertEventExpect2 =
+                DataChangeEvent.insertEvent(
+                        COLUMN_SQUARE_TABLE, recordDataGenerator.generate(new 
Object[] {6, 6, 36}));
+
+        DataChangeEvent insertEvent3 =
+                DataChangeEvent.insertEvent(
+                        COLUMN_SQUARE_TABLE,
+                        recordDataGenerator.generate(new Object[] {4, 4, 
null}));
+
+        transform.processElement(new StreamRecord<>(createTableEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(
+                        new StreamRecord<>(
+                                new CreateTableEvent(COLUMN_SQUARE_TABLE, 
COLUMN_SQUARE_SCHEMA)));
+        transform.processElement(new StreamRecord<>(insertEvent));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect));
+        transform.processElement(new StreamRecord<>(insertEvent2));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isEqualTo(new StreamRecord<>(insertEventExpect2));
+        transform.processElement(new StreamRecord<>(insertEvent3));
+        Assertions.assertThat(
+                        
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+                .isNull();
+    }
+
     @Test
     void testTimestampTransform() throws Exception {
         PostTransformOperator transform =

Reply via email to