This is an automated email from the ASF dual-hosted git repository. xuyangzhong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 58d7acc1f1164b6701fea7f24930a0a0ead1be2c Author: xuyang <[email protected]> AuthorDate: Thu Mar 19 16:20:59 2026 +0800 [FLINK-39286][table] Introduce immutable columns constraint in schema --- .../flink/table/sql/CreateTableAsITCase.java | 5 +- .../flink/table/sql/PlannerScalaFreeITCase.java | 5 +- .../flink/table/sql/UsingRemoteJarITCase.java | 5 +- .../table/tests/test_catalog_completeness.py | 3 +- .../apache/flink/table/catalog/CatalogManager.java | 3 +- .../flink/table/catalog/DefaultSchemaResolver.java | 76 ++++++++- .../table/api/internal/ShowCreateUtilTest.java | 51 +++++- .../catalog/CatalogBaseTableResolutionTest.java | 17 +- .../flink/table/catalog/SchemaResolutionTest.java | 95 ++++++++++- .../java/org/apache/flink/table/api/Schema.java | 175 ++++++++++++++++++++- .../flink/table/catalog/CatalogPropertiesUtil.java | 27 ++++ .../org/apache/flink/table/catalog/Constraint.java | 20 ++- .../table/catalog/ImmutableColumnsConstraint.java | 97 ++++++++++++ .../apache/flink/table/catalog/ResolvedSchema.java | 57 ++++++- .../apache/flink/table/utils/TableSchemaUtils.java | 3 +- .../table/catalog/CatalogPropertiesUtilTest.java | 7 +- .../apache/flink/table/catalog/CatalogTest.java | 6 +- .../flink/table/catalog/TestSchemaResolver.java | 16 +- .../flink/table/utils/TableSchemaUtilsTest.java | 9 +- ...aintMixin.java => AbstractConstraintMixin.java} | 24 +-- .../nodes/exec/serde/CompiledPlanSerdeUtil.java | 3 + ...n.java => ImmutableColumnsConstraintMixin.java} | 24 +-- .../exec/serde/ResolvedSchemaJsonDeserializer.java | 7 +- .../exec/serde/ResolvedSchemaJsonSerializer.java | 6 + .../nodes/exec/serde/UniqueConstraintMixin.java | 12 +- .../planner/catalog/CatalogConstraintTest.java | 5 +- .../planner/lineage/TableLineageUtilsTest.java | 3 +- ...erializedTableNodeToOperationConverterTest.java | 13 +- .../plan/FlinkCalciteCatalogReaderTest.java | 3 +- .../exec/serde/ContextResolvedTableSerdeTest.java | 6 +- .../exec/serde/DynamicTableSinkSpecSerdeTest.java | 12 +- .../serde/DynamicTableSourceSpecSerdeTest.java | 9 +- .../exec/serde/ResolvedCatalogTableSerdeTest.java | 8 +- .../serde/TemporalTableSourceSpecSerdeTest.java | 3 +- .../VectorSearchTableSourceSpecSerdeTest.java | 3 +- .../runtime/stream/sql/DataStreamJavaITCase.java | 6 +- .../planner/plan/metadata/MetadataTestUtil.scala | 15 +- .../catalog/TestFileSystemCatalogTest.java | 12 +- 38 files changed, 727 insertions(+), 124 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java index 3d86251def1..06fbe73dd66 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.DefaultIndex; +import org.apache.flink.table.catalog.ImmutableColumnsConstraint; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.test.util.SQLJobSubmission; @@ -46,7 +47,9 @@ public class CreateTableAsITCase extends SqlITCaseBase { Collections.emptyList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("user_name")), Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("user_name")))); + DefaultIndex.newIndex("idx", Collections.singletonList("user_name"))), + ImmutableColumnsConstraint.immutableColumns( + "imt", Collections.singletonList("user_name"))); private static final DebeziumJsonDeserializationSchema DESERIALIZATION_SCHEMA = createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA); diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java index 58d13ac5895..dbdbae5b551 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.DefaultIndex; +import org.apache.flink.table.catalog.ImmutableColumnsConstraint; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.test.util.SQLJobSubmission; @@ -53,7 +54,9 @@ public class PlannerScalaFreeITCase extends SqlITCaseBase { Collections.emptyList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("user_name")), Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("user_name")))); + DefaultIndex.newIndex("idx", Collections.singletonList("user_name"))), + ImmutableColumnsConstraint.immutableColumns( + "imt", Collections.singletonList("user_name"))); private static final DebeziumJsonDeserializationSchema DESERIALIZATION_SCHEMA = createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA); diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java index c49b045252c..0432aa76ecc 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.DefaultIndex; +import org.apache.flink.table.catalog.ImmutableColumnsConstraint; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; @@ -47,7 +48,9 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase { Collections.emptyList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("user_name")), Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("user_name")))); + DefaultIndex.newIndex("idx", Collections.singletonList("user_name"))), + ImmutableColumnsConstraint.immutableColumns( + "imt", Collections.singletonList("user_name"))); private static final DebeziumJsonDeserializationSchema USER_ORDER_DESERIALIZATION_SCHEMA = createDebeziumDeserializationSchema(USER_ORDER_SCHEMA); diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py b/flink-python/pyflink/table/tests/test_catalog_completeness.py index a3e171fc384..c1ab64c96aa 100644 --- a/flink-python/pyflink/table/tests/test_catalog_completeness.py +++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py @@ -253,7 +253,8 @@ class ResolvedSchemaAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkT @classmethod def excluded_methods(cls): # getIndexes are not needed in Python API as they are used internally - return {'getIndexes'} + # TODO FLINK-39319 add immutable column constraint in Python API + return {'getIndexes', 'getImmutableColumns', 'getImmutableColumnIndexes'} if __name__ == '__main__': diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index bc870857807..3bba4c05f4e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -1984,7 +1984,8 @@ public final class CatalogManager implements CatalogRegistry, AutoCloseable { .collect(Collectors.toList()), resolvedSchema.getWatermarkSpecs(), resolvedSchema.getPrimaryKey().orElse(null), - resolvedSchema.getIndexes()); + resolvedSchema.getIndexes(), + resolvedSchema.getImmutableColumns().orElse(null)); return new ResolvedCatalogView( // pass a view that has the query parsed and // validated already diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java index b44e8437420..4ce5177832d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java @@ -21,6 +21,7 @@ package org.apache.flink.table.catalog; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.Schema.UnresolvedComputedColumn; +import org.apache.flink.table.api.Schema.UnresolvedImmutableColumns; import org.apache.flink.table.api.Schema.UnresolvedIndex; import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn; import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn; @@ -92,7 +93,12 @@ class DefaultSchemaResolver implements SchemaResolver { final List<Index> indexes = resolveIndexes(schema.getIndexes(), columnsWithRowtime); - return new ResolvedSchema(columnsWithRowtime, watermarkSpecs, primaryKey, indexes); + final ImmutableColumnsConstraint immutableColumns = + resolveImmutableColumns( + schema.getImmutableColumns().orElse(null), primaryKey, columnsWithRowtime); + + return new ResolvedSchema( + columnsWithRowtime, watermarkSpecs, primaryKey, indexes, immutableColumns); } // -------------------------------------------------------------------------------------------- @@ -459,6 +465,74 @@ class DefaultSchemaResolver implements SchemaResolver { } } + private @Nullable ImmutableColumnsConstraint resolveImmutableColumns( + @Nullable UnresolvedImmutableColumns unresolvedImmutableColumns, + @Nullable UniqueConstraint primaryKey, + List<Column> columns) { + if (unresolvedImmutableColumns == null) { + return null; + } + + final ImmutableColumnsConstraint immutableColumns = + ImmutableColumnsConstraint.immutableColumns( + unresolvedImmutableColumns.getConstraintName(), + unresolvedImmutableColumns.getColumnNames()); + + validateImmutableColumns(immutableColumns, primaryKey, columns); + + return immutableColumns; + } + + private void validateImmutableColumns( + ImmutableColumnsConstraint immutableColumns, + @Nullable UniqueConstraint primaryKey, + List<Column> columns) { + if (primaryKey == null) { + throw new ValidationException( + String.format( + "Invalid immutable constraint '%s'. " + + "An immutable constraint must be defined on the table that contains primary key.", + immutableColumns.getName())); + } + + final Map<String, Column> columnsByNameLookup = + columns.stream().collect(Collectors.toMap(Column::getName, Function.identity())); + + final Set<String> duplicateColumns = + immutableColumns.getColumns().stream() + .filter( + name -> + Collections.frequency(immutableColumns.getColumns(), name) + > 1) + .collect(Collectors.toSet()); + + if (!duplicateColumns.isEmpty()) { + throw new ValidationException( + String.format( + "Invalid immutable constraint '%s'. " + + "An immutable constraint must not contain duplicate columns. " + + "Found: %s", + immutableColumns.getName(), duplicateColumns)); + } + + for (String columnName : immutableColumns.getColumns()) { + Column column = columnsByNameLookup.get(columnName); + if (column == null) { + throw new ValidationException( + String.format( + "Invalid immutable constraint '%s'. Column '%s' does not exist.", + immutableColumns.getName(), columnName)); + } + + if (!column.isPhysical()) { + throw new ValidationException( + String.format( + "Invalid immutable constraint '%s'. Column '%s' is not a physical column.", + immutableColumns.getName(), columnName)); + } + } + } + private ResolvedExpression resolveExpression( List<Column> columns, Expression expression, @Nullable DataType outputDataType) { final LocalReferenceExpression[] localRefs = diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java index 15464afc7e5..51e1a5479fd 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java @@ -30,6 +30,7 @@ import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ImmutableColumnsConstraint; import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; @@ -73,13 +74,24 @@ class ShowCreateUtilTest { Column.metadata("mt_column", DataTypes.STRING(), null, true)), List.of(), UniqueConstraint.primaryKey("pk", List.of("id")), - List.of()); + List.of(), + null); private static final ResolvedSchema TWO_COLUMNS_SCHEMA = ResolvedSchema.of( Column.physical("id", DataTypes.INT()), Column.physical("name", DataTypes.STRING())); + private static final ResolvedSchema TWO_COLUMNS_SCHEMA_WITH_PRIMARY_KEY_AND_IMMUTABLE_COLS = + new ResolvedSchema( + List.of( + Column.physical("id", DataTypes.INT()), + Column.physical("name", DataTypes.STRING())), + List.of(), + UniqueConstraint.primaryKey("pk", List.of("id")), + List.of(), + ImmutableColumnsConstraint.immutableColumns("imt", List.of("name"))); + @ParameterizedTest(name = "{index}: {2}") @MethodSource("argsForShowCreateTable") void showCreateTable( @@ -213,6 +225,23 @@ class ShowCreateUtilTest { + "COMMENT 'Table comment'\n" + "DISTRIBUTED BY RANGE(`1`, `10`) INTO 2 BUCKETS\n"); + addTemporaryAndPermanent( + argList, + createResolvedTable( + TWO_COLUMNS_SCHEMA_WITH_PRIMARY_KEY_AND_IMMUTABLE_COLS, + Collections.emptyMap(), + Collections.emptyList(), + TableDistribution.of( + TableDistribution.Kind.RANGE, 2, Arrays.asList("1", "10")), + "Table comment"), + "CREATE %sTABLE `catalogName`.`dbName`.`tableName` (\n" + + " `id` INT,\n" + + " `name` VARCHAR(2147483647),\n" + + " CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED\n" + + ")\n" + + "COMMENT 'Table comment'\n" + + "DISTRIBUTED BY RANGE(`1`, `10`) INTO 2 BUCKETS\n"); + final Map<String, String> options = new HashMap<>(); options.put("option_key_a", "option_value_a"); options.put("option_key_b", "option_value_b"); @@ -315,6 +344,26 @@ class ShowCreateUtilTest { + "REFRESH_MODE = CONTINUOUS\n" + "AS SELECT 1\n")); + argList.add( + Arguments.of( + createResolvedMaterialized( + TWO_COLUMNS_SCHEMA_WITH_PRIMARY_KEY_AND_IMMUTABLE_COLS, + null, + List.of(), + null, + IntervalFreshness.ofMinute("1"), + RefreshMode.CONTINUOUS, + "SELECT 1", + "SELECT 1"), + "CREATE MATERIALIZED TABLE `catalogName`.`dbName`.`materializedTableName` (\n" + + " `id` INT,\n" + + " `name` VARCHAR(2147483647),\n" + + " CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED\n" + + ")\n" + + "FRESHNESS = INTERVAL '1' MINUTE\n" + + "REFRESH_MODE = CONTINUOUS\n" + + "AS SELECT 1\n")); + argList.add( Arguments.of( createResolvedMaterialized( diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java index 05ddc34a23b..92035ccc944 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java @@ -89,6 +89,7 @@ class CatalogBaseTableResolutionTest { .watermark("ts", WATERMARK_SQL) .primaryKeyNamed("primary_constraint", "id") .indexNamed("idx", Collections.singletonList("id")) + .immutableColumnsNamed("imt", Collections.singletonList("region")) .build(); private static final Schema MATERIALIZED_TABLE_SCHEMA = @@ -101,6 +102,7 @@ class CatalogBaseTableResolutionTest { .withComment("") // empty column comment .primaryKeyNamed("primary_constraint", "id") .indexNamed("idx", Collections.singletonList("id")) + .immutableColumnsNamed("imt", Collections.singletonList("region")) .build(); private static final TableSchema LEGACY_TABLE_SCHEMA = @@ -138,7 +140,9 @@ class CatalogBaseTableResolutionTest { UniqueConstraint.primaryKey( "primary_constraint", Collections.singletonList("id")), Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("id")))); + DefaultIndex.newIndex("idx", Collections.singletonList("id"))), + ImmutableColumnsConstraint.immutableColumns( + "imt", Collections.singletonList("region"))); private static final ResolvedSchema RESOLVED_MATERIALIZED_TABLE_SCHEMA = new ResolvedSchema( @@ -152,7 +156,9 @@ class CatalogBaseTableResolutionTest { UniqueConstraint.primaryKey( "primary_constraint", Collections.singletonList("id")), Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("id")))); + DefaultIndex.newIndex("idx", Collections.singletonList("id"))), + ImmutableColumnsConstraint.immutableColumns( + "imt", Collections.singletonList("region"))); private static final ContinuousRefreshHandler CONTINUOUS_REFRESH_HANDLER = new ContinuousRefreshHandler( @@ -171,7 +177,8 @@ class CatalogBaseTableResolutionTest { Column.physical("county", DataTypes.VARCHAR(200))), Collections.emptyList(), null, - Collections.emptyList()); + Collections.emptyList(), + null); @Test void testCatalogTableResolution() { @@ -397,6 +404,8 @@ class CatalogBaseTableResolutionTest { properties.put("schema.primary-key.columns", "id"); properties.put("schema.index.0.name", "idx"); properties.put("schema.index.0.columns", "id"); + properties.put("schema.immutable.name", "imt"); + properties.put("schema.immutable.columns", "region"); properties.put("partition.keys.0.name", "region"); properties.put("partition.keys.1.name", "county"); properties.put("version", "12"); @@ -424,6 +433,8 @@ class CatalogBaseTableResolutionTest { properties.put("schema.primary-key.columns", "id"); properties.put("schema.index.0.name", "idx"); properties.put("schema.index.0.columns", "id"); + properties.put("schema.immutable.name", "imt"); + properties.put("schema.immutable.columns", "region"); properties.put("freshness-interval", "30"); properties.put("freshness-unit", "SECOND"); properties.put("logical-refresh-mode", "CONTINUOUS"); diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java index 58510130796..3519ade2907 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java @@ -90,6 +90,7 @@ class SchemaResolutionTest { .watermark("ts", WATERMARK_SQL) .columnByExpression("proctime", PROCTIME_SQL) .indexNamed("idx", Collections.singletonList("counter")) + .immutableColumnsNamed("imt", Collections.singletonList("payload")) .build(); // the type of ts_ltz is TIMESTAMP_LTZ @@ -141,8 +142,9 @@ class SchemaResolutionTest { UniqueConstraint.primaryKey( "primary_constraint", Collections.singletonList("id")), Collections.singletonList( - DefaultIndex.newIndex( - "idx", Collections.singletonList("counter")))); + DefaultIndex.newIndex("idx", Collections.singletonList("counter"))), + ImmutableColumnsConstraint.immutableColumns( + "imt", Collections.singletonList("payload"))); final ResolvedSchema actualStreamSchema = resolveSchema(SCHEMA, true); { @@ -172,7 +174,8 @@ class SchemaResolutionTest { WatermarkSpec.of("ts1", WATERMARK_RESOLVED_WITH_TS_LTZ)), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("id")))); + DefaultIndex.newIndex("idx", Collections.singletonList("id"))), + null); final ResolvedSchema actualStreamSchema = resolveSchema(SCHEMA_WITH_TS_LTZ, true); { @@ -202,7 +205,8 @@ class SchemaResolutionTest { DataTypes.TIMESTAMP_LTZ(1)))), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("ts_ltz")))); + DefaultIndex.newIndex("idx", Collections.singletonList("ts_ltz"))), + null); final ResolvedSchema resolvedSchema = resolveSchema( Schema.newBuilder() @@ -352,6 +356,40 @@ class SchemaResolutionTest { .build(), "Invalid index 'INDEX_ts'. " + "Column 'ts' is not a physical column or a metadata column."); + + // immutable constraints + + testError( + Schema.newBuilder().column("id", DataTypes.INT()).immutableColumns("id").build(), + "An immutable constraint must be defined on the table that contains primary key."); + + testError( + Schema.newBuilder() + .column("id", DataTypes.INT().notNull()) + .primaryKey("id") + .immutableColumns("INVALID") + .build(), + "Column 'INVALID' does not exist."); + + testError( + Schema.newBuilder() + .column("id", DataTypes.INT().notNull()) + .column("orig_ts", DataTypes.TIMESTAMP(3)) + .columnByExpression("ts", COMPUTED_SQL) + .primaryKey("id") + .immutableColumns("ts") + .build(), + "Column 'ts' is not a physical column."); + + testError( + Schema.newBuilder() + .column("id", DataTypes.INT().notNull()) + .column("name", DataTypes.STRING()) + .primaryKey("id") + .immutableColumns("name", "name") + .build(), + "Invalid immutable constraint 'IMMUTABLE_COLUMNS_name_name'. " + + "An immutable constraint must not contain duplicate columns. Found: [name]"); } @Test @@ -371,6 +409,7 @@ class SchemaResolutionTest { .indexNamed("idx", null) .build()) .hasMessageContaining("Index column names must not be null."); + assertThatThrownBy( () -> Schema.newBuilder() @@ -380,6 +419,48 @@ class SchemaResolutionTest { .hasMessageContaining("Index must be defined for at least a single column."); } + @Test + void testImmutableColumnsBuildingErrors() { + assertThatThrownBy( + () -> + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .primaryKey("a") + .immutableColumnsNamed(null, Collections.singletonList("a")) + .build()) + .hasMessageContaining("Immutable constraint name must not be null."); + + assertThatThrownBy( + () -> + Schema.newBuilder() + .column("a", DataTypes.INT()) + .primaryKey("a") + .immutableColumnsNamed("imt", (String[]) null) + .build()) + .hasMessageContaining("Immutable column names must not be null."); + + assertThatThrownBy( + () -> + Schema.newBuilder() + .column("a", DataTypes.INT()) + .primaryKey("a") + .immutableColumnsNamed("idx", Collections.emptyList()) + .build()) + .hasMessageContaining( + "Immutable constraint must be defined for at least a single column."); + + assertThatThrownBy( + () -> + Schema.newBuilder() + .column("a", DataTypes.INT().notNull()) + .column("b", DataTypes.INT().notNull()) + .primaryKey("a") + .immutableColumns("a") + .immutableColumns("b") + .build()) + .hasMessageContaining("Multiple immutable constraints are not supported."); + } + @Test void testUnresolvedSchemaString() { assertThat(SCHEMA.toString()) @@ -394,7 +475,8 @@ class SchemaResolutionTest { + " `proctime` AS [PROCTIME()],\n" + " WATERMARK FOR `ts` AS [ts - INTERVAL '5' SECOND],\n" + " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED,\n" - + " INDEX `idx` (`counter`)\n" + + " INDEX `idx` (`counter`),\n" + + " CONSTRAINT `imt` COLUMNS (`payload`) IMMUTABLE NOT ENFORCED\n" + ")"); } @@ -413,7 +495,8 @@ class SchemaResolutionTest { + " `proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME(),\n" + " WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '5' SECOND,\n" + " CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED,\n" - + " INDEX `idx` (`counter`)\n" + + " INDEX `idx` (`counter`),\n" + + " CONSTRAINT `imt` COLUMNS (`payload`) IMMUTABLE NOT ENFORCED\n" + ")"); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java index 654e919dfe5..d22edb91b25 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java @@ -24,6 +24,7 @@ import org.apache.flink.table.catalog.Column.ComputedColumn; import org.apache.flink.table.catalog.Column.MetadataColumn; import org.apache.flink.table.catalog.Column.PhysicalColumn; import org.apache.flink.table.catalog.Constraint; +import org.apache.flink.table.catalog.ImmutableColumnsConstraint; import org.apache.flink.table.catalog.Index; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.SchemaResolver; @@ -80,7 +81,12 @@ public final class Schema { private final List<UnresolvedIndex> indexes; - /** Please use {@link #Schema(List, List, UnresolvedPrimaryKey, List)} instead. */ + private final @Nullable UnresolvedImmutableColumns immutableColumns; + + /** + * Please use {@link #Schema(List, List, UnresolvedPrimaryKey, List, + * UnresolvedImmutableColumns)} instead. + */ @Deprecated public Schema( List<UnresolvedColumn> columns, @@ -94,10 +100,20 @@ public final class Schema { List<UnresolvedWatermarkSpec> watermarkSpecs, @Nullable UnresolvedPrimaryKey primaryKey, List<UnresolvedIndex> indexes) { + this(columns, watermarkSpecs, primaryKey, indexes, null); + } + + public Schema( + List<UnresolvedColumn> columns, + List<UnresolvedWatermarkSpec> watermarkSpecs, + @Nullable UnresolvedPrimaryKey primaryKey, + List<UnresolvedIndex> indexes, + @Nullable UnresolvedImmutableColumns immutableColumns) { this.columns = Collections.unmodifiableList(columns); this.watermarkSpecs = Collections.unmodifiableList(watermarkSpecs); this.primaryKey = primaryKey; this.indexes = Collections.unmodifiableList(indexes); + this.immutableColumns = immutableColumns; } /** Builder for configuring and creating instances of {@link Schema}. */ @@ -134,6 +150,10 @@ public final class Schema { return indexes; } + public Optional<UnresolvedImmutableColumns> getImmutableColumns() { + return Optional.ofNullable(immutableColumns); + } + /** Resolves the given {@link Schema} to a validated {@link ResolvedSchema}. */ public ResolvedSchema resolve(SchemaResolver resolver) { return resolver.resolve(this); @@ -150,6 +170,9 @@ public final class Schema { if (!indexes.isEmpty()) { components.addAll(indexes); } + if (immutableColumns != null) { + components.add(immutableColumns); + } return components.stream() .map(Objects::toString) .map(s -> " " + s) @@ -168,12 +191,13 @@ public final class Schema { return columns.equals(schema.columns) && watermarkSpecs.equals(schema.watermarkSpecs) && Objects.equals(primaryKey, schema.primaryKey) - && indexes.equals(schema.indexes); + && indexes.equals(schema.indexes) + && Objects.equals(immutableColumns, schema.immutableColumns); } @Override public int hashCode() { - return Objects.hash(columns, watermarkSpecs, primaryKey, indexes); + return Objects.hash(columns, watermarkSpecs, primaryKey, indexes, immutableColumns); } // -------------------------------------------------------------------------------------------- @@ -190,6 +214,8 @@ public final class Schema { private final List<UnresolvedIndex> indexes; + private @Nullable UnresolvedImmutableColumns immutableColumns; + private Builder() { columns = new ArrayList<>(); watermarkSpecs = new ArrayList<>(); @@ -206,6 +232,11 @@ public final class Schema { unresolvedSchema.primaryKey.getColumnNames()); } indexes.addAll(unresolvedSchema.indexes); + if (unresolvedSchema.immutableColumns != null) { + immutableColumnsNamed( + unresolvedSchema.immutableColumns.getConstraintName(), + unresolvedSchema.immutableColumns.getColumnNames()); + } return this; } @@ -215,6 +246,7 @@ public final class Schema { addResolvedWatermarkSpec(resolvedSchema.getWatermarkSpecs()); resolvedSchema.getPrimaryKey().ifPresent(this::addResolvedConstraint); addResolvedIndexes(resolvedSchema.getIndexes()); + resolvedSchema.getImmutableColumns().ifPresent(this::addResolvedConstraint); return this; } @@ -672,9 +704,87 @@ public final class Schema { return this; } + /** + * Declares an immutable columns constraint for a list of given columns. Immutable columns + * constraint is used to identify which columns in a table are not allowed to be modified. + * Currently, this constraint is informational only and is not enforced. It can be utilized + * for optimization purposes. It is the responsibility of the data owner to ensure that + * these columns are unmodified. + * + * <p>The immutable columns will be assigned a generated name in the format {@code + * IMMUTABLE_COLUMNS_col1_col2}. + * + * @param columnNames columns that form the constraint for immutable columns + */ + public Builder immutableColumns(String... columnNames) { + Preconditions.checkNotNull(columnNames, "Immutable column names must not be null."); + return immutableColumns(Arrays.asList(columnNames)); + } + + /** + * Declares an immutable columns constraint for a list of given columns. Immutable columns + * constraint is used to identify which columns in a table are not allowed to be modified. + * Currently, this constraint is informational only and is not enforced. It can be utilized + * for optimization purposes. It is the responsibility of the data owner to ensure that + * these columns are unmodified. + * + * <p>The immutable columns will be assigned a generated name in the format {@code + * IMMUTABLE_COLUMNS_col1_col2}. + * + * @param columnNames columns that form the constraint for immutable columns + */ + public Builder immutableColumns(List<String> columnNames) { + Preconditions.checkNotNull(columnNames, "Immutable column names must not be null."); + final String generatedConstraintName = + columnNames.stream().collect(Collectors.joining("_", "IMMUTABLE_COLUMNS_", "")); + return immutableColumnsNamed(generatedConstraintName, columnNames); + } + + /** + * Declares an immutable columns constraint for a list of given columns. Immutable columns + * constraint is used to identify which columns in a table are not allowed to be modified. + * Currently, this constraint is informational only and is not enforced. It can be utilized + * for optimization purposes. It is the responsibility of the data owner to ensure that + * these columns are unmodified. + * + * @param constraintName name for the immutable columns constraint, can be used to reference + * this constraint + * @param columnNames columns that form the constraint for immutable columns + */ + public Builder immutableColumnsNamed(String constraintName, String... columnNames) { + Preconditions.checkNotNull(columnNames, "Immutable column names must not be null."); + return immutableColumnsNamed(constraintName, Arrays.asList(columnNames)); + } + + /** + * Declares an immutable columns constraint for a list of given columns. Immutable columns + * constraint is used to identify which columns in a table are not allowed to be modified. + * Currently, this constraint is informational only and is not enforced. It can be utilized + * for optimization purposes. It is the responsibility of the data owner to ensure that + * these columns are unmodified. + * + * @param constraintName name for the immutable columns constraint, can be used to reference + * this constraint + * @param columnNames columns that form the constraint for immutable columns + */ + public Builder immutableColumnsNamed(String constraintName, List<String> columnNames) { + Preconditions.checkState( + immutableColumns == null, "Multiple immutable constraints are not supported."); + Preconditions.checkNotNull( + constraintName, "Immutable constraint name must not be null."); + Preconditions.checkArgument( + !StringUtils.isNullOrWhitespaceOnly(constraintName), + "Immutable constraint name must not be empty."); + Preconditions.checkArgument( + columnNames != null && !columnNames.isEmpty(), + "Immutable constraint must be defined for at least a single column."); + immutableColumns = new UnresolvedImmutableColumns(constraintName, columnNames); + return this; + } + /** Returns an instance of an unresolved {@link Schema}. */ public Schema build() { - return new Schema(columns, watermarkSpecs, primaryKey, indexes); + return new Schema(columns, watermarkSpecs, primaryKey, indexes, immutableColumns); } // ---------------------------------------------------------------------------------------- @@ -709,9 +819,13 @@ public final class Schema { s.getRowtimeAttribute(), s.getWatermarkExpression()))); } - private void addResolvedConstraint(UniqueConstraint constraint) { + private void addResolvedConstraint(Constraint constraint) { if (constraint.getType() == Constraint.ConstraintType.PRIMARY_KEY) { - primaryKeyNamed(constraint.getName(), constraint.getColumns()); + primaryKeyNamed(constraint.getName(), ((UniqueConstraint) constraint).getColumns()); + } else if (constraint.getType() == Constraint.ConstraintType.IMMUTABLE_COLUMNS) { + immutableColumnsNamed( + constraint.getName(), + ((ImmutableColumnsConstraint) constraint).getColumns()); } else { throw new IllegalArgumentException("Unsupported constraint type."); } @@ -1173,4 +1287,53 @@ public final class Schema { return Objects.hash(indexName, columnNames); } } + + /** + * Declaration of a list of immutable columns that will be resolved to {@link + * ImmutableColumnsConstraint} during schema resolution. + */ + @PublicEvolving + public static final class UnresolvedImmutableColumns extends UnresolvedConstraint { + + private final List<String> columnNames; + + public UnresolvedImmutableColumns(String constraintName, List<String> columnNames) { + super(constraintName); + this.columnNames = columnNames; + } + + public List<String> getColumnNames() { + return columnNames; + } + + @Override + public String toString() { + return String.format( + "%s COLUMNS (%s) IMMUTABLE NOT ENFORCED", + super.toString(), + columnNames.stream() + .map(EncodingUtils::escapeIdentifier) + .collect(Collectors.joining(", "))); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + UnresolvedImmutableColumns that = (UnresolvedImmutableColumns) o; + return columnNames.equals(that.columnNames); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), columnNames); + } + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java index 16e9ba8c224..67b72bdeb68 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java @@ -376,6 +376,8 @@ public final class CatalogPropertiesUtil { private static final String PRIMARY_KEY = "primary-key"; + private static final String IMMUTABLE = "immutable"; + private static final String COLUMNS = "columns"; private static final String PARTITION = "partition"; @@ -399,6 +401,10 @@ public final class CatalogPropertiesUtil { private static final String PRIMARY_KEY_COLUMNS = compoundKey(PRIMARY_KEY, COLUMNS); + private static final String IMMUTABLE_NAME = compoundKey(IMMUTABLE, NAME); + + private static final String IMMUTABLE_COLUMNS = compoundKey(IMMUTABLE, COLUMNS); + private static final String INDEX = "index"; private static final String INDEX_NAME = "name"; @@ -515,6 +521,8 @@ public final class CatalogPropertiesUtil { deserializePrimaryKey(map, schemaKey, builder); + deserializeImmutableCols(map, schemaKey, builder); + deserializeIndexes(map, schemaKey, builder); return builder.build(); @@ -545,6 +553,17 @@ public final class CatalogPropertiesUtil { } } + private static void deserializeImmutableCols( + Map<String, String> map, String schemaKey, Builder builder) { + final String constraintNameKey = compoundKey(schemaKey, IMMUTABLE_NAME); + final String columnsKey = compoundKey(schemaKey, IMMUTABLE_COLUMNS); + if (map.containsKey(constraintNameKey)) { + final String constraintName = getValue(map, constraintNameKey); + final String[] columns = getValue(map, columnsKey, s -> s.split(",")); + builder.immutableColumnsNamed(constraintName, columns); + } + } + private static void deserializeWatermark( Map<String, String> map, String schemaKey, Builder builder) { final String watermarkKey = compoundKey(schemaKey, WATERMARK); @@ -654,6 +673,8 @@ public final class CatalogPropertiesUtil { schema.getPrimaryKey().ifPresent(pk -> serializePrimaryKey(map, pk)); + schema.getImmutableColumns().ifPresent(ics -> serializeImmutableCols(map, ics)); + serializeIndexes(map, schema.getIndexes()); } @@ -679,6 +700,12 @@ public final class CatalogPropertiesUtil { String.join(",", constraint.getColumns())); } + private static void serializeImmutableCols( + Map<String, String> map, ImmutableColumnsConstraint constraint) { + map.put(compoundKey(SCHEMA, IMMUTABLE_NAME), constraint.getName()); + map.put(compoundKey(SCHEMA, IMMUTABLE_COLUMNS), String.join(",", constraint.getColumns())); + } + private static void serializeWatermarkSpecs( Map<String, String> map, List<WatermarkSpec> specs, SqlFactory sqlFactory) { if (!specs.isEmpty()) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java index f0c363e9f02..1f0c4359c64 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java @@ -46,19 +46,23 @@ public interface Constraint { /** * Type of the constraint. * - * <p>Unique constraints: - * * <ul> - * <li>UNIQUE - is satisfied if and only if there do not exist two rows that have same - * non-null values in the unique columns - * <li>PRIMARY KEY - additionally to UNIQUE constraint, it requires none of the values in - * specified columns be a null value. Moreover there can be only a single PRIMARY KEY - * defined for a Table. + * <li>Unique constraints: + * <ul> + * <li>UNIQUE - is satisfied if and only if there do not exist two rows that have same + * non-null values in the unique columns + * <li>PRIMARY KEY - additionally to UNIQUE constraint, it requires none of the values + * in specified columns be a null value. Moreover there can be only a single PRIMARY + * KEY defined for a Table. + * </ul> + * <li>Immutable constraint - is satisfied iff these specific columns are not allowed to be + * modified. * </ul> */ @PublicEvolving enum ConstraintType { PRIMARY_KEY, - UNIQUE_KEY + UNIQUE_KEY, + IMMUTABLE_COLUMNS } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ImmutableColumnsConstraint.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ImmutableColumnsConstraint.java new file mode 100644 index 00000000000..e1426d0b659 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ImmutableColumnsConstraint.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.utils.EncodingUtils; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Immutable columns constraint is used to identify which columns in a table are not allowed to be + * modified. + * + * @see ConstraintType + */ +@PublicEvolving +public final class ImmutableColumnsConstraint extends AbstractConstraint { + + private final List<String> columns; + private final ConstraintType type; + + /** Creates a non enforced {@link ConstraintType#IMMUTABLE_COLUMNS} constraint. */ + public static ImmutableColumnsConstraint immutableColumns(String name, List<String> columns) { + return new ImmutableColumnsConstraint( + name, false, ConstraintType.IMMUTABLE_COLUMNS, columns); + } + + private ImmutableColumnsConstraint( + String name, boolean enforced, ConstraintType type, List<String> columns) { + super(name, enforced); + + this.columns = columns; + this.type = type; + + if (type != ConstraintType.IMMUTABLE_COLUMNS) { + throw new IllegalStateException("Unknown key type: " + getType()); + } + } + + public List<String> getColumns() { + return columns; + } + + @Override + public ConstraintType getType() { + return ConstraintType.IMMUTABLE_COLUMNS; + } + + @Override + public String asSummaryString() { + return String.format( + "CONSTRAINT %s COLUMNS (%s) IMMUTABLE%s", + EncodingUtils.escapeIdentifier(getName()), + columns.stream() + .map(EncodingUtils::escapeIdentifier) + .collect(Collectors.joining(", ")), + isEnforced() ? "" : " NOT ENFORCED"); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + ImmutableColumnsConstraint that = (ImmutableColumnsConstraint) o; + return Objects.equals(columns, that.columns); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), columns); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java index 916733b44b7..b2871b6e56a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java @@ -64,8 +64,12 @@ public final class ResolvedSchema { private final List<WatermarkSpec> watermarkSpecs; private final @Nullable UniqueConstraint primaryKey; private final List<Index> indexes; + private final @Nullable ImmutableColumnsConstraint immutableColumns; - /** Please use {@link #ResolvedSchema(List, List, UniqueConstraint, List)} instead. */ + /** + * Please use {@link #ResolvedSchema(List, List, UniqueConstraint, List, + * ImmutableColumnsConstraint)} instead. + */ @Deprecated public ResolvedSchema( List<Column> columns, @@ -79,16 +83,31 @@ public final class ResolvedSchema { List<WatermarkSpec> watermarkSpecs, @Nullable UniqueConstraint primaryKey, List<Index> indexes) { + this(columns, watermarkSpecs, primaryKey, indexes, null); + } + + public ResolvedSchema( + List<Column> columns, + List<WatermarkSpec> watermarkSpecs, + @Nullable UniqueConstraint primaryKey, + List<Index> indexes, + @Nullable ImmutableColumnsConstraint immutableColumns) { this.columns = Preconditions.checkNotNull(columns, "Columns must not be null."); this.watermarkSpecs = Preconditions.checkNotNull(watermarkSpecs, "Watermark specs must not be null."); this.primaryKey = primaryKey; this.indexes = Preconditions.checkNotNull(indexes, "Indexes must not be null."); + this.immutableColumns = immutableColumns; + + Preconditions.checkArgument( + primaryKey != null || immutableColumns == null, + "Immutable constraint must be defined on the table that contains primary key."); } /** Shortcut for a resolved schema of only columns. */ public static ResolvedSchema of(List<Column> columns) { - return new ResolvedSchema(columns, Collections.emptyList(), null, Collections.emptyList()); + return new ResolvedSchema( + columns, Collections.emptyList(), null, Collections.emptyList(), null); } /** Shortcut for a resolved schema of only columns. */ @@ -106,7 +125,8 @@ public final class ResolvedSchema { IntStream.range(0, columnNames.size()) .mapToObj(i -> Column.physical(columnNames.get(i), columnDataTypes.get(i))) .collect(Collectors.toList()); - return new ResolvedSchema(columns, Collections.emptyList(), null, Collections.emptyList()); + return new ResolvedSchema( + columns, Collections.emptyList(), null, Collections.emptyList(), null); } /** Shortcut for a resolved schema of only physical columns. */ @@ -180,6 +200,11 @@ public final class ResolvedSchema { return indexes; } + /** Returns the constraint about immutable columns if it has been defined. */ + public Optional<ImmutableColumnsConstraint> getImmutableColumns() { + return Optional.ofNullable(immutableColumns); + } + /** * Returns the primary key indexes in the {@link #toPhysicalRowDataType()}, if any, otherwise * returns an empty array. @@ -196,6 +221,24 @@ public final class ResolvedSchema { .orElseGet(() -> new int[] {}); } + /** + * Returns the indexes of columns about the immutable constraint in the {@link + * #toPhysicalRowDataType()}, if any, otherwise returns an empty array. + */ + public int[] getImmutableColumnIndexes() { + final List<String> columns = + getColumns().stream() + .filter(Column::isPhysical) + .map(Column::getName) + .collect(Collectors.toList()); + return getImmutableColumns() + .map(ImmutableColumnsConstraint::getColumns) + .map( + immutableColumns -> + immutableColumns.stream().mapToInt(columns::indexOf).toArray()) + .orElseGet(() -> new int[] {}); + } + /** * Converts all columns of this schema into a (possibly nested) row data type. * @@ -253,6 +296,9 @@ public final class ResolvedSchema { components.add(primaryKey); } components.addAll(indexes); + if (immutableColumns != null) { + components.add(immutableColumns); + } return components.stream() .map(Objects::toString) .map(s -> " " + s) @@ -271,12 +317,13 @@ public final class ResolvedSchema { return Objects.equals(columns, that.columns) && Objects.equals(watermarkSpecs, that.watermarkSpecs) && Objects.equals(primaryKey, that.primaryKey) - && Objects.equals(indexes, that.indexes); + && Objects.equals(indexes, that.indexes) + && Objects.equals(immutableColumns, that.immutableColumns); } @Override public int hashCode() { - return Objects.hash(columns, watermarkSpecs, primaryKey, indexes); + return Objects.hash(columns, watermarkSpecs, primaryKey, indexes, immutableColumns); } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java index ad541ff7d26..21cb1900f77 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java @@ -127,7 +127,8 @@ public class TableSchemaUtils { .collect(Collectors.toList()), resolvedSchema.getWatermarkSpecs(), resolvedSchema.getPrimaryKey().orElse(null), - resolvedSchema.getIndexes()); + resolvedSchema.getIndexes(), + resolvedSchema.getImmutableColumns().orElse(null)); } /** diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java index 6800f8e2ced..43eb735c4c9 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java @@ -120,6 +120,7 @@ class CatalogPropertiesUtilTest { .column("f2", DataTypes.STRING().getLogicalType().asSerializableString()) .primaryKey("f1") .indexNamed("f1", Collections.singletonList("f1")) + .immutableColumns("f2") .build(); final TableDistribution hashDist = @@ -137,8 +138,12 @@ class CatalogPropertiesUtilTest { List<Index> indexes = Collections.singletonList( DefaultIndex.newIndex("f1", Collections.singletonList("f1"))); + final ImmutableColumnsConstraint immutableColumns = + ImmutableColumnsConstraint.immutableColumns( + "IMMUTABLE_COLUMNS_f2", Collections.singletonList("f2")); final ResolvedSchema resolvedSchema = - new ResolvedSchema(columns, Collections.emptyList(), primaryKey, indexes); + new ResolvedSchema( + columns, Collections.emptyList(), primaryKey, indexes, immutableColumns); return Stream.of( new ResolvedCatalogTable( diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java index 937e99c46ef..228b5485f01 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java @@ -1623,7 +1623,8 @@ public abstract class CatalogTest { Collections.emptyList(), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("first")))); + DefaultIndex.newIndex("idx", Collections.singletonList("first"))), + null); } protected ResolvedSchema createAnotherSchema() { @@ -1635,7 +1636,8 @@ public abstract class CatalogTest { Collections.emptyList(), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("first")))); + DefaultIndex.newIndex("idx", Collections.singletonList("first"))), + null); } protected List<String> createPartitionKeys() { diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java index 454691c0921..5ba20ef1fbc 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java @@ -51,7 +51,10 @@ public class TestSchemaResolver implements SchemaResolver { final List<Index> indexes = resolveIndexes(schema.getIndexes()); - return new ResolvedSchema(columns, watermarkSpecs, primaryKey, indexes); + final ImmutableColumnsConstraint immutableColumns = + resolveImmutableColumns(schema.getImmutableColumns().orElse(null)); + + return new ResolvedSchema(columns, watermarkSpecs, primaryKey, indexes, immutableColumns); } private List<Index> resolveIndexes(List<Schema.UnresolvedIndex> unresolvedIndexes) { @@ -124,6 +127,17 @@ public class TestSchemaResolver implements SchemaResolver { unresolvedPrimaryKey.getConstraintName(), unresolvedPrimaryKey.getColumnNames()); } + private @Nullable ImmutableColumnsConstraint resolveImmutableColumns( + @Nullable Schema.UnresolvedImmutableColumns unresolvedImmutableColumns) { + if (unresolvedImmutableColumns == null) { + return null; + } + + return ImmutableColumnsConstraint.immutableColumns( + unresolvedImmutableColumns.getConstraintName(), + unresolvedImmutableColumns.getColumnNames()); + } + private ResolvedExpression resolveExpression(Expression expression) { if (expression instanceof ResolvedExpression) { return (ResolvedExpression) expression; diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java index 97e6664834b..4bfa97673cb 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java @@ -22,6 +22,7 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.DefaultIndex; +import org.apache.flink.table.catalog.ImmutableColumnsConstraint; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.catalog.WatermarkSpec; @@ -104,7 +105,9 @@ class TableSchemaUtilsTest { WatermarkSpec.of("t", ResolvedExpressionMock.of(rowTimeType, "t"))), UniqueConstraint.primaryKey("test-pk", Collections.singletonList("id")), Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("id")))); + DefaultIndex.newIndex("idx", Collections.singletonList("id"))), + ImmutableColumnsConstraint.immutableColumns( + "test-imt", Collections.singletonList("t"))); assertThat(TableSchemaUtils.removeTimeAttributeFromResolvedSchema(schema)) .isEqualTo( new ResolvedSchema( @@ -124,6 +127,8 @@ class TableSchemaUtilsTest { "test-pk", Collections.singletonList("id")), Collections.singletonList( DefaultIndex.newIndex( - "idx", Collections.singletonList("id"))))); + "idx", Collections.singletonList("id"))), + ImmutableColumnsConstraint.immutableColumns( + "test-imt", Collections.singletonList("t")))); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AbstractConstraintMixin.java similarity index 62% copy from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java copy to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AbstractConstraintMixin.java index 62501577f0f..20560ef2afb 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AbstractConstraintMixin.java @@ -19,41 +19,21 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.catalog.Constraint.ConstraintType; -import org.apache.flink.table.catalog.UniqueConstraint; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -import java.util.List; - -/** Mixin for {@link UniqueConstraint}. */ +/** Base mixin for {@code AbstractConstraint} subclasses. */ @Internal -abstract class UniqueConstraintMixin { +abstract class AbstractConstraintMixin { static final String NAME = "name"; static final String ENFORCED = "enforced"; - static final String TYPE = "type"; - static final String COLUMNS = "columns"; - - @JsonCreator - private UniqueConstraintMixin( - @JsonProperty(NAME) String name, - @JsonProperty(ENFORCED) boolean enforced, - @JsonProperty(TYPE) ConstraintType type, - @JsonProperty(COLUMNS) List<String> columns) {} @JsonProperty(NAME) public abstract String getName(); - @JsonProperty(TYPE) - public abstract ConstraintType getType(); - @JsonProperty(ENFORCED) @JsonInclude(JsonInclude.Include.NON_DEFAULT) public abstract boolean isEnforced(); - - @JsonProperty(COLUMNS) - public abstract List<String> getColumns(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CompiledPlanSerdeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CompiledPlanSerdeUtil.java index cb227b3fd75..bb0ea8a35e2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CompiledPlanSerdeUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CompiledPlanSerdeUtil.java @@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ContextResolvedModel; import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.DefaultIndex; +import org.apache.flink.table.catalog.ImmutableColumnsConstraint; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogModel; import org.apache.flink.table.catalog.ResolvedCatalogTable; @@ -230,6 +231,8 @@ public class CompiledPlanSerdeUtil { private static void registerMixins(SimpleModule module) { module.setMixInAnnotation(WatermarkSpec.class, WatermarkSpecMixin.class); module.setMixInAnnotation(UniqueConstraint.class, UniqueConstraintMixin.class); + module.setMixInAnnotation( + ImmutableColumnsConstraint.class, ImmutableColumnsConstraintMixin.class); module.setMixInAnnotation(DefaultIndex.class, DefaultIndexMixin.class); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ImmutableColumnsConstraintMixin.java similarity index 68% copy from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java copy to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ImmutableColumnsConstraintMixin.java index 62501577f0f..4957b9728a1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ImmutableColumnsConstraintMixin.java @@ -19,40 +19,30 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.catalog.Constraint.ConstraintType; -import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.Constraint; +import org.apache.flink.table.catalog.ImmutableColumnsConstraint; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; -/** Mixin for {@link UniqueConstraint}. */ +/** Mixin for {@link ImmutableColumnsConstraint}. */ @Internal -abstract class UniqueConstraintMixin { +abstract class ImmutableColumnsConstraintMixin extends AbstractConstraintMixin { - static final String NAME = "name"; - static final String ENFORCED = "enforced"; static final String TYPE = "type"; static final String COLUMNS = "columns"; @JsonCreator - private UniqueConstraintMixin( + private ImmutableColumnsConstraintMixin( @JsonProperty(NAME) String name, @JsonProperty(ENFORCED) boolean enforced, - @JsonProperty(TYPE) ConstraintType type, + @JsonProperty(TYPE) Constraint.ConstraintType type, @JsonProperty(COLUMNS) List<String> columns) {} - @JsonProperty(NAME) - public abstract String getName(); - @JsonProperty(TYPE) - public abstract ConstraintType getType(); - - @JsonProperty(ENFORCED) - @JsonInclude(JsonInclude.Include.NON_DEFAULT) - public abstract boolean isEnforced(); + public abstract Constraint.ConstraintType getType(); @JsonProperty(COLUMNS) public abstract List<String> getColumns(); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java index 7795facd1e4..82c84ff9acc 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java @@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; import org.apache.flink.annotation.Internal; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.DefaultIndex; +import org.apache.flink.table.catalog.ImmutableColumnsConstraint; import org.apache.flink.table.catalog.Index; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; @@ -39,6 +40,7 @@ import static org.apache.flink.table.planner.plan.nodes.exec.serde.CompiledPlanS import static org.apache.flink.table.planner.plan.nodes.exec.serde.CompiledPlanSerdeUtil.deserializeList; import static org.apache.flink.table.planner.plan.nodes.exec.serde.CompiledPlanSerdeUtil.deserializeListOrEmpty; import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.COLUMNS; +import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.IMMUTABLE_COLUMNS; import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.INDEXES; import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.PRIMARY_KEY; import static org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.WATERMARK_SPECS; @@ -69,7 +71,10 @@ final class ResolvedSchemaJsonDeserializer extends StdDeserializer<ResolvedSchem deserializeFieldOrNull(jsonNode, PRIMARY_KEY, UniqueConstraint.class, codec, ctx); List<Index> indexes = deserializeListOrEmpty(jsonNode, INDEXES, DefaultIndex.class, codec, ctx); + ImmutableColumnsConstraint immutableColumns = + deserializeFieldOrNull( + jsonNode, IMMUTABLE_COLUMNS, ImmutableColumnsConstraint.class, codec, ctx); - return new ResolvedSchema(columns, watermarkSpecs, primaryKey, indexes); + return new ResolvedSchema(columns, watermarkSpecs, primaryKey, indexes, immutableColumns); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java index f3d0c83df2b..1cd450e37d2 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java @@ -43,6 +43,7 @@ final class ResolvedSchemaJsonSerializer extends StdSerializer<ResolvedSchema> { static final String WATERMARK_SPECS = "watermarkSpecs"; static final String PRIMARY_KEY = "primaryKey"; static final String INDEXES = "indexes"; + static final String IMMUTABLE_COLUMNS = "immutableColumns"; ResolvedSchemaJsonSerializer() { super(ResolvedSchema.class); @@ -67,6 +68,11 @@ final class ResolvedSchemaJsonSerializer extends StdSerializer<ResolvedSchema> { jsonGenerator, PRIMARY_KEY, resolvedSchema.getPrimaryKey(), serializerProvider); serializeListIfNotEmpty( jsonGenerator, INDEXES, resolvedSchema.getIndexes(), serializerProvider); + serializeOptionalField( + jsonGenerator, + IMMUTABLE_COLUMNS, + resolvedSchema.getImmutableColumns(), + serializerProvider); jsonGenerator.writeEndObject(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java index 62501577f0f..46ecd9012e9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java @@ -23,17 +23,14 @@ import org.apache.flink.table.catalog.Constraint.ConstraintType; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; /** Mixin for {@link UniqueConstraint}. */ @Internal -abstract class UniqueConstraintMixin { +abstract class UniqueConstraintMixin extends AbstractConstraintMixin { - static final String NAME = "name"; - static final String ENFORCED = "enforced"; static final String TYPE = "type"; static final String COLUMNS = "columns"; @@ -44,16 +41,9 @@ abstract class UniqueConstraintMixin { @JsonProperty(TYPE) ConstraintType type, @JsonProperty(COLUMNS) List<String> columns) {} - @JsonProperty(NAME) - public abstract String getName(); - @JsonProperty(TYPE) public abstract ConstraintType getType(); - @JsonProperty(ENFORCED) - @JsonInclude(JsonInclude.Include.NON_DEFAULT) - public abstract boolean isEnforced(); - @JsonProperty(COLUMNS) public abstract List<String> getColumns(); } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java index 042bff697a0..738824398cd 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java @@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.DefaultIndex; +import org.apache.flink.table.catalog.ImmutableColumnsConstraint; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; @@ -78,7 +79,9 @@ public class CatalogConstraintTest { "primary_constraint", Collections.singletonList("b")), Collections.singletonList( - DefaultIndex.newIndex("idx", List.of("a", "b"))))) + DefaultIndex.newIndex("idx", List.of("a", "b"))), + ImmutableColumnsConstraint.immutableColumns( + "immutable_constraint", List.of("b")))) .build(); Map<String, String> properties = buildCatalogTableProperties(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java index 35d300ea35e..b0ffe87df9a 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java @@ -54,7 +54,8 @@ class TableLineageUtilsTest { Column.physical("c", DataTypes.BOOLEAN())), Collections.emptyList(), null, - Collections.emptyList()); + Collections.emptyList(), + null); private static final Schema CATALOG_TABLE_SCHEMA = Schema.newBuilder().fromResolvedSchema(CATALOG_TABLE_RESOLVED_SCHEMA).build(); private static final Map<String, String> TABLE_OPTIONS = new HashMap<>(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index 6f246c2fbb1..dd00b8c8506 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -39,6 +39,7 @@ import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.TableDistribution; import org.apache.flink.table.catalog.TableDistribution.Kind; +import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.WatermarkSpec; import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; @@ -1469,9 +1470,9 @@ class SqlMaterializedTableNodeToOperationConverterTest Column.physical("shop_id", DataTypes.BIGINT()), Column.physical("user_id", DataTypes.INT().notNull())), List.of(), - org.apache.flink.table.catalog.UniqueConstraint.primaryKey( - "PK_user_id", List.of("user_id")), - List.of())), + UniqueConstraint.primaryKey("PK_user_id", List.of("user_id")), + List.of(), + null)), Arguments.of( operation + "MATERIALIZED TABLE users_shops (PRIMARY KEY(user_id) NOT ENFORCED)" @@ -1482,9 +1483,9 @@ class SqlMaterializedTableNodeToOperationConverterTest Column.physical("shop_id", DataTypes.INT().notNull()), Column.physical("user_id", DataTypes.INT().notNull())), List.of(), - org.apache.flink.table.catalog.UniqueConstraint.primaryKey( - "PK_user_id", List.of("user_id")), - List.of()))); + UniqueConstraint.primaryKey("PK_user_id", List.of("user_id")), + List.of(), + null))); } /** Boilerplate CatalogMaterializedTable builder for tests. */ diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java index 302bcae32b6..ce8a16f534b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java @@ -102,7 +102,8 @@ class FlinkCalciteCatalogReaderTest { Collections.emptyList(), Collections.emptyList(), null, - Collections.emptyList()); + Collections.emptyList(), + null); final CatalogTable catalogTable = ConnectorCatalogTable.source( new TestTableSource( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java index 6417922b107..0ba42e3dd68 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java @@ -83,7 +83,8 @@ public class ContextResolvedTableSerdeTest { Collections.emptyList(), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("a")))); + DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null); private static final Schema CATALOG_TABLE_SCHEMA = Schema.newBuilder().fromResolvedSchema(CATALOG_TABLE_RESOLVED_SCHEMA).build(); @@ -416,7 +417,8 @@ public class ContextResolvedTableSerdeTest { null, Collections.singletonList( DefaultIndex.newIndex( - "idx", Collections.singletonList("a")))); + "idx", Collections.singletonList("a"))), + null); final ContextResolvedTable spec = ContextResolvedTable.permanent( PERMANENT_TABLE_IDENTIFIER, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java index 461b1346e7b..cc151c09382 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java @@ -90,7 +90,8 @@ class DynamicTableSinkSpecSerdeTest { Collections.emptyList(), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("a")))); + DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null); final CatalogTable catalogTable1 = CatalogTable.newBuilder() .schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema1).build()) @@ -122,7 +123,8 @@ class DynamicTableSinkSpecSerdeTest { Collections.emptyList(), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("a")))); + DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null); final CatalogTable catalogTable2 = CatalogTable.newBuilder() .schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema2).build()) @@ -160,7 +162,8 @@ class DynamicTableSinkSpecSerdeTest { Collections.emptyList(), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("a")))); + DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null); final CatalogTable catalogTable3 = CatalogTable.newBuilder() .schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema3).build()) @@ -195,7 +198,8 @@ class DynamicTableSinkSpecSerdeTest { Collections.emptyList(), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("a")))); + DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null); final CatalogTable catalogTable4 = CatalogTable.newBuilder() .schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema4).build()) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java index c32eaae35ce..6fd5942f19e 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java @@ -107,7 +107,8 @@ public class DynamicTableSourceSpecSerdeTest { Collections.emptyList(), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("a")))); + DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null); final CatalogTable catalogTable1 = CatalogTable.newBuilder() @@ -146,7 +147,8 @@ public class DynamicTableSourceSpecSerdeTest { Collections.emptyList(), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("a")))); + DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null); final CatalogTable catalogTable2 = CatalogTable.newBuilder() @@ -393,7 +395,8 @@ public class DynamicTableSourceSpecSerdeTest { Collections.emptyList(), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("a")))); + DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null); return new ResolvedCatalogTable( CatalogTable.newBuilder() diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableSerdeTest.java index 9e6162c9ccc..323221bca37 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableSerdeTest.java @@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.DefaultIndex; import org.apache.flink.table.catalog.ExternalCatalogTable; +import org.apache.flink.table.catalog.ImmutableColumnsConstraint; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableDistribution; @@ -96,7 +97,9 @@ class ResolvedCatalogTableSerdeTest { Collections.singletonList(WatermarkSpec.of("b", REX_NODE_EXPRESSION)), UniqueConstraint.primaryKey("myPrimaryKey", Arrays.asList("a", "c")), Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("b")))); + DefaultIndex.newIndex("idx", Collections.singletonList("b"))), + ImmutableColumnsConstraint.immutableColumns( + "imt", Collections.singletonList("d"))); private static final ResolvedCatalogTable FULL_RESOLVED_CATALOG_TABLE = new ResolvedCatalogTable( @@ -126,7 +129,8 @@ class ResolvedCatalogTableSerdeTest { Collections.emptyList(), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("a")))); + DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null); return Stream.of( FULL_RESOLVED_CATALOG_TABLE, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java index 7eb6132a75d..78804d8d0a2 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java @@ -79,7 +79,8 @@ public class TemporalTableSourceSpecSerdeTest { Collections.emptyList(), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("a")))); + DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null); final CatalogTable catalogTable1 = CatalogTable.newBuilder() diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java index 8604882b7de..cef6487cc8b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java @@ -78,7 +78,8 @@ public class VectorSearchTableSourceSpecSerdeTest { Collections.singletonList(Column.physical("a", DataTypes.BIGINT())), Collections.emptyList(), null, - Collections.emptyList()); + Collections.emptyList(), + null); final CatalogTable catalogTable1 = CatalogTable.newBuilder() diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java index 94ebdba3d04..98310727df5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java @@ -370,7 +370,8 @@ class DataStreamJavaITCase { TIMESTAMP_LTZ(3), "`SOURCE_WATERMARK`()"))), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("f0"))))); + DefaultIndex.newIndex("idx", Collections.singletonList("f0"))), + null)); tableEnv.createTemporaryView("t", table); @@ -644,7 +645,8 @@ class DataStreamJavaITCase { TIMESTAMP(3), "`SOURCE_WATERMARK`()"))), null, Collections.singletonList( - DefaultIndex.newIndex("idx", Collections.singletonList("f0"))))); + DefaultIndex.newIndex("idx", Collections.singletonList("f0"))), + null)); final DataStream<Long> rowtimeStream = tableEnv.toDataStream(table) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala index e8c321236c2..5e2ffd7c58e 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala @@ -321,7 +321,8 @@ object MetadataTestUtil { ), Collections.emptyList(), UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "d")), - Collections.singletonList(DefaultIndex.newIndex("idx", Collections.singletonList("a")))) + Collections.singletonList(DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null) val catalogTable = getCatalogTable(resolvedSchema) @@ -366,7 +367,8 @@ object MetadataTestUtil { ), Collections.emptyList(), UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "b")), - Collections.singletonList(DefaultIndex.newIndex("idx", Collections.singletonList("a")))) + Collections.singletonList(DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null) val typeFactory = new FlinkTypeFactory(Thread.currentThread().getContextClassLoader) val rowType = typeFactory.buildRelNodeRowType( @@ -394,7 +396,8 @@ object MetadataTestUtil { ), Collections.emptyList(), UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("b")), - Collections.singletonList(DefaultIndex.newIndex("idx", Collections.singletonList("a")))) + Collections.singletonList(DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null) val catalogTable = getCatalogTable(resolvedSchema) @@ -424,7 +427,8 @@ object MetadataTestUtil { ), Collections.emptyList(), null, - Collections.singletonList(DefaultIndex.newIndex("idx", Collections.singletonList("a")))) + Collections.singletonList(DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null) val catalogTable = getCatalogTable(resolvedSchema) @@ -451,7 +455,8 @@ object MetadataTestUtil { Column.physical("b", DataTypes.BIGINT().notNull())), Collections.emptyList(), UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "b")), - Collections.singletonList(DefaultIndex.newIndex("idx", Collections.singletonList("a")))) + Collections.singletonList(DefaultIndex.newIndex("idx", Collections.singletonList("a"))), + null) val catalogTable = getCatalogTable(resolvedSchema) diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java index 9be9499428c..7a117ef58c2 100644 --- a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java @@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode; import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ImmutableColumnsConstraint; import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; @@ -69,13 +70,20 @@ public class TestFileSystemCatalogTest extends TestFileSystemCatalogTestBase { Column.physical("age", DataTypes.INT()), Column.physical("tss", DataTypes.TIMESTAMP(3)), Column.physical("partition", DataTypes.VARCHAR(10))); - private static final UniqueConstraint CONSTRAINTS = + private static final UniqueConstraint PK_CONSTRAINT = UniqueConstraint.primaryKey("primary_constraint", Collections.singletonList("id")); + private static final ImmutableColumnsConstraint IMMUTABLE_COLS_CONSTRAINT = + ImmutableColumnsConstraint.immutableColumns( + "imt_constraint", Collections.singletonList("name")); private static final List<String> PARTITION_KEYS = Collections.singletonList("partition"); private static final ResolvedSchema CREATE_RESOLVED_SCHEMA = new ResolvedSchema( - CREATE_COLUMNS, Collections.emptyList(), CONSTRAINTS, Collections.emptyList()); + CREATE_COLUMNS, + Collections.emptyList(), + PK_CONSTRAINT, + Collections.emptyList(), + IMMUTABLE_COLS_CONSTRAINT); private static final Schema CREATE_SCHEMA = Schema.newBuilder().fromResolvedSchema(CREATE_RESOLVED_SCHEMA).build();
