This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 1bf40f09cc9fd387df4219d93fdce034666425a5 Author: MOBIN <18814118...@163.com> AuthorDate: Thu Nov 14 11:41:56 2024 +0800 [FLINK-36093][transform] Fix preTransformoperator wrongly filters columns belong to different transforms This closes #3572 --- .../flink/FlinkPipelineTransformITCase.java | 101 ++++++++++++++- .../cdc/pipeline/tests/TransformE2eITCase.java | 76 ++++++++++++ .../operators/transform/PreTransformOperator.java | 137 ++++++++++++--------- .../operators/transform/PreTransformProcessor.java | 69 ++--------- .../transform/PreTransformOperatorTest.java | 121 ++++++++++++++++++ 5 files changed, 388 insertions(+), 116 deletions(-) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 5c19ae484..25fd3baad 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -255,6 +255,105 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, OLD], after=[], op=DELETE, meta=()}")); } + @ParameterizedTest + @EnumSource + void testMultipleTransformWithDiffRefColumn(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Arrays.asList( + new TransformDef( + "default_namespace.default_schema.\\.*", + "id,age,'Juvenile' AS roleName", + "age < 18", + null, + null, + null, + null), + new TransformDef( + "default_namespace.default_schema.\\.*", + "id,age,name AS roleName", + "age >= 18", + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`age` INT,`roleName` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, 18, Alice], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, 20, Bob], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 20, Bob], after=[2, 30, Bob], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`age` TINYINT,`roleName` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, 15, Juvenile], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, 25, Derrida], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, 25, Derrida], after=[], op=DELETE, meta=()}")); + } + + @ParameterizedTest + @EnumSource + void testMultiTransformWithAsterisk(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Arrays.asList( + new TransformDef( + "default_namespace.default_schema.mytable2", + "*,'Juvenile' AS roleName", + "age < 18", + null, + null, + null, + null), + new TransformDef( + "default_namespace.default_schema.mytable2", + "id,name,age,description,name AS roleName", + "age >= 18", + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`roleName` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, Juvenile], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, Derrida], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, Derrida], after=[], op=DELETE, meta=()}")); + } + + @ParameterizedTest + @EnumSource + void testMultiTransformMissingProjection(ValuesDataSink.SinkApi sinkApi) throws Exception { + runGenericTransformTest( + sinkApi, + Arrays.asList( + new TransformDef( + "default_namespace.default_schema.mytable2", + null, + "age < 18", + null, + null, + null, + null), + new TransformDef( + "default_namespace.default_schema.mytable2", + "id,UPPER(name) AS name,age,description", + "age >= 18", + null, + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20], after=[2, Bob, 30], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT,`name` STRING,`age` TINYINT,`description` STRING}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, DERRIDA, 25, student], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, DERRIDA, 25, student], after=[], op=DELETE, meta=()}")); + } + /** This tests if transform generates metadata info correctly. */ @ParameterizedTest @EnumSource @@ -1297,7 +1396,7 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", // Alter column type stage - "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}", + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index e8603991e..04bd13368 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -597,6 +597,82 @@ public class TransformE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=%s.TABLEBETA, before=[2011, 11, Big Sur, 21, Eva, 3011, 11], after=[], op=DELETE, meta=()}"); } + @Test + public void testMultipleTransformWithDiffRefColumn() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.TABLEALPHA\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "sink:\n" + + " type: values\n" + + "transform:\n" + + " - source-table: %s.TABLEALPHA\n" + + " projection: ID, VERSION, PRICEALPHA, AGEALPHA, 'Juvenile' AS ROLENAME\n" + + " filter: AGEALPHA < 18\n" + + " - source-table: %s.TABLEALPHA\n" + + " projection: ID, VERSION, PRICEALPHA, AGEALPHA, NAMEALPHA AS ROLENAME\n" + + " filter: AGEALPHA >= 18\n" + + "pipeline:\n" + + " parallelism: %d", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + transformTestDatabase.getDatabaseName(), + transformTestDatabase.getDatabaseName(), + transformTestDatabase.getDatabaseName(), + parallelism); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path valuesCdcJar = TestUtils.getResource("values-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, valuesCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + + waitUntilSpecificEvent( + String.format( + "CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`ROLENAME` STRING}, primaryKeys=ID, options=()}", + transformTestDatabase.getDatabaseName()), + 60000L); + + validateEvents( + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1008, 8, 199, 17, Juvenile], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave], op=INSERT, meta=()}"); + + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + transformTestDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("UPDATE TABLEALPHA SET VERSION='100' WHERE id=1009;"); + stat.execute("INSERT INTO TABLEALPHA VALUES (3007, '7', 79, 25, 'IINA');"); + stat.execute("DELETE FROM TABLEALPHA WHERE id=1011;"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + validateEvents( + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob], after=[1009, 100, 0, 18, Bob], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 25, IINA], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[1011, 11, 59, 20, Dave], after=[], op=DELETE, meta=()}"); + } + @Test public void testTransformWithCast() throws Exception { String pipelineJob = diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index ed5b57595..af7b5773a 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.utils.SchemaUtils; @@ -46,6 +47,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -253,7 +255,6 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent event) { TableId tableId = event.tableId(); PreTransformChangeInfo tableChangeInfo = preTransformChangeInfoMap.get(tableId); - List<String> columnNamesBeforeChange = tableChangeInfo.getSourceSchema().getColumnNames(); Schema originalSchema = SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event); @@ -284,6 +285,7 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> SchemaUtils.applySchemaChangeEvent( tableChangeInfo.getPreTransformedSchema(), schemaChangeEvent.get()); } + cachePreTransformProcessor(tableId, originalSchema); preTransformChangeInfoMap.put( tableId, PreTransformChangeInfo.of(tableId, originalSchema, preTransformedSchema)); return schemaChangeEvent; @@ -314,8 +316,6 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) { TableId tableId = createTableEvent.tableId(); - PreTransformChangeInfo tableChangeInfo = preTransformChangeInfoMap.get(tableId); - cacheTransformRuleInfo(createTableEvent); for (Tuple2<Selectors, SchemaMetadataTransform> transform : schemaMetadataTransformers) { Selectors selectors = transform.f0; if (selectors.isMatch(tableId)) { @@ -327,27 +327,74 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> } } + cachePreTransformProcessor(tableId, createTableEvent.getSchema()); + if (preTransformProcessorMap.containsKey(tableId)) { + return preTransformProcessorMap + .get(tableId) + .preTransformCreateTableEvent(createTableEvent); + } + return createTableEvent; + } + + private void cachePreTransformProcessor(TableId tableId, Schema tableSchema) { + LinkedHashSet<Column> referencedColumnsSet = new LinkedHashSet<>(); + boolean hasMatchTransform = false; for (PreTransformer transform : transforms) { - Selectors selectors = transform.getSelectors(); - if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) { + if (!transform.getSelectors().isMatch(tableId)) { + continue; + } + if (!transform.getProjection().isPresent()) { + processProjectionTransform(tableId, tableSchema, referencedColumnsSet, null); + hasMatchTransform = true; + } else { TransformProjection transformProjection = transform.getProjection().get(); - TransformFilter transformFilter = transform.getFilter().orElse(null); if (transformProjection.isValid()) { - if (!preTransformProcessorMap.containsKey(tableId)) { - preTransformProcessorMap.put( - tableId, - new PreTransformProcessor( - tableChangeInfo, transformProjection, transformFilter)); - } - PreTransformProcessor preTransformProcessor = - preTransformProcessorMap.get(tableId); - // TODO: Currently this code wrongly filters out rows that weren't referenced in - // the first matching transform rule but in the following transform rules. - return preTransformProcessor.preTransformCreateTableEvent(createTableEvent); + processProjectionTransform( + tableId, tableSchema, referencedColumnsSet, transform); + hasMatchTransform = true; } } } - return createTableEvent; + if (!hasMatchTransform) { + processProjectionTransform(tableId, tableSchema, referencedColumnsSet, null); + } + } + + public void processProjectionTransform( + TableId tableId, + Schema tableSchema, + LinkedHashSet<Column> referencedColumnsSet, + @Nullable PreTransformer transform) { + // If this TableId isn't presented in any transform block, it should behave like a "*" + // projection and should be regarded as asterisk-ful. + if (transform == null) { + referencedColumnsSet.addAll(tableSchema.getColumns()); + hasAsteriskMap.put(tableId, true); + } else { + TransformProjection transformProjection = transform.getProjection().get(); + boolean hasAsterisk = TransformParser.hasAsterisk(transformProjection.getProjection()); + if (hasAsterisk) { + referencedColumnsSet.addAll(tableSchema.getColumns()); + hasAsteriskMap.put(tableId, true); + } else { + TransformFilter transformFilter = transform.getFilter().orElse(null); + List<Column> referencedColumns = + TransformParser.generateReferencedColumns( + transformProjection.getProjection(), + transformFilter != null ? transformFilter.getExpression() : null, + tableSchema.getColumns()); + // update referenced columns of other projections of the same tableId, if any + referencedColumnsSet.addAll(referencedColumns); + hasAsteriskMap.putIfAbsent(tableId, false); + } + } + + PreTransformChangeInfo tableChangeInfo = + PreTransformChangeInfo.of( + tableId, + tableSchema, + tableSchema.copy(new ArrayList<>(referencedColumnsSet))); + preTransformProcessorMap.put(tableId, new PreTransformProcessor(tableChangeInfo)); } private Schema transformSchemaMetaData( @@ -371,46 +418,22 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> return schemaBuilder.build(); } - private DataChangeEvent processDataChangeEvent(DataChangeEvent dataChangeEvent) - throws Exception { - TableId tableId = dataChangeEvent.tableId(); - for (PreTransformer transform : transforms) { - Selectors selectors = transform.getSelectors(); - - if (selectors.isMatch(tableId) && transform.getProjection().isPresent()) { - TransformProjection transformProjection = transform.getProjection().get(); - TransformFilter transformFilter = transform.getFilter().orElse(null); - if (transformProjection.isValid()) { - return processProjection(transformProjection, transformFilter, dataChangeEvent); - } + private DataChangeEvent processDataChangeEvent(DataChangeEvent dataChangeEvent) { + if (!transforms.isEmpty()) { + PreTransformProcessor preTransformProcessor = + preTransformProcessorMap.get(dataChangeEvent.tableId()); + BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before(); + BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after(); + if (before != null) { + BinaryRecordData projectedBefore = + preTransformProcessor.processFillDataField(before); + dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore); } - } - return dataChangeEvent; - } - - private DataChangeEvent processProjection( - TransformProjection transformProjection, - @Nullable TransformFilter transformFilter, - DataChangeEvent dataChangeEvent) { - TableId tableId = dataChangeEvent.tableId(); - PreTransformChangeInfo tableChangeInfo = preTransformChangeInfoMap.get(tableId); - if (!preTransformProcessorMap.containsKey(tableId) - || !preTransformProcessorMap.get(tableId).hasTableChangeInfo()) { - preTransformProcessorMap.put( - tableId, - new PreTransformProcessor( - tableChangeInfo, transformProjection, transformFilter)); - } - PreTransformProcessor preTransformProcessor = preTransformProcessorMap.get(tableId); - BinaryRecordData before = (BinaryRecordData) dataChangeEvent.before(); - BinaryRecordData after = (BinaryRecordData) dataChangeEvent.after(); - if (before != null) { - BinaryRecordData projectedBefore = preTransformProcessor.processFillDataField(before); - dataChangeEvent = DataChangeEvent.projectBefore(dataChangeEvent, projectedBefore); - } - if (after != null) { - BinaryRecordData projectedAfter = preTransformProcessor.processFillDataField(after); - dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter); + if (after != null) { + BinaryRecordData projectedAfter = preTransformProcessor.processFillDataField(after); + dataChangeEvent = DataChangeEvent.projectAfter(dataChangeEvent, projectedAfter); + } + return dataChangeEvent; } return dataChangeEvent; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java index 1ede3ce77..d4f6fec7d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformProcessor.java @@ -22,11 +22,8 @@ import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; -import org.apache.flink.cdc.runtime.parser.TransformParser; import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; -import javax.annotation.Nullable; - import java.util.ArrayList; import java.util.List; @@ -43,23 +40,9 @@ import java.util.List; */ public class PreTransformProcessor { private PreTransformChangeInfo tableChangeInfo; - private TransformProjection transformProjection; - private @Nullable TransformFilter transformFilter; - private List<Boolean> cachedProjectionColumnsState; - public PreTransformProcessor( - PreTransformChangeInfo tableChangeInfo, - TransformProjection transformProjection, - @Nullable TransformFilter transformFilter) { + public PreTransformProcessor(PreTransformChangeInfo tableChangeInfo) { this.tableChangeInfo = tableChangeInfo; - this.transformProjection = transformProjection; - this.transformFilter = transformFilter; - this.cachedProjectionColumnsState = - cacheIsProjectionColumnMap(tableChangeInfo, transformProjection); - } - - public boolean hasTableChangeInfo() { - return this.tableChangeInfo != null; } /** @@ -69,32 +52,24 @@ public class PreTransformProcessor { * will be sent to downstream, and (D, E) column along with corresponding data will be trimmed. */ public CreateTableEvent preTransformCreateTableEvent(CreateTableEvent createTableEvent) { - List<Column> preTransformColumns = - TransformParser.generateReferencedColumns( - transformProjection.getProjection(), - transformFilter != null ? transformFilter.getExpression() : null, - createTableEvent.getSchema().getColumns()); - Schema schema = createTableEvent.getSchema().copy(preTransformColumns); + Schema schema = + createTableEvent + .getSchema() + .copy(tableChangeInfo.getPreTransformedSchema().getColumns()); return new CreateTableEvent(createTableEvent.tableId(), schema); } public BinaryRecordData processFillDataField(BinaryRecordData data) { List<Object> valueList = new ArrayList<>(); List<Column> columns = tableChangeInfo.getPreTransformedSchema().getColumns(); - for (int i = 0; i < columns.size(); i++) { - if (cachedProjectionColumnsState.get(i)) { - valueList.add(null); - } else { - valueList.add( - getValueFromBinaryRecordData( - columns.get(i).getName(), - data, - tableChangeInfo.getSourceSchema().getColumns(), - tableChangeInfo.getSourceFieldGetters())); - } + valueList.add( + getValueFromBinaryRecordData( + columns.get(i).getName(), + data, + tableChangeInfo.getSourceSchema().getColumns(), + tableChangeInfo.getSourceFieldGetters())); } - return tableChangeInfo .getPreTransformedRecordDataGenerator() .generate(valueList.toArray(new Object[0])); @@ -113,26 +88,4 @@ public class PreTransformProcessor { } return null; } - - private List<Boolean> cacheIsProjectionColumnMap( - PreTransformChangeInfo tableChangeInfo, TransformProjection transformProjection) { - List<Boolean> cachedMap = new ArrayList<>(); - if (!hasTableChangeInfo()) { - return cachedMap; - } - - for (Column column : tableChangeInfo.getPreTransformedSchema().getColumns()) { - boolean isProjectionColumn = false; - for (ProjectionColumn projectionColumn : transformProjection.getProjectionColumns()) { - if (column.getName().equals(projectionColumn.getColumnName()) - && projectionColumn.isValidTransformedProjectionColumn()) { - isProjectionColumn = true; - break; - } - } - cachedMap.add(isProjectionColumn); - } - - return cachedMap; - } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java index cc257401e..0e3553e8c 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperatorTest.java @@ -164,6 +164,23 @@ public class PreTransformOperatorTest { .primaryKey("sid") .build(); + private static final Schema MULTITRANSFORM_SCHEMA = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .physicalColumn("sex", DataTypes.STRING()) + .primaryKey("id") + .build(); + + private static final Schema EXPECTED_MULTITRANSFORM_SCHEMA = + Schema.newBuilder() + .physicalColumn("id", DataTypes.INT()) + .physicalColumn("age", DataTypes.INT()) + .physicalColumn("name", DataTypes.STRING()) + .primaryKey("id") + .build(); + @Test void testEventTransform() throws Exception { PreTransformOperator transform = @@ -526,4 +543,108 @@ public class PreTransformOperatorTest { new StreamRecord<>( new CreateTableEvent(METADATA_TABLEID, EXPECTED_METADATA_SCHEMA))); } + + @Test + void testMultiTransformWithDiffRefColumns() throws Exception { + PreTransformOperator transform = + PreTransformOperator.newBuilder() + .addTransform( + CUSTOMERS_TABLEID.identifier(), + "id, 'Juvenile' as roleName", + "age < 18", + "id", + null, + null) + .addTransform( + CUSTOMERS_TABLEID.identifier(), + "id, name as roleName", + "age >= 18", + "id", + null, + null) + .build(); + EventOperatorTestHarness<PreTransformOperator, Event> + transformFunctionEventEventOperatorTestHarness = + new EventOperatorTestHarness<>(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(CUSTOMERS_TABLEID, MULTITRANSFORM_SCHEMA); + + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent( + CUSTOMERS_TABLEID, EXPECTED_MULTITRANSFORM_SCHEMA))); + } + + @Test + void testMultiTransformWithAsterisk() throws Exception { + PreTransformOperator transform = + PreTransformOperator.newBuilder() + .addTransform( + CUSTOMERS_TABLEID.identifier(), + "*, 'Juvenile' as roleName", + "age < 18", + "id", + null, + null) + .addTransform( + CUSTOMERS_TABLEID.identifier(), + "id, age, name, sex, 'Juvenile' as roleName", + "age >= 18", + "id", + null, + null) + .build(); + EventOperatorTestHarness<PreTransformOperator, Event> + transformFunctionEventEventOperatorTestHarness = + new EventOperatorTestHarness<>(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(CUSTOMERS_TABLEID, MULTITRANSFORM_SCHEMA); + + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent(CUSTOMERS_TABLEID, MULTITRANSFORM_SCHEMA))); + } + + @Test + void testMultiTransformMissingProjection() throws Exception { + PreTransformOperator transform = + PreTransformOperator.newBuilder() + .addTransform( + CUSTOMERS_TABLEID.identifier(), null, "age < 18", "id", null, null) + .addTransform( + CUSTOMERS_TABLEID.identifier(), + "id, age, UPPER(name) as name, sex", + "age >= 18", + "id", + null, + null) + .build(); + EventOperatorTestHarness<PreTransformOperator, Event> + transformFunctionEventEventOperatorTestHarness = + new EventOperatorTestHarness<>(transform, 1); + // Initialization + transformFunctionEventEventOperatorTestHarness.open(); + // Create table + CreateTableEvent createTableEvent = + new CreateTableEvent(CUSTOMERS_TABLEID, MULTITRANSFORM_SCHEMA); + + transform.processElement(new StreamRecord<>(createTableEvent)); + Assertions.assertThat( + transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll()) + .isEqualTo( + new StreamRecord<>( + new CreateTableEvent(CUSTOMERS_TABLEID, MULTITRANSFORM_SCHEMA))); + } }