This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 6fb19e612bae340228a428b0a5ab00f676242460
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Thu Oct 10 13:51:13 2024 +0800

    [FLINK-36461][transform] Fix schema evolution failure with un-transformed 
tables
    
    This closes  #3721.
---
 .../apache/flink/cdc/common/utils/SchemaUtils.java | 19 +++++
 .../flink/FlinkPipelineTransformITCase.java        | 95 ++++++++++++++++++++++
 .../operators/transform/PostTransformOperator.java | 11 ++-
 .../operators/transform/PreTransformOperator.java  | 56 ++++++-------
 4 files changed, 148 insertions(+), 33 deletions(-)

diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
index ba9dd4212..eb6bdbad4 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaUtils.java
@@ -376,6 +376,25 @@ public class SchemaUtils {
         return oldSchema.copy(columns);
     }
 
+    /**
+     * This function determines if the given schema change event {@code event} 
should be sent to
+     * downstream based on if the given transform rule has asterisk, and what 
columns are
+     * referenced.
+     *
+     * <p>For example, if {@code hasAsterisk} is false, then all {@code 
AddColumnEvent} and {@code
+     * DropColumnEvent} should be ignored since asterisk-less transform should 
not emit schema
+     * change events that change number of downstream columns.
+     *
+     * <p>Also, {@code referencedColumns} will be used to determine if the 
schema change event
+     * affects any referenced columns, since if a column has been projected 
out of downstream, its
+     * corresponding schema change events should not be emitted, either.
+     *
+     * <p>For the case when {@code hasAsterisk} is true, things will be 
cleaner since we don't have
+     * to filter out any schema change events. All we need to do is to change 
{@code
+     * AddColumnEvent}'s inserting position, and replacing `FIRST` / `LAST` 
with column-relative
+     * position indicators. This is necessary since extra calculated columns 
might be added, and
+     * `FIRST` / `LAST` position might differ.
+     */
     public static Optional<SchemaChangeEvent> transformSchemaChangeEvent(
             boolean hasAsterisk, List<String> referencedColumns, 
SchemaChangeEvent event) {
         Optional<SchemaChangeEvent> evolvedSchemaChangeEvent = 
Optional.empty();
diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index e16a2745b..abea0c943 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -1227,6 +1227,101 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[15 
-> Oops, 12th, 15, Oops], after=[], op=DELETE, meta=()}");
     }
 
+    @Test
+    void testTransformUnmatchedSchemaEvolution() throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+        TableId tableId = TableId.tableId("default_namespace", 
"default_schema", "mytable1");
+        List<Event> events = generateSchemaEvolutionEvents(tableId);
+
+        
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(
+                PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, 
SchemaChangeBehavior.EVOLVE);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.singletonList(
+                                new TransformDef(
+                                        "foo.bar.baz", // This doesn't match 
given tableId
+                                        "*",
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null)),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+        assertThat(outputEvents)
+                .containsExactly(
+                        // Initial stage
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, Alice, 21], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, Barcarolle, 22], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, Cecily, 23], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, 
Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, 
Barcarolle, 22], after=[], op=DELETE, meta=()}",
+
+                        // Add column stage
+                        
"AddColumnEvent{tableId=default_namespace.default_schema.mytable1, 
addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, 
existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, 
position=AFTER, existedColumnName=age}]}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1st, 4, Derrida, 24, 0], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2nd, 5, Eve, 25, 1], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[2nd, 5, Eve, 25, 1], after=[2nd, 5, Eva, 20, 2], op=UPDATE, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3rd, 6, Fiona, 26, 3], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",
+
+                        // Alter column type stage
+                        
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, 
nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, 
meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[6th, 9, IINA, 17.0, 0], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[6th, 9, IINA, 17.0, 0], after=[], op=DELETE, meta=()}",
+
+                        // Rename column stage
+                        
"RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, 
nameMapping={gender=biological_sex, age=toshi}}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[7th, 10, Julia, 24.0, 1], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[8th, 11, Kalle, 23.0, 0], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[8th, 11, Kalle, 23.0, 0], after=[8th, 11, Kella, 18.0, 0], op=UPDATE, 
meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[9th, 12, Lynx, 17.0, 0], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[9th, 12, Lynx, 17.0, 0], after=[], op=DELETE, meta=()}",
+
+                        // Drop column stage
+                        
"DropColumnEvent{tableId=default_namespace.default_schema.mytable1, 
droppedColumnNames=[biological_sex, toshi]}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[10th, 13, Munroe], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[11th, 14, Neko], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[11th, 14, Neko], after=[11th, 14, Nein], op=UPDATE, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[12th, 15, Oops], op=INSERT, meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, 
before=[12th, 15, Oops], after=[], op=DELETE, meta=()}");
+    }
+
     private List<Event> generateSchemaEvolutionEvents(TableId tableId) {
         List<Event> events = new ArrayList<>();
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
index 83dd07c53..8a607ffb5 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java
@@ -44,6 +44,7 @@ import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -242,6 +243,8 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
 
     private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent event) 
throws Exception {
         TableId tableId = event.tableId();
+        List<String> columnNamesBeforeChange = Collections.emptyList();
+
         if (event instanceof CreateTableEvent) {
             CreateTableEvent createTableEvent = (CreateTableEvent) event;
             Set<String> projectedColumnsSet =
@@ -286,6 +289,9 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
                     createTableEvent.getSchema().getColumnNames().stream()
                             .filter(projectedColumnsSet::contains)
                             .collect(Collectors.toList()));
+        } else {
+            columnNamesBeforeChange =
+                    
getPostTransformChangeInfo(tableId).getPreTransformedSchema().getColumnNames();
         }
 
         Schema schema;
@@ -304,9 +310,12 @@ public class PostTransformOperator extends 
AbstractStreamOperator<Event>
 
         if (event instanceof CreateTableEvent) {
             return Optional.of(new CreateTableEvent(tableId, projectedSchema));
+        } else if (hasAsteriskMap.getOrDefault(tableId, true)) {
+            // See comments in PreTransformOperator#cacheChangeSchema method.
+            return SchemaUtils.transformSchemaChangeEvent(true, 
columnNamesBeforeChange, event);
         } else {
             return SchemaUtils.transformSchemaChangeEvent(
-                    hasAsteriskMap.get(tableId), 
projectedColumnsMap.get(tableId), event);
+                    false, projectedColumnsMap.get(tableId), event);
         }
     }
 
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
index 5e5c4c708..ed5b57595 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java
@@ -27,7 +27,6 @@ import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.schema.Column;
 import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.schema.Selectors;
 import org.apache.flink.cdc.common.utils.SchemaUtils;
@@ -50,7 +49,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
@@ -71,7 +69,6 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
     private List<UserDefinedFunctionDescriptor> udfDescriptors;
     private Map<TableId, PreTransformProcessor> preTransformProcessorMap;
     private Map<TableId, Boolean> hasAsteriskMap;
-    private Map<TableId, List<String>> referencedColumnsMap;
 
     public static PreTransformOperator.Builder newBuilder() {
         return new PreTransformOperator.Builder();
@@ -163,7 +160,6 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
         }
         this.preTransformProcessorMap = new ConcurrentHashMap<>();
         this.hasAsteriskMap = new ConcurrentHashMap<>();
-        this.referencedColumnsMap = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -186,8 +182,7 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
                         new CreateTableEvent(
                                 stateTableChangeInfo.getTableId(),
                                 
stateTableChangeInfo.getPreTransformedSchema());
-                // hasAsteriskMap and referencedColumnsMap needs to be 
recalculated after restoring
-                // from a checkpoint.
+                // hasAsteriskMap needs to be recalculated after restoring 
from a checkpoint.
                 cacheTransformRuleInfo(restoredCreateTableEvent);
 
                 // Since PostTransformOperator doesn't preserve state, 
pre-transformed schema
@@ -258,12 +253,32 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
     private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent 
event) {
         TableId tableId = event.tableId();
         PreTransformChangeInfo tableChangeInfo = 
preTransformChangeInfoMap.get(tableId);
+        List<String> columnNamesBeforeChange = 
tableChangeInfo.getSourceSchema().getColumnNames();
+
         Schema originalSchema =
                 
SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event);
         Schema preTransformedSchema = 
tableChangeInfo.getPreTransformedSchema();
-        Optional<SchemaChangeEvent> schemaChangeEvent =
-                SchemaUtils.transformSchemaChangeEvent(
-                        hasAsteriskMap.get(tableId), 
referencedColumnsMap.get(tableId), event);
+
+        Optional<SchemaChangeEvent> schemaChangeEvent;
+        if (hasAsteriskMap.getOrDefault(tableId, true)) {
+            // If this TableId is asterisk-ful, we should use the latest 
upstream schema as
+            // referenced columns to perform schema evolution, not of the 
original ones generated
+            // when creating tables. If hasAsteriskMap has no entry for this 
TableId, it means that
+            // this TableId has not been referenced by any transform rules, 
and should be regarded
+            // as asterisk-ful by default.
+            schemaChangeEvent =
+                    SchemaUtils.transformSchemaChangeEvent(
+                            true, 
tableChangeInfo.getSourceSchema().getColumnNames(), event);
+        } else {
+            // Otherwise, we will use the pre-transformed columns to determine 
if the given schema
+            // change event should be passed to downstream, only when it is 
presented in the
+            // pre-transformed schema.
+            schemaChangeEvent =
+                    SchemaUtils.transformSchemaChangeEvent(
+                            false,
+                            
tableChangeInfo.getPreTransformedSchema().getColumnNames(),
+                            event);
+        }
         if (schemaChangeEvent.isPresent()) {
             preTransformedSchema =
                     SchemaUtils.applySchemaChangeEvent(
@@ -276,26 +291,8 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
 
     private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {
         TableId tableId = createTableEvent.tableId();
-        Set<String> referencedColumnsSet =
-                transforms.stream()
-                        .filter(t -> t.getSelectors().isMatch(tableId))
-                        .flatMap(
-                                rule ->
-                                        
TransformParser.generateReferencedColumns(
-                                                rule.getProjection()
-                                                        
.map(TransformProjection::getProjection)
-                                                        .orElse(null),
-                                                rule.getFilter()
-                                                        
.map(TransformFilter::getExpression)
-                                                        .orElse(null),
-                                                
createTableEvent.getSchema().getColumns())
-                                                .stream())
-                        .map(Column::getName)
-                        .collect(Collectors.toSet());
-
         boolean notTransformed =
                 transforms.stream().noneMatch(t -> 
t.getSelectors().isMatch(tableId));
-
         if (notTransformed) {
             // If this TableId isn't presented in any transform block, it 
should behave like a "*"
             // projection and should be regarded as asterisk-ful.
@@ -313,11 +310,6 @@ public class PreTransformOperator extends 
AbstractStreamOperator<Event>
 
             hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
         }
-        referencedColumnsMap.put(
-                createTableEvent.tableId(),
-                createTableEvent.getSchema().getColumnNames().stream()
-                        .filter(referencedColumnsSet::contains)
-                        .collect(Collectors.toList()));
     }
 
     private CreateTableEvent transformCreateTableEvent(CreateTableEvent 
createTableEvent) {

Reply via email to