This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 8d5be4488 [FLINK-37837][core] Always add create.table to
include.schema.changes option (#4262)
8d5be4488 is described below
commit 8d5be4488c6ce647d808e92fd5c437c88f1b3f59
Author: Jia Fan <[email protected]>
AuthorDate: Fri Mar 20 17:46:05 2026 +0800
[FLINK-37837][core] Always add create.table to include.schema.changes
option (#4262)
---
.../docs/core-concept/schema-evolution.md | 2 +
docs/content/docs/core-concept/schema-evolution.md | 2 +
.../cli/parser/YamlPipelineDefinitionParser.java | 14 +++++++
.../parser/YamlPipelineDefinitionParserTest.java | 46 ++++++++++++++++++++++
4 files changed, 64 insertions(+)
diff --git a/docs/content.zh/docs/core-concept/schema-evolution.md
b/docs/content.zh/docs/core-concept/schema-evolution.md
index eb9835ba6..c227e8390 100644
--- a/docs/content.zh/docs/core-concept/schema-evolution.md
+++ b/docs/content.zh/docs/core-concept/schema-evolution.md
@@ -90,6 +90,8 @@ pipeline:
> 在 Lenient 模式下,`TruncateTableEvent` 和 `DropTableEvent`
> 默认会被忽略。在任何其他模式下,默认不会忽略任何事件。
+> `CreateTableEvent` 是所有后续 schema 变更处理的基础。当显式指定 `include.schema.changes`
时,`create.table` 会被自动添加,除非用户通过 `exclude.schema.changes` 明确将其排除。
+
以下是可配置架构变更事件类型的完整列表:
| 事件类型 | 注释 |
diff --git a/docs/content/docs/core-concept/schema-evolution.md
b/docs/content/docs/core-concept/schema-evolution.md
index 05d5c9314..2668e1398 100644
--- a/docs/content/docs/core-concept/schema-evolution.md
+++ b/docs/content/docs/core-concept/schema-evolution.md
@@ -88,6 +88,8 @@ This could be achieved by setting `include.schema.changes`
and `exclude.schema.c
> In Lenient mode, `TruncateTableEvent` and `DropTableEvent` will be ignored
> by default. In any other mode, no events will be ignored by default.
+> `CreateTableEvent` is the foundation for all subsequent schema change
processing. When `include.schema.changes` is explicitly specified,
`create.table` will be automatically added unless the user explicitly excludes
it via `exclude.schema.changes`.
+
Here's a full list of configurable schema change event types:
| Event Type | Description |
diff --git
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
index 79886ea08..79f51a705 100644
---
a/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
+++
b/flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
@@ -21,6 +21,7 @@ import
org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.utils.ChangeEventUtils;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.composer.definition.ModelDef;
@@ -227,6 +228,19 @@ public class YamlPipelineDefinitionParser implements
PipelineDefinitionParser {
Arrays.stream(SchemaChangeEventTypeFamily.ALL)
.map(SchemaChangeEventType::getTag)
.forEach(includedSETypes::add);
+ } else {
+ // CreateTableEvent is always required as the foundation for all
subsequent processing.
+ // Automatically add it if not explicitly excluded by user.
+ // Use resolveSchemaEvolutionTag to properly handle both exact
tags and family tags.
+ boolean createTableExplicitlyExcluded =
+ excludedSETypes.stream()
+ .flatMap(
+ tag ->
ChangeEventUtils.resolveSchemaEvolutionTag(tag).stream())
+ .anyMatch(type -> type ==
SchemaChangeEventType.CREATE_TABLE);
+ if (!createTableExplicitlyExcluded
+ &&
!includedSETypes.contains(SchemaChangeEventType.CREATE_TABLE.getTag())) {
+
includedSETypes.add(SchemaChangeEventType.CREATE_TABLE.getTag());
+ }
}
if (excludedFieldNotPresent &&
SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) {
diff --git
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index 8349c2977..01bb4e5be 100644
---
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -271,6 +271,52 @@ class YamlPipelineDefinitionParserTest {
TRUNCATE_TABLE));
}
+ /**
+ * Test that CreateTableEvent is automatically added when user specifies
include.schema.changes
+ * without create.table. This ensures the foundational CreateTableEvent is
always included for
+ * proper schema handling. See FLINK-37837.
+ */
+ @Test
+ void testCreateTableAutoAddedToIncludedSchemaChanges() throws Exception {
+ // Test case 1: User specifies only add.column, create.table should be
auto-added
+ testSchemaEvolutionTypesParsing(
+ "evolve", "[add.column]", null, ImmutableSet.of(ADD_COLUMN,
CREATE_TABLE));
+
+ // Test case 2: User specifies column family, create.table should be
auto-added
+ testSchemaEvolutionTypesParsing(
+ "evolve",
+ "[column]",
+ null,
+ ImmutableSet.of(
+ ADD_COLUMN, ALTER_COLUMN_TYPE, DROP_COLUMN,
RENAME_COLUMN, CREATE_TABLE));
+
+ // Test case 3: User explicitly excludes create.table, should NOT
auto-add
+ testSchemaEvolutionTypesParsing(
+ "evolve", "[add.column]", "[create.table]",
ImmutableSet.of(ADD_COLUMN));
+
+ // Test case 4: User excludes via "create" family tag, should NOT
auto-add
+ testSchemaEvolutionTypesParsing(
+ "evolve", "[add.column]", "[create]",
ImmutableSet.of(ADD_COLUMN));
+
+ // Test case 5: User excludes via "table" family tag, should NOT
auto-add
+ testSchemaEvolutionTypesParsing(
+ "evolve",
+ "[add.column, alter.column.type]",
+ "[table]",
+ ImmutableSet.of(ADD_COLUMN, ALTER_COLUMN_TYPE));
+
+ // Test case 6: User already includes create.table, no duplicate
should be added
+ testSchemaEvolutionTypesParsing(
+ "evolve",
+ "[add.column, create.table]",
+ null,
+ ImmutableSet.of(ADD_COLUMN, CREATE_TABLE));
+
+ // Test case 7: Lenient mode with specified include, create.table
should be auto-added
+ testSchemaEvolutionTypesParsing(
+ "lenient", "[add.column]", null, ImmutableSet.of(ADD_COLUMN,
CREATE_TABLE));
+ }
+
private void testSchemaEvolutionTypesParsing(
String behavior, String included, String excluded,
Set<SchemaChangeEventType> expected)
throws Exception {