This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 5e2188387 [FLINK-39232] Loosen transformed schema merging validation
check (#4315)
5e2188387 is described below
commit 5e218838751cada0b684d84884a3cb5fc87e2afe
Author: yuxiqian <[email protected]>
AuthorDate: Mon Mar 16 21:10:51 2026 +0800
[FLINK-39232] Loosen transformed schema merging validation check (#4315)
---
.../flink/cdc/common/utils/SchemaMergingUtils.java | 134 -------
.../flink/FlinkPipelineBatchComposerITCase.java | 67 ----
.../flink/FlinkPipelineComposerITCase.java | 72 ----
.../flink/FlinkPipelineComposerLenientITCase.java | 70 ----
.../flink/FlinkPipelineTransformITCase.java | 284 ++-------------
.../tests/SchemaEvolvingTransformE2eITCase.java | 7 +-
.../cdc/pipeline/tests/TransformE2eITCase.java | 14 +-
.../operators/transform/PostTransformOperator.java | 157 ++++----
.../transform/PostTransformOperatorTest.java | 396 ++-------------------
pom.xml | 1 -
10 files changed, 141 insertions(+), 1061 deletions(-)
diff --git
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
index 0d95f050b..50870edb5 100644
---
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
+++
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java
@@ -63,7 +63,6 @@ import org.apache.flink.cdc.common.types.VariantType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.cdc.common.types.variant.Variant;
-import
org.apache.flink.shaded.guava31.com.google.common.collect.ArrayListMultimap;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava31.com.google.common.collect.Streams;
import org.apache.flink.shaded.guava31.com.google.common.io.BaseEncoding;
@@ -320,139 +319,6 @@ public class SchemaMergingUtils {
return coercedRow;
}
- /**
- * Try to merge given {@link Schema}s and ensure they're identical. The
only difference allowed
- * is nullability, string and varchar precision, default value, and
comments.
- */
- public static Schema strictlyMergeSchemas(List<Schema> schemas) {
- Preconditions.checkArgument(
- !schemas.isEmpty(), "Trying to merge transformed schemas %s,
but got empty list");
- if (schemas.size() == 1) {
- return schemas.get(0);
- }
-
- List<List<String>> primaryKeys =
- schemas.stream()
- .map(Schema::primaryKeys)
- .filter(p -> !p.isEmpty())
- .distinct()
- .collect(Collectors.toList());
- List<List<String>> partitionKeys =
- schemas.stream()
- .map(Schema::partitionKeys)
- .filter(p -> !p.isEmpty())
- .distinct()
- .collect(Collectors.toList());
- List<Map<String, String>> options =
- schemas.stream()
- .map(Schema::options)
- .filter(p -> !p.isEmpty())
- .distinct()
- .collect(Collectors.toList());
- List<List<String>> columnNames =
- schemas.stream()
- .map(Schema::getColumnNames)
- .distinct()
- .collect(Collectors.toList());
-
- Preconditions.checkArgument(
- primaryKeys.size() <= 1,
- "Trying to merge transformed schemas %s, but got more than one
primary key configurations: %s",
- schemas,
- primaryKeys);
- Preconditions.checkArgument(
- partitionKeys.size() <= 1,
- "Trying to merge transformed schemas %s, but got more than one
partition key configurations: %s",
- schemas,
- partitionKeys);
- Preconditions.checkArgument(
- options.size() <= 1,
- "Trying to merge transformed schemas %s, but got more than one
option configurations: %s",
- schemas,
- options);
- Preconditions.checkArgument(
- columnNames.size() == 1,
- "Trying to merge transformed schemas %s, but got more than one
column name views: %s",
- schemas,
- columnNames);
-
- int arity = columnNames.get(0).size();
-
- ArrayListMultimap<Integer, DataType> toBeMergedColumnTypes =
- ArrayListMultimap.create(arity, 1);
- for (Schema schema : schemas) {
- List<DataType> columnTypes = schema.getColumnDataTypes();
- for (int colIndex = 0; colIndex < columnTypes.size(); colIndex++) {
- toBeMergedColumnTypes.put(colIndex, columnTypes.get(colIndex));
- }
- }
-
- List<String> mergedColumnNames = columnNames.iterator().next();
- List<DataType> mergedColumnTypes = new ArrayList<>(arity);
- for (int i = 0; i < arity; i++) {
-
mergedColumnTypes.add(strictlyMergeDataTypes(toBeMergedColumnTypes.get(i)));
- }
-
- List<Column> mergedColumns = new ArrayList<>();
- for (int i = 0; i < mergedColumnNames.size(); i++) {
- mergedColumns.add(
- Column.physicalColumn(mergedColumnNames.get(i),
mergedColumnTypes.get(i)));
- }
-
- return Schema.newBuilder()
- .primaryKey(primaryKeys.isEmpty() ? Collections.emptyList() :
primaryKeys.get(0))
- .partitionKey(
- partitionKeys.isEmpty() ? Collections.emptyList() :
partitionKeys.get(0))
- .options(options.isEmpty() ? Collections.emptyMap() :
options.get(0))
- .setColumns(mergedColumns)
- .build();
- }
-
- private static DataType strictlyMergeDataTypes(List<DataType> dataTypes) {
- Preconditions.checkArgument(
- !dataTypes.isEmpty(),
- "Trying to merge transformed data types %s, but got empty
list");
-
- List<DataType> simpleMergeTypes =
- dataTypes.stream().distinct().collect(Collectors.toList());
- if (simpleMergeTypes.size() == 1) {
- return simpleMergeTypes.get(0);
- }
-
- List<DataTypeRoot> typeRoots =
- dataTypes.stream()
- .map(DataType::getTypeRoot)
- .distinct()
- .collect(Collectors.toList());
- Preconditions.checkArgument(
- typeRoots.size() == 1,
- "Trying to merge types %s, but got more than one type root:
%s",
- dataTypes,
- typeRoots);
-
- // Decay types to the most
- DataType type = dataTypes.get(0);
-
- if (type.is(DataTypeRoot.CHAR)) {
- return DataTypes.CHAR(CharType.MAX_LENGTH);
- } else if (type.is(DataTypeRoot.VARCHAR)) {
- return DataTypes.STRING();
- } else if (type.is(DataTypeRoot.BINARY)) {
- return DataTypes.BINARY(BinaryType.MAX_LENGTH);
- } else if (type.is(DataTypeRoot.VARBINARY)) {
- return DataTypes.VARBINARY(VarBinaryType.MAX_LENGTH);
- } else if (type.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
- return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION);
- } else if (type.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) {
- return DataTypes.TIMESTAMP_TZ(ZonedTimestampType.MAX_PRECISION);
- } else if (type.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
- return
DataTypes.TIMESTAMP_LTZ(LocalZonedTimestampType.MAX_PRECISION);
- } else {
- throw new IllegalArgumentException(
- "Unable to merge data types with different precision: " +
dataTypes);
- }
- }
-
@VisibleForTesting
static boolean isDataTypeCompatible(@Nullable DataType currentType,
DataType upcomingType) {
// If two types are identical, they're compatible of course.
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java
index 0074039c9..5e521c3df 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineBatchComposerITCase.java
@@ -613,73 +613,6 @@ public class FlinkPipelineBatchComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[2, 2, 20, +I, 2], op=INSERT, meta=({op_ts=2})}");
}
- @ParameterizedTest
- @EnumSource
- void testTransformTwiceInBatchMode(ValuesDataSink.SinkApi sinkApi) throws
Exception {
- FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
-
- // Setup value source
- Configuration sourceConfig = new Configuration();
- sourceConfig.set(
- ValuesDataSourceOptions.EVENT_SET_ID,
- ValuesDataSourceHelper.EventSetId.TRANSFORM_BATCH_TABLE);
- SourceDef sourceDef =
- new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
-
- // Setup value sink
- Configuration sinkConfig = new Configuration();
- sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
- sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
- SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
-
- // Setup transform
- TransformDef transformDef1 =
- new TransformDef(
- "default_namespace.default_schema.table1",
- "*,concat(col1,'1') as col12",
- "col1 = '1' OR col1 = '999'",
- "col1",
- "col12",
- "key1=value1",
- "",
- null);
- TransformDef transformDef2 =
- new TransformDef(
- "default_namespace.default_schema.table1",
- "*,concat(col1,'2') as col12",
- "col1 = '2'",
- null,
- null,
- null,
- "",
- null);
- // Setup pipeline
- Configuration pipelineConfig = new Configuration();
- pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
- pipelineConfig.set(
- PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE,
RuntimeExecutionMode.BATCH);
- PipelineDef pipelineDef =
- new PipelineDef(
- sourceDef,
- sinkDef,
- Collections.emptyList(),
- new ArrayList<>(Arrays.asList(transformDef1,
transformDef2)),
- Collections.emptyList(),
- pipelineConfig);
-
- // Execute the pipeline
- PipelineExecution execution = composer.compose(pipelineDef);
- execution.execute();
-
- // Check the order and content of all received events
- String[] outputEvents = outCaptor.toString().trim().split("\n");
- assertThat(outputEvents)
- .containsExactly(
-
"CreateTableEvent{tableId=default_namespace.default_schema.table1,
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING},
primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}");
- }
-
@ParameterizedTest
@EnumSource
void testOneToOneRoutingInBatchMode(ValuesDataSink.SinkApi sinkApi) throws
Exception {
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
index fd2368b6c..d01d987d4 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java
@@ -423,78 +423,6 @@ class FlinkPipelineComposerITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ,
20, -U, 5], after=[2, x, 20, +U, 5], op=UPDATE, meta=({op_ts=5})}");
}
- @ParameterizedTest
- @EnumSource
- void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
- FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
-
- // Setup value source
- Configuration sourceConfig = new Configuration();
- sourceConfig.set(
- ValuesDataSourceOptions.EVENT_SET_ID,
- ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
- SourceDef sourceDef =
- new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
-
- // Setup value sink
- Configuration sinkConfig = new Configuration();
- sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
- sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
- SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
-
- // Setup transform
- TransformDef transformDef1 =
- new TransformDef(
- "default_namespace.default_schema.table1",
- "*,concat(col1,'1') as col12",
- "col1 = '1' OR col1 = '999'",
- "col1",
- "col12",
- "key1=value1",
- "",
- null);
- TransformDef transformDef2 =
- new TransformDef(
- "default_namespace.default_schema.table1",
- "*,concat(col1,'2') as col12",
- "col1 = '2'",
- null,
- null,
- null,
- "",
- null);
- // Setup pipeline
- Configuration pipelineConfig = new Configuration();
- pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
- pipelineConfig.set(
- PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR,
SchemaChangeBehavior.EVOLVE);
- PipelineDef pipelineDef =
- new PipelineDef(
- sourceDef,
- sinkDef,
- Collections.emptyList(),
- new ArrayList<>(Arrays.asList(transformDef1,
transformDef2)),
- Collections.emptyList(),
- pipelineConfig);
-
- // Execute the pipeline
- PipelineExecution execution = composer.compose(pipelineDef);
- execution.execute();
-
- // Check the order and content of all received events
- String[] outputEvents = outCaptor.toString().trim().split("\n");
- assertThat(outputEvents)
- .containsExactly(
-
"CreateTableEvent{tableId=default_namespace.default_schema.table1,
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING},
primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}",
-
"AddColumnEvent{tableId=default_namespace.default_schema.table1,
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=AFTER,
existedColumnName=col2}]}",
-
"RenameColumnEvent{tableId=default_namespace.default_schema.table1,
nameMapping={col2=newCol2, col3=newCol3}}",
-
"DropColumnEvent{tableId=default_namespace.default_schema.table1,
droppedColumnNames=[newCol2]}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1, 1,
11], after=[], op=DELETE, meta=({op_ts=4})}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2, ,
22], after=[2, x, 22], op=UPDATE, meta=({op_ts=5})}");
- }
-
@ParameterizedTest
@EnumSource
void testOneToOneRouting(ValuesDataSink.SinkApi sinkApi) throws Exception {
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
index ede3c4b17..7348749b7 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java
@@ -465,76 +465,6 @@ class FlinkPipelineComposerLenientITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2,
null, 20, -U, null, null, ], after=[2, null, 20, +U, null, null, x], op=UPDATE,
meta=({op_ts=5})}");
}
- @ParameterizedTest
- @EnumSource
- void testTransformTwice(ValuesDataSink.SinkApi sinkApi) throws Exception {
- FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
-
- // Setup value source
- Configuration sourceConfig = new Configuration();
- sourceConfig.set(
- ValuesDataSourceOptions.EVENT_SET_ID,
- ValuesDataSourceHelper.EventSetId.TRANSFORM_TABLE);
- SourceDef sourceDef =
- new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
-
- // Setup value sink
- Configuration sinkConfig = new Configuration();
- sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
- sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
- SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
-
- // Setup transform
- TransformDef transformDef1 =
- new TransformDef(
- "default_namespace.default_schema.table1",
- "*,concat(col1,'1') as col12",
- "col1 = '1' OR col1 = '999'",
- "col1",
- "col12",
- "key1=value1",
- "",
- null);
- TransformDef transformDef2 =
- new TransformDef(
- "default_namespace.default_schema.table1",
- "*,concat(col1,'2') as col12",
- "col1 = '2'",
- null,
- null,
- null,
- "",
- null);
- // Setup pipeline
- Configuration pipelineConfig = new Configuration();
- pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
-
- PipelineDef pipelineDef =
- new PipelineDef(
- sourceDef,
- sinkDef,
- Collections.emptyList(),
- new ArrayList<>(Arrays.asList(transformDef1,
transformDef2)),
- Collections.emptyList(),
- pipelineConfig);
-
- // Execute the pipeline
- PipelineExecution execution = composer.compose(pipelineDef);
- execution.execute();
-
- // Check the order and content of all received events
- String[] outputEvents =
outCaptor.toString().trim().split(LINE_SEPARATOR);
- assertThat(outputEvents)
- .containsExactly(
-
"CreateTableEvent{tableId=default_namespace.default_schema.table1,
schema=columns={`col1` STRING NOT NULL,`col2` STRING,`col12` STRING},
primaryKeys=col1, partitionKeys=col12, options=({key1=value1})}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[1, 1, 11], op=INSERT, meta=({op_ts=1})}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[],
after=[2, 2, 22], op=INSERT, meta=({op_ts=2})}",
-
"AddColumnEvent{tableId=default_namespace.default_schema.table1,
addedColumns=[ColumnWithPosition{column=`col3` STRING, position=LAST,
existedColumnName=null}]}",
-
"AddColumnEvent{tableId=default_namespace.default_schema.table1,
addedColumns=[ColumnWithPosition{column=`newCol2` STRING, position=LAST,
existedColumnName=null}, ColumnWithPosition{column=`newCol3` STRING,
position=LAST, existedColumnName=null}]}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[1,
null, 11, null, null, 1], after=[], op=DELETE, meta=({op_ts=4})}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2,
null, 22, null, null, ], after=[2, null, 22, null, null, x], op=UPDATE,
meta=({op_ts=5})}");
- }
-
@Test
void testOneToOneRouting() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index e4da2fca3..2842729cd 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -253,20 +253,11 @@ class FlinkPipelineTransformITCase {
Arrays.asList(
new TransformDef(
"default_namespace.default_schema.\\.*",
- "*, 'YOUNG' AS category",
- "age < 20",
+ "*, CASE WHEN age < 20 THEN 'YOUNG' WHEN age
>= 20 THEN 'OLD' END AS category",
null,
null,
null,
null,
- null),
- new TransformDef(
- "default_namespace.default_schema.\\.*",
- "*, 'OLD' AS category",
- "age >= 20",
- null,
- null,
- null,
null,
null)),
Arrays.asList(
@@ -288,17 +279,8 @@ class FlinkPipelineTransformITCase {
Arrays.asList(
new TransformDef(
"default_namespace.default_schema.\\.*",
- "id,age,'Juvenile' AS roleName",
- "age < 18",
- null,
- null,
- null,
+ "id,age, CASE WHEN age < 18 THEN 'Juvenile'
WHEN age >= 18 THEN name END AS roleName",
null,
- null),
- new TransformDef(
- "default_namespace.default_schema.\\.*",
- "id,age,name AS roleName",
- "age >= 18",
null,
null,
null,
@@ -317,42 +299,7 @@ class FlinkPipelineTransformITCase {
@ParameterizedTest
@EnumSource
- void testMultiTransformWithAsterisk(ValuesDataSink.SinkApi sinkApi) throws
Exception {
- runGenericTransformTest(
- sinkApi,
- Arrays.asList(
- new TransformDef(
- "default_namespace.default_schema.mytable2",
- "*,'Juvenile' AS roleName",
- "age < 18",
- null,
- null,
- null,
- null,
- null),
- new TransformDef(
- "default_namespace.default_schema.mytable2",
- "id,name,age,description,name AS roleName",
- "age >= 18",
- null,
- null,
- null,
- null,
- null)),
- Arrays.asList(
-
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[1, Alice, 18], op=INSERT, meta=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[2, Bob, 20], op=INSERT, meta=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2,
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
-
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2,
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age`
TINYINT,`description` STRING,`roleName` STRING}, primaryKeys=id, options=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[3, Carol, 15, student, Juvenile], op=INSERT, meta=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[4, Derrida, 25, student, Derrida], op=INSERT, meta=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4,
Derrida, 25, student, Derrida], after=[], op=DELETE, meta=()}"));
- }
-
- @ParameterizedTest
- @EnumSource
- void testMultiTransformMissingProjection(ValuesDataSink.SinkApi sinkApi)
throws Exception {
+ void testMissingProjection(ValuesDataSink.SinkApi sinkApi) throws
Exception {
runGenericTransformTest(
sinkApi,
Arrays.asList(
@@ -364,170 +311,14 @@ class FlinkPipelineTransformITCase {
null,
null,
null,
- null),
- new TransformDef(
- "default_namespace.default_schema.mytable2",
- "id,UPPER(name) AS name,age,description",
- "age >= 18",
- null,
- null,
- null,
- null,
- null)),
- Arrays.asList(
-
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[1, Alice, 18], op=INSERT, meta=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[2, Bob, 20], op=INSERT, meta=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2,
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
-
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2,
schema=columns={`id` BIGINT NOT NULL,`name` STRING,`age` TINYINT,`description`
STRING}, primaryKeys=id, options=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[3, Carol, 15, student], op=INSERT, meta=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[4, DERRIDA, 25, student], op=INSERT, meta=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4,
DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
- }
-
- @ParameterizedTest
- @EnumSource
- @Disabled("to be fixed in FLINK-37132")
- void testMultiTransformSchemaColumnsCompatibilityWithNullProjection(
- ValuesDataSink.SinkApi sinkApi) {
- TransformDef nullProjection =
- new TransformDef(
- "default_namespace.default_schema.mytable2",
- null,
- "age < 18",
- null,
- null,
- null,
- null,
- null);
-
- assertThatThrownBy(
- () ->
- runGenericTransformTest(
- sinkApi,
- Arrays.asList(
- nullProjection,
- new TransformDef(
-
"default_namespace.default_schema.mytable2",
- // reference part
column
- "id,UPPER(name) AS
name",
- "age >= 18",
- null,
- null,
- null,
- null,
- null)),
- Collections.emptyList()))
- .rootCause()
- .isExactlyInstanceOf(IllegalStateException.class)
- .hasMessage(
- "Unable to merge schema columns={`id` BIGINT,`name`
VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
- + "and columns={`id` BIGINT,`name` STRING},
primaryKeys=id, options=() with different column counts.");
- }
-
- @ParameterizedTest
- @EnumSource
- @Disabled("to be fixed in FLINK-37132")
- void testMultiTransformSchemaColumnsCompatibilityWithEmptyProjection(
- ValuesDataSink.SinkApi sinkApi) {
- TransformDef emptyProjection =
- new TransformDef(
- "default_namespace.default_schema.mytable2",
- "",
- "age < 18",
- null,
- null,
- null,
- null,
- null);
-
- assertThatThrownBy(
- () ->
- runGenericTransformTest(
- sinkApi,
- Arrays.asList(
- emptyProjection,
- new TransformDef(
-
"default_namespace.default_schema.mytable2",
- // reference part
column
- "id,UPPER(name) AS
name",
- "age >= 18",
- null,
- null,
- null,
- null,
- null)),
- Collections.emptyList()))
- .rootCause()
- .isExactlyInstanceOf(IllegalStateException.class)
- .hasMessage(
- "Unable to merge schema columns={`id` BIGINT,`name`
VARCHAR(255),`age` TINYINT,`description` STRING}, primaryKeys=id, options=() "
- + "and columns={`id` BIGINT,`name` STRING},
primaryKeys=id, options=() with different column counts.");
- }
-
- @ParameterizedTest
- @EnumSource
- void
testMultiTransformWithNullEmptyAsteriskProjections(ValuesDataSink.SinkApi
sinkApi)
- throws Exception {
- TransformDef nullProjection =
- new TransformDef(
- "default_namespace.default_schema.mytable2",
- null,
- "age < 18",
- null,
- null,
- null,
- null,
- null);
-
- TransformDef emptyProjection =
- new TransformDef(
- "default_namespace.default_schema.mytable2",
- "",
- "age < 18",
- null,
- null,
- null,
- null,
- null);
-
- TransformDef asteriskProjection =
- new TransformDef(
- "default_namespace.default_schema.mytable2",
- "*",
- "age < 18",
- null,
- null,
- null,
- null,
- null);
-
- runGenericTransformTest(
- sinkApi,
- Arrays.asList(
- // Setting projection as null, '', or * should be
equivalent
- nullProjection,
- emptyProjection,
- asteriskProjection,
- new TransformDef(
- "default_namespace.default_schema.mytable2",
- // reference all column
- "id,UPPER(name) AS name,age,description",
- "age >= 18",
- null,
- null,
- null,
- null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[1, Alice, 18], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[2, Bob, 20], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2,
Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}",
-
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2,
schema=columns={`id` BIGINT NOT NULL,`name` STRING,`age` TINYINT,`description`
STRING}, primaryKeys=id, options=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[3, Carol, 15, student], op=INSERT, meta=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[4, DERRIDA, 25, student], op=INSERT, meta=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4,
DERRIDA, 25, student], after=[], op=DELETE, meta=()}"));
+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2,
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age`
TINYINT,`description` STRING}, primaryKeys=id, options=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[3, Carol, 15, student], op=INSERT, meta=()}"));
}
/** This tests if transform generates metadata info correctly. */
@@ -1237,47 +1028,49 @@ class FlinkPipelineTransformITCase {
@ParameterizedTest
@EnumSource
- void testTransformMergingIncompatibleRules(ValuesDataSink.SinkApi
apiVersion) {
- Assertions.assertThatThrownBy(
- () ->
- runGenericTransformTest(
- apiVersion,
- Arrays.asList(
- new TransformDef(
- "\\.*.\\.*.mytable1",
- "*, 'rule_1_matched'
AS rule_1_matched",
- "id > 0",
- null,
- "id",
- null,
- null,
- null),
- new TransformDef(
- "\\.*.\\.*.\\.*",
- "*, 'rule_fallback' AS
rule_fallback",
- null,
- null,
- "id",
- null,
- null,
- null)),
- Collections.emptyList()))
- .rootCause()
- .isExactlyInstanceOf(IllegalArgumentException.class)
- .hasMessage(
- "Trying to merge transformed schemas [columns={`id`
INT,`name` STRING,`age` INT,`rule_1_matched` STRING}, primaryKeys=id,
partitionKeys=id, options=(), columns={`id` INT,`name` STRING,`age`
INT,`rule_fallback` STRING}, primaryKeys=id, partitionKeys=id, options=()], but
got more than one column name views: [[id, name, age, rule_1_matched], [id,
name, age, rule_fallback]]");
+ void testTransformWithFallbackRules(ValuesDataSink.SinkApi apiVersion)
throws Exception {
+ runGenericTransformTest(
+ apiVersion,
+ Arrays.asList(
+ new TransformDef(
+ "\\.*.\\.*.mytable1",
+ "*, 'rule_1_matched' AS rule_1_matched",
+ null,
+ null,
+ "id",
+ null,
+ null,
+ null),
+ new TransformDef(
+ "\\.*.\\.*.\\.*",
+ "*, 'rule_fallback' AS rule_fallback",
+ null,
+ null,
+ "id",
+ null,
+ null,
+ null)),
+ Arrays.asList(
+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`rule_1_matched`
STRING}, primaryKeys=id, partitionKeys=id, options=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[1, Alice, 18, rule_1_matched], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[2, Bob, 20, rule_1_matched], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2,
Bob, 20, rule_1_matched], after=[2, Bob, 30, rule_1_matched], op=UPDATE,
meta=()}",
+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2,
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age`
TINYINT,`description` STRING,`rule_fallback` STRING}, primaryKeys=id,
partitionKeys=id, options=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[3, Carol, 15, student, rule_fallback], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[],
after=[4, Derrida, 25, student, rule_fallback], op=INSERT, meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4,
Derrida, 25, student, rule_fallback], after=[], op=DELETE, meta=()}"));
}
@ParameterizedTest
@EnumSource
- void testTransformWithFallbackRules(ValuesDataSink.SinkApi apiVersion)
throws Exception {
+ void testTransformFilterWithFallbackRules(ValuesDataSink.SinkApi
apiVersion) throws Exception {
runGenericTransformTest(
apiVersion,
Arrays.asList(
new TransformDef(
"\\.*.\\.*.mytable1",
"*, 'rule_1_matched' AS rule_1_matched",
- null,
+ "id > 1",
null,
"id",
null,
@@ -1294,7 +1087,6 @@ class FlinkPipelineTransformITCase {
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`rule_1_matched`
STRING}, primaryKeys=id, partitionKeys=id, options=()}",
-
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[1, Alice, 18, rule_1_matched], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[2, Bob, 20, rule_1_matched], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2,
Bob, 20, rule_1_matched], after=[2, Bob, 30, rule_1_matched], op=UPDATE,
meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2,
schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age`
TINYINT,`description` STRING,`rule_fallback` STRING}, primaryKeys=id,
partitionKeys=id, options=()}",
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
index 5410eb903..766621971 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java
@@ -270,11 +270,7 @@ class SchemaEvolvingTransformE2eITCase extends
PipelineTestEnvironment {
+ (triggerError ? " error.on.schema.change:
true\n" : "\n")
+ "transform:\n"
+ " - source-table: %s.\\.*\n"
- + " projection: CAST(id AS VARCHAR) || ' ->
' || name AS uid, *, id * id AS id_square, 'age < 20' as tag\n"
- + " filter: age < 20\n"
- + " - source-table: %s.\\.*\n"
- + " projection: CAST(id AS VARCHAR) || ' ->
' || name AS uid, *, 0 - id * id AS id_square, 'age >= 20' as tag\n"
- + " filter: age >= 20\n"
+ + " projection: CAST(id AS VARCHAR) || ' ->
' || name AS uid, *, CASE WHEN age < 20 THEN id * id WHEN age >= 20 THEN 0 - id
* id END AS id_square, CASE WHEN age < 20 THEN 'age < 20' WHEN age >= 20 THEN
'age >= 20' END as tag\n"
+ (mergeTable
? String.format(
"route:\n"
@@ -292,7 +288,6 @@ class SchemaEvolvingTransformE2eITCase extends
PipelineTestEnvironment {
dbName,
mergeTable ? "(members|new_members)" : "members",
dbName,
- dbName,
behavior,
parallelism);
submitPipelineJob(pipelineJob);
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
index 925bc6059..6b2e7b6dd 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java
@@ -188,11 +188,7 @@ class TransformE2eITCase extends PipelineTestEnvironment {
+ "\n"
+ "transform:\n"
+ " - source-table: %s.\\.*\n"
- + " projection: ID, VERSION, 'Type-A' AS
CATEGORY\n"
- + " filter: ID > 1008\n"
- + " - source-table: %s.\\.*\n"
- + " projection: ID, VERSION, 'Type-B' AS
CATEGORY\n"
- + " filter: ID <= 1008\n"
+ + " projection: ID, VERSION, CASE WHEN ID >
1008 THEN 'Type-A' WHEN ID <= 1008 THEN 'Type-B' END AS CATEGORY\n"
+ "\n"
+ "pipeline:\n"
+ " execution.runtime-mode: %s\n"
@@ -203,7 +199,6 @@ class TransformE2eITCase extends PipelineTestEnvironment {
startupMode,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
- transformTestDatabase.getDatabaseName(),
runtimeMode,
parallelism);
submitPipelineJob(pipelineJob);
@@ -612,11 +607,7 @@ class TransformE2eITCase extends PipelineTestEnvironment {
+ " type: values\n"
+ "transform:\n"
+ " - source-table: %s.TABLEALPHA\n"
- + " projection: ID, VERSION, PRICEALPHA,
AGEALPHA, 'Juvenile' AS ROLENAME\n"
- + " filter: AGEALPHA < 18\n"
- + " - source-table: %s.TABLEALPHA\n"
- + " projection: ID, VERSION, PRICEALPHA,
AGEALPHA, NAMEALPHA AS ROLENAME\n"
- + " filter: AGEALPHA >= 18\n"
+ + " projection: ID, VERSION, PRICEALPHA,
AGEALPHA, CASE WHEN AGEALPHA < 18 THEN 'Juvenile' WHEN AGEALPHA >= 18 THEN
NAMEALPHA END AS ROLENAME\n"
+ "pipeline:\n"
+ " execution.runtime-mode: %s\n"
+ " parallelism: %d",
@@ -626,7 +617,6 @@ class TransformE2eITCase extends PipelineTestEnvironment {
startupMode,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
- transformTestDatabase.getDatabaseName(),
runtimeMode,
parallelism);
submitPipelineJob(pipelineJob);
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index 36e348e09..8683a07b8 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -32,7 +32,6 @@ import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.udf.UserDefinedFunctionContext;
-import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter;
import
org.apache.flink.cdc.runtime.operators.transform.converter.PostTransformConverters;
@@ -43,6 +42,9 @@ import
org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
+import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
import
org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable;
import org.apache.flink.shaded.guava31.com.google.common.collect.Table;
@@ -56,7 +58,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
@@ -88,6 +89,8 @@ public class PostTransformOperator extends
AbstractStreamOperatorAdapter<Event>
projectionProcessors;
private transient Table<TableId, PostTransformer,
TransformFilterProcessor> filterProcessors;
+ private transient LoadingCache<TableId, Optional<PostTransformer>>
transformersCache;
+
public static PostTransformOperatorBuilder newBuilder() {
return new PostTransformOperatorBuilder();
}
@@ -116,6 +119,16 @@ public class PostTransformOperator extends
AbstractStreamOperatorAdapter<Event>
initializeUdf();
this.transformers = createTransformers();
+ this.transformersCache =
+ CacheBuilder.newBuilder()
+ .maximumSize(1024)
+ .build(
+ new CacheLoader<>() {
+ @Override
+ public Optional<PostTransformer>
load(TableId tableId) {
+ return
getEffectiveTransformer(tableId);
+ }
+ });
}
@Override
@@ -162,26 +175,26 @@ public class PostTransformOperator extends
AbstractStreamOperatorAdapter<Event>
ChangeEvent changeEvent = (ChangeEvent) event;
TableId tableId = changeEvent.tableId();
- List<PostTransformer> transformers = getEffectiveTransformers(tableId);
+ Optional<PostTransformer> transformer =
transformersCache.getUnchecked(tableId);
// Short-circuit if there's no effective transformers.
- if (transformers.isEmpty()) {
+ if (transformer.isEmpty()) {
output.collect(element);
return;
}
if (event instanceof CreateTableEvent) {
- processCreateTableEvent((CreateTableEvent) event, transformers)
+ processCreateTableEvent((CreateTableEvent) event,
transformer.get())
.map(StreamRecord::new)
.ifPresent(output::collect);
invalidateCache(tableId);
} else if (event instanceof SchemaChangeEvent) {
- processSchemaChangeEvent((SchemaChangeEvent) event, transformers)
+ processSchemaChangeEvent((SchemaChangeEvent) event,
transformer.get())
.map(StreamRecord::new)
.ifPresent(output::collect);
invalidateCache(tableId);
} else if (event instanceof DataChangeEvent) {
- processDataChangeEvent((DataChangeEvent) event, transformers)
+ processDataChangeEvent((DataChangeEvent) event, transformer.get())
.map(StreamRecord::new)
.ifPresent(output::collect);
} else {
@@ -197,18 +210,12 @@ public class PostTransformOperator extends
AbstractStreamOperatorAdapter<Event>
* Apply effective transform rules to {@link CreateTableEvent}s based on
effective transformers.
*/
private Optional<Event> processCreateTableEvent(
- CreateTableEvent event, List<PostTransformer>
effectiveTransformers) {
+ CreateTableEvent event, PostTransformer effectiveTransformer) {
TableId tableId = event.tableId();
Schema preSchema = event.getSchema();
- // Apply transform rules and verify we can get a deterministic post
schema
- List<Schema> schemas =
- effectiveTransformers.stream()
- .map(trans -> transformSchema(preSchema, trans))
- .collect(Collectors.toList());
-
Schema postSchema =
-
SchemaUtils.ensurePkNonNull(SchemaMergingUtils.strictlyMergeSchemas(schemas));
+ SchemaUtils.ensurePkNonNull(transformSchema(preSchema,
effectiveTransformer));
// Update transform info map
postTransformInfoMap.put(
@@ -216,11 +223,10 @@ public class PostTransformOperator extends
AbstractStreamOperatorAdapter<Event>
// Update "if-table-has-been–wildcard–matched" map
boolean wildcardMatched =
- effectiveTransformers.stream()
- .map(PostTransformer::getProjection)
- .flatMap(this::optionalToStream)
- .map(TransformProjection::getProjection)
- .anyMatch(TransformParser::hasAsterisk);
+ effectiveTransformer.getProjection().isPresent()
+ && TransformParser.hasAsterisk(
+
effectiveTransformer.getProjection().get().getProjection());
+
hasAsteriskMap.put(tableId, wildcardMatched);
projectedColumnsMap.put(
tableId,
@@ -236,7 +242,7 @@ public class PostTransformOperator extends
AbstractStreamOperatorAdapter<Event>
* transformers and existing {@link PostTransformChangeInfo}.
*/
private Optional<Event> processSchemaChangeEvent(
- SchemaChangeEvent event, List<PostTransformer>
effectiveTransformers) {
+ SchemaChangeEvent event, PostTransformer effectiveTransformer) {
TableId tableId = event.tableId();
PostTransformChangeInfo info =
checkNotNull(postTransformInfoMap.get(tableId));
@@ -244,14 +250,8 @@ public class PostTransformOperator extends
AbstractStreamOperatorAdapter<Event>
Schema prevPreSchema = info.getPreTransformedSchema();
Schema nextPreSchema =
SchemaUtils.applySchemaChangeEvent(prevPreSchema, event);
- // Apply transform rules and verify we can get a deterministic post
schema
- List<Schema> schemas =
- effectiveTransformers.stream()
- .map(trans -> transformSchema(nextPreSchema, trans))
- .collect(Collectors.toList());
-
Schema nextPostSchema =
-
SchemaUtils.ensurePkNonNull(SchemaMergingUtils.strictlyMergeSchemas(schemas));
+ SchemaUtils.ensurePkNonNull(transformSchema(nextPreSchema,
effectiveTransformer));
// Update transform info map
postTransformInfoMap.put(
@@ -274,7 +274,7 @@ public class PostTransformOperator extends
AbstractStreamOperatorAdapter<Event>
/** Apply projection rules to given {@link DataChangeEvent}. */
private Optional<Event> processDataChangeEvent(
- DataChangeEvent event, List<PostTransformer>
effectiveTransformers) {
+ DataChangeEvent event, PostTransformer effectiveTransformer) {
TableId tableId = event.tableId();
PostTransformChangeInfo info =
checkNotNull(postTransformInfoMap.get(tableId));
@@ -285,50 +285,39 @@ public class PostTransformOperator extends
AbstractStreamOperatorAdapter<Event>
String beforeOp = event.opTypeString(false);
String afterOp = event.opTypeString(true);
-
- for (PostTransformer transformer : effectiveTransformers) {
- TransformProjectionProcessor projectionProcessor =
- getProjectionProcessor(tableId, transformer);
- TransformFilterProcessor filterProcessor =
getFilterProcessor(tableId, transformer);
-
- RecordData beforeRow = null;
- RecordData afterRow = null;
- boolean filterPassed = true;
-
- if (event.before() != null) {
- context.opType = beforeOp;
- Tuple2<BinaryRecordData, Boolean> result =
- transformRecord(
- event.before(),
- info,
- projectionProcessor,
- filterProcessor,
- context);
- beforeRow = result.f0;
- filterPassed = result.f1;
- }
-
- if (event.after() != null) {
- context.opType = afterOp;
- Tuple2<BinaryRecordData, Boolean> result =
- transformRecord(
- event.after(), info, projectionProcessor,
filterProcessor, context);
- afterRow = result.f0;
- filterPassed = result.f1;
- }
-
- if (filterPassed) {
- DataChangeEvent finalEvent =
- DataChangeEvent.projectRecords(event, beforeRow,
afterRow);
- if (transformer.getPostTransformConverter().isPresent()) {
- return transformer
- .getPostTransformConverter()
- .get()
- .convert(finalEvent)
- .map(Event.class::cast);
- } else {
- return Optional.of(finalEvent);
- }
+ TransformProjectionProcessor projectionProcessor =
+ getProjectionProcessor(tableId, effectiveTransformer);
+ TransformFilterProcessor filterProcessor =
+ getFilterProcessor(tableId, effectiveTransformer);
+ RecordData beforeRow = null;
+ RecordData afterRow = null;
+ boolean filterPassed = true;
+ if (event.before() != null) {
+ context.opType = beforeOp;
+ Tuple2<BinaryRecordData, Boolean> result =
+ transformRecord(
+ event.before(), info, projectionProcessor,
filterProcessor, context);
+ beforeRow = result.f0;
+ filterPassed = result.f1;
+ }
+ if (event.after() != null) {
+ context.opType = afterOp;
+ Tuple2<BinaryRecordData, Boolean> result =
+ transformRecord(
+ event.after(), info, projectionProcessor,
filterProcessor, context);
+ afterRow = result.f0;
+ filterPassed = result.f1;
+ }
+ if (filterPassed) {
+ DataChangeEvent finalEvent = DataChangeEvent.projectRecords(event,
beforeRow, afterRow);
+ if (effectiveTransformer.getPostTransformConverter().isPresent()) {
+ return effectiveTransformer
+ .getPostTransformConverter()
+ .get()
+ .convert(finalEvent)
+ .map(Event.class::cast);
+ } else {
+ return Optional.of(finalEvent);
}
}
@@ -397,22 +386,14 @@ public class PostTransformOperator extends
AbstractStreamOperatorAdapter<Event>
// Convenience methods for coping with transient fields.
// -------------------
- /** Obtain effective transformers based on given {@link TableId}. */
- private List<PostTransformer> getEffectiveTransformers(TableId tableId) {
- List<PostTransformer> effectiveTransformers = new ArrayList<>();
+ /** Obtain effective transformer based on given {@link TableId}. */
+ private Optional<PostTransformer> getEffectiveTransformer(TableId tableId)
{
for (PostTransformer transformer : transformers) {
if (transformer.getSelectors().isMatch(tableId)) {
- effectiveTransformers.add(transformer);
-
- // Transform module works with "First-match" rule. If we have
met an uncondition
- // transform rule (without any filtering expression), then any
following transform
- // rule will not be effective.
- if (!transformer.getFilter().isPresent()) {
- break;
- }
+ return Optional.of(transformer);
}
}
- return effectiveTransformers;
+ return Optional.empty();
}
/**
@@ -546,10 +527,4 @@ public class PostTransformOperator extends
AbstractStreamOperatorAdapter<Event>
udfDescriptors.clear();
udfFunctionInstances.clear();
}
-
- /** Backport of {@code Optional#stream} before Java 11. */
- @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
- private <T> Stream<T> optionalToStream(Optional<T> optional) {
- return optional.map(Stream::of).orElseGet(Stream::empty);
- }
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 22e0cfbd3..ddd5aeb76 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -468,112 +468,6 @@ class PostTransformOperatorTest {
transformFunctionEventEventOperatorTestHarness.close();
}
- @Test
- void testDataChangeEventTransformTwice() throws Exception {
- PostTransformOperator transform =
- PostTransformOperator.newBuilder()
- .addTransform(
- CUSTOMERS_TABLEID.identifier(),
- "*, concat(col1, '1') col12",
- "col1 = '1'")
- .addTransform(
- CUSTOMERS_TABLEID.identifier(),
- "*, concat(col1, '2') col12",
- "col1 = '2'")
- .build();
- RegularEventOperatorTestHarness<PostTransformOperator, Event>
- transformFunctionEventEventOperatorTestHarness =
- RegularEventOperatorTestHarness.with(transform, 1);
- // Initialization
- transformFunctionEventEventOperatorTestHarness.open();
- // Create table
- CreateTableEvent createTableEvent =
- new CreateTableEvent(CUSTOMERS_TABLEID, CUSTOMERS_SCHEMA);
- BinaryRecordDataGenerator recordDataGenerator =
- new BinaryRecordDataGenerator(((RowType)
CUSTOMERS_SCHEMA.toRowDataType()));
- // Insert
- DataChangeEvent insertEvent =
- DataChangeEvent.insertEvent(
- CUSTOMERS_TABLEID,
- recordDataGenerator.generate(
- new Object[] {
- new BinaryStringData("1"), new
BinaryStringData("2"), null
- }));
- DataChangeEvent insertEventExpect =
- DataChangeEvent.insertEvent(
- CUSTOMERS_TABLEID,
- recordDataGenerator.generate(
- new Object[] {
- new BinaryStringData("1"),
- new BinaryStringData("2"),
- new BinaryStringData("11")
- }));
- // Insert
- DataChangeEvent insertEvent2 =
- DataChangeEvent.insertEvent(
- CUSTOMERS_TABLEID,
- recordDataGenerator.generate(
- new Object[] {
- new BinaryStringData("2"), new
BinaryStringData("2"), null
- }));
- DataChangeEvent insertEvent2Expect =
- DataChangeEvent.insertEvent(
- CUSTOMERS_TABLEID,
- recordDataGenerator.generate(
- new Object[] {
- new BinaryStringData("2"),
- new BinaryStringData("2"),
- new BinaryStringData("22")
- }));
- // Update
- DataChangeEvent updateEvent =
- DataChangeEvent.updateEvent(
- CUSTOMERS_TABLEID,
- recordDataGenerator.generate(
- new Object[] {
- new BinaryStringData("1"), new
BinaryStringData("2"), null
- }),
- recordDataGenerator.generate(
- new Object[] {
- new BinaryStringData("1"), new
BinaryStringData("3"), null
- }));
- DataChangeEvent updateEventExpect =
- DataChangeEvent.updateEvent(
- CUSTOMERS_TABLEID,
- recordDataGenerator.generate(
- new Object[] {
- new BinaryStringData("1"),
- new BinaryStringData("2"),
- new BinaryStringData("11")
- }),
- recordDataGenerator.generate(
- new Object[] {
- new BinaryStringData("1"),
- new BinaryStringData("3"),
- new BinaryStringData("11")
- }));
-
- transform.processElement(new StreamRecord<>(createTableEvent));
- Assertions.assertThat(
-
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
- .isEqualTo(
- new StreamRecord<>(
- new CreateTableEvent(CUSTOMERS_TABLEID,
CUSTOMERS_SCHEMA)));
- transform.processElement(new StreamRecord<>(insertEvent));
- Assertions.assertThat(
-
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
- .isEqualTo(new StreamRecord<>(insertEventExpect));
- transform.processElement(new StreamRecord<>(insertEvent2));
- Assertions.assertThat(
-
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
- .isEqualTo(new StreamRecord<>(insertEvent2Expect));
- transform.processElement(new StreamRecord<>(updateEvent));
- Assertions.assertThat(
-
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
- .isEqualTo(new StreamRecord<>(updateEventExpect));
- transformFunctionEventEventOperatorTestHarness.close();
- }
-
@Test
void testDataChangeEventTransformProjectionDataTypeConvert() throws
Exception {
PostTransformOperator transform =
@@ -1259,21 +1153,12 @@ class PostTransformOperatorTest {
.addTransform(
TIMESTAMPDIFF_TABLEID.identifier(),
"col1, TIMESTAMPDIFF(SECOND, LOCALTIMESTAMP,
CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as second_diff,"
- + " TIMESTAMPDIFF(MINUTE,
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as minute_diff,"
- + " TIMESTAMPDIFF(HOUR,
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as hour_diff,"
- + " TIMESTAMPDIFF(DAY, LOCALTIMESTAMP,
CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as day_diff,"
- + " TIMESTAMPDIFF(MONTH,
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as month_diff,"
- + " TIMESTAMPDIFF(YEAR,
LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) as year_diff",
- "col1='1'")
- .addTransform(
- TIMESTAMPDIFF_TABLEID.identifier(),
- "col1, TIMESTAMPDIFF(SECOND, LOCALTIMESTAMP,
CAST(NOW() AS TIMESTAMP)) as second_diff,"
- + " TIMESTAMPDIFF(MINUTE,
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as minute_diff,"
- + " TIMESTAMPDIFF(HOUR,
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as hour_diff,"
- + " TIMESTAMPDIFF(DAY, LOCALTIMESTAMP,
CAST(NOW() AS TIMESTAMP)) as day_diff,"
- + " TIMESTAMPDIFF(MONTH,
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as month_diff,"
- + " TIMESTAMPDIFF(YEAR,
LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) as year_diff",
- "col1='2'")
+ + " CASE WHEN col1='1' THEN
TIMESTAMPDIFF(MINUTE, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP))
WHEN col1='2' THEN TIMESTAMPDIFF(MINUTE, LOCALTIMESTAMP, CAST(NOW() AS
TIMESTAMP)) END as minute_diff,"
+ + " CASE WHEN col1='1' THEN
TIMESTAMPDIFF(HOUR, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) WHEN
col1='2' THEN TIMESTAMPDIFF(HOUR, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) END
as hour_diff,"
+ + " CASE WHEN col1='1' THEN
TIMESTAMPDIFF(DAY, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) WHEN
col1='2' THEN TIMESTAMPDIFF(DAY, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) END
as day_diff,"
+ + " CASE WHEN col1='1' THEN
TIMESTAMPDIFF(MONTH, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) WHEN
col1='2' THEN TIMESTAMPDIFF(MONTH, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP))
END as month_diff,"
+ + " CASE WHEN col1='1' THEN
TIMESTAMPDIFF(YEAR, LOCALTIMESTAMP, CAST(CURRENT_TIMESTAMP AS TIMESTAMP)) WHEN
col1='2' THEN TIMESTAMPDIFF(YEAR, LOCALTIMESTAMP, CAST(NOW() AS TIMESTAMP)) END
as year_diff",
+ null)
.addTimezone("Asia/Shanghai")
.build();
RegularEventOperatorTestHarness<PostTransformOperator, Event>
@@ -1336,51 +1221,11 @@ class PostTransformOperatorTest {
.addTransform(
TIMESTAMPDIFF_DATA_TABLEID.identifier(),
"col1, time_interval_unit,"
- + " TIMESTAMPDIFF(SECOND,
start_timestamp, end_timestamp) as timestamp_timestamp,"
- + " TIMESTAMPDIFF(SECOND,
start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz,"
- + " TIMESTAMPDIFF(SECOND,
start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp,"
- + " TIMESTAMPDIFF(SECOND,
start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz",
- "time_interval_unit='SECOND'")
- .addTransform(
- TIMESTAMPDIFF_DATA_TABLEID.identifier(),
- "col1, time_interval_unit,"
- + " TIMESTAMPDIFF(MINUTE,
start_timestamp, end_timestamp) as timestamp_timestamp,"
- + " TIMESTAMPDIFF(MINUTE,
start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz,"
- + " TIMESTAMPDIFF(MINUTE,
start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp,"
- + " TIMESTAMPDIFF(MINUTE,
start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz",
- "time_interval_unit='MINUTE'")
- .addTransform(
- TIMESTAMPDIFF_DATA_TABLEID.identifier(),
- "col1, time_interval_unit,"
- + " TIMESTAMPDIFF(HOUR,
start_timestamp, end_timestamp) as timestamp_timestamp,"
- + " TIMESTAMPDIFF(HOUR,
start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz,"
- + " TIMESTAMPDIFF(HOUR,
start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp,"
- + " TIMESTAMPDIFF(HOUR,
start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz",
- "time_interval_unit='HOUR'")
- .addTransform(
- TIMESTAMPDIFF_DATA_TABLEID.identifier(),
- "col1, time_interval_unit,"
- + " TIMESTAMPDIFF(DAY,
start_timestamp, end_timestamp) as timestamp_timestamp,"
- + " TIMESTAMPDIFF(DAY,
start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz,"
- + " TIMESTAMPDIFF(DAY,
start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp,"
- + " TIMESTAMPDIFF(DAY,
start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz",
- "time_interval_unit='DAY'")
- .addTransform(
- TIMESTAMPDIFF_DATA_TABLEID.identifier(),
- "col1, time_interval_unit,"
- + " TIMESTAMPDIFF(MONTH,
start_timestamp, end_timestamp) as timestamp_timestamp,"
- + " TIMESTAMPDIFF(MONTH,
start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz,"
- + " TIMESTAMPDIFF(MONTH,
start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp,"
- + " TIMESTAMPDIFF(MONTH,
start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz",
- "time_interval_unit='MONTH'")
- .addTransform(
- TIMESTAMPDIFF_DATA_TABLEID.identifier(),
- "col1, time_interval_unit,"
- + " TIMESTAMPDIFF(YEAR,
start_timestamp, end_timestamp) as timestamp_timestamp,"
- + " TIMESTAMPDIFF(YEAR,
start_timestamp, end_timestamp_ltz) as timestamp_timestamp_ltz,"
- + " TIMESTAMPDIFF(YEAR,
start_timestamp_ltz, end_timestamp) as timestamp_ltz_timestamp,"
- + " TIMESTAMPDIFF(YEAR,
start_timestamp_ltz, end_timestamp_ltz) as timestamp_ltz_timestamp_ltz",
- "time_interval_unit='YEAR'")
+ + " CASE WHEN
time_interval_unit='SECOND' THEN TIMESTAMPDIFF(SECOND, start_timestamp,
end_timestamp) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPDIFF(MINUTE,
start_timestamp, end_timestamp) WHEN time_interval_unit='HOUR' THEN
TIMESTAMPDIFF(HOUR, start_timestamp, end_timestamp) WHEN
time_interval_unit='DAY' THEN TIMESTAMPDIFF(DAY, start_timestamp,
end_timestamp) WHEN time_interval_unit='MONTH' THEN TIMESTAMPDIFF(MONTH,
start_timestamp, end_times [...]
+ + " CASE WHEN
time_interval_unit='SECOND' THEN TIMESTAMPDIFF(SECOND, start_timestamp,
end_timestamp_ltz) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPDIFF(MINUTE,
start_timestamp, end_timestamp_ltz) WHEN time_interval_unit='HOUR' THEN
TIMESTAMPDIFF(HOUR, start_timestamp, end_timestamp_ltz) WHEN
time_interval_unit='DAY' THEN TIMESTAMPDIFF(DAY, start_timestamp,
end_timestamp_ltz) WHEN time_interval_unit='MONTH' THEN TIMESTAMPDIFF(MONTH,
start_times [...]
+ + " CASE WHEN
time_interval_unit='SECOND' THEN TIMESTAMPDIFF(SECOND, start_timestamp_ltz,
end_timestamp) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPDIFF(MINUTE,
start_timestamp_ltz, end_timestamp) WHEN time_interval_unit='HOUR' THEN
TIMESTAMPDIFF(HOUR, start_timestamp_ltz, end_timestamp) WHEN
time_interval_unit='DAY' THEN TIMESTAMPDIFF(DAY, start_timestamp_ltz,
end_timestamp) WHEN time_interval_unit='MONTH' THEN TIMESTAMPDIFF(MONTH,
start_times [...]
+ + " CASE WHEN
time_interval_unit='SECOND' THEN TIMESTAMPDIFF(SECOND, start_timestamp_ltz,
end_timestamp_ltz) WHEN time_interval_unit='MINUTE' THEN TIMESTAMPDIFF(MINUTE,
start_timestamp_ltz, end_timestamp_ltz) WHEN time_interval_unit='HOUR' THEN
TIMESTAMPDIFF(HOUR, start_timestamp_ltz, end_timestamp_ltz) WHEN
time_interval_unit='DAY' THEN TIMESTAMPDIFF(DAY, start_timestamp_ltz,
end_timestamp_ltz) WHEN time_interval_unit='MONTH' THEN TIMESTAMPDIFF(MO [...]
+ null)
.addTimezone("UTC")
.build();
RegularEventOperatorTestHarness<PostTransformOperator, Event>
@@ -1676,22 +1521,14 @@ class PostTransformOperatorTest {
PostTransformOperator.newBuilder()
.addTransform(
TIMESTAMPADD_TABLEID.identifier(),
- "col1, DATE_FORMAT(TIMESTAMPADD(SECOND, 1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as second_add,"
- + " DATE_FORMAT(TIMESTAMPADD(MINUTE,
1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as minute_add,"
- + " DATE_FORMAT(TIMESTAMPADD(HOUR, 1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as hour_add,"
- + " DATE_FORMAT(TIMESTAMPADD(DAY, 1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as day_add,"
- + " DATE_FORMAT(TIMESTAMPADD(MONTH, 1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as month_add,"
- + " DATE_FORMAT(TIMESTAMPADD(YEAR, 1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as year_add",
- "col1='1'")
- .addTransform(
- TIMESTAMPADD_TABLEID.identifier(),
- "col1, DATE_FORMAT(TIMESTAMPADD(SECOND, -1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as second_add,"
- + " DATE_FORMAT(TIMESTAMPADD(MINUTE,
-1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as minute_add,"
- + " DATE_FORMAT(TIMESTAMPADD(HOUR, -1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as hour_add,"
- + " DATE_FORMAT(TIMESTAMPADD(DAY, -1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as day_add,"
- + " DATE_FORMAT(TIMESTAMPADD(MONTH,
-1, TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as month_add,"
- + " DATE_FORMAT(TIMESTAMPADD(YEAR, -1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') as year_add",
- "col1='2'")
+ "col1, "
+ + "CASE WHEN col1='1' THEN
DATE_FORMAT(TIMESTAMPADD(SECOND, 1, TO_TIMESTAMP('2024-10-01 00:00:00')),
'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(SECOND, -1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as second_add,"
+ + "CASE WHEN col1='1' THEN
DATE_FORMAT(TIMESTAMPADD(MINUTE, 1, TO_TIMESTAMP('2024-10-01 00:00:00')),
'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(MINUTE, -1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as minute_add,"
+ + "CASE WHEN col1='1' THEN
DATE_FORMAT(TIMESTAMPADD(HOUR, 1, TO_TIMESTAMP('2024-10-01 00:00:00')),
'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(HOUR, -1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as hour_add,"
+ + "CASE WHEN col1='1' THEN
DATE_FORMAT(TIMESTAMPADD(DAY, 1, TO_TIMESTAMP('2024-10-01 00:00:00')),
'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(DAY, -1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as day_add,"
+ + "CASE WHEN col1='1' THEN
DATE_FORMAT(TIMESTAMPADD(MONTH, 1, TO_TIMESTAMP('2024-10-01 00:00:00')),
'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(MONTH, -1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as month_add,"
+ + "CASE WHEN col1='1' THEN
DATE_FORMAT(TIMESTAMPADD(YEAR, 1, TO_TIMESTAMP('2024-10-01 00:00:00')),
'yyyy-MM-dd HH:mm:ss') WHEN col1='2' THEN DATE_FORMAT(TIMESTAMPADD(YEAR, -1,
TO_TIMESTAMP('2024-10-01 00:00:00')), 'yyyy-MM-dd HH:mm:ss') END as year_add",
+ null)
.addTimezone("UTC")
.build();
RegularEventOperatorTestHarness<PostTransformOperator, Event>
@@ -1771,39 +1608,9 @@ class PostTransformOperatorTest {
.addTransform(
TIMESTAMPADD_DATA_TABLEID.identifier(),
"col1, time_interval_unit, interval_value,"
- + " TIMESTAMPADD(SECOND,
interval_value, time_point_timestamp) as time_point_timestamp,"
- + " TIMESTAMPADD(SECOND,
interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz",
- "time_interval_unit='SECOND'")
- .addTransform(
- TIMESTAMPADD_DATA_TABLEID.identifier(),
- "col1, time_interval_unit, interval_value,"
- + " TIMESTAMPADD(MINUTE,
interval_value, time_point_timestamp) as time_point_timestamp,"
- + " TIMESTAMPADD(MINUTE,
interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz",
- "time_interval_unit='MINUTE'")
- .addTransform(
- TIMESTAMPADD_DATA_TABLEID.identifier(),
- "col1, time_interval_unit, interval_value,"
- + " TIMESTAMPADD(HOUR, interval_value,
time_point_timestamp) as time_point_timestamp,"
- + " TIMESTAMPADD(HOUR, interval_value,
time_point_timestamp_ltz) as time_point_timestamp_ltz",
- "time_interval_unit='HOUR'")
- .addTransform(
- TIMESTAMPADD_DATA_TABLEID.identifier(),
- "col1, time_interval_unit, interval_value,"
- + " TIMESTAMPADD(DAY, interval_value,
time_point_timestamp) as time_point_timestamp,"
- + " TIMESTAMPADD(DAY, interval_value,
time_point_timestamp_ltz) as time_point_timestamp_ltz",
- "time_interval_unit='DAY'")
- .addTransform(
- TIMESTAMPADD_DATA_TABLEID.identifier(),
- "col1, time_interval_unit, interval_value,"
- + " TIMESTAMPADD(MONTH,
interval_value, time_point_timestamp) as time_point_timestamp,"
- + " TIMESTAMPADD(MONTH,
interval_value, time_point_timestamp_ltz) as time_point_timestamp_ltz",
- "time_interval_unit='MONTH'")
- .addTransform(
- TIMESTAMPADD_DATA_TABLEID.identifier(),
- "col1, time_interval_unit, interval_value,"
- + " TIMESTAMPADD(YEAR, interval_value,
time_point_timestamp) as time_point_timestamp,"
- + " TIMESTAMPADD(YEAR, interval_value,
time_point_timestamp_ltz) as time_point_timestamp_ltz",
- "time_interval_unit='YEAR'")
+ + " CASE WHEN
time_interval_unit='SECOND' THEN TIMESTAMPADD(SECOND, interval_value,
time_point_timestamp) WHEN time_interval_unit='MINUTE' THEN
TIMESTAMPADD(MINUTE, interval_value, time_point_timestamp) WHEN
time_interval_unit='HOUR' THEN TIMESTAMPADD(HOUR, interval_value,
time_point_timestamp) WHEN time_interval_unit='DAY' THEN TIMESTAMPADD(DAY,
interval_value, time_point_timestamp) WHEN time_interval_unit='MONTH' THEN
TIMESTAMPADD(MONTH, interval [...]
+ + " CASE WHEN
time_interval_unit='SECOND' THEN TIMESTAMPADD(SECOND, interval_value,
time_point_timestamp_ltz) WHEN time_interval_unit='MINUTE' THEN
TIMESTAMPADD(MINUTE, interval_value, time_point_timestamp_ltz) WHEN
time_interval_unit='HOUR' THEN TIMESTAMPADD(HOUR, interval_value,
time_point_timestamp_ltz) WHEN time_interval_unit='DAY' THEN TIMESTAMPADD(DAY,
interval_value, time_point_timestamp_ltz) WHEN time_interval_unit='MONTH' THEN
TIMESTAMPADD [...]
+ null)
.addTimezone("UTC")
.build();
RegularEventOperatorTestHarness<PostTransformOperator, Event>
@@ -2135,153 +1942,18 @@ class PostTransformOperatorTest {
.addTransform(
CAST_TABLEID.identifier(),
"col1"
- + ",cast(col1 as int) as castInt"
- + ",cast(col1 as boolean) as
castBoolean"
- + ",cast(col1 as tinyint) as
castTinyint"
- + ",cast(col1 as smallint) as
castSmallint"
- + ",cast(col1 as bigint) as castBigint"
- + ",cast(col1 as float) as castFloat"
- + ",cast(col1 as double) as castDouble"
- + ",cast(col1 as char) as castChar"
- + ",cast(col1 as varchar) as
castVarchar"
- + ",cast(col1 as DECIMAL(4,2)) as
castDecimal"
- + ", castTimestamp",
- "col1 = '1'")
- .addTransform(
- CAST_TABLEID.identifier(),
- "col1"
- + ",cast(castInt as int) as castInt"
- + ",cast(castInt as boolean) as
castBoolean"
- + ",cast(castInt as tinyint) as
castTinyint"
- + ",cast(castInt as smallint) as
castSmallint"
- + ",cast(castInt as bigint) as
castBigint"
- + ",cast(castInt as float) as
castFloat"
- + ",cast(castInt as double) as
castDouble"
- + ",cast(castInt as char) as castChar"
- + ",cast(castInt as varchar) as
castVarchar"
- + ",cast(castInt as DECIMAL(4,2)) as
castDecimal"
- + ", castTimestamp",
- "col1 = '2'")
- .addTransform(
- CAST_TABLEID.identifier(),
- "col1"
- + ",cast(castBoolean as int) as
castInt"
- + ",cast(castBoolean as boolean) as
castBoolean"
- + ",cast(castBoolean as tinyint) as
castTinyint"
- + ",cast(castBoolean as smallint) as
castSmallint"
- + ",cast(castBoolean as bigint) as
castBigint"
- + ",castFloat"
- + ",castDouble"
- + ",cast(castBoolean as char) as
castChar"
- + ",cast(castBoolean as varchar) as
castVarchar"
- + ",castDecimal"
- + ", castTimestamp",
- "col1 = '3'")
- .addTransform(
- CAST_TABLEID.identifier(),
- "col1"
- + ",cast(castTinyint as int) as
castInt"
- + ",cast(castTinyint as boolean) as
castBoolean"
- + ",cast(castTinyint as tinyint) as
castTinyint"
- + ",cast(castTinyint as smallint) as
castSmallint"
- + ",cast(castTinyint as bigint) as
castBigint"
- + ",cast(castTinyint as float) as
castFloat"
- + ",cast(castTinyint as double) as
castDouble"
- + ",cast(castTinyint as char) as
castChar"
- + ",cast(castTinyint as varchar) as
castVarchar"
- + ",cast(castTinyint as DECIMAL(4,2))
as castDecimal"
- + ", castTimestamp",
- "col1 = '4'")
- .addTransform(
- CAST_TABLEID.identifier(),
- "col1"
- + ",cast(castSmallint as int) as
castInt"
- + ",cast(castSmallint as boolean) as
castBoolean"
- + ",cast(castSmallint as tinyint) as
castTinyint"
- + ",cast(castSmallint as smallint) as
castSmallint"
- + ",cast(castSmallint as bigint) as
castBigint"
- + ",cast(castSmallint as float) as
castFloat"
- + ",cast(castSmallint as double) as
castDouble"
- + ",cast(castSmallint as char) as
castChar"
- + ",cast(castSmallint as varchar) as
castVarchar"
- + ",cast(castSmallint as DECIMAL(4,2))
as castDecimal"
- + ", castTimestamp",
- "col1 = '5'")
- .addTransform(
- CAST_TABLEID.identifier(),
- "col1"
- + ",cast(castBigint as int) as castInt"
- + ",cast(castBigint as boolean) as
castBoolean"
- + ",cast(castBigint as tinyint) as
castTinyint"
- + ",cast(castBigint as smallint) as
castSmallint"
- + ",cast(castBigint as bigint) as
castBigint"
- + ",cast(castBigint as float) as
castFloat"
- + ",cast(castBigint as double) as
castDouble"
- + ",cast(castBigint as char) as
castChar"
- + ",cast(castBigint as varchar) as
castVarchar"
- + ",cast(castBigint as DECIMAL(4,2))
as castDecimal"
- + ", castTimestamp",
- "col1 = '6'")
- .addTransform(
- CAST_TABLEID.identifier(),
- "col1"
- + ",castInt"
- + ",cast(castFloat as boolean) as
castBoolean"
- + ",castTinyint"
- + ",castSmallint"
- + ",castBigint"
- + ",cast(castFloat as float) as
castFloat"
- + ",cast(castFloat as double) as
castDouble"
- + ",cast(castFloat as char) as
castChar"
- + ",cast(castFloat as varchar) as
castVarchar"
- + ",cast(castFloat as DECIMAL(4,2)) as
castDecimal"
- + ", castTimestamp",
- "col1 = '7'")
- .addTransform(
- CAST_TABLEID.identifier(),
- "col1"
- + ",castInt"
- + ",cast(castDouble as boolean) as
castBoolean"
- + ",castTinyint"
- + ",castSmallint"
- + ",castBigint"
- + ",cast(castDouble as float) as
castFloat"
- + ",cast(castDouble as double) as
castDouble"
- + ",cast(castDouble as char) as
castChar"
- + ",cast(castDouble as varchar) as
castVarchar"
- + ",cast(castDouble as DECIMAL(4,2))
as castDecimal"
- + ", castTimestamp",
- "col1 = '8'")
- .addTransform(
- CAST_TABLEID.identifier(),
- "col1"
- + ",castInt"
- + ",cast(castDecimal as boolean) as
castBoolean"
- + ",castTinyint"
- + ",castSmallint"
- + ",castBigint"
- + ",cast(castDecimal as float) as
castFloat"
- + ",cast(castDecimal as double) as
castDouble"
- + ",cast(castDecimal as char) as
castChar"
- + ",cast(castDecimal as varchar) as
castVarchar"
- + ",cast(castDecimal as DECIMAL(4,2))
as castDecimal"
- + ", castTimestamp",
- "col1 = '9'")
- .addTransform(
- CAST_TABLEID.identifier(),
- "col1"
- + ",castInt"
- + ",castBoolean"
- + ",castTinyint"
- + ",castSmallint"
- + ",castBigint"
- + ",castFloat"
- + ",castDouble"
- + ",castChar"
- + ",cast(castTimestamp as varchar) as
castVarchar"
- + ",castDecimal"
- + ",cast('1970-01-01T00:00:01.234' as
TIMESTAMP(3)) as castTimestamp",
- "col1 = '10'")
+ + ",CASE WHEN col1 = '1' THEN
cast(col1 as int) WHEN col1 = '2' THEN cast(castInt as int) WHEN col1 = '3'
THEN cast(castBoolean as int) WHEN col1 = '4' THEN cast(castTinyint as int)
WHEN col1 = '5' THEN cast(castSmallint as int) WHEN col1 = '6' THEN
cast(castBigint as int) WHEN col1 = '7' THEN castInt WHEN col1 = '8' THEN
castInt WHEN col1 = '9' THEN castInt WHEN col1 = '10' THEN castInt END as
castInt"
+ + ",CASE WHEN col1 = '1' THEN
cast(col1 as boolean) WHEN col1 = '2' THEN cast(castInt as boolean) WHEN col1 =
'3' THEN cast(castBoolean as boolean) WHEN col1 = '4' THEN cast(castTinyint as
boolean) WHEN col1 = '5' THEN cast(castSmallint as boolean) WHEN col1 = '6'
THEN cast(castBigint as boolean) WHEN col1 = '7' THEN cast(castFloat as
boolean) WHEN col1 = '8' THEN cast(castDouble as boolean) WHEN col1 = '9' THEN
cast(castDecimal as boolean) WHEN co [...]
+ + ",CASE WHEN col1 = '1' THEN
cast(col1 as tinyint) WHEN col1 = '2' THEN cast(castInt as tinyint) WHEN col1 =
'3' THEN cast(castBoolean as tinyint) WHEN col1 = '4' THEN cast(castTinyint as
tinyint) WHEN col1 = '5' THEN cast(castSmallint as tinyint) WHEN col1 = '6'
THEN cast(castBigint as tinyint) WHEN col1 = '7' THEN castTinyint WHEN col1 =
'8' THEN castTinyint WHEN col1 = '9' THEN castTinyint WHEN col1 = '10' THEN
castTinyint END as castTinyint"
+ + ",CASE WHEN col1 = '1' THEN
cast(col1 as smallint) WHEN col1 = '2' THEN cast(castInt as smallint) WHEN col1
= '3' THEN cast(castBoolean as smallint) WHEN col1 = '4' THEN cast(castTinyint
as smallint) WHEN col1 = '5' THEN cast(castSmallint as smallint) WHEN col1 =
'6' THEN cast(castBigint as smallint) WHEN col1 = '7' THEN castSmallint WHEN
col1 = '8' THEN castSmallint WHEN col1 = '9' THEN castSmallint WHEN col1 = '10'
THEN castSmallint END as cast [...]
+ + ",CASE WHEN col1 = '1' THEN
cast(col1 as bigint) WHEN col1 = '2' THEN cast(castInt as bigint) WHEN col1 =
'3' THEN cast(castBoolean as bigint) WHEN col1 = '4' THEN cast(castTinyint as
bigint) WHEN col1 = '5' THEN cast(castSmallint as bigint) WHEN col1 = '6' THEN
cast(castBigint as bigint) WHEN col1 = '7' THEN castBigint WHEN col1 = '8' THEN
castBigint WHEN col1 = '9' THEN castBigint WHEN col1 = '10' THEN castBigint END
as castBigint"
+ + ",CASE WHEN col1 = '1' THEN
cast(col1 as float) WHEN col1 = '2' THEN cast(castInt as float) WHEN col1 = '3'
THEN castFloat WHEN col1 = '4' THEN cast(castTinyint as float) WHEN col1 = '5'
THEN cast(castSmallint as float) WHEN col1 = '6' THEN cast(castBigint as float)
WHEN col1 = '7' THEN cast(castFloat as float) WHEN col1 = '8' THEN
cast(castDouble as float) WHEN col1 = '9' THEN cast(castDecimal as float) WHEN
col1 = '10' THEN castFloat END as castFloat"
+ + ",CASE WHEN col1 = '1' THEN
cast(col1 as double) WHEN col1 = '2' THEN cast(castInt as double) WHEN col1 =
'3' THEN castDouble WHEN col1 = '4' THEN cast(castTinyint as double) WHEN col1
= '5' THEN cast(castSmallint as double) WHEN col1 = '6' THEN cast(castBigint as
double) WHEN col1 = '7' THEN cast(castFloat as double) WHEN col1 = '8' THEN
cast(castDouble as double) WHEN col1 = '9' THEN cast(castDecimal as double)
WHEN col1 = '10' THEN castDouble [...]
+ + ",CASE WHEN col1 = '1' THEN
cast(col1 as char) WHEN col1 = '2' THEN cast(castInt as char) WHEN col1 = '3'
THEN cast(castBoolean as char) WHEN col1 = '4' THEN cast(castTinyint as char)
WHEN col1 = '5' THEN cast(castSmallint as char) WHEN col1 = '6' THEN
cast(castBigint as char) WHEN col1 = '7' THEN cast(castFloat as char) WHEN col1
= '8' THEN cast(castDouble as char) WHEN col1 = '9' THEN cast(castDecimal as
char) WHEN col1 = '10' THEN castChar END [...]
+ + ",CASE WHEN col1 = '1' THEN
cast(col1 as varchar) WHEN col1 = '2' THEN cast(castInt as varchar) WHEN col1 =
'3' THEN cast(castBoolean as varchar) WHEN col1 = '4' THEN cast(castTinyint as
varchar) WHEN col1 = '5' THEN cast(castSmallint as varchar) WHEN col1 = '6'
THEN cast(castBigint as varchar) WHEN col1 = '7' THEN cast(castFloat as
varchar) WHEN col1 = '8' THEN cast(castDouble as varchar) WHEN col1 = '9' THEN
cast(castDecimal as varchar) WHEN co [...]
+ + ",CASE WHEN col1 = '1' THEN
cast(col1 as DECIMAL(4,2)) WHEN col1 = '2' THEN cast(castInt as DECIMAL(4,2))
WHEN col1 = '3' THEN castDecimal WHEN col1 = '4' THEN cast(castTinyint as
DECIMAL(4,2)) WHEN col1 = '5' THEN cast(castSmallint as DECIMAL(4,2)) WHEN col1
= '6' THEN cast(castBigint as DECIMAL(4,2)) WHEN col1 = '7' THEN cast(castFloat
as DECIMAL(4,2)) WHEN col1 = '8' THEN cast(castDouble as DECIMAL(4,2)) WHEN
col1 = '9' THEN cast(castDecimal a [...]
+ + ",CASE WHEN col1 = '1' THEN
castTimestamp WHEN col1 = '2' THEN castTimestamp WHEN col1 = '3' THEN
castTimestamp WHEN col1 = '4' THEN castTimestamp WHEN col1 = '5' THEN
castTimestamp WHEN col1 = '6' THEN castTimestamp WHEN col1 = '7' THEN
castTimestamp WHEN col1 = '8' THEN castTimestamp WHEN col1 = '9' THEN
castTimestamp WHEN col1 = '10' THEN cast('1970-01-01T00:00:01.234' as
TIMESTAMP(3)) END as castTimestamp",
+ null)
.addTimezone("UTC")
.build();
RegularEventOperatorTestHarness<PostTransformOperator, Event>
diff --git a/pom.xml b/pom.xml
index 53220c9ef..fb431f0af 100644
--- a/pom.xml
+++ b/pom.xml
@@ -366,7 +366,6 @@ limitations under the License.
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.15</version>
- <scope>test</scope>
</dependency>
</dependencies>