This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new f309d311f [FLINK-37429][transform] Map each column name to a new name
in generated expression
f309d311f is described below
commit f309d311f8f79e0108a0666bf8cae7bd609cd2ef
Author: Shawn Huang <[email protected]>
AuthorDate: Tue Mar 11 12:09:37 2025 +0800
[FLINK-37429][transform] Map each column name to a new name in generated
expression
This closes #3939
Co-authored-by: Leonard Xu <[email protected]>
---
.../flink/FlinkPipelineTransformITCase.java | 63 +++++++
.../values/source/TimestampTypeMetadataColumn.java | 48 ++++++
.../connectors/values/source/ValuesDataSource.java | 4 +-
.../values/source/ValuesDataSourceHelper.java | 131 +++++++++++++-
.../operators/transform/PostTransformOperator.java | 2 +-
.../operators/transform/ProjectionColumn.java | 41 +++--
.../transform/ProjectionColumnProcessor.java | 13 +-
.../transform/TransformExpressionCompiler.java | 5 +-
.../transform/TransformExpressionKey.java | 23 ++-
.../operators/transform/TransformFilter.java | 22 ++-
.../transform/TransformFilterProcessor.java | 36 ++--
.../flink/cdc/runtime/parser/JaninoCompiler.java | 53 ++++--
.../flink/cdc/runtime/parser/TransformParser.java | 40 ++++-
.../transform/PostTransformOperatorTest.java | 110 ++++++++++++
.../transform/PreTransformOperatorTest.java | 47 +++++
.../TransformOperatorWithSchemaEvolveTest.java | 129 ++++++++++++++
.../transform/UnifiedTransformOperatorTest.java | 34 ++++
.../cdc/runtime/parser/TransformParserTest.java | 191 +++++++++++++++++----
18 files changed, 880 insertions(+), 112 deletions(-)
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index 4426374b6..e678b3737 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -956,6 +956,69 @@ class FlinkPipelineTransformITCase {
Arrays.stream(outputEvents).forEach(this::extractDataLines);
}
+ @ParameterizedTest
+ @EnumSource
+ public void testTransformWithColumnNameMap(ValuesDataSink.SinkApi sinkApi)
throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.COMPLEX_COLUMN_NAME_TABLE);
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ // Setup transform
+ TransformDef transformDef =
+ new TransformDef(
+ "default_namespace.default_schema.table1",
+ "*, `timestamp-type`",
+ "`foo-bar` > 0",
+ null,
+ null,
+ null,
+ null,
+ null);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(
+ PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ new ArrayList<>(Arrays.asList(transformDef)),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+ assertThat(outputEvents)
+ .containsExactly(
+
"CreateTableEvent{tableId=default_namespace.default_schema.table1,
schema=columns={`class` STRING NOT NULL,`foo-bar` INT,`bar-foo`
INT,`timestamp-type` STRING NOT NULL}, primaryKeys=class, options=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[class1, 1, 10, type1], op=INSERT, meta=({timestamp-type=type1})}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[class2, 2, 100, type2], op=INSERT, meta=({timestamp-type=type2})}",
+
"AddColumnEvent{tableId=default_namespace.default_schema.table1,
addedColumns=[ColumnWithPosition{column=`import-package` STRING,
position=AFTER, existedColumnName=bar-foo}]}",
+
"RenameColumnEvent{tableId=default_namespace.default_schema.table1,
nameMapping={bar-foo=bar-baz}}",
+
"DropColumnEvent{tableId=default_namespace.default_schema.table1,
droppedColumnNames=[bar-baz]}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.table1,
before=[class1, 1, , type1], after=[], op=DELETE,
meta=({timestamp-type=type1})}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.table1,
before=[class2, 2, , type2], after=[new-class2, 20, new-package2, type2],
op=UPDATE, meta=({timestamp-type=type2})}");
+ }
+
void runGenericTransformTest(
ValuesDataSink.SinkApi sinkApi,
List<TransformDef> transformDefs,
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/TimestampTypeMetadataColumn.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/TimestampTypeMetadataColumn.java
new file mode 100644
index 000000000..c7b086d11
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/TimestampTypeMetadataColumn.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.values.source;
+
+import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+
+import java.util.Map;
+
+/** A {@link SupportedMetadataColumn} for timestamp-type. */
+public class TimestampTypeMetadataColumn implements SupportedMetadataColumn {
+
+ @Override
+ public String getName() {
+ return "timestamp-type";
+ }
+
+ @Override
+ public DataType getType() {
+ return DataTypes.STRING();
+ }
+
+ @Override
+ public Class<?> getJavaClass() {
+ return String.class;
+ }
+
+ @Override
+ public Object read(Map<String, String> metadata) {
+ return metadata.getOrDefault(getName(), null);
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java
index 60ed244a5..5ba1e336d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSource.java
@@ -83,7 +83,9 @@ public class ValuesDataSource implements DataSource {
@Override
public SupportedMetadataColumn[] supportedMetadataColumns() {
- return new SupportedMetadataColumn[] {new OpTsMetadataColumn()};
+ return new SupportedMetadataColumn[] {
+ new OpTsMetadataColumn(), new TimestampTypeMetadataColumn()
+ };
}
/**
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
index af9c1890b..75cba1ad3 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/source/ValuesDataSourceHelper.java
@@ -30,6 +30,7 @@ import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import java.util.ArrayList;
@@ -54,7 +55,8 @@ public class ValuesDataSourceHelper {
SINGLE_SPLIT_MULTI_TABLES,
MULTI_SPLITS_SINGLE_TABLE,
CUSTOM_SOURCE_EVENTS,
- TRANSFORM_TABLE
+ TRANSFORM_TABLE,
+ COMPLEX_COLUMN_NAME_TABLE
}
public static final TableId TABLE_1 =
@@ -120,6 +122,11 @@ public class ValuesDataSourceHelper {
sourceEvents = transformTable();
break;
}
+ case COMPLEX_COLUMN_NAME_TABLE:
+ {
+ sourceEvents = complexColumnNameTable();
+ break;
+ }
default:
throw new IllegalArgumentException(eventType + " is not
supported");
}
@@ -644,4 +651,126 @@ public class ValuesDataSourceHelper {
eventOfSplits.add(split1);
return eventOfSplits;
}
+
+ public static List<List<Event>> complexColumnNameTable() {
+ List<List<Event>> eventOfSplits = new ArrayList<>();
+ List<Event> split1 = new ArrayList<>();
+
+ // create table
+ Schema schema =
+ Schema.newBuilder()
+ .physicalColumn("class", DataTypes.STRING())
+ .physicalColumn("foo-bar", DataTypes.INT())
+ .physicalColumn("bar-foo", DataTypes.INT())
+ .primaryKey("class")
+ .build();
+ CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_1,
schema);
+ split1.add(createTableEvent);
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator((RowType)
schema.toRowDataType());
+ // insert
+ DataChangeEvent insertEvent1 =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("class0"), 0,
0,
+ }),
+ new HashMap<String, String>() {
+ {
+ put("timestamp-type", "type0");
+ }
+ });
+ split1.add(insertEvent1);
+ DataChangeEvent insertEvent2 =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("class1"), 1,
10,
+ }),
+ new HashMap<String, String>() {
+ {
+ put("timestamp-type", "type1");
+ }
+ });
+ split1.add(insertEvent2);
+ DataChangeEvent insertEvent3 =
+ DataChangeEvent.insertEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[]
{BinaryStringData.fromString("class2"), 2, 100}),
+ new HashMap<String, String>() {
+ {
+ put("timestamp-type", "type2");
+ }
+ });
+ split1.add(insertEvent3);
+
+ // add column
+ AddColumnEvent.ColumnWithPosition columnWithPosition =
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("import-package",
DataTypes.STRING()));
+ AddColumnEvent addColumnEvent =
+ new AddColumnEvent(TABLE_1,
Collections.singletonList(columnWithPosition));
+ split1.add(addColumnEvent);
+ schema = SchemaUtils.applySchemaChangeEvent(schema, addColumnEvent);
+
+ // rename column
+ Map<String, String> nameMapping = new HashMap<>();
+ nameMapping.put("bar-foo", "bar-baz");
+ RenameColumnEvent renameColumnEvent = new RenameColumnEvent(TABLE_1,
nameMapping);
+ split1.add(renameColumnEvent);
+ schema = SchemaUtils.applySchemaChangeEvent(schema, renameColumnEvent);
+
+ // drop column
+ DropColumnEvent dropColumnEvent =
+ new DropColumnEvent(TABLE_1,
Collections.singletonList("bar-baz"));
+ split1.add(dropColumnEvent);
+ schema = SchemaUtils.applySchemaChangeEvent(schema, dropColumnEvent);
+
+ generator = new BinaryRecordDataGenerator((RowType)
schema.toRowDataType());
+
+ // delete
+ split1.add(
+ DataChangeEvent.deleteEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("class1"),
+ 1,
+ BinaryStringData.fromString(""),
+ }),
+ new HashMap<String, String>() {
+ {
+ put("timestamp-type", "type1");
+ }
+ }));
+
+ // update
+ split1.add(
+ DataChangeEvent.updateEvent(
+ TABLE_1,
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("class2"),
+ 2,
+ BinaryStringData.fromString("")
+ }),
+ generator.generate(
+ new Object[] {
+ BinaryStringData.fromString("new-class2"),
+ 20,
+
BinaryStringData.fromString("new-package2"),
+ }),
+ new HashMap<String, String>() {
+ {
+ put("timestamp-type", "type2");
+ }
+ }));
+
+ eventOfSplits.add(split1);
+ return eventOfSplits;
+ }
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index c7c69fa6c..a12e97f93 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -409,7 +409,7 @@ public class PostTransformOperator extends
AbstractStreamOperator<Event>
Optional<TransformFilter> transformFilterOptional =
transform.getFilter();
if (transformFilterOptional.isPresent()
- && transformFilterOptional.get().isVaild()) {
+ && transformFilterOptional.get().isValid()) {
TransformFilter transformFilter =
transformFilterOptional.get();
if (!transformFilterProcessorMap.containsKey(
Tuple2.of(tableId, transformFilter))) {
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
index 58f95392b..bd2e19695 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumn.java
@@ -24,7 +24,9 @@ import org.apache.flink.cdc.common.utils.StringUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* The ProjectionColumn applies to describe the information of the
transformation column. If it only
@@ -48,17 +50,19 @@ public class ProjectionColumn implements Serializable {
private final String expression;
private final String scriptExpression;
private final List<String> originalColumnNames;
- private TransformExpressionKey transformExpressionKey;
+ private final Map<String, String> columnNameMap;
public ProjectionColumn(
Column column,
String expression,
String scriptExpression,
- List<String> originalColumnNames) {
+ List<String> originalColumnNames,
+ Map<String, String> columnNameMap) {
this.column = column;
this.expression = expression;
this.scriptExpression = scriptExpression;
this.originalColumnNames = originalColumnNames;
+ this.columnNameMap = columnNameMap;
}
public ProjectionColumn copy() {
@@ -66,7 +70,8 @@ public class ProjectionColumn implements Serializable {
column.copy(column.getName()),
expression,
scriptExpression,
- new ArrayList<>(originalColumnNames));
+ new ArrayList<>(originalColumnNames),
+ new HashMap<>(columnNameMap));
}
public Column getColumn() {
@@ -89,8 +94,8 @@ public class ProjectionColumn implements Serializable {
return originalColumnNames;
}
- public void setTransformExpressionKey(TransformExpressionKey
transformExpressionKey) {
- this.transformExpressionKey = transformExpressionKey;
+ public Map<String, String> getColumnNameMap() {
+ return columnNameMap;
}
public boolean isValidTransformedProjectionColumn() {
@@ -102,9 +107,11 @@ public class ProjectionColumn implements Serializable {
* Just like column {@code id} in {@code id, name AS new_name, age + 1 AS
new_age}. <br>
* Comments and default expressions will be intact.
*/
- public static ProjectionColumn ofForwarded(Column column) {
+ public static ProjectionColumn ofForwarded(Column column, String
mappedColumnName) {
String name = column.getName();
- return new ProjectionColumn(column, name, name,
Collections.singletonList(name));
+ Map<String, String> columnNameMap = Collections.singletonMap(name,
mappedColumnName);
+ return new ProjectionColumn(
+ column, name, mappedColumnName,
Collections.singletonList(name), columnNameMap);
}
/**
@@ -112,13 +119,17 @@ public class ProjectionColumn implements Serializable {
* Just like column {@code new_name} in {@code id, name AS new_name, age +
1 AS new_age}. <br>
* Comments and default expressions will be intact.
*/
- public static ProjectionColumn ofAliased(Column column, String newName) {
+ public static ProjectionColumn ofAliased(
+ Column column, String newName, String mappedColumnName) {
String originalName = column.getName();
+ Map<String, String> columnNameMap =
+ Collections.singletonMap(originalName, mappedColumnName);
return new ProjectionColumn(
column.copy(newName),
originalName,
- originalName,
- Collections.singletonList(originalName));
+ mappedColumnName,
+ Collections.singletonList(originalName),
+ columnNameMap);
}
/**
@@ -131,12 +142,14 @@ public class ProjectionColumn implements Serializable {
DataType dataType,
String expression,
String scriptExpression,
- List<String> originalColumnNames) {
+ List<String> originalColumnNames,
+ Map<String, String> columnNameMap) {
return new ProjectionColumn(
Column.physicalColumn(columnName, dataType),
expression,
scriptExpression,
- originalColumnNames);
+ originalColumnNames,
+ columnNameMap);
}
@Override
@@ -152,8 +165,8 @@ public class ProjectionColumn implements Serializable {
+ '\''
+ ", originalColumnNames="
+ originalColumnNames
- + ", transformExpressionKey="
- + transformExpressionKey
+ + ", columnNameMap="
+ + columnNameMap
+ '}';
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
index 83de37926..903a2b9b9 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java
@@ -99,10 +99,11 @@ public class ProjectionColumnProcessor {
return expressionEvaluator.evaluate(generateParams(record,
epochTime, opType, meta));
} catch (InvocationTargetException e) {
LOG.error(
- "Table:{} column:{} projection:{} execute failed. {}",
+ "Table:{} column:{} projection:{} column name map:{}
execute failed. {}",
tableInfo.getName(),
projectionColumn.getColumnName(),
projectionColumn.getScriptExpression(),
+ projectionColumn.getColumnNameMap(),
e);
throw new RuntimeException(e);
}
@@ -176,12 +177,13 @@ public class ProjectionColumnProcessor {
List<Class<?>> paramTypes = new ArrayList<>();
List<Column> columns =
tableInfo.getPreTransformedSchema().getColumns();
String scriptExpression = projectionColumn.getScriptExpression();
+ Map<String, String> columnNameMap =
projectionColumn.getColumnNameMap();
LinkedHashSet<String> originalColumnNames =
new LinkedHashSet<>(projectionColumn.getOriginalColumnNames());
for (String originalColumnName : originalColumnNames) {
for (Column column : columns) {
if (column.getName().equals(originalColumnName)) {
- argumentNames.add(originalColumnName);
+ argumentNames.add(columnNameMap.get(originalColumnName));
paramTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
break;
}
@@ -192,7 +194,7 @@ public class ProjectionColumnProcessor {
.findFirst()
.ifPresent(
col -> {
- argumentNames.add(col.f0);
+ argumentNames.add(columnNameMap.get(col.f0));
paramTypes.add(col.f2);
});
Stream.of(supportedMetadataColumns)
@@ -200,7 +202,7 @@ public class ProjectionColumnProcessor {
.findFirst()
.ifPresent(
col -> {
- argumentNames.add(col.getName());
+
argumentNames.add(columnNameMap.get(col.getName()));
paramTypes.add(col.getJavaClass());
});
}
@@ -214,6 +216,7 @@ public class ProjectionColumnProcessor {
JaninoCompiler.loadSystemFunction(scriptExpression),
argumentNames,
paramTypes,
-
DataTypeConverter.convertOriginalClass(projectionColumn.getDataType()));
+
DataTypeConverter.convertOriginalClass(projectionColumn.getDataType()),
+ columnNameMap);
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
index 326ae7ba7..06749a29f 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java
@@ -74,8 +74,9 @@ public class TransformExpressionCompiler {
expressionEvaluator.cook(key.getExpression());
} catch (CompileException e) {
throw new InvalidProgramException(
- "Expression cannot be compiled. This is a
bug. Please file an issue.\nExpression: "
- + key.getExpression(),
+ String.format(
+ "Expression cannot be compiled.
This is a bug. Please file an issue.\nExpression: %s\nColumn name map: %s",
+ key.getExpression(),
key.getColumnNameMap()),
e);
}
return expressionEvaluator;
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
index 0cbf1f827..927ab2711 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionKey.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.runtime.operators.transform;
import java.io.Serializable;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
/**
@@ -31,6 +32,8 @@ import java.util.Objects;
* <li>argumentNames: a list for the argument names in expression.
* <li>argumentClasses: a list for the argument classes in expression.
* <li>returnClass: a class for the return class in expression
+ * <li>columnNameMap: a map whose key is the original column name and value
is the mapped column
+ * name
* </ul>
*/
public class TransformExpressionKey implements Serializable {
@@ -39,16 +42,19 @@ public class TransformExpressionKey implements Serializable
{
private final List<String> argumentNames;
private final List<Class<?>> argumentClasses;
private final Class<?> returnClass;
+ private final Map<String, String> columnNameMap;
private TransformExpressionKey(
String expression,
List<String> argumentNames,
List<Class<?>> argumentClasses,
- Class<?> returnClass) {
+ Class<?> returnClass,
+ Map<String, String> columnNameMap) {
this.expression = expression;
this.argumentNames = argumentNames;
this.argumentClasses = argumentClasses;
this.returnClass = returnClass;
+ this.columnNameMap = columnNameMap;
}
public String getExpression() {
@@ -67,12 +73,18 @@ public class TransformExpressionKey implements Serializable
{
return returnClass;
}
+ public Map<String, String> getColumnNameMap() {
+ return columnNameMap;
+ }
+
public static TransformExpressionKey of(
String expression,
List<String> argumentNames,
List<Class<?>> argumentClasses,
- Class<?> returnClass) {
- return new TransformExpressionKey(expression, argumentNames,
argumentClasses, returnClass);
+ Class<?> returnClass,
+ Map<String, String> columnNameMap) {
+ return new TransformExpressionKey(
+ expression, argumentNames, argumentClasses, returnClass,
columnNameMap);
}
@Override
@@ -87,11 +99,12 @@ public class TransformExpressionKey implements Serializable
{
return expression.equals(that.expression)
&& argumentNames.equals(that.argumentNames)
&& argumentClasses.equals(that.argumentClasses)
- && returnClass.equals(that.returnClass);
+ && returnClass.equals(that.returnClass)
+ && columnNameMap.equals(that.columnNameMap);
}
@Override
public int hashCode() {
- return Objects.hash(expression, argumentNames, argumentClasses,
returnClass);
+ return Objects.hash(expression, argumentNames, argumentClasses,
returnClass, columnNameMap);
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
index 4f1aee57e..f12880d41 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java
@@ -22,6 +22,7 @@ import org.apache.flink.cdc.runtime.parser.TransformParser;
import java.io.Serializable;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
/**
@@ -41,11 +42,17 @@ public class TransformFilter implements Serializable {
private final String expression;
private final String scriptExpression;
private final List<String> columnNames;
+ private final Map<String, String> columnNameMap;
- public TransformFilter(String expression, String scriptExpression,
List<String> columnNames) {
+ public TransformFilter(
+ String expression,
+ String scriptExpression,
+ List<String> columnNames,
+ Map<String, String> columnNameMap) {
this.expression = expression;
this.scriptExpression = scriptExpression;
this.columnNames = columnNames;
+ this.columnNameMap = columnNameMap;
}
public String getExpression() {
@@ -60,19 +67,26 @@ public class TransformFilter implements Serializable {
return columnNames;
}
+ public Map<String, String> getColumnNameMap() {
+ return columnNameMap;
+ }
+
public static Optional<TransformFilter> of(
String filterExpression, List<UserDefinedFunctionDescriptor>
udfDescriptors) {
if (StringUtils.isNullOrWhitespaceOnly(filterExpression)) {
return Optional.empty();
}
List<String> columnNames =
TransformParser.parseFilterColumnNameList(filterExpression);
+ Map<String, String> columnNameMap =
TransformParser.generateColumnNameMap(columnNames);
String scriptExpression =
TransformParser.translateFilterExpressionToJaninoExpression(
- filterExpression, udfDescriptors);
- return Optional.of(new TransformFilter(filterExpression,
scriptExpression, columnNames));
+ filterExpression, udfDescriptors, columnNameMap);
+ return Optional.of(
+ new TransformFilter(
+ filterExpression, scriptExpression, columnNames,
columnNameMap));
}
- public boolean isVaild() {
+ public boolean isValid() {
return !columnNames.isEmpty();
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
index 220f61109..d84209e78 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java
@@ -96,45 +96,46 @@ public class TransformFilterProcessor {
expressionEvaluator.evaluate(generateParams(record,
epochTime, opType, meta));
} catch (InvocationTargetException e) {
LOG.error(
- "Table:{} filter:{} execute failed. {}",
+ "Table:{} filter:{} column name map:{} execute failed. {}",
tableInfo.getName(),
transformFilter.getExpression(),
+ transformFilter.getColumnNameMap(),
e);
throw new RuntimeException(e);
}
}
- private Tuple2<List<String>, List<Class<?>>> generateArguments() {
+ private Tuple2<List<String>, List<Class<?>>> generateArguments(boolean
mapColumnNames) {
List<String> argNames = new ArrayList<>();
List<Class<?>> argTypes = new ArrayList<>();
- String scriptExpression = transformFilter.getScriptExpression();
+ String expression = transformFilter.getExpression();
List<Column> columns =
tableInfo.getPreTransformedSchema().getColumns();
+ Map<String, String> columnNameMap = transformFilter.getColumnNameMap();
LinkedHashSet<String> columnNames = new
LinkedHashSet<>(transformFilter.getColumnNames());
for (String columnName : columnNames) {
for (Column column : columns) {
if (column.getName().equals(columnName)) {
- argNames.add(columnName);
+ argNames.add(mapColumnNames ?
columnNameMap.get(columnName) : columnName);
argTypes.add(DataTypeConverter.convertOriginalClass(column.getType()));
break;
}
}
}
- METADATA_COLUMNS.stream()
- .forEach(
- col -> {
- if (scriptExpression.contains(col.f0) &&
!argNames.contains(col.f0)) {
- argNames.add(col.f0);
- argTypes.add(col.f2);
- }
- });
+ METADATA_COLUMNS.forEach(
+ col -> {
+ if (expression.contains(col.f0) &&
!argNames.contains(col.f0)) {
+ argNames.add(mapColumnNames ?
columnNameMap.get(col.f0) : col.f0);
+ argTypes.add(col.f2);
+ }
+ });
supportedMetadataColumns
.keySet()
.forEach(
colName -> {
- if (scriptExpression.contains(colName) &&
!argNames.contains(colName)) {
- argNames.add(colName);
+ if (expression.contains(colName) &&
!argNames.contains(colName)) {
+ argNames.add(mapColumnNames ?
columnNameMap.get(colName) : colName);
argTypes.add(supportedMetadataColumns.get(colName).getJavaClass());
}
});
@@ -147,7 +148,7 @@ public class TransformFilterProcessor {
List<Column> columns =
tableInfo.getPreTransformedSchema().getColumns();
// 1 - Add referenced columns
- Tuple2<List<String>, List<Class<?>>> args = generateArguments();
+ Tuple2<List<String>, List<Class<?>>> args = generateArguments(false);
RecordData.FieldGetter[] fieldGetters =
tableInfo.getPreTransformedFieldGetters();
for (String columnName : args.f0) {
switch (columnName) {
@@ -191,7 +192,7 @@ public class TransformFilterProcessor {
}
private TransformExpressionKey generateTransformExpressionKey() {
- Tuple2<List<String>, List<Class<?>>> args = generateArguments();
+ Tuple2<List<String>, List<Class<?>>> args = generateArguments(true);
args.f0.add(JaninoCompiler.DEFAULT_TIME_ZONE);
args.f1.add(String.class);
@@ -202,6 +203,7 @@ public class TransformFilterProcessor {
JaninoCompiler.loadSystemFunction(transformFilter.getScriptExpression()),
args.f0,
args.f1,
- Boolean.class);
+ Boolean.class,
+ transformFilter.getColumnNameMap());
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index d9f644b0b..c5bcb3139 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -42,6 +42,7 @@ import org.codehaus.janino.Java;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
/**
@@ -105,8 +106,11 @@ public class JaninoCompiler {
}
public static String translateSqlNodeToJaninoExpression(
- SqlNode transform, List<UserDefinedFunctionDescriptor>
udfDescriptors) {
- Java.Rvalue rvalue = translateSqlNodeToJaninoRvalue(transform,
udfDescriptors);
+ SqlNode transform,
+ List<UserDefinedFunctionDescriptor> udfDescriptors,
+ Map<String, String> columnNameMap) {
+ Java.Rvalue rvalue =
+ translateSqlNodeToJaninoRvalue(transform, udfDescriptors,
columnNameMap);
if (rvalue != null) {
return rvalue.toString();
}
@@ -114,20 +118,23 @@ public class JaninoCompiler {
}
public static Java.Rvalue translateSqlNodeToJaninoRvalue(
- SqlNode transform, List<UserDefinedFunctionDescriptor>
udfDescriptors) {
+ SqlNode transform,
+ List<UserDefinedFunctionDescriptor> udfDescriptors,
+ Map<String, String> columnNameMap) {
if (transform instanceof SqlIdentifier) {
- return translateSqlIdentifier((SqlIdentifier) transform);
+ return translateSqlIdentifier((SqlIdentifier) transform,
columnNameMap);
} else if (transform instanceof SqlBasicCall) {
- return translateSqlBasicCall((SqlBasicCall) transform,
udfDescriptors);
+ return translateSqlBasicCall((SqlBasicCall) transform,
udfDescriptors, columnNameMap);
} else if (transform instanceof SqlCase) {
- return translateSqlCase((SqlCase) transform, udfDescriptors);
+ return translateSqlCase((SqlCase) transform, udfDescriptors,
columnNameMap);
} else if (transform instanceof SqlLiteral) {
return translateSqlSqlLiteral((SqlLiteral) transform);
}
return null;
}
- private static Java.Rvalue translateSqlIdentifier(SqlIdentifier
sqlIdentifier) {
+ private static Java.Rvalue translateSqlIdentifier(
+ SqlIdentifier sqlIdentifier, Map<String, String> columnNameMap) {
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size()
- 1);
if
(TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(columnName.toUpperCase())) {
return generateTimezoneFreeTemporalFunctionOperation(columnName);
@@ -139,7 +146,9 @@ public class JaninoCompiler {
columnName.toUpperCase())) {
return
generateTimezoneRequiredTemporalConversionFunctionOperation(columnName);
} else {
- return new Java.AmbiguousName(Location.NOWHERE, new String[]
{columnName});
+ return new Java.AmbiguousName(
+ Location.NOWHERE,
+ new String[] {columnNameMap.getOrDefault(columnName,
columnName)});
}
}
@@ -166,11 +175,13 @@ public class JaninoCompiler {
}
private static Java.Rvalue translateSqlBasicCall(
- SqlBasicCall sqlBasicCall, List<UserDefinedFunctionDescriptor>
udfDescriptors) {
+ SqlBasicCall sqlBasicCall,
+ List<UserDefinedFunctionDescriptor> udfDescriptors,
+ Map<String, String> columnNameMap) {
List<SqlNode> operandList = sqlBasicCall.getOperandList();
List<Java.Rvalue> atoms = new ArrayList<>();
for (SqlNode sqlNode : operandList) {
- translateSqlNodeToAtoms(sqlNode, atoms, udfDescriptors);
+ translateSqlNodeToAtoms(sqlNode, atoms, udfDescriptors,
columnNameMap);
}
if (TIMEZONE_FREE_TEMPORAL_FUNCTIONS.contains(
sqlBasicCall.getOperator().getName().toUpperCase())) {
@@ -188,19 +199,22 @@ public class JaninoCompiler {
}
private static Java.Rvalue translateSqlCase(
- SqlCase sqlCase, List<UserDefinedFunctionDescriptor>
udfDescriptors) {
+ SqlCase sqlCase,
+ List<UserDefinedFunctionDescriptor> udfDescriptors,
+ Map<String, String> columnNameMap) {
SqlNodeList whenOperands = sqlCase.getWhenOperands();
SqlNodeList thenOperands = sqlCase.getThenOperands();
SqlNode elseOperand = sqlCase.getElseOperand();
List<Java.Rvalue> whenAtoms = new ArrayList<>();
for (SqlNode sqlNode : whenOperands) {
- translateSqlNodeToAtoms(sqlNode, whenAtoms, udfDescriptors);
+ translateSqlNodeToAtoms(sqlNode, whenAtoms, udfDescriptors,
columnNameMap);
}
List<Java.Rvalue> thenAtoms = new ArrayList<>();
for (SqlNode sqlNode : thenOperands) {
- translateSqlNodeToAtoms(sqlNode, thenAtoms, udfDescriptors);
+ translateSqlNodeToAtoms(sqlNode, thenAtoms, udfDescriptors,
columnNameMap);
}
- Java.Rvalue elseAtoms = translateSqlNodeToJaninoRvalue(elseOperand,
udfDescriptors);
+ Java.Rvalue elseAtoms =
+ translateSqlNodeToJaninoRvalue(elseOperand, udfDescriptors,
columnNameMap);
Java.Rvalue sqlCaseRvalueTemp = elseAtoms;
for (int i = whenAtoms.size() - 1; i >= 0; i--) {
sqlCaseRvalueTemp =
@@ -216,19 +230,20 @@ public class JaninoCompiler {
private static void translateSqlNodeToAtoms(
SqlNode sqlNode,
List<Java.Rvalue> atoms,
- List<UserDefinedFunctionDescriptor> udfDescriptors) {
+ List<UserDefinedFunctionDescriptor> udfDescriptors,
+ Map<String, String> columnNameMap) {
if (sqlNode instanceof SqlIdentifier) {
- atoms.add(translateSqlIdentifier((SqlIdentifier) sqlNode));
+ atoms.add(translateSqlIdentifier((SqlIdentifier) sqlNode,
columnNameMap));
} else if (sqlNode instanceof SqlLiteral) {
atoms.add(translateSqlSqlLiteral((SqlLiteral) sqlNode));
} else if (sqlNode instanceof SqlBasicCall) {
- atoms.add(translateSqlBasicCall((SqlBasicCall) sqlNode,
udfDescriptors));
+ atoms.add(translateSqlBasicCall((SqlBasicCall) sqlNode,
udfDescriptors, columnNameMap));
} else if (sqlNode instanceof SqlNodeList) {
for (SqlNode node : (SqlNodeList) sqlNode) {
- translateSqlNodeToAtoms(node, atoms, udfDescriptors);
+ translateSqlNodeToAtoms(node, atoms, udfDescriptors,
columnNameMap);
}
} else if (sqlNode instanceof SqlCase) {
- atoms.add(translateSqlCase((SqlCase) sqlNode, udfDescriptors));
+ atoms.add(translateSqlCase((SqlCase) sqlNode, udfDescriptors,
columnNameMap));
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
index ee29bbf2c..1fc3101ae 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
@@ -95,6 +95,8 @@ public class TransformParser {
private static final Logger LOG =
LoggerFactory.getLogger(TransformParser.class);
private static final String DEFAULT_SCHEMA = "default_schema";
private static final String DEFAULT_TABLE = "TB";
+ private static final String MAPPED_COLUMN_NAME_PREFIX = "$";
+ private static final String MAPPED_SINGLE_COLUMN_NAME =
MAPPED_COLUMN_NAME_PREFIX + "0";
private static SqlParser getCalciteParser(String sql) {
return SqlParser.create(
@@ -334,6 +336,8 @@ public class TransformParser {
columnName,
supportedMetadataColumns);
} else {
+ List<String> originalColumnNames =
parseColumnNameList(exprNode);
+ Map<String, String> columnNameMap =
generateColumnNameMap(originalColumnNames);
projectionColumn =
ProjectionColumn.ofCalculated(
columnName,
@@ -341,8 +345,9 @@ public class TransformParser {
relDataTypeMap.get(columnName)),
exprNode.toString(),
JaninoCompiler.translateSqlNodeToJaninoExpression(
- exprNode, udfDescriptors),
- parseColumnNameList(exprNode));
+ exprNode, udfDescriptors,
columnNameMap),
+ originalColumnNames,
+ columnNameMap);
}
}
// ... or an existing column's name identifier.
@@ -384,6 +389,8 @@ public class TransformParser {
String identifier,
String projectedColumnName,
SupportedMetadataColumn[] supportedMetadataColumns) {
+ Map<String, String> columnNameMap =
+ Collections.singletonMap(identifier,
MAPPED_SINGLE_COLUMN_NAME);
if (isMetadataColumn(identifier, supportedMetadataColumns)) {
// For a metadata column, we simply generate a projection column
with the same
return ProjectionColumn.ofCalculated(
@@ -393,8 +400,9 @@ public class TransformParser {
relDataTypeMap.get(projectedColumnName))
.notNull(),
identifier,
- identifier,
- Collections.singletonList(identifier));
+ columnNameMap.get(identifier),
+ Collections.singletonList(identifier),
+ columnNameMap);
}
Preconditions.checkArgument(
@@ -404,14 +412,17 @@ public class TransformParser {
Column column = originalColumnMap.get(identifier);
if (Objects.equals(identifier, projectedColumnName)) {
- return ProjectionColumn.ofForwarded(column);
+ return ProjectionColumn.ofForwarded(column,
MAPPED_SINGLE_COLUMN_NAME);
} else {
- return ProjectionColumn.ofAliased(column, projectedColumnName);
+ return ProjectionColumn.ofAliased(
+ column, projectedColumnName, MAPPED_SINGLE_COLUMN_NAME);
}
}
public static String translateFilterExpressionToJaninoExpression(
- String filterExpression, List<UserDefinedFunctionDescriptor>
udfDescriptors) {
+ String filterExpression,
+ List<UserDefinedFunctionDescriptor> udfDescriptors,
+ Map<String, String> columnNameMap) {
if (isNullOrWhitespaceOnly(filterExpression)) {
return "";
}
@@ -420,7 +431,8 @@ public class TransformParser {
return "";
}
SqlNode where = sqlSelect.getWhere();
- return JaninoCompiler.translateSqlNodeToJaninoExpression(where,
udfDescriptors);
+ return JaninoCompiler.translateSqlNodeToJaninoExpression(
+ where, udfDescriptors, columnNameMap);
}
public static List<String> parseComputedColumnNames(
@@ -642,4 +654,16 @@ public class TransformParser {
return false;
}
}
+
+ public static Map<String, String> generateColumnNameMap(List<String>
originalColumnNames) {
+ int i = 0;
+ Map<String, String> columnNameMap = new HashMap<>();
+ for (String columnName : originalColumnNames) {
+ if (!columnNameMap.containsKey(columnName)) {
+ columnNameMap.put(columnName, MAPPED_COLUMN_NAME_PREFIX + i);
+ i++;
+ }
+ }
+ return columnNameMap;
+ }
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 07ae5369b..cf84a8214 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -362,6 +362,20 @@ public class PostTransformOperatorTest {
.primaryKey("id")
.build();
+ private static final TableId COL_NAME_MAPPING_TABLEID =
+ TableId.tableId("my_company", "my_branch",
"col_name_mapping_table");
+ private static final Schema COL_NAME_MAPPING_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("foo", DataTypes.INT())
+ .physicalColumn("bar", DataTypes.INT())
+ .physicalColumn("foo-bar", DataTypes.INT())
+ .physicalColumn("bar-foo", DataTypes.INT())
+ .physicalColumn("class", DataTypes.STRING())
+ .physicalColumn("f0", DataTypes.INT())
+ .physicalColumn("f1", DataTypes.INT())
+ .physicalColumn("f2", DataTypes.INT())
+ .build();
+
@Test
void testDataChangeEventTransform() throws Exception {
PostTransformOperator transform =
@@ -3249,4 +3263,100 @@ public class PostTransformOperatorTest {
.isEqualTo(new StreamRecord<>(updateEventExpect));
transformFunctionEventEventOperatorTestHarness.close();
}
+
+ @Test
+ void testColumnNameMapping() throws Exception {
+ PostTransformOperator transform =
+ PostTransformOperator.newBuilder()
+ .addTransform(
+ COL_NAME_MAPPING_TABLEID.identifier(),
+ "*, class, foo-bar AS f0, bar-foo AS f1,
`foo-bar`-`bar-foo` AS f2",
+ "`foo-bar`-`bar-foo` <> 0")
+ .build();
+ RegularEventOperatorTestHarness<PostTransformOperator, Event>
+ transformFunctionEventEventOperatorTestHarness =
+ RegularEventOperatorTestHarness.with(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(COL_NAME_MAPPING_TABLEID,
COL_NAME_MAPPING_SCHEMA);
+ BinaryRecordDataGenerator recordDataGenerator =
+ new BinaryRecordDataGenerator(((RowType)
COL_NAME_MAPPING_SCHEMA.toRowDataType()));
+ // Insert
+ DataChangeEvent insertEvent =
+ DataChangeEvent.insertEvent(
+ COL_NAME_MAPPING_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ 1,
+ 2,
+ 3,
+ 4,
+ BinaryStringData.fromString("class0"),
+ null,
+ null,
+ null
+ }));
+ DataChangeEvent insertEventExpect =
+ DataChangeEvent.insertEvent(
+ COL_NAME_MAPPING_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ 1, 2, 3, 4,
BinaryStringData.fromString("class0"), -1, 1, -1
+ }));
+ // Update
+ DataChangeEvent updateEvent =
+ DataChangeEvent.updateEvent(
+ COL_NAME_MAPPING_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ 1,
+ 2,
+ 3,
+ 4,
+ BinaryStringData.fromString("class0"),
+ null,
+ null,
+ null
+ }),
+ recordDataGenerator.generate(
+ new Object[] {
+ 2,
+ 4,
+ 6,
+ 8,
+ BinaryStringData.fromString("class1"),
+ null,
+ null,
+ null
+ }));
+ DataChangeEvent updateEventExpect =
+ DataChangeEvent.updateEvent(
+ COL_NAME_MAPPING_TABLEID,
+ recordDataGenerator.generate(
+ new Object[] {
+ 1, 2, 3, 4,
BinaryStringData.fromString("class0"), -1, 1, -1
+ }),
+ recordDataGenerator.generate(
+ new Object[] {
+ 2, 4, 6, 8,
BinaryStringData.fromString("class1"), -2, 2, -2
+ }));
+
+ transform.processElement(new StreamRecord<>(createTableEvent));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(
+ COL_NAME_MAPPING_TABLEID,
COL_NAME_MAPPING_SCHEMA)));
+ transform.processElement(new StreamRecord<>(insertEvent));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(insertEventExpect));
+ transform.processElement(new StreamRecord<>(updateEvent));
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(new StreamRecord<>(updateEventExpect));
+ }
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
index 441d6f86c..763dee7c4 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java
@@ -182,6 +182,24 @@ public class PreTransformOperatorTest {
.primaryKey("id")
.build();
+ private static final Schema COL_NAME_MAPPING_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("foo", DataTypes.INT())
+ .physicalColumn("bar", DataTypes.INT())
+ .physicalColumn("foo-bar", DataTypes.INT())
+ .physicalColumn("bar-foo", DataTypes.INT())
+ .physicalColumn("class", DataTypes.INT())
+ .build();
+
+ private static final Schema EXPECTED_COL_NAME_MAPPING_SCHEMA =
+ Schema.newBuilder()
+ .physicalColumn("foo", DataTypes.INT())
+ .physicalColumn("bar", DataTypes.INT())
+ .physicalColumn("foo-bar", DataTypes.INT())
+ .physicalColumn("bar-foo", DataTypes.INT())
+ .physicalColumn("class", DataTypes.INT())
+ .build();
+
@Test
void testEventTransform() throws Exception {
PreTransformOperator transform =
@@ -681,4 +699,33 @@ public class PreTransformOperatorTest {
new CreateTableEvent(CUSTOMERS_TABLEID,
MULTITRANSFORM_SCHEMA)));
transformFunctionEventEventOperatorTestHarness.close();
}
+
+ @Test
+ void testColumnNameMapping() throws Exception {
+ PreTransformOperator transform =
+ PreTransformOperator.newBuilder()
+ .addTransform(
+ CUSTOMERS_TABLEID.identifier(),
+ "foo, `foo-bar`, foo-bar AS f0, `bar-foo` AS
f1, class",
+ " `foo-bar` > 1 and foo-bar > 1 and class > 1")
+ .build();
+
+ RegularEventOperatorTestHarness<PreTransformOperator, Event>
+ transformFunctionEventEventOperatorTestHarness =
+ RegularEventOperatorTestHarness.with(transform, 1);
+ // Initialization
+ transformFunctionEventEventOperatorTestHarness.open();
+ // Create table
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(CUSTOMERS_TABLEID,
COL_NAME_MAPPING_SCHEMA);
+ transform.processElement(new StreamRecord<>(createTableEvent));
+
+ Assertions.assertThat(
+
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
+ .isEqualTo(
+ new StreamRecord<>(
+ new CreateTableEvent(
+ CUSTOMERS_TABLEID,
EXPECTED_COL_NAME_MAPPING_SCHEMA)));
+ transformFunctionEventEventOperatorTestHarness.close();
+ }
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java
index 50521e2a5..3c0af3d39 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/TransformOperatorWithSchemaEvolveTest.java
@@ -21,6 +21,7 @@ import
org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
@@ -29,7 +30,10 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.types.RowType;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
import
org.apache.flink.cdc.runtime.testutils.operators.RegularEventOperatorTestHarness;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
@@ -72,6 +76,10 @@ public class TransformOperatorWithSchemaEvolveTest {
private PreTransformOperator preTransformOperator;
private PostTransformOperator postTransformOperator;
+ private BinaryRecordDataGenerator sourceRecordGenerator;
+ private BinaryRecordDataGenerator preTransformedRecordGenerator;
+ private BinaryRecordDataGenerator postTransformedRecordGenerator;
+
private RegularEventOperatorTestHarness<PreTransformOperator, Event>
preTransformOperatorHarness;
private RegularEventOperatorTestHarness<PostTransformOperator, Event>
@@ -101,6 +109,16 @@ public class TransformOperatorWithSchemaEvolveTest {
public TransformWithSchemaEvolveTestCase
evolveFromSource(SchemaChangeEvent event) {
sourceEvents.add(event);
+ sourceSchema = SchemaUtils.applySchemaChangeEvent(sourceSchema,
event);
+ sourceRecordGenerator =
+ new BinaryRecordDataGenerator((RowType)
sourceSchema.toRowDataType());
+ return this;
+ }
+
+ public TransformWithSchemaEvolveTestCase insertSource(Object...
record) {
+ sourceEvents.add(
+ DataChangeEvent.insertEvent(
+ tableId,
sourceRecordGenerator.generate(stringify(record))));
return this;
}
@@ -111,6 +129,16 @@ public class TransformOperatorWithSchemaEvolveTest {
public TransformWithSchemaEvolveTestCase
expectInPreTransformed(SchemaChangeEvent event) {
preTransformedEvents.add(event);
+ preTransformedSchema =
SchemaUtils.applySchemaChangeEvent(preTransformedSchema, event);
+ preTransformedRecordGenerator =
+ new BinaryRecordDataGenerator((RowType)
preTransformedSchema.toRowDataType());
+ return this;
+ }
+
+ public TransformWithSchemaEvolveTestCase
expectInPreTransformed(Object... record) {
+ preTransformedEvents.add(
+ DataChangeEvent.insertEvent(
+ tableId,
preTransformedRecordGenerator.generate(stringify(record))));
return this;
}
@@ -121,6 +149,17 @@ public class TransformOperatorWithSchemaEvolveTest {
public TransformWithSchemaEvolveTestCase
expectInPostTransformed(SchemaChangeEvent event) {
postTransformedEvents.add(event);
+ postTransformedSchema =
+ SchemaUtils.applySchemaChangeEvent(postTransformedSchema,
event);
+ postTransformedRecordGenerator =
+ new BinaryRecordDataGenerator((RowType)
postTransformedSchema.toRowDataType());
+ return this;
+ }
+
+ public TransformWithSchemaEvolveTestCase
expectInPostTransformed(Object... event) {
+ postTransformedEvents.add(
+ DataChangeEvent.insertEvent(
+ tableId,
postTransformedRecordGenerator.generate(stringify(event))));
return this;
}
@@ -139,6 +178,13 @@ public class TransformOperatorWithSchemaEvolveTest {
this.preTransformedSchema = preTransformedSchema;
this.postTransformedSchema = postTransformedSchema;
+ this.sourceRecordGenerator =
+ new BinaryRecordDataGenerator((RowType)
sourceSchema.toRowDataType());
+ this.preTransformedRecordGenerator =
+ new BinaryRecordDataGenerator((RowType)
preTransformedSchema.toRowDataType());
+ this.postTransformedRecordGenerator =
+ new BinaryRecordDataGenerator((RowType)
postTransformedSchema.toRowDataType());
+
this.sourceEvents = new ArrayList<>();
this.preTransformedEvents = new ArrayList<>();
this.postTransformedEvents = new ArrayList<>();
@@ -793,4 +839,87 @@ public class TransformOperatorWithSchemaEvolveTest {
"name"))))
.runTests("inserting columns at last");
}
+
+ /** This case tests column name map when schema evolution happens. */
+ @Test
+ public void testSchemaChangeWithColumnNameMap() throws Exception {
+ TableId tableId = TableId.tableId("my_company", "my_branch",
"data_changes");
+ TransformWithSchemaEvolveTestCase.of(
+ tableId,
+ "*, foo-bar as computed",
+ "class <> 'class0'",
+ Schema.newBuilder()
+ .physicalColumn("foo", DataTypes.INT())
+ .physicalColumn("bar", DataTypes.INT())
+ .physicalColumn("foo-bar", DataTypes.INT())
+ .physicalColumn("class", DataTypes.STRING())
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("foo", DataTypes.INT())
+ .physicalColumn("bar", DataTypes.INT())
+ .physicalColumn("foo-bar", DataTypes.INT())
+ .physicalColumn("class", DataTypes.STRING())
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("foo", DataTypes.INT())
+ .physicalColumn("bar", DataTypes.INT())
+ .physicalColumn("foo-bar", DataTypes.INT())
+ .physicalColumn("class", DataTypes.STRING())
+ .physicalColumn("computed", DataTypes.INT())
+ .build())
+ .initializeHarness()
+ .runTests("initializing table")
+ .insertSource(0, 0, 0, "class0")
+ .expectInPreTransformed(0, 0, 0, "class0")
+ .expectNothingInPostTransformed()
+ .insertSource(1, 2, 3, "class1")
+ .expectInPreTransformed(1, 2, 3, "class1")
+ .expectInPostTransformed(1, 2, 3, "class1", -1)
+ .evolveFromSource(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+
Column.physicalColumn("bar-foo", DataTypes.INT()),
+
AddColumnEvent.ColumnPosition.FIRST,
+ null))))
+ .expectInPreTransformed(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+
Column.physicalColumn("bar-foo", DataTypes.INT()),
+
AddColumnEvent.ColumnPosition.BEFORE,
+ "foo"))))
+ .expectInPostTransformed(
+ new AddColumnEvent(
+ tableId,
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+
Column.physicalColumn("bar-foo", DataTypes.INT()),
+
AddColumnEvent.ColumnPosition.BEFORE,
+ "foo"))))
+ .insertSource(10, 2, 4, 6, "class2")
+ .expectInPreTransformed(10, 2, 4, 6, "class2")
+ .expectInPostTransformed(10, 2, 4, 6, "class2", -2)
+ .insertSource(20, 2, 4, 6, "class0")
+ .expectInPreTransformed(20, 2, 4, 6, "class0")
+ .expectNothingInPostTransformed()
+ .evolveFromSource(
+ new RenameColumnEvent(
+ tableId, Collections.singletonMap("bar-foo",
"package")))
+ .expectInPreTransformed(
+ new RenameColumnEvent(
+ tableId, Collections.singletonMap("bar-foo",
"package")))
+ .expectInPostTransformed(
+ new RenameColumnEvent(
+ tableId, Collections.singletonMap("bar-foo",
"package")))
+ .insertSource(30, 3, 6, 9, "class3")
+ .expectInPreTransformed(30, 3, 6, 9, "class3")
+ .expectInPostTransformed(30, 3, 6, 9, "class3", -3)
+ .insertSource(40, 3, 6, 9, "class0")
+ .expectInPreTransformed(40, 3, 6, 9, "class0")
+ .expectNothingInPostTransformed()
+ .runTests("schema evolution with mapped column names");
+ }
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
index 59abfa06a..04d64b6c1 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UnifiedTransformOperatorTest.java
@@ -1164,4 +1164,38 @@ public class UnifiedTransformOperatorTest {
.runTests()
.destroyHarness();
}
+
+ @Test
+ public void testTransformWithColumnNameMap() throws Exception {
+ TableId tableId = TableId.tableId("my_company", "my_branch",
"column_name_map");
+ UnifiedTransformTestCase.of(
+ tableId,
+ "foo-bar AS f0, `foo-bar`, foo-bar-`foo-bar` AS f1,
class",
+ "foo-bar <> 0",
+ Schema.newBuilder()
+ .physicalColumn("foo", DataTypes.INT())
+ .physicalColumn("bar", DataTypes.INT())
+ .physicalColumn("foo-bar", DataTypes.INT())
+ .physicalColumn("bar-foo", DataTypes.INT())
+ .physicalColumn("class", DataTypes.STRING())
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("foo", DataTypes.INT())
+ .physicalColumn("bar", DataTypes.INT())
+ .physicalColumn("foo-bar", DataTypes.INT())
+ .physicalColumn("class", DataTypes.STRING())
+ .build(),
+ Schema.newBuilder()
+ .physicalColumn("f0", DataTypes.INT())
+ .physicalColumn("foo-bar", DataTypes.INT())
+ .physicalColumn("f1", DataTypes.INT())
+ .physicalColumn("class", DataTypes.STRING())
+ .build())
+ .initializeHarness()
+ .insertSource(1, 2, 3, 4, "class")
+ .insertPreTransformed(1, 2, 3, "class")
+ .insertPostTransformed(-1, 3, -4, "class")
+ .runTests()
+ .destroyHarness();
+ }
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 9f6e0ecdf..47b99e92d 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
+import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
import
org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor;
@@ -422,14 +423,18 @@ public class TransformParserTest {
Assertions.assertThatThrownBy(
() -> {
TransformParser.translateFilterExpressionToJaninoExpression(
- "TIMESTAMPDIFF(SECONDS, dt1, dt2)",
Collections.emptyList());
+ "TIMESTAMPDIFF(SECONDS, dt1, dt2)",
+ Collections.emptyList(),
+ Collections.emptyMap());
})
.isExactlyInstanceOf(ParseException.class)
.hasMessage("Statements can not be parsed.");
Assertions.assertThatThrownBy(
() -> {
TransformParser.translateFilterExpressionToJaninoExpression(
- "TIMESTAMPDIFF(QUARTER, dt1, dt2)",
Collections.emptyList());
+ "TIMESTAMPDIFF(QUARTER, dt1, dt2)",
+ Collections.emptyList(),
+ Collections.emptyMap());
})
.isExactlyInstanceOf(ParseException.class)
.hasMessage(
@@ -437,14 +442,18 @@ public class TransformParserTest {
Assertions.assertThatThrownBy(
() -> {
TransformParser.translateFilterExpressionToJaninoExpression(
- "TIMESTAMPADD(SECONDS, dt1, dt2)",
Collections.emptyList());
+ "TIMESTAMPADD(SECONDS, dt1, dt2)",
+ Collections.emptyList(),
+ Collections.emptyMap());
})
.isExactlyInstanceOf(ParseException.class)
.hasMessage("Statements can not be parsed.");
Assertions.assertThatThrownBy(
() -> {
TransformParser.translateFilterExpressionToJaninoExpression(
- "TIMESTAMPADD(QUARTER, dt1, dt2)",
Collections.emptyList());
+ "TIMESTAMPADD(QUARTER, dt1, dt2)",
+ Collections.emptyList(),
+ Collections.emptyMap());
})
.isExactlyInstanceOf(ParseException.class)
.hasMessage(
@@ -474,13 +483,13 @@ public class TransformParserTest {
List<String> expected =
Arrays.asList(
- "ProjectionColumn{column=`id` INT 'id',
expression='id', scriptExpression='id', originalColumnNames=[id],
transformExpressionKey=null}",
- "ProjectionColumn{column=`name` STRING,
expression='UPPER(`TB`.`name`)', scriptExpression='upper(name)',
originalColumnNames=[name], transformExpressionKey=null}",
- "ProjectionColumn{column=`newage` INT,
expression='`TB`.`age` + 1', scriptExpression='age + 1',
originalColumnNames=[age], transformExpressionKey=null}",
- "ProjectionColumn{column=`newCreateTime` TIMESTAMP(3)
'newCreateTime', expression='createTime', scriptExpression='createTime',
originalColumnNames=[createTime], transformExpressionKey=null}",
- "ProjectionColumn{column=`newAddress` VARCHAR(50)
'newAddress', expression='address', scriptExpression='address',
originalColumnNames=[address], transformExpressionKey=null}",
- "ProjectionColumn{column=`deposits` DECIMAL(10, 2)
'deposit', expression='deposit', scriptExpression='deposit',
originalColumnNames=[deposit], transformExpressionKey=null}",
- "ProjectionColumn{column=`bmi` DOUBLE,
expression='`TB`.`weight` / (`TB`.`height` * `TB`.`height`)',
scriptExpression='weight / height * height', originalColumnNames=[weight,
height, height], transformExpressionKey=null}");
+ "ProjectionColumn{column=`id` INT 'id',
expression='id', scriptExpression='$0', originalColumnNames=[id],
columnNameMap={id=$0}}",
+ "ProjectionColumn{column=`name` STRING,
expression='UPPER(`TB`.`name`)', scriptExpression='upper($0)',
originalColumnNames=[name], columnNameMap={name=$0}}",
+ "ProjectionColumn{column=`newage` INT,
expression='`TB`.`age` + 1', scriptExpression='$0 + 1',
originalColumnNames=[age], columnNameMap={age=$0}}",
+ "ProjectionColumn{column=`newCreateTime` TIMESTAMP(3)
'newCreateTime', expression='createTime', scriptExpression='$0',
originalColumnNames=[createTime], columnNameMap={createTime=$0}}",
+ "ProjectionColumn{column=`newAddress` VARCHAR(50)
'newAddress', expression='address', scriptExpression='$0',
originalColumnNames=[address], columnNameMap={address=$0}}",
+ "ProjectionColumn{column=`deposits` DECIMAL(10, 2)
'deposit', expression='deposit', scriptExpression='$0',
originalColumnNames=[deposit], columnNameMap={deposit=$0}}",
+ "ProjectionColumn{column=`bmi` DOUBLE,
expression='`TB`.`weight` / (`TB`.`height` * `TB`.`height`)',
scriptExpression='$0 / $1 * $1', originalColumnNames=[weight, height, height],
columnNameMap={weight=$0, height=$1}}");
Assertions.assertThat(result).hasToString("[" + String.join(", ",
expected) + "]");
List<ProjectionColumn> metadataResult =
@@ -492,17 +501,17 @@ public class TransformParserTest {
List<String> metadataExpected =
Arrays.asList(
- "ProjectionColumn{column=`id` INT 'id',
expression='id', scriptExpression='id', originalColumnNames=[id],
transformExpressionKey=null}",
- "ProjectionColumn{column=`name` STRING 'name',
expression='name', scriptExpression='name', originalColumnNames=[name],
transformExpressionKey=null}",
- "ProjectionColumn{column=`age` INT 'age',
expression='age', scriptExpression='age', originalColumnNames=[age],
transformExpressionKey=null}",
- "ProjectionColumn{column=`createTime` TIMESTAMP(3)
'newCreateTime', expression='createTime', scriptExpression='createTime',
originalColumnNames=[createTime], transformExpressionKey=null}",
- "ProjectionColumn{column=`address` VARCHAR(50)
'newAddress', expression='address', scriptExpression='address',
originalColumnNames=[address], transformExpressionKey=null}",
- "ProjectionColumn{column=`deposit` DECIMAL(10, 2)
'deposit', expression='deposit', scriptExpression='deposit',
originalColumnNames=[deposit], transformExpressionKey=null}",
- "ProjectionColumn{column=`weight` DOUBLE 'weight',
expression='weight', scriptExpression='weight', originalColumnNames=[weight],
transformExpressionKey=null}",
- "ProjectionColumn{column=`height` DOUBLE 'height',
expression='height', scriptExpression='height', originalColumnNames=[height],
transformExpressionKey=null}",
- "ProjectionColumn{column=`__namespace_name__` STRING
NOT NULL, expression='__namespace_name__',
scriptExpression='__namespace_name__',
originalColumnNames=[__namespace_name__], transformExpressionKey=null}",
- "ProjectionColumn{column=`__schema_name__` STRING NOT
NULL, expression='__schema_name__', scriptExpression='__schema_name__',
originalColumnNames=[__schema_name__], transformExpressionKey=null}",
- "ProjectionColumn{column=`__table_name__` STRING NOT
NULL, expression='__table_name__', scriptExpression='__table_name__',
originalColumnNames=[__table_name__], transformExpressionKey=null}");
+ "ProjectionColumn{column=`id` INT 'id',
expression='id', scriptExpression='$0', originalColumnNames=[id],
columnNameMap={id=$0}}",
+ "ProjectionColumn{column=`name` STRING 'name',
expression='name', scriptExpression='$0', originalColumnNames=[name],
columnNameMap={name=$0}}",
+ "ProjectionColumn{column=`age` INT 'age',
expression='age', scriptExpression='$0', originalColumnNames=[age],
columnNameMap={age=$0}}",
+ "ProjectionColumn{column=`createTime` TIMESTAMP(3)
'newCreateTime', expression='createTime', scriptExpression='$0',
originalColumnNames=[createTime], columnNameMap={createTime=$0}}",
+ "ProjectionColumn{column=`address` VARCHAR(50)
'newAddress', expression='address', scriptExpression='$0',
originalColumnNames=[address], columnNameMap={address=$0}}",
+ "ProjectionColumn{column=`deposit` DECIMAL(10, 2)
'deposit', expression='deposit', scriptExpression='$0',
originalColumnNames=[deposit], columnNameMap={deposit=$0}}",
+ "ProjectionColumn{column=`weight` DOUBLE 'weight',
expression='weight', scriptExpression='$0', originalColumnNames=[weight],
columnNameMap={weight=$0}}",
+ "ProjectionColumn{column=`height` DOUBLE 'height',
expression='height', scriptExpression='$0', originalColumnNames=[height],
columnNameMap={height=$0}}",
+ "ProjectionColumn{column=`__namespace_name__` STRING
NOT NULL, expression='__namespace_name__', scriptExpression='$0',
originalColumnNames=[__namespace_name__],
columnNameMap={__namespace_name__=$0}}",
+ "ProjectionColumn{column=`__schema_name__` STRING NOT
NULL, expression='__schema_name__', scriptExpression='$0',
originalColumnNames=[__schema_name__], columnNameMap={__schema_name__=$0}}",
+ "ProjectionColumn{column=`__table_name__` STRING NOT
NULL, expression='__table_name__', scriptExpression='$0',
originalColumnNames=[__table_name__], columnNameMap={__table_name__=$0}}");
Assertions.assertThat(metadataResult)
.map(ProjectionColumn::toString)
.containsExactlyElementsOf(metadataExpected);
@@ -544,15 +553,15 @@ public class TransformParserTest {
List<String> expected =
Arrays.asList(
- "ProjectionColumn{column=`id` INT 'id',
expression='id', scriptExpression='id', originalColumnNames=[id],
transformExpressionKey=null}",
- "ProjectionColumn{column=`name2` STRING,
expression='UPPER(`TB`.`name`)', scriptExpression='upper(name)',
originalColumnNames=[name], transformExpressionKey=null}",
- "ProjectionColumn{column=`sex2` STRING,
expression='UPPER(`TB`.`sex`)', scriptExpression='upper(sex)',
originalColumnNames=[sex], transformExpressionKey=null}",
- "ProjectionColumn{column=`address2` BINARY(50),
expression='CASE WHEN `TB`.`address` IS NOT NULL THEN `TB`.`address` ELSE
`TB`.`address` END', scriptExpression='(null != address ? address : address)',
originalColumnNames=[address, address, address], transformExpressionKey=null}",
- "ProjectionColumn{column=`phone2` VARBINARY(50),
expression='CASE WHEN `TB`.`phone` IS NOT NULL THEN `TB`.`phone` ELSE
`TB`.`phone` END', scriptExpression='(null != phone ? phone : phone)',
originalColumnNames=[phone, phone, phone], transformExpressionKey=null}",
- "ProjectionColumn{column=`deposit2` DECIMAL(10, 2),
expression='CASE WHEN `TB`.`deposit` IS NOT NULL THEN `TB`.`deposit` ELSE
`TB`.`deposit` END', scriptExpression='(null != deposit ? deposit : deposit)',
originalColumnNames=[deposit, deposit, deposit], transformExpressionKey=null}",
- "ProjectionColumn{column=`birthday2` TIMESTAMP(3),
expression='CASE WHEN `TB`.`birthday` IS NOT NULL THEN `TB`.`birthday` ELSE
`TB`.`birthday` END', scriptExpression='(null != birthday ? birthday :
birthday)', originalColumnNames=[birthday, birthday, birthday],
transformExpressionKey=null}",
- "ProjectionColumn{column=`birthday_ltz2`
TIMESTAMP_LTZ(3), expression='CASE WHEN `TB`.`birthday_ltz` IS NOT NULL THEN
`TB`.`birthday_ltz` ELSE `TB`.`birthday_ltz` END', scriptExpression='(null !=
birthday_ltz ? birthday_ltz : birthday_ltz)',
originalColumnNames=[birthday_ltz, birthday_ltz, birthday_ltz],
transformExpressionKey=null}",
- "ProjectionColumn{column=`update_time2` TIME(3),
expression='CASE WHEN `TB`.`update_time` IS NOT NULL THEN `TB`.`update_time`
ELSE `TB`.`update_time` END', scriptExpression='(null != update_time ?
update_time : update_time)', originalColumnNames=[update_time, update_time,
update_time], transformExpressionKey=null}");
+ "ProjectionColumn{column=`id` INT 'id',
expression='id', scriptExpression='$0', originalColumnNames=[id],
columnNameMap={id=$0}}",
+ "ProjectionColumn{column=`name2` STRING,
expression='UPPER(`TB`.`name`)', scriptExpression='upper($0)',
originalColumnNames=[name], columnNameMap={name=$0}}",
+ "ProjectionColumn{column=`sex2` STRING,
expression='UPPER(`TB`.`sex`)', scriptExpression='upper($0)',
originalColumnNames=[sex], columnNameMap={sex=$0}}",
+ "ProjectionColumn{column=`address2` BINARY(50),
expression='CASE WHEN `TB`.`address` IS NOT NULL THEN `TB`.`address` ELSE
`TB`.`address` END', scriptExpression='(null != $0 ? $0 : $0)',
originalColumnNames=[address, address, address], columnNameMap={address=$0}}",
+ "ProjectionColumn{column=`phone2` VARBINARY(50),
expression='CASE WHEN `TB`.`phone` IS NOT NULL THEN `TB`.`phone` ELSE
`TB`.`phone` END', scriptExpression='(null != $0 ? $0 : $0)',
originalColumnNames=[phone, phone, phone], columnNameMap={phone=$0}}",
+ "ProjectionColumn{column=`deposit2` DECIMAL(10, 2),
expression='CASE WHEN `TB`.`deposit` IS NOT NULL THEN `TB`.`deposit` ELSE
`TB`.`deposit` END', scriptExpression='(null != $0 ? $0 : $0)',
originalColumnNames=[deposit, deposit, deposit], columnNameMap={deposit=$0}}",
+ "ProjectionColumn{column=`birthday2` TIMESTAMP(3),
expression='CASE WHEN `TB`.`birthday` IS NOT NULL THEN `TB`.`birthday` ELSE
`TB`.`birthday` END', scriptExpression='(null != $0 ? $0 : $0)',
originalColumnNames=[birthday, birthday, birthday],
columnNameMap={birthday=$0}}",
+ "ProjectionColumn{column=`birthday_ltz2`
TIMESTAMP_LTZ(3), expression='CASE WHEN `TB`.`birthday_ltz` IS NOT NULL THEN
`TB`.`birthday_ltz` ELSE `TB`.`birthday_ltz` END', scriptExpression='(null !=
$0 ? $0 : $0)', originalColumnNames=[birthday_ltz, birthday_ltz, birthday_ltz],
columnNameMap={birthday_ltz=$0}}",
+ "ProjectionColumn{column=`update_time2` TIME(3),
expression='CASE WHEN `TB`.`update_time` IS NOT NULL THEN `TB`.`update_time`
ELSE `TB`.`update_time` END', scriptExpression='(null != $0 ? $0 : $0)',
originalColumnNames=[update_time, update_time, update_time],
columnNameMap={update_time=$0}}");
Assertions.assertThat(result).hasToString("[" + String.join(", ",
expected) + "]");
}
@@ -655,6 +664,55 @@ public class TransformParserTest {
"greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval(id)),
4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval(id), \"bool\") &&
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\",
\"a\", \"z\", \"lie\"), \"\")");
}
+ @Test
+ public void testTranslateUdfFilterToJaninoExpressionWithColumnNameMap() {
+ Map<String, String> columnNameMap = new HashMap<>();
+ columnNameMap.put("a", "$0");
+ columnNameMap.put("b", "$1");
+ columnNameMap.put("a-b", "$2");
+
+ testFilterExpressionWithUdf(
+ "format(upper(a))",
+ "__instanceOfFormatFunctionClass.eval(upper($0))",
+ columnNameMap);
+ testFilterExpressionWithUdf(
+ "format(lower(b))",
+ "__instanceOfFormatFunctionClass.eval(lower($1))",
+ columnNameMap);
+ testFilterExpressionWithUdf(
+ "format(concat(a,b))",
+ "__instanceOfFormatFunctionClass.eval(concat($0, $1))",
+ columnNameMap);
+ testFilterExpressionWithUdf(
+ "format(SUBSTR(`a-b`,1))",
+ "__instanceOfFormatFunctionClass.eval(substr($2, 1))",
+ columnNameMap);
+ testFilterExpressionWithUdf(
+ "typeof(`a-b` like '^[a-zA-Z]')",
+ "__instanceOfTypeOfFunctionClass.eval(like($2,
\"^[a-zA-Z]\"))",
+ columnNameMap);
+ testFilterExpressionWithUdf(
+ "typeof(`a-b` not like '^[a-zA-Z]')",
+ "__instanceOfTypeOfFunctionClass.eval(notLike($2,
\"^[a-zA-Z]\"))",
+ columnNameMap);
+ testFilterExpressionWithUdf(
+ "typeof(a-b-`a-b`)",
+ "__instanceOfTypeOfFunctionClass.eval($0 - $1 - $2)",
+ columnNameMap);
+ testFilterExpressionWithUdf(
+ "typeof(a-b-2)",
+ "__instanceOfTypeOfFunctionClass.eval($0 - $1 - 2)",
+ columnNameMap);
+ testFilterExpressionWithUdf(
+ "addone(addone(`a-b`)) > 4 OR typeof(a-b) <> 'bool' AND
format('from %s to %s is %s', 'a', 'z', 'lie') <> ''",
+
"greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval($2)),
4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval($0 - $1), \"bool\") &&
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\",
\"a\", \"z\", \"lie\"), \"\")",
+ columnNameMap);
+ testFilterExpressionWithUdf(
+ "ADDONE(ADDONE(`a-b`)) > 4 OR TYPEOF(a-b) <> 'bool' AND
FORMAT('from %s to %s is %s', 'a', 'z', 'lie') <> ''",
+
"greaterThan(__instanceOfAddOneFunctionClass.eval(__instanceOfAddOneFunctionClass.eval($2)),
4) || !valueEquals(__instanceOfTypeOfFunctionClass.eval($0 - $1), \"bool\") &&
!valueEquals(__instanceOfFormatFunctionClass.eval(\"from %s to %s is %s\",
\"a\", \"z\", \"lie\"), \"\")",
+ columnNameMap);
+ }
+
@Test
void testLargeNumericalLiterals() {
// For literals within [-2147483648, 2147483647] range, plain Integers
are OK
@@ -671,26 +729,64 @@ public class TransformParserTest {
Assertions.assertThatThrownBy(
() ->
TransformParser.translateFilterExpressionToJaninoExpression(
- "id > 9223372036854775808",
Collections.emptyList()))
+ "id > 9223372036854775808",
+ Collections.emptyList(),
+ Collections.emptyMap()))
.isExactlyInstanceOf(CalciteContextException.class)
.hasMessageContaining("Numeric literal '9223372036854775808'
out of range");
Assertions.assertThatThrownBy(
() ->
TransformParser.translateFilterExpressionToJaninoExpression(
- "id < -9223372036854775809",
Collections.emptyList()))
+ "id < -9223372036854775809",
+ Collections.emptyList(),
+ Collections.emptyMap()))
.isExactlyInstanceOf(CalciteContextException.class)
.hasMessageContaining("Numeric literal '-9223372036854775809'
out of range");
}
+ @Test
+ public void testProjectionColumnsWithColumnNameMap() {
+ List<Column> testColumns =
+ Arrays.asList(
+ Column.physicalColumn("a", DataTypes.INT(), "a"),
+ Column.physicalColumn("b", DataTypes.INT(), "b"),
+ Column.physicalColumn("a-b", DataTypes.DOUBLE(),
"`a-b`"));
+
+ List<ProjectionColumn> result =
+ TransformParser.generateProjectionColumns(
+ "a, b, a-b as c, `a-b`, `a-b` AS d, `a-b`-1 AS e,
a-b+`a-b` AS f, `test-meta-col`, `test-meta-col`-a-b AS g",
+ testColumns,
+ Collections.emptyList(),
+ new SupportedMetadataColumn[] {new
TestMetadataColumn()});
+
+ List<String> expected =
+ Arrays.asList(
+ "ProjectionColumn{column=`a` INT 'a', expression='a',
scriptExpression='$0', originalColumnNames=[a], columnNameMap={a=$0}}",
+ "ProjectionColumn{column=`b` INT 'b', expression='b',
scriptExpression='$0', originalColumnNames=[b], columnNameMap={b=$0}}",
+ "ProjectionColumn{column=`c` INT, expression='`TB`.`a`
- `TB`.`b`', scriptExpression='$0 - $1', originalColumnNames=[a, b],
columnNameMap={a=$0, b=$1}}",
+ "ProjectionColumn{column=`a-b` DOUBLE '`a-b`',
expression='a-b', scriptExpression='$0', originalColumnNames=[a-b],
columnNameMap={a-b=$0}}",
+ "ProjectionColumn{column=`d` DOUBLE '`a-b`',
expression='a-b', scriptExpression='$0', originalColumnNames=[a-b],
columnNameMap={a-b=$0}}",
+ "ProjectionColumn{column=`e` DOUBLE,
expression='`TB`.`a-b` - 1', scriptExpression='$0 - 1',
originalColumnNames=[a-b], columnNameMap={a-b=$0}}",
+ "ProjectionColumn{column=`f` DOUBLE,
expression='`TB`.`a` - `TB`.`b` + `TB`.`a-b`', scriptExpression='$0 - $1 + $2',
originalColumnNames=[a, b, a-b], columnNameMap={a=$0, b=$1, a-b=$2}}",
+ "ProjectionColumn{column=`test-meta-col` INT NOT NULL,
expression='test-meta-col', scriptExpression='$0',
originalColumnNames=[test-meta-col], columnNameMap={test-meta-col=$0}}",
+ "ProjectionColumn{column=`g` INT,
expression='`TB`.`test-meta-col` - `TB`.`a` - `TB`.`b`', scriptExpression='$0 -
$1 - $2', originalColumnNames=[test-meta-col, a, b], columnNameMap={a=$1, b=$2,
test-meta-col=$0}}");
+ Assertions.assertThat(result).hasToString("[" + String.join(", ",
expected) + "]");
+ }
+
private void testFilterExpression(String expression, String
expressionExpect) {
String janinoExpression =
TransformParser.translateFilterExpressionToJaninoExpression(
- expression, Collections.emptyList());
+ expression, Collections.emptyList(),
Collections.emptyMap());
Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect);
}
private void testFilterExpressionWithUdf(String expression, String
expressionExpect) {
+ testFilterExpressionWithUdf(expression, expressionExpect,
Collections.emptyMap());
+ }
+
+ private void testFilterExpressionWithUdf(
+ String expression, String expressionExpect, Map<String, String>
columnNameMap) {
String janinoExpression =
TransformParser.translateFilterExpressionToJaninoExpression(
expression,
@@ -703,7 +799,32 @@ public class TransformParserTest {
"org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass"),
new UserDefinedFunctionDescriptor(
"typeof",
-
"org.apache.flink.cdc.udf.examples.java.TypeOfFunctionClass")));
+
"org.apache.flink.cdc.udf.examples.java.TypeOfFunctionClass")),
+ columnNameMap);
Assertions.assertThat(janinoExpression).isEqualTo(expressionExpect);
}
+
+ /** Test metadata column. */
+ private static class TestMetadataColumn implements SupportedMetadataColumn
{
+ @Override
+ public String getName() {
+ // Column name contains '-', which can be used to test column name
map.
+ return "test-meta-col";
+ }
+
+ @Override
+ public DataType getType() {
+ return DataTypes.INT();
+ }
+
+ @Override
+ public Class<?> getJavaClass() {
+ return Integer.class;
+ }
+
+ @Override
+ public Object read(Map<String, String> metadata) {
+ return 0;
+ }
+ }
}