This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 6fb19e612bae340228a428b0a5ab00f676242460 Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com> AuthorDate: Thu Oct 10 13:51:13 2024 +0800 [FLINK-36461][transform] Fix schema evolution failure with un-transformed tables This closes #3721. --- .../apache/flink/cdc/common/utils/SchemaUtils.java | 19 +++++ .../flink/FlinkPipelineTransformITCase.java | 95 ++++++++++++++++++++++ .../operators/transform/PostTransformOperator.java | 11 ++- .../operators/transform/PreTransformOperator.java | 56 ++++++------- 4 files changed, 148 insertions(+), 33 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java index ba9dd4212..eb6bdbad4 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java @@ -376,6 +376,25 @@ public class SchemaUtils { return oldSchema.copy(columns); } + /** + * This function determines if the given schema change event {@code event} should be sent to + * downstream based on if the given transform rule has asterisk, and what columns are + * referenced. + * + * <p>For example, if {@code hasAsterisk} is false, then all {@code AddColumnEvent} and {@code + * DropColumnEvent} should be ignored since asterisk-less transform should not emit schema + * change events that change number of downstream columns. + * + * <p>Also, {@code referencedColumns} will be used to determine if the schema change event + * affects any referenced columns, since if a column has been projected out of downstream, its + * corresponding schema change events should not be emitted, either. + * + * <p>For the case when {@code hasAsterisk} is true, things will be cleaner since we don't have + * to filter out any schema change events. All we need to do is to change {@code + * AddColumnEvent}'s inserting position, and replacing `FIRST` / `LAST` with column-relative + * position indicators. This is necessary since extra calculated columns might be added, and + * `FIRST` / `LAST` position might differ. + */ public static Optional<SchemaChangeEvent> transformSchemaChangeEvent( boolean hasAsterisk, List<String> referencedColumns, SchemaChangeEvent event) { Optional<SchemaChangeEvent> evolvedSchemaChangeEvent = Optional.empty(); diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index e16a2745b..abea0c943 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -1227,6 +1227,101 @@ class FlinkPipelineTransformITCase { "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[15 -> Oops, 12th, 15, Oops], after=[], op=DELETE, meta=()}"); } + @Test + void testTransformUnmatchedSchemaEvolution() throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS); + + TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1"); + List<Event> events = generateSchemaEvolutionEvents(tableId); + + ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events)); + + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + // Setup pipeline + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + Collections.emptyList(), + Collections.singletonList( + new TransformDef( + "foo.bar.baz", // This doesn't match given tableId + "*", + null, + null, + null, + null, + null)), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check the order and content of all received events + String[] outputEvents = outCaptor.toString().trim().split("\n"); + + assertThat(outputEvents) + .containsExactly( + // Initial stage + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 21], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Barcarolle, 22], after=[], op=DELETE, meta=()}", + + // Add column stage + "AddColumnEvent{tableId=default_namespace.default_schema.mytable1, addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1st, 4, Derrida, 24, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2nd, 5, Eve, 25, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2nd, 5, Eve, 25, 1], after=[2nd, 5, Eva, 20, 2], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3rd, 6, Fiona, 26, 3], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}", + + // Alter column type stage + "AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6th, 9, IINA, 17.0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6th, 9, IINA, 17.0, 0], after=[], op=DELETE, meta=()}", + + // Rename column stage + "RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=biological_sex, age=toshi}}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7th, 10, Julia, 24.0, 1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8th, 11, Kalle, 23.0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8th, 11, Kalle, 23.0, 0], after=[8th, 11, Kella, 18.0, 0], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9th, 12, Lynx, 17.0, 0], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[9th, 12, Lynx, 17.0, 0], after=[], op=DELETE, meta=()}", + + // Drop column stage + "DropColumnEvent{tableId=default_namespace.default_schema.mytable1, droppedColumnNames=[biological_sex, toshi]}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[10th, 13, Munroe], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[11th, 14, Neko], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[11th, 14, Neko], after=[11th, 14, Nein], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[12th, 15, Oops], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops], after=[], op=DELETE, meta=()}"); + } + private List<Event> generateSchemaEvolutionEvents(TableId tableId) { List<Event> events = new ArrayList<>(); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java index 83dd07c53..8a607ffb5 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java @@ -44,6 +44,7 @@ import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -242,6 +243,8 @@ public class PostTransformOperator extends AbstractStreamOperator<Event> private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent event) throws Exception { TableId tableId = event.tableId(); + List<String> columnNamesBeforeChange = Collections.emptyList(); + if (event instanceof CreateTableEvent) { CreateTableEvent createTableEvent = (CreateTableEvent) event; Set<String> projectedColumnsSet = @@ -286,6 +289,9 @@ public class PostTransformOperator extends AbstractStreamOperator<Event> createTableEvent.getSchema().getColumnNames().stream() .filter(projectedColumnsSet::contains) .collect(Collectors.toList())); + } else { + columnNamesBeforeChange = + getPostTransformChangeInfo(tableId).getPreTransformedSchema().getColumnNames(); } Schema schema; @@ -304,9 +310,12 @@ public class PostTransformOperator extends AbstractStreamOperator<Event> if (event instanceof CreateTableEvent) { return Optional.of(new CreateTableEvent(tableId, projectedSchema)); + } else if (hasAsteriskMap.getOrDefault(tableId, true)) { + // See comments in PreTransformOperator#cacheChangeSchema method. + return SchemaUtils.transformSchemaChangeEvent(true, columnNamesBeforeChange, event); } else { return SchemaUtils.transformSchemaChangeEvent( - hasAsteriskMap.get(tableId), projectedColumnsMap.get(tableId), event); + false, projectedColumnsMap.get(tableId), event); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java index 5e5c4c708..ed5b57595 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java @@ -27,7 +27,6 @@ import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.cdc.common.utils.SchemaUtils; @@ -50,7 +49,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -71,7 +69,6 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> private List<UserDefinedFunctionDescriptor> udfDescriptors; private Map<TableId, PreTransformProcessor> preTransformProcessorMap; private Map<TableId, Boolean> hasAsteriskMap; - private Map<TableId, List<String>> referencedColumnsMap; public static PreTransformOperator.Builder newBuilder() { return new PreTransformOperator.Builder(); @@ -163,7 +160,6 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> } this.preTransformProcessorMap = new ConcurrentHashMap<>(); this.hasAsteriskMap = new ConcurrentHashMap<>(); - this.referencedColumnsMap = new ConcurrentHashMap<>(); } @Override @@ -186,8 +182,7 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> new CreateTableEvent( stateTableChangeInfo.getTableId(), stateTableChangeInfo.getPreTransformedSchema()); - // hasAsteriskMap and referencedColumnsMap needs to be recalculated after restoring - // from a checkpoint. + // hasAsteriskMap needs to be recalculated after restoring from a checkpoint. cacheTransformRuleInfo(restoredCreateTableEvent); // Since PostTransformOperator doesn't preserve state, pre-transformed schema @@ -258,12 +253,32 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent event) { TableId tableId = event.tableId(); PreTransformChangeInfo tableChangeInfo = preTransformChangeInfoMap.get(tableId); + List<String> columnNamesBeforeChange = tableChangeInfo.getSourceSchema().getColumnNames(); + Schema originalSchema = SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event); Schema preTransformedSchema = tableChangeInfo.getPreTransformedSchema(); - Optional<SchemaChangeEvent> schemaChangeEvent = - SchemaUtils.transformSchemaChangeEvent( - hasAsteriskMap.get(tableId), referencedColumnsMap.get(tableId), event); + + Optional<SchemaChangeEvent> schemaChangeEvent; + if (hasAsteriskMap.getOrDefault(tableId, true)) { + // If this TableId is asterisk-ful, we should use the latest upstream schema as + // referenced columns to perform schema evolution, not of the original ones generated + // when creating tables. If hasAsteriskMap has no entry for this TableId, it means that + // this TableId has not been referenced by any transform rules, and should be regarded + // as asterisk-ful by default. + schemaChangeEvent = + SchemaUtils.transformSchemaChangeEvent( + true, tableChangeInfo.getSourceSchema().getColumnNames(), event); + } else { + // Otherwise, we will use the pre-transformed columns to determine if the given schema + // change event should be passed to downstream, only when it is presented in the + // pre-transformed schema. + schemaChangeEvent = + SchemaUtils.transformSchemaChangeEvent( + false, + tableChangeInfo.getPreTransformedSchema().getColumnNames(), + event); + } if (schemaChangeEvent.isPresent()) { preTransformedSchema = SchemaUtils.applySchemaChangeEvent( @@ -276,26 +291,8 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) { TableId tableId = createTableEvent.tableId(); - Set<String> referencedColumnsSet = - transforms.stream() - .filter(t -> t.getSelectors().isMatch(tableId)) - .flatMap( - rule -> - TransformParser.generateReferencedColumns( - rule.getProjection() - .map(TransformProjection::getProjection) - .orElse(null), - rule.getFilter() - .map(TransformFilter::getExpression) - .orElse(null), - createTableEvent.getSchema().getColumns()) - .stream()) - .map(Column::getName) - .collect(Collectors.toSet()); - boolean notTransformed = transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId)); - if (notTransformed) { // If this TableId isn't presented in any transform block, it should behave like a "*" // projection and should be regarded as asterisk-ful. @@ -313,11 +310,6 @@ public class PreTransformOperator extends AbstractStreamOperator<Event> hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk); } - referencedColumnsMap.put( - createTableEvent.tableId(), - createTableEvent.getSchema().getColumnNames().stream() - .filter(referencedColumnsSet::contains) - .collect(Collectors.toList())); } private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) {