This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d5be4488 [FLINK-37837][core] Always add create.table to 
include.schema.changes option (#4262)
8d5be4488 is described below

commit 8d5be4488c6ce647d808e92fd5c437c88f1b3f59
Author: Jia Fan <[email protected]>
AuthorDate: Fri Mar 20 17:46:05 2026 +0800

    [FLINK-37837][core] Always add create.table to include.schema.changes 
option (#4262)
---
 .../docs/core-concept/schema-evolution.md          |  2 +
 docs/content/docs/core-concept/schema-evolution.md |  2 +
 .../cli/parser/YamlPipelineDefinitionParser.java   | 14 +++++++
 .../parser/YamlPipelineDefinitionParserTest.java   | 46 ++++++++++++++++++++++
 4 files changed, 64 insertions(+)

diff --git a/docs/content.zh/docs/core-concept/schema-evolution.md 
b/docs/content.zh/docs/core-concept/schema-evolution.md
index eb9835ba6..c227e8390 100644
--- a/docs/content.zh/docs/core-concept/schema-evolution.md
+++ b/docs/content.zh/docs/core-concept/schema-evolution.md
@@ -90,6 +90,8 @@ pipeline:
 
 > 在 Lenient 模式下,`TruncateTableEvent` 和 `DropTableEvent` 
 > 默认会被忽略。在任何其他模式下,默认不会忽略任何事件。
 
+> `CreateTableEvent` 是所有后续 schema 变更处理的基础。当显式指定 `include.schema.changes` 
时,`create.table` 会被自动添加,除非用户通过 `exclude.schema.changes` 明确将其排除。
+
 以下是可配置架构变更事件类型的完整列表:
 
 | 事件类型                | 注释           |
diff --git a/docs/content/docs/core-concept/schema-evolution.md 
b/docs/content/docs/core-concept/schema-evolution.md
index 05d5c9314..2668e1398 100644
--- a/docs/content/docs/core-concept/schema-evolution.md
+++ b/docs/content/docs/core-concept/schema-evolution.md
@@ -88,6 +88,8 @@ This could be achieved by setting `include.schema.changes` 
and `exclude.schema.c
 
 > In Lenient mode, `TruncateTableEvent` and `DropTableEvent` will be ignored 
 > by default. In any other mode, no events will be ignored by default.
 
+> `CreateTableEvent` is the foundation for all subsequent schema change 
processing. When `include.schema.changes` is explicitly specified, 
`create.table` will be automatically added unless the user explicitly excludes 
it via `exclude.schema.changes`.
+
 Here's a full list of configurable schema change event types:
 
 | Event Type          | Description                  |
diff --git 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index 79886ea08..79f51a705 100644
--- 
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++ 
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.event.SchemaChangeEventType;
 import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.utils.ChangeEventUtils;
 import org.apache.flink.cdc.common.utils.Preconditions;
 import org.apache.flink.cdc.common.utils.StringUtils;
 import org.apache.flink.cdc.composer.definition.ModelDef;
@@ -227,6 +228,19 @@ public class YamlPipelineDefinitionParser implements 
PipelineDefinitionParser {
             Arrays.stream(SchemaChangeEventTypeFamily.ALL)
                     .map(SchemaChangeEventType::getTag)
                     .forEach(includedSETypes::add);
+        } else {
+            // CreateTableEvent is always required as the foundation for all 
subsequent processing.
+            // Automatically add it if not explicitly excluded by user.
+            // Use resolveSchemaEvolutionTag to properly handle both exact 
tags and family tags.
+            boolean createTableExplicitlyExcluded =
+                    excludedSETypes.stream()
+                            .flatMap(
+                                    tag -> 
ChangeEventUtils.resolveSchemaEvolutionTag(tag).stream())
+                            .anyMatch(type -> type == 
SchemaChangeEventType.CREATE_TABLE);
+            if (!createTableExplicitlyExcluded
+                    && 
!includedSETypes.contains(SchemaChangeEventType.CREATE_TABLE.getTag())) {
+                
includedSETypes.add(SchemaChangeEventType.CREATE_TABLE.getTag());
+            }
         }
 
         if (excludedFieldNotPresent && 
SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) {
diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index 8349c2977..01bb4e5be 100644
--- 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++ 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -271,6 +271,52 @@ class YamlPipelineDefinitionParserTest {
                         TRUNCATE_TABLE));
     }
 
+    /**
+     * Test that CreateTableEvent is automatically added when user specifies 
include.schema.changes
+     * without create.table. This ensures the foundational CreateTableEvent is 
always included for
+     * proper schema handling. See FLINK-37837.
+     */
+    @Test
+    void testCreateTableAutoAddedToIncludedSchemaChanges() throws Exception {
+        // Test case 1: User specifies only add.column, create.table should be 
auto-added
+        testSchemaEvolutionTypesParsing(
+                "evolve", "[add.column]", null, ImmutableSet.of(ADD_COLUMN, 
CREATE_TABLE));
+
+        // Test case 2: User specifies column family, create.table should be 
auto-added
+        testSchemaEvolutionTypesParsing(
+                "evolve",
+                "[column]",
+                null,
+                ImmutableSet.of(
+                        ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN, 
RENAME_COLUMN, CREATE_TABLE));
+
+        // Test case 3: User explicitly excludes create.table, should NOT 
auto-add
+        testSchemaEvolutionTypesParsing(
+                "evolve", "[add.column]", "[create.table]", 
ImmutableSet.of(ADD_COLUMN));
+
+        // Test case 4: User excludes via "create" family tag, should NOT 
auto-add
+        testSchemaEvolutionTypesParsing(
+                "evolve", "[add.column]", "[create]", 
ImmutableSet.of(ADD_COLUMN));
+
+        // Test case 5: User excludes via "table" family tag, should NOT 
auto-add
+        testSchemaEvolutionTypesParsing(
+                "evolve",
+                "[add.column, alter.column.type]",
+                "[table]",
+                ImmutableSet.of(ADD_COLUMN, ALTER_COLUMN_TYPE));
+
+        // Test case 6: User already includes create.table, no duplicate 
should be added
+        testSchemaEvolutionTypesParsing(
+                "evolve",
+                "[add.column, create.table]",
+                null,
+                ImmutableSet.of(ADD_COLUMN, CREATE_TABLE));
+
+        // Test case 7: Lenient mode with specified include, create.table 
should be auto-added
+        testSchemaEvolutionTypesParsing(
+                "lenient", "[add.column]", null, ImmutableSet.of(ADD_COLUMN, 
CREATE_TABLE));
+    }
+
     private void testSchemaEvolutionTypesParsing(
             String behavior, String included, String excluded, 
Set<SchemaChangeEventType> expected)
             throws Exception {

Reply via email to