This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 58d7acc1f1164b6701fea7f24930a0a0ead1be2c
Author: xuyang <[email protected]>
AuthorDate: Thu Mar 19 16:20:59 2026 +0800

    [FLINK-39286][table] Introduce immutable columns constraint in schema
---
 .../flink/table/sql/CreateTableAsITCase.java       |   5 +-
 .../flink/table/sql/PlannerScalaFreeITCase.java    |   5 +-
 .../flink/table/sql/UsingRemoteJarITCase.java      |   5 +-
 .../table/tests/test_catalog_completeness.py       |   3 +-
 .../apache/flink/table/catalog/CatalogManager.java |   3 +-
 .../flink/table/catalog/DefaultSchemaResolver.java |  76 ++++++++-
 .../table/api/internal/ShowCreateUtilTest.java     |  51 +++++-
 .../catalog/CatalogBaseTableResolutionTest.java    |  17 +-
 .../flink/table/catalog/SchemaResolutionTest.java  |  95 ++++++++++-
 .../java/org/apache/flink/table/api/Schema.java    | 175 ++++++++++++++++++++-
 .../flink/table/catalog/CatalogPropertiesUtil.java |  27 ++++
 .../org/apache/flink/table/catalog/Constraint.java |  20 ++-
 .../table/catalog/ImmutableColumnsConstraint.java  |  97 ++++++++++++
 .../apache/flink/table/catalog/ResolvedSchema.java |  57 ++++++-
 .../apache/flink/table/utils/TableSchemaUtils.java |   3 +-
 .../table/catalog/CatalogPropertiesUtilTest.java   |   7 +-
 .../apache/flink/table/catalog/CatalogTest.java    |   6 +-
 .../flink/table/catalog/TestSchemaResolver.java    |  16 +-
 .../flink/table/utils/TableSchemaUtilsTest.java    |   9 +-
 ...aintMixin.java => AbstractConstraintMixin.java} |  24 +--
 .../nodes/exec/serde/CompiledPlanSerdeUtil.java    |   3 +
 ...n.java => ImmutableColumnsConstraintMixin.java} |  24 +--
 .../exec/serde/ResolvedSchemaJsonDeserializer.java |   7 +-
 .../exec/serde/ResolvedSchemaJsonSerializer.java   |   6 +
 .../nodes/exec/serde/UniqueConstraintMixin.java    |  12 +-
 .../planner/catalog/CatalogConstraintTest.java     |   5 +-
 .../planner/lineage/TableLineageUtilsTest.java     |   3 +-
 ...erializedTableNodeToOperationConverterTest.java |  13 +-
 .../plan/FlinkCalciteCatalogReaderTest.java        |   3 +-
 .../exec/serde/ContextResolvedTableSerdeTest.java  |   6 +-
 .../exec/serde/DynamicTableSinkSpecSerdeTest.java  |  12 +-
 .../serde/DynamicTableSourceSpecSerdeTest.java     |   9 +-
 .../exec/serde/ResolvedCatalogTableSerdeTest.java  |   8 +-
 .../serde/TemporalTableSourceSpecSerdeTest.java    |   3 +-
 .../VectorSearchTableSourceSpecSerdeTest.java      |   3 +-
 .../runtime/stream/sql/DataStreamJavaITCase.java   |   6 +-
 .../planner/plan/metadata/MetadataTestUtil.scala   |  15 +-
 .../catalog/TestFileSystemCatalogTest.java         |  12 +-
 38 files changed, 727 insertions(+), 124 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
index 3d86251def1..06fbe73dd66 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/CreateTableAsITCase.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.DefaultIndex;
+import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.test.util.SQLJobSubmission;
@@ -46,7 +47,9 @@ public class CreateTableAsITCase extends SqlITCaseBase {
                     Collections.emptyList(),
                     UniqueConstraint.primaryKey("pk", 
Collections.singletonList("user_name")),
                     Collections.singletonList(
-                            DefaultIndex.newIndex("idx", 
Collections.singletonList("user_name"))));
+                            DefaultIndex.newIndex("idx", 
Collections.singletonList("user_name"))),
+                    ImmutableColumnsConstraint.immutableColumns(
+                            "imt", Collections.singletonList("user_name")));
 
     private static final DebeziumJsonDeserializationSchema 
DESERIALIZATION_SCHEMA =
             createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
index 58d13ac5895..dbdbae5b551 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/PlannerScalaFreeITCase.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.DefaultIndex;
+import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.test.util.SQLJobSubmission;
@@ -53,7 +54,9 @@ public class PlannerScalaFreeITCase extends SqlITCaseBase {
                     Collections.emptyList(),
                     UniqueConstraint.primaryKey("pk", 
Collections.singletonList("user_name")),
                     Collections.singletonList(
-                            DefaultIndex.newIndex("idx", 
Collections.singletonList("user_name"))));
+                            DefaultIndex.newIndex("idx", 
Collections.singletonList("user_name"))),
+                    ImmutableColumnsConstraint.immutableColumns(
+                            "imt", Collections.singletonList("user_name")));
 
     private static final DebeziumJsonDeserializationSchema 
DESERIALIZATION_SCHEMA =
             createDebeziumDeserializationSchema(SINK_TABLE_SCHEMA);
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
index c49b045252c..0432aa76ecc 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-sql/src/test/java/org/apache/flink/table/sql/UsingRemoteJarITCase.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.DefaultIndex;
+import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
 
@@ -47,7 +48,9 @@ public class UsingRemoteJarITCase extends HdfsITCaseBase {
                     Collections.emptyList(),
                     UniqueConstraint.primaryKey("pk", 
Collections.singletonList("user_name")),
                     Collections.singletonList(
-                            DefaultIndex.newIndex("idx", 
Collections.singletonList("user_name"))));
+                            DefaultIndex.newIndex("idx", 
Collections.singletonList("user_name"))),
+                    ImmutableColumnsConstraint.immutableColumns(
+                            "imt", Collections.singletonList("user_name")));
 
     private static final DebeziumJsonDeserializationSchema 
USER_ORDER_DESERIALIZATION_SCHEMA =
             createDebeziumDeserializationSchema(USER_ORDER_SCHEMA);
diff --git a/flink-python/pyflink/table/tests/test_catalog_completeness.py 
b/flink-python/pyflink/table/tests/test_catalog_completeness.py
index a3e171fc384..c1ab64c96aa 100644
--- a/flink-python/pyflink/table/tests/test_catalog_completeness.py
+++ b/flink-python/pyflink/table/tests/test_catalog_completeness.py
@@ -253,7 +253,8 @@ class 
ResolvedSchemaAPICompletenessTests(PythonAPICompletenessTestCase, PyFlinkT
     @classmethod
     def excluded_methods(cls):
         # getIndexes are not needed in Python API as they are used internally
-        return {'getIndexes'}
+        # TODO FLINK-39319 add immutable column constraint in Python API
+        return {'getIndexes', 'getImmutableColumns', 
'getImmutableColumnIndexes'}
 
 
 if __name__ == '__main__':
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index bc870857807..3bba4c05f4e 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -1984,7 +1984,8 @@ public final class CatalogManager implements 
CatalogRegistry, AutoCloseable {
                                     .collect(Collectors.toList()),
                             resolvedSchema.getWatermarkSpecs(),
                             resolvedSchema.getPrimaryKey().orElse(null),
-                            resolvedSchema.getIndexes());
+                            resolvedSchema.getIndexes(),
+                            resolvedSchema.getImmutableColumns().orElse(null));
             return new ResolvedCatalogView(
                     // pass a view that has the query parsed and
                     // validated already
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
index b44e8437420..4ce5177832d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.Schema.UnresolvedComputedColumn;
+import org.apache.flink.table.api.Schema.UnresolvedImmutableColumns;
 import org.apache.flink.table.api.Schema.UnresolvedIndex;
 import org.apache.flink.table.api.Schema.UnresolvedMetadataColumn;
 import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn;
@@ -92,7 +93,12 @@ class DefaultSchemaResolver implements SchemaResolver {
 
         final List<Index> indexes = resolveIndexes(schema.getIndexes(), 
columnsWithRowtime);
 
-        return new ResolvedSchema(columnsWithRowtime, watermarkSpecs, 
primaryKey, indexes);
+        final ImmutableColumnsConstraint immutableColumns =
+                resolveImmutableColumns(
+                        schema.getImmutableColumns().orElse(null), primaryKey, 
columnsWithRowtime);
+
+        return new ResolvedSchema(
+                columnsWithRowtime, watermarkSpecs, primaryKey, indexes, 
immutableColumns);
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -459,6 +465,74 @@ class DefaultSchemaResolver implements SchemaResolver {
         }
     }
 
+    private @Nullable ImmutableColumnsConstraint resolveImmutableColumns(
+            @Nullable UnresolvedImmutableColumns unresolvedImmutableColumns,
+            @Nullable UniqueConstraint primaryKey,
+            List<Column> columns) {
+        if (unresolvedImmutableColumns == null) {
+            return null;
+        }
+
+        final ImmutableColumnsConstraint immutableColumns =
+                ImmutableColumnsConstraint.immutableColumns(
+                        unresolvedImmutableColumns.getConstraintName(),
+                        unresolvedImmutableColumns.getColumnNames());
+
+        validateImmutableColumns(immutableColumns, primaryKey, columns);
+
+        return immutableColumns;
+    }
+
+    private void validateImmutableColumns(
+            ImmutableColumnsConstraint immutableColumns,
+            @Nullable UniqueConstraint primaryKey,
+            List<Column> columns) {
+        if (primaryKey == null) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid immutable constraint '%s'. "
+                                    + "An immutable constraint must be defined 
on the table that contains primary key.",
+                            immutableColumns.getName()));
+        }
+
+        final Map<String, Column> columnsByNameLookup =
+                columns.stream().collect(Collectors.toMap(Column::getName, 
Function.identity()));
+
+        final Set<String> duplicateColumns =
+                immutableColumns.getColumns().stream()
+                        .filter(
+                                name ->
+                                        
Collections.frequency(immutableColumns.getColumns(), name)
+                                                > 1)
+                        .collect(Collectors.toSet());
+
+        if (!duplicateColumns.isEmpty()) {
+            throw new ValidationException(
+                    String.format(
+                            "Invalid immutable constraint '%s'. "
+                                    + "An immutable constraint must not 
contain duplicate columns. "
+                                    + "Found: %s",
+                            immutableColumns.getName(), duplicateColumns));
+        }
+
+        for (String columnName : immutableColumns.getColumns()) {
+            Column column = columnsByNameLookup.get(columnName);
+            if (column == null) {
+                throw new ValidationException(
+                        String.format(
+                                "Invalid immutable constraint '%s'. Column 
'%s' does not exist.",
+                                immutableColumns.getName(), columnName));
+            }
+
+            if (!column.isPhysical()) {
+                throw new ValidationException(
+                        String.format(
+                                "Invalid immutable constraint '%s'. Column 
'%s' is not a physical column.",
+                                immutableColumns.getName(), columnName));
+            }
+        }
+    }
+
     private ResolvedExpression resolveExpression(
             List<Column> columns, Expression expression, @Nullable DataType 
outputDataType) {
         final LocalReferenceExpression[] localRefs =
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
index 15464afc7e5..51e1a5479fd 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/internal/ShowCreateUtilTest.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogView;
 import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -73,13 +74,24 @@ class ShowCreateUtilTest {
                             Column.metadata("mt_column", DataTypes.STRING(), 
null, true)),
                     List.of(),
                     UniqueConstraint.primaryKey("pk", List.of("id")),
-                    List.of());
+                    List.of(),
+                    null);
 
     private static final ResolvedSchema TWO_COLUMNS_SCHEMA =
             ResolvedSchema.of(
                     Column.physical("id", DataTypes.INT()),
                     Column.physical("name", DataTypes.STRING()));
 
+    private static final ResolvedSchema 
TWO_COLUMNS_SCHEMA_WITH_PRIMARY_KEY_AND_IMMUTABLE_COLS =
+            new ResolvedSchema(
+                    List.of(
+                            Column.physical("id", DataTypes.INT()),
+                            Column.physical("name", DataTypes.STRING())),
+                    List.of(),
+                    UniqueConstraint.primaryKey("pk", List.of("id")),
+                    List.of(),
+                    ImmutableColumnsConstraint.immutableColumns("imt", 
List.of("name")));
+
     @ParameterizedTest(name = "{index}: {2}")
     @MethodSource("argsForShowCreateTable")
     void showCreateTable(
@@ -213,6 +225,23 @@ class ShowCreateUtilTest {
                         + "COMMENT 'Table comment'\n"
                         + "DISTRIBUTED BY RANGE(`1`, `10`) INTO 2 BUCKETS\n");
 
+        addTemporaryAndPermanent(
+                argList,
+                createResolvedTable(
+                        TWO_COLUMNS_SCHEMA_WITH_PRIMARY_KEY_AND_IMMUTABLE_COLS,
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        TableDistribution.of(
+                                TableDistribution.Kind.RANGE, 2, 
Arrays.asList("1", "10")),
+                        "Table comment"),
+                "CREATE %sTABLE `catalogName`.`dbName`.`tableName` (\n"
+                        + "  `id` INT,\n"
+                        + "  `name` VARCHAR(2147483647),\n"
+                        + "  CONSTRAINT `pk` PRIMARY KEY (`id`) NOT ENFORCED\n"
+                        + ")\n"
+                        + "COMMENT 'Table comment'\n"
+                        + "DISTRIBUTED BY RANGE(`1`, `10`) INTO 2 BUCKETS\n");
+
         final Map<String, String> options = new HashMap<>();
         options.put("option_key_a", "option_value_a");
         options.put("option_key_b", "option_value_b");
@@ -315,6 +344,26 @@ class ShowCreateUtilTest {
                                 + "REFRESH_MODE = CONTINUOUS\n"
                                 + "AS SELECT 1\n"));
 
+        argList.add(
+                Arguments.of(
+                        createResolvedMaterialized(
+                                
TWO_COLUMNS_SCHEMA_WITH_PRIMARY_KEY_AND_IMMUTABLE_COLS,
+                                null,
+                                List.of(),
+                                null,
+                                IntervalFreshness.ofMinute("1"),
+                                RefreshMode.CONTINUOUS,
+                                "SELECT 1",
+                                "SELECT 1"),
+                        "CREATE MATERIALIZED TABLE 
`catalogName`.`dbName`.`materializedTableName` (\n"
+                                + "  `id` INT,\n"
+                                + "  `name` VARCHAR(2147483647),\n"
+                                + "  CONSTRAINT `pk` PRIMARY KEY (`id`) NOT 
ENFORCED\n"
+                                + ")\n"
+                                + "FRESHNESS = INTERVAL '1' MINUTE\n"
+                                + "REFRESH_MODE = CONTINUOUS\n"
+                                + "AS SELECT 1\n"));
+
         argList.add(
                 Arguments.of(
                         createResolvedMaterialized(
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index 05ddc34a23b..92035ccc944 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -89,6 +89,7 @@ class CatalogBaseTableResolutionTest {
                     .watermark("ts", WATERMARK_SQL)
                     .primaryKeyNamed("primary_constraint", "id")
                     .indexNamed("idx", Collections.singletonList("id"))
+                    .immutableColumnsNamed("imt", 
Collections.singletonList("region"))
                     .build();
 
     private static final Schema MATERIALIZED_TABLE_SCHEMA =
@@ -101,6 +102,7 @@ class CatalogBaseTableResolutionTest {
                     .withComment("") // empty column comment
                     .primaryKeyNamed("primary_constraint", "id")
                     .indexNamed("idx", Collections.singletonList("id"))
+                    .immutableColumnsNamed("imt", 
Collections.singletonList("region"))
                     .build();
 
     private static final TableSchema LEGACY_TABLE_SCHEMA =
@@ -138,7 +140,9 @@ class CatalogBaseTableResolutionTest {
                     UniqueConstraint.primaryKey(
                             "primary_constraint", 
Collections.singletonList("id")),
                     Collections.singletonList(
-                            DefaultIndex.newIndex("idx", 
Collections.singletonList("id"))));
+                            DefaultIndex.newIndex("idx", 
Collections.singletonList("id"))),
+                    ImmutableColumnsConstraint.immutableColumns(
+                            "imt", Collections.singletonList("region")));
 
     private static final ResolvedSchema RESOLVED_MATERIALIZED_TABLE_SCHEMA =
             new ResolvedSchema(
@@ -152,7 +156,9 @@ class CatalogBaseTableResolutionTest {
                     UniqueConstraint.primaryKey(
                             "primary_constraint", 
Collections.singletonList("id")),
                     Collections.singletonList(
-                            DefaultIndex.newIndex("idx", 
Collections.singletonList("id"))));
+                            DefaultIndex.newIndex("idx", 
Collections.singletonList("id"))),
+                    ImmutableColumnsConstraint.immutableColumns(
+                            "imt", Collections.singletonList("region")));
 
     private static final ContinuousRefreshHandler CONTINUOUS_REFRESH_HANDLER =
             new ContinuousRefreshHandler(
@@ -171,7 +177,8 @@ class CatalogBaseTableResolutionTest {
                             Column.physical("county", DataTypes.VARCHAR(200))),
                     Collections.emptyList(),
                     null,
-                    Collections.emptyList());
+                    Collections.emptyList(),
+                    null);
 
     @Test
     void testCatalogTableResolution() {
@@ -397,6 +404,8 @@ class CatalogBaseTableResolutionTest {
         properties.put("schema.primary-key.columns", "id");
         properties.put("schema.index.0.name", "idx");
         properties.put("schema.index.0.columns", "id");
+        properties.put("schema.immutable.name", "imt");
+        properties.put("schema.immutable.columns", "region");
         properties.put("partition.keys.0.name", "region");
         properties.put("partition.keys.1.name", "county");
         properties.put("version", "12");
@@ -424,6 +433,8 @@ class CatalogBaseTableResolutionTest {
         properties.put("schema.primary-key.columns", "id");
         properties.put("schema.index.0.name", "idx");
         properties.put("schema.index.0.columns", "id");
+        properties.put("schema.immutable.name", "imt");
+        properties.put("schema.immutable.columns", "region");
         properties.put("freshness-interval", "30");
         properties.put("freshness-unit", "SECOND");
         properties.put("logical-refresh-mode", "CONTINUOUS");
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
index 58510130796..3519ade2907 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/SchemaResolutionTest.java
@@ -90,6 +90,7 @@ class SchemaResolutionTest {
                     .watermark("ts", WATERMARK_SQL)
                     .columnByExpression("proctime", PROCTIME_SQL)
                     .indexNamed("idx", Collections.singletonList("counter"))
+                    .immutableColumnsNamed("imt", 
Collections.singletonList("payload"))
                     .build();
 
     // the type of ts_ltz is TIMESTAMP_LTZ
@@ -141,8 +142,9 @@ class SchemaResolutionTest {
                         UniqueConstraint.primaryKey(
                                 "primary_constraint", 
Collections.singletonList("id")),
                         Collections.singletonList(
-                                DefaultIndex.newIndex(
-                                        "idx", 
Collections.singletonList("counter"))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("counter"))),
+                        ImmutableColumnsConstraint.immutableColumns(
+                                "imt", Collections.singletonList("payload")));
 
         final ResolvedSchema actualStreamSchema = resolveSchema(SCHEMA, true);
         {
@@ -172,7 +174,8 @@ class SchemaResolutionTest {
                                 WatermarkSpec.of("ts1", 
WATERMARK_RESOLVED_WITH_TS_LTZ)),
                         null,
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("id"))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("id"))),
+                        null);
 
         final ResolvedSchema actualStreamSchema = 
resolveSchema(SCHEMA_WITH_TS_LTZ, true);
         {
@@ -202,7 +205,8 @@ class SchemaResolutionTest {
                                                 DataTypes.TIMESTAMP_LTZ(1)))),
                         null,
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("ts_ltz"))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("ts_ltz"))),
+                        null);
         final ResolvedSchema resolvedSchema =
                 resolveSchema(
                         Schema.newBuilder()
@@ -352,6 +356,40 @@ class SchemaResolutionTest {
                         .build(),
                 "Invalid index 'INDEX_ts'. "
                         + "Column 'ts' is not a physical column or a metadata 
column.");
+
+        // immutable constraints
+
+        testError(
+                Schema.newBuilder().column("id", 
DataTypes.INT()).immutableColumns("id").build(),
+                "An immutable constraint must be defined on the table that 
contains primary key.");
+
+        testError(
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT().notNull())
+                        .primaryKey("id")
+                        .immutableColumns("INVALID")
+                        .build(),
+                "Column 'INVALID' does not exist.");
+
+        testError(
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT().notNull())
+                        .column("orig_ts", DataTypes.TIMESTAMP(3))
+                        .columnByExpression("ts", COMPUTED_SQL)
+                        .primaryKey("id")
+                        .immutableColumns("ts")
+                        .build(),
+                "Column 'ts' is not a physical column.");
+
+        testError(
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT().notNull())
+                        .column("name", DataTypes.STRING())
+                        .primaryKey("id")
+                        .immutableColumns("name", "name")
+                        .build(),
+                "Invalid immutable constraint 'IMMUTABLE_COLUMNS_name_name'. "
+                        + "An immutable constraint must not contain duplicate 
columns. Found: [name]");
     }
 
     @Test
@@ -371,6 +409,7 @@ class SchemaResolutionTest {
                                         .indexNamed("idx", null)
                                         .build())
                 .hasMessageContaining("Index column names must not be null.");
+
         assertThatThrownBy(
                         () ->
                                 Schema.newBuilder()
@@ -380,6 +419,48 @@ class SchemaResolutionTest {
                 .hasMessageContaining("Index must be defined for at least a 
single column.");
     }
 
+    @Test
+    void testImmutableColumnsBuildingErrors() {
+        assertThatThrownBy(
+                        () ->
+                                Schema.newBuilder()
+                                        .column("a", DataTypes.INT().notNull())
+                                        .primaryKey("a")
+                                        .immutableColumnsNamed(null, 
Collections.singletonList("a"))
+                                        .build())
+                .hasMessageContaining("Immutable constraint name must not be 
null.");
+
+        assertThatThrownBy(
+                        () ->
+                                Schema.newBuilder()
+                                        .column("a", DataTypes.INT())
+                                        .primaryKey("a")
+                                        .immutableColumnsNamed("imt", 
(String[]) null)
+                                        .build())
+                .hasMessageContaining("Immutable column names must not be 
null.");
+
+        assertThatThrownBy(
+                        () ->
+                                Schema.newBuilder()
+                                        .column("a", DataTypes.INT())
+                                        .primaryKey("a")
+                                        .immutableColumnsNamed("idx", 
Collections.emptyList())
+                                        .build())
+                .hasMessageContaining(
+                        "Immutable constraint must be defined for at least a 
single column.");
+
+        assertThatThrownBy(
+                        () ->
+                                Schema.newBuilder()
+                                        .column("a", DataTypes.INT().notNull())
+                                        .column("b", DataTypes.INT().notNull())
+                                        .primaryKey("a")
+                                        .immutableColumns("a")
+                                        .immutableColumns("b")
+                                        .build())
+                .hasMessageContaining("Multiple immutable constraints are not 
supported.");
+    }
+
     @Test
     void testUnresolvedSchemaString() {
         assertThat(SCHEMA.toString())
@@ -394,7 +475,8 @@ class SchemaResolutionTest {
                                 + "  `proctime` AS [PROCTIME()],\n"
                                 + "  WATERMARK FOR `ts` AS [ts - INTERVAL '5' 
SECOND],\n"
                                 + "  CONSTRAINT `primary_constraint` PRIMARY 
KEY (`id`) NOT ENFORCED,\n"
-                                + "  INDEX `idx` (`counter`)\n"
+                                + "  INDEX `idx` (`counter`),\n"
+                                + "  CONSTRAINT `imt` COLUMNS (`payload`) 
IMMUTABLE NOT ENFORCED\n"
                                 + ")");
     }
 
@@ -413,7 +495,8 @@ class SchemaResolutionTest {
                                 + "  `proctime` TIMESTAMP_LTZ(3) NOT NULL 
*PROCTIME* AS PROCTIME(),\n"
                                 + "  WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - 
INTERVAL '5' SECOND,\n"
                                 + "  CONSTRAINT `primary_constraint` PRIMARY 
KEY (`id`) NOT ENFORCED,\n"
-                                + "  INDEX `idx` (`counter`)\n"
+                                + "  INDEX `idx` (`counter`),\n"
+                                + "  CONSTRAINT `imt` COLUMNS (`payload`) 
IMMUTABLE NOT ENFORCED\n"
                                 + ")");
     }
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
index 654e919dfe5..d22edb91b25 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.catalog.Column.ComputedColumn;
 import org.apache.flink.table.catalog.Column.MetadataColumn;
 import org.apache.flink.table.catalog.Column.PhysicalColumn;
 import org.apache.flink.table.catalog.Constraint;
+import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 import org.apache.flink.table.catalog.Index;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.SchemaResolver;
@@ -80,7 +81,12 @@ public final class Schema {
 
     private final List<UnresolvedIndex> indexes;
 
-    /** Please use {@link #Schema(List, List, UnresolvedPrimaryKey, List)} 
instead. */
+    private final @Nullable UnresolvedImmutableColumns immutableColumns;
+
+    /**
+     * Please use {@link #Schema(List, List, UnresolvedPrimaryKey, List,
+     * UnresolvedImmutableColumns)} instead.
+     */
     @Deprecated
     public Schema(
             List<UnresolvedColumn> columns,
@@ -94,10 +100,20 @@ public final class Schema {
             List<UnresolvedWatermarkSpec> watermarkSpecs,
             @Nullable UnresolvedPrimaryKey primaryKey,
             List<UnresolvedIndex> indexes) {
+        this(columns, watermarkSpecs, primaryKey, indexes, null);
+    }
+
+    public Schema(
+            List<UnresolvedColumn> columns,
+            List<UnresolvedWatermarkSpec> watermarkSpecs,
+            @Nullable UnresolvedPrimaryKey primaryKey,
+            List<UnresolvedIndex> indexes,
+            @Nullable UnresolvedImmutableColumns immutableColumns) {
         this.columns = Collections.unmodifiableList(columns);
         this.watermarkSpecs = Collections.unmodifiableList(watermarkSpecs);
         this.primaryKey = primaryKey;
         this.indexes = Collections.unmodifiableList(indexes);
+        this.immutableColumns = immutableColumns;
     }
 
     /** Builder for configuring and creating instances of {@link Schema}. */
@@ -134,6 +150,10 @@ public final class Schema {
         return indexes;
     }
 
+    public Optional<UnresolvedImmutableColumns> getImmutableColumns() {
+        return Optional.ofNullable(immutableColumns);
+    }
+
     /** Resolves the given {@link Schema} to a validated {@link 
ResolvedSchema}. */
     public ResolvedSchema resolve(SchemaResolver resolver) {
         return resolver.resolve(this);
@@ -150,6 +170,9 @@ public final class Schema {
         if (!indexes.isEmpty()) {
             components.addAll(indexes);
         }
+        if (immutableColumns != null) {
+            components.add(immutableColumns);
+        }
         return components.stream()
                 .map(Objects::toString)
                 .map(s -> "  " + s)
@@ -168,12 +191,13 @@ public final class Schema {
         return columns.equals(schema.columns)
                 && watermarkSpecs.equals(schema.watermarkSpecs)
                 && Objects.equals(primaryKey, schema.primaryKey)
-                && indexes.equals(schema.indexes);
+                && indexes.equals(schema.indexes)
+                && Objects.equals(immutableColumns, schema.immutableColumns);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(columns, watermarkSpecs, primaryKey, indexes);
+        return Objects.hash(columns, watermarkSpecs, primaryKey, indexes, 
immutableColumns);
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -190,6 +214,8 @@ public final class Schema {
 
         private final List<UnresolvedIndex> indexes;
 
+        private @Nullable UnresolvedImmutableColumns immutableColumns;
+
         private Builder() {
             columns = new ArrayList<>();
             watermarkSpecs = new ArrayList<>();
@@ -206,6 +232,11 @@ public final class Schema {
                         unresolvedSchema.primaryKey.getColumnNames());
             }
             indexes.addAll(unresolvedSchema.indexes);
+            if (unresolvedSchema.immutableColumns != null) {
+                immutableColumnsNamed(
+                        unresolvedSchema.immutableColumns.getConstraintName(),
+                        unresolvedSchema.immutableColumns.getColumnNames());
+            }
             return this;
         }
 
@@ -215,6 +246,7 @@ public final class Schema {
             addResolvedWatermarkSpec(resolvedSchema.getWatermarkSpecs());
             
resolvedSchema.getPrimaryKey().ifPresent(this::addResolvedConstraint);
             addResolvedIndexes(resolvedSchema.getIndexes());
+            
resolvedSchema.getImmutableColumns().ifPresent(this::addResolvedConstraint);
             return this;
         }
 
@@ -672,9 +704,87 @@ public final class Schema {
             return this;
         }
 
+        /**
+         * Declares an immutable columns constraint for a list of given 
columns. Immutable columns
+         * constraint is used to identify which columns in a table are not 
allowed to be modified.
+         * Currently, this constraint is informational only and is not 
enforced. It can be utilized
+         * for optimization purposes. It is the responsibility of the data 
owner to ensure that
+         * these columns are unmodified.
+         *
+         * <p>The immutable columns will be assigned a generated name in the 
format {@code
+         * IMMUTABLE_COLUMNS_col1_col2}.
+         *
+         * @param columnNames columns that form the constraint for immutable 
columns
+         */
+        public Builder immutableColumns(String... columnNames) {
+            Preconditions.checkNotNull(columnNames, "Immutable column names 
must not be null.");
+            return immutableColumns(Arrays.asList(columnNames));
+        }
+
+        /**
+         * Declares an immutable columns constraint for a list of given 
columns. Immutable columns
+         * constraint is used to identify which columns in a table are not 
allowed to be modified.
+         * Currently, this constraint is informational only and is not 
enforced. It can be utilized
+         * for optimization purposes. It is the responsibility of the data 
owner to ensure that
+         * these columns are unmodified.
+         *
+         * <p>The immutable columns will be assigned a generated name in the 
format {@code
+         * IMMUTABLE_COLUMNS_col1_col2}.
+         *
+         * @param columnNames columns that form the constraint for immutable 
columns
+         */
+        public Builder immutableColumns(List<String> columnNames) {
+            Preconditions.checkNotNull(columnNames, "Immutable column names 
must not be null.");
+            final String generatedConstraintName =
+                    columnNames.stream().collect(Collectors.joining("_", 
"IMMUTABLE_COLUMNS_", ""));
+            return immutableColumnsNamed(generatedConstraintName, columnNames);
+        }
+
+        /**
+         * Declares an immutable columns constraint for a list of given 
columns. Immutable columns
+         * constraint is used to identify which columns in a table are not 
allowed to be modified.
+         * Currently, this constraint is informational only and is not 
enforced. It can be utilized
+         * for optimization purposes. It is the responsibility of the data 
owner to ensure that
+         * these columns are unmodified.
+         *
+         * @param constraintName name for the immutable columns constraint, 
can be used to reference
+         *     this constraint
+         * @param columnNames columns that form the constraint for immutable 
columns
+         */
+        public Builder immutableColumnsNamed(String constraintName, String... 
columnNames) {
+            Preconditions.checkNotNull(columnNames, "Immutable column names 
must not be null.");
+            return immutableColumnsNamed(constraintName, 
Arrays.asList(columnNames));
+        }
+
+        /**
+         * Declares an immutable columns constraint for a list of given 
columns. Immutable columns
+         * constraint is used to identify which columns in a table are not 
allowed to be modified.
+         * Currently, this constraint is informational only and is not 
enforced. It can be utilized
+         * for optimization purposes. It is the responsibility of the data 
owner to ensure that
+         * these columns are unmodified.
+         *
+         * @param constraintName name for the immutable columns constraint, 
can be used to reference
+         *     this constraint
+         * @param columnNames columns that form the constraint for immutable 
columns
+         */
+        public Builder immutableColumnsNamed(String constraintName, 
List<String> columnNames) {
+            Preconditions.checkState(
+                    immutableColumns == null, "Multiple immutable constraints 
are not supported.");
+            Preconditions.checkNotNull(
+                    constraintName, "Immutable constraint name must not be 
null.");
+            Preconditions.checkArgument(
+                    !StringUtils.isNullOrWhitespaceOnly(constraintName),
+                    "Immutable constraint name must not be empty.");
+            Preconditions.checkArgument(
+                    columnNames != null && !columnNames.isEmpty(),
+                    "Immutable constraint must be defined for at least a 
single column.");
+            immutableColumns = new UnresolvedImmutableColumns(constraintName, 
columnNames);
+            return this;
+        }
+
         /** Returns an instance of an unresolved {@link Schema}. */
         public Schema build() {
-            return new Schema(columns, watermarkSpecs, primaryKey, indexes);
+            return new Schema(columns, watermarkSpecs, primaryKey, indexes, 
immutableColumns);
         }
 
         // 
----------------------------------------------------------------------------------------
@@ -709,9 +819,13 @@ public final class Schema {
                                             s.getRowtimeAttribute(), 
s.getWatermarkExpression())));
         }
 
-        private void addResolvedConstraint(UniqueConstraint constraint) {
+        private void addResolvedConstraint(Constraint constraint) {
             if (constraint.getType() == Constraint.ConstraintType.PRIMARY_KEY) 
{
-                primaryKeyNamed(constraint.getName(), constraint.getColumns());
+                primaryKeyNamed(constraint.getName(), ((UniqueConstraint) 
constraint).getColumns());
+            } else if (constraint.getType() == 
Constraint.ConstraintType.IMMUTABLE_COLUMNS) {
+                immutableColumnsNamed(
+                        constraint.getName(),
+                        ((ImmutableColumnsConstraint) 
constraint).getColumns());
             } else {
                 throw new IllegalArgumentException("Unsupported constraint 
type.");
             }
@@ -1173,4 +1287,53 @@ public final class Schema {
             return Objects.hash(indexName, columnNames);
         }
     }
+
+    /**
+     * Declaration of a list of immutable columns that will be resolved to 
{@link
+     * ImmutableColumnsConstraint} during schema resolution.
+     */
+    @PublicEvolving
+    public static final class UnresolvedImmutableColumns extends 
UnresolvedConstraint {
+
+        private final List<String> columnNames;
+
+        public UnresolvedImmutableColumns(String constraintName, List<String> 
columnNames) {
+            super(constraintName);
+            this.columnNames = columnNames;
+        }
+
+        public List<String> getColumnNames() {
+            return columnNames;
+        }
+
+        @Override
+        public String toString() {
+            return String.format(
+                    "%s COLUMNS (%s) IMMUTABLE NOT ENFORCED",
+                    super.toString(),
+                    columnNames.stream()
+                            .map(EncodingUtils::escapeIdentifier)
+                            .collect(Collectors.joining(", ")));
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            if (!super.equals(o)) {
+                return false;
+            }
+            UnresolvedImmutableColumns that = (UnresolvedImmutableColumns) o;
+            return columnNames.equals(that.columnNames);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(super.hashCode(), columnNames);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
index 16e9ba8c224..67b72bdeb68 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java
@@ -376,6 +376,8 @@ public final class CatalogPropertiesUtil {
 
     private static final String PRIMARY_KEY = "primary-key";
 
+    private static final String IMMUTABLE = "immutable";
+
     private static final String COLUMNS = "columns";
 
     private static final String PARTITION = "partition";
@@ -399,6 +401,10 @@ public final class CatalogPropertiesUtil {
 
     private static final String PRIMARY_KEY_COLUMNS = compoundKey(PRIMARY_KEY, 
COLUMNS);
 
+    private static final String IMMUTABLE_NAME = compoundKey(IMMUTABLE, NAME);
+
+    private static final String IMMUTABLE_COLUMNS = compoundKey(IMMUTABLE, 
COLUMNS);
+
     private static final String INDEX = "index";
 
     private static final String INDEX_NAME = "name";
@@ -515,6 +521,8 @@ public final class CatalogPropertiesUtil {
 
         deserializePrimaryKey(map, schemaKey, builder);
 
+        deserializeImmutableCols(map, schemaKey, builder);
+
         deserializeIndexes(map, schemaKey, builder);
 
         return builder.build();
@@ -545,6 +553,17 @@ public final class CatalogPropertiesUtil {
         }
     }
 
+    private static void deserializeImmutableCols(
+            Map<String, String> map, String schemaKey, Builder builder) {
+        final String constraintNameKey = compoundKey(schemaKey, 
IMMUTABLE_NAME);
+        final String columnsKey = compoundKey(schemaKey, IMMUTABLE_COLUMNS);
+        if (map.containsKey(constraintNameKey)) {
+            final String constraintName = getValue(map, constraintNameKey);
+            final String[] columns = getValue(map, columnsKey, s -> 
s.split(","));
+            builder.immutableColumnsNamed(constraintName, columns);
+        }
+    }
+
     private static void deserializeWatermark(
             Map<String, String> map, String schemaKey, Builder builder) {
         final String watermarkKey = compoundKey(schemaKey, WATERMARK);
@@ -654,6 +673,8 @@ public final class CatalogPropertiesUtil {
 
         schema.getPrimaryKey().ifPresent(pk -> serializePrimaryKey(map, pk));
 
+        schema.getImmutableColumns().ifPresent(ics -> 
serializeImmutableCols(map, ics));
+
         serializeIndexes(map, schema.getIndexes());
     }
 
@@ -679,6 +700,12 @@ public final class CatalogPropertiesUtil {
                 String.join(",", constraint.getColumns()));
     }
 
+    private static void serializeImmutableCols(
+            Map<String, String> map, ImmutableColumnsConstraint constraint) {
+        map.put(compoundKey(SCHEMA, IMMUTABLE_NAME), constraint.getName());
+        map.put(compoundKey(SCHEMA, IMMUTABLE_COLUMNS), String.join(",", 
constraint.getColumns()));
+    }
+
     private static void serializeWatermarkSpecs(
             Map<String, String> map, List<WatermarkSpec> specs, SqlFactory 
sqlFactory) {
         if (!specs.isEmpty()) {
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java
index f0c363e9f02..1f0c4359c64 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Constraint.java
@@ -46,19 +46,23 @@ public interface Constraint {
     /**
      * Type of the constraint.
      *
-     * <p>Unique constraints:
-     *
      * <ul>
-     *   <li>UNIQUE - is satisfied if and only if there do not exist two rows 
that have same
-     *       non-null values in the unique columns
-     *   <li>PRIMARY KEY - additionally to UNIQUE constraint, it requires none 
of the values in
-     *       specified columns be a null value. Moreover there can be only a 
single PRIMARY KEY
-     *       defined for a Table.
+     *   <li>Unique constraints:
+     *       <ul>
+     *         <li>UNIQUE - is satisfied if and only if there do not exist two 
rows that have same
+     *             non-null values in the unique columns
+     *         <li>PRIMARY KEY - additionally to UNIQUE constraint, it 
requires none of the values
+     *             in specified columns be a null value. Moreover there can be 
only a single PRIMARY
+     *             KEY defined for a Table.
+     *       </ul>
+     *   <li>Immutable constraint - is satisfied iff these specific columns 
are not allowed to be
+     *       modified.
      * </ul>
      */
     @PublicEvolving
     enum ConstraintType {
         PRIMARY_KEY,
-        UNIQUE_KEY
+        UNIQUE_KEY,
+        IMMUTABLE_COLUMNS
     }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ImmutableColumnsConstraint.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ImmutableColumnsConstraint.java
new file mode 100644
index 00000000000..e1426d0b659
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ImmutableColumnsConstraint.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.utils.EncodingUtils;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Immutable columns constraint is used to identify which columns in a table 
are not allowed to be
+ * modified.
+ *
+ * @see ConstraintType
+ */
+@PublicEvolving
+public final class ImmutableColumnsConstraint extends AbstractConstraint {
+
+    private final List<String> columns;
+    private final ConstraintType type;
+
+    /** Creates a non enforced {@link ConstraintType#IMMUTABLE_COLUMNS} 
constraint. */
+    public static ImmutableColumnsConstraint immutableColumns(String name, 
List<String> columns) {
+        return new ImmutableColumnsConstraint(
+                name, false, ConstraintType.IMMUTABLE_COLUMNS, columns);
+    }
+
+    private ImmutableColumnsConstraint(
+            String name, boolean enforced, ConstraintType type, List<String> 
columns) {
+        super(name, enforced);
+
+        this.columns = columns;
+        this.type = type;
+
+        if (type != ConstraintType.IMMUTABLE_COLUMNS) {
+            throw new IllegalStateException("Unknown key type: " + getType());
+        }
+    }
+
+    public List<String> getColumns() {
+        return columns;
+    }
+
+    @Override
+    public ConstraintType getType() {
+        return ConstraintType.IMMUTABLE_COLUMNS;
+    }
+
+    @Override
+    public String asSummaryString() {
+        return String.format(
+                "CONSTRAINT %s COLUMNS (%s) IMMUTABLE%s",
+                EncodingUtils.escapeIdentifier(getName()),
+                columns.stream()
+                        .map(EncodingUtils::escapeIdentifier)
+                        .collect(Collectors.joining(", ")),
+                isEnforced() ? "" : " NOT ENFORCED");
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        ImmutableColumnsConstraint that = (ImmutableColumnsConstraint) o;
+        return Objects.equals(columns, that.columns);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), columns);
+    }
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
index 916733b44b7..b2871b6e56a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedSchema.java
@@ -64,8 +64,12 @@ public final class ResolvedSchema {
     private final List<WatermarkSpec> watermarkSpecs;
     private final @Nullable UniqueConstraint primaryKey;
     private final List<Index> indexes;
+    private final @Nullable ImmutableColumnsConstraint immutableColumns;
 
-    /** Please use {@link #ResolvedSchema(List, List, UniqueConstraint, List)} 
instead. */
+    /**
+     * Please use {@link #ResolvedSchema(List, List, UniqueConstraint, List,
+     * ImmutableColumnsConstraint)} instead.
+     */
     @Deprecated
     public ResolvedSchema(
             List<Column> columns,
@@ -79,16 +83,31 @@ public final class ResolvedSchema {
             List<WatermarkSpec> watermarkSpecs,
             @Nullable UniqueConstraint primaryKey,
             List<Index> indexes) {
+        this(columns, watermarkSpecs, primaryKey, indexes, null);
+    }
+
+    public ResolvedSchema(
+            List<Column> columns,
+            List<WatermarkSpec> watermarkSpecs,
+            @Nullable UniqueConstraint primaryKey,
+            List<Index> indexes,
+            @Nullable ImmutableColumnsConstraint immutableColumns) {
         this.columns = Preconditions.checkNotNull(columns, "Columns must not 
be null.");
         this.watermarkSpecs =
                 Preconditions.checkNotNull(watermarkSpecs, "Watermark specs 
must not be null.");
         this.primaryKey = primaryKey;
         this.indexes = Preconditions.checkNotNull(indexes, "Indexes must not 
be null.");
+        this.immutableColumns = immutableColumns;
+
+        Preconditions.checkArgument(
+                primaryKey != null || immutableColumns == null,
+                "Immutable constraint must be defined on the table that 
contains primary key.");
     }
 
     /** Shortcut for a resolved schema of only columns. */
     public static ResolvedSchema of(List<Column> columns) {
-        return new ResolvedSchema(columns, Collections.emptyList(), null, 
Collections.emptyList());
+        return new ResolvedSchema(
+                columns, Collections.emptyList(), null, 
Collections.emptyList(), null);
     }
 
     /** Shortcut for a resolved schema of only columns. */
@@ -106,7 +125,8 @@ public final class ResolvedSchema {
                 IntStream.range(0, columnNames.size())
                         .mapToObj(i -> Column.physical(columnNames.get(i), 
columnDataTypes.get(i)))
                         .collect(Collectors.toList());
-        return new ResolvedSchema(columns, Collections.emptyList(), null, 
Collections.emptyList());
+        return new ResolvedSchema(
+                columns, Collections.emptyList(), null, 
Collections.emptyList(), null);
     }
 
     /** Shortcut for a resolved schema of only physical columns. */
@@ -180,6 +200,11 @@ public final class ResolvedSchema {
         return indexes;
     }
 
+    /** Returns the constraint about immutable columns if it has been defined. 
*/
+    public Optional<ImmutableColumnsConstraint> getImmutableColumns() {
+        return Optional.ofNullable(immutableColumns);
+    }
+
     /**
      * Returns the primary key indexes in the {@link 
#toPhysicalRowDataType()}, if any, otherwise
      * returns an empty array.
@@ -196,6 +221,24 @@ public final class ResolvedSchema {
                 .orElseGet(() -> new int[] {});
     }
 
+    /**
+     * Returns the indexes of columns about the immutable constraint in the 
{@link
+     * #toPhysicalRowDataType()}, if any, otherwise returns an empty array.
+     */
+    public int[] getImmutableColumnIndexes() {
+        final List<String> columns =
+                getColumns().stream()
+                        .filter(Column::isPhysical)
+                        .map(Column::getName)
+                        .collect(Collectors.toList());
+        return getImmutableColumns()
+                .map(ImmutableColumnsConstraint::getColumns)
+                .map(
+                        immutableColumns ->
+                                
immutableColumns.stream().mapToInt(columns::indexOf).toArray())
+                .orElseGet(() -> new int[] {});
+    }
+
     /**
      * Converts all columns of this schema into a (possibly nested) row data 
type.
      *
@@ -253,6 +296,9 @@ public final class ResolvedSchema {
             components.add(primaryKey);
         }
         components.addAll(indexes);
+        if (immutableColumns != null) {
+            components.add(immutableColumns);
+        }
         return components.stream()
                 .map(Objects::toString)
                 .map(s -> "  " + s)
@@ -271,12 +317,13 @@ public final class ResolvedSchema {
         return Objects.equals(columns, that.columns)
                 && Objects.equals(watermarkSpecs, that.watermarkSpecs)
                 && Objects.equals(primaryKey, that.primaryKey)
-                && Objects.equals(indexes, that.indexes);
+                && Objects.equals(indexes, that.indexes)
+                && Objects.equals(immutableColumns, that.immutableColumns);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(columns, watermarkSpecs, primaryKey, indexes);
+        return Objects.hash(columns, watermarkSpecs, primaryKey, indexes, 
immutableColumns);
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
index ad541ff7d26..21cb1900f77 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/TableSchemaUtils.java
@@ -127,7 +127,8 @@ public class TableSchemaUtils {
                         .collect(Collectors.toList()),
                 resolvedSchema.getWatermarkSpecs(),
                 resolvedSchema.getPrimaryKey().orElse(null),
-                resolvedSchema.getIndexes());
+                resolvedSchema.getIndexes(),
+                resolvedSchema.getImmutableColumns().orElse(null));
     }
 
     /**
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
index 6800f8e2ced..43eb735c4c9 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogPropertiesUtilTest.java
@@ -120,6 +120,7 @@ class CatalogPropertiesUtilTest {
                         .column("f2", 
DataTypes.STRING().getLogicalType().asSerializableString())
                         .primaryKey("f1")
                         .indexNamed("f1", Collections.singletonList("f1"))
+                        .immutableColumns("f2")
                         .build();
 
         final TableDistribution hashDist =
@@ -137,8 +138,12 @@ class CatalogPropertiesUtilTest {
         List<Index> indexes =
                 Collections.singletonList(
                         DefaultIndex.newIndex("f1", 
Collections.singletonList("f1")));
+        final ImmutableColumnsConstraint immutableColumns =
+                ImmutableColumnsConstraint.immutableColumns(
+                        "IMMUTABLE_COLUMNS_f2", 
Collections.singletonList("f2"));
         final ResolvedSchema resolvedSchema =
-                new ResolvedSchema(columns, Collections.emptyList(), 
primaryKey, indexes);
+                new ResolvedSchema(
+                        columns, Collections.emptyList(), primaryKey, indexes, 
immutableColumns);
 
         return Stream.of(
                 new ResolvedCatalogTable(
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
index 937e99c46ef..228b5485f01 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/CatalogTest.java
@@ -1623,7 +1623,8 @@ public abstract class CatalogTest {
                 Collections.emptyList(),
                 null,
                 Collections.singletonList(
-                        DefaultIndex.newIndex("idx", 
Collections.singletonList("first"))));
+                        DefaultIndex.newIndex("idx", 
Collections.singletonList("first"))),
+                null);
     }
 
     protected ResolvedSchema createAnotherSchema() {
@@ -1635,7 +1636,8 @@ public abstract class CatalogTest {
                 Collections.emptyList(),
                 null,
                 Collections.singletonList(
-                        DefaultIndex.newIndex("idx", 
Collections.singletonList("first"))));
+                        DefaultIndex.newIndex("idx", 
Collections.singletonList("first"))),
+                null);
     }
 
     protected List<String> createPartitionKeys() {
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java
index 454691c0921..5ba20ef1fbc 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/catalog/TestSchemaResolver.java
@@ -51,7 +51,10 @@ public class TestSchemaResolver implements SchemaResolver {
 
         final List<Index> indexes = resolveIndexes(schema.getIndexes());
 
-        return new ResolvedSchema(columns, watermarkSpecs, primaryKey, 
indexes);
+        final ImmutableColumnsConstraint immutableColumns =
+                
resolveImmutableColumns(schema.getImmutableColumns().orElse(null));
+
+        return new ResolvedSchema(columns, watermarkSpecs, primaryKey, 
indexes, immutableColumns);
     }
 
     private List<Index> resolveIndexes(List<Schema.UnresolvedIndex> 
unresolvedIndexes) {
@@ -124,6 +127,17 @@ public class TestSchemaResolver implements SchemaResolver {
                 unresolvedPrimaryKey.getConstraintName(), 
unresolvedPrimaryKey.getColumnNames());
     }
 
+    private @Nullable ImmutableColumnsConstraint resolveImmutableColumns(
+            @Nullable Schema.UnresolvedImmutableColumns 
unresolvedImmutableColumns) {
+        if (unresolvedImmutableColumns == null) {
+            return null;
+        }
+
+        return ImmutableColumnsConstraint.immutableColumns(
+                unresolvedImmutableColumns.getConstraintName(),
+                unresolvedImmutableColumns.getColumnNames());
+    }
+
     private ResolvedExpression resolveExpression(Expression expression) {
         if (expression instanceof ResolvedExpression) {
             return (ResolvedExpression) expression;
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
index 97e6664834b..4bfa97673cb 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/TableSchemaUtilsTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.DefaultIndex;
+import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.catalog.WatermarkSpec;
@@ -104,7 +105,9 @@ class TableSchemaUtilsTest {
                                 WatermarkSpec.of("t", 
ResolvedExpressionMock.of(rowTimeType, "t"))),
                         UniqueConstraint.primaryKey("test-pk", 
Collections.singletonList("id")),
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("id"))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("id"))),
+                        ImmutableColumnsConstraint.immutableColumns(
+                                "test-imt", Collections.singletonList("t")));
         
assertThat(TableSchemaUtils.removeTimeAttributeFromResolvedSchema(schema))
                 .isEqualTo(
                         new ResolvedSchema(
@@ -124,6 +127,8 @@ class TableSchemaUtilsTest {
                                         "test-pk", 
Collections.singletonList("id")),
                                 Collections.singletonList(
                                         DefaultIndex.newIndex(
-                                                "idx", 
Collections.singletonList("id")))));
+                                                "idx", 
Collections.singletonList("id"))),
+                                ImmutableColumnsConstraint.immutableColumns(
+                                        "test-imt", 
Collections.singletonList("t"))));
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AbstractConstraintMixin.java
similarity index 62%
copy from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
copy to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AbstractConstraintMixin.java
index 62501577f0f..20560ef2afb 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AbstractConstraintMixin.java
@@ -19,41 +19,21 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.Constraint.ConstraintType;
-import org.apache.flink.table.catalog.UniqueConstraint;
 
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.util.List;
-
-/** Mixin for {@link UniqueConstraint}. */
+/** Base mixin for {@code AbstractConstraint} subclasses. */
 @Internal
-abstract class UniqueConstraintMixin {
+abstract class AbstractConstraintMixin {
 
     static final String NAME = "name";
     static final String ENFORCED = "enforced";
-    static final String TYPE = "type";
-    static final String COLUMNS = "columns";
-
-    @JsonCreator
-    private UniqueConstraintMixin(
-            @JsonProperty(NAME) String name,
-            @JsonProperty(ENFORCED) boolean enforced,
-            @JsonProperty(TYPE) ConstraintType type,
-            @JsonProperty(COLUMNS) List<String> columns) {}
 
     @JsonProperty(NAME)
     public abstract String getName();
 
-    @JsonProperty(TYPE)
-    public abstract ConstraintType getType();
-
     @JsonProperty(ENFORCED)
     @JsonInclude(JsonInclude.Include.NON_DEFAULT)
     public abstract boolean isEnforced();
-
-    @JsonProperty(COLUMNS)
-    public abstract List<String> getColumns();
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CompiledPlanSerdeUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CompiledPlanSerdeUtil.java
index cb227b3fd75..bb0ea8a35e2 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CompiledPlanSerdeUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/CompiledPlanSerdeUtil.java
@@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ContextResolvedModel;
 import org.apache.flink.table.catalog.ContextResolvedTable;
 import org.apache.flink.table.catalog.DefaultIndex;
+import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogModel;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
@@ -230,6 +231,8 @@ public class CompiledPlanSerdeUtil {
     private static void registerMixins(SimpleModule module) {
         module.setMixInAnnotation(WatermarkSpec.class, 
WatermarkSpecMixin.class);
         module.setMixInAnnotation(UniqueConstraint.class, 
UniqueConstraintMixin.class);
+        module.setMixInAnnotation(
+                ImmutableColumnsConstraint.class, 
ImmutableColumnsConstraintMixin.class);
         module.setMixInAnnotation(DefaultIndex.class, DefaultIndexMixin.class);
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ImmutableColumnsConstraintMixin.java
similarity index 68%
copy from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
copy to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ImmutableColumnsConstraintMixin.java
index 62501577f0f..4957b9728a1 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ImmutableColumnsConstraintMixin.java
@@ -19,40 +19,30 @@
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.catalog.Constraint.ConstraintType;
-import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.Constraint;
+import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.util.List;
 
-/** Mixin for {@link UniqueConstraint}. */
+/** Mixin for {@link ImmutableColumnsConstraint}. */
 @Internal
-abstract class UniqueConstraintMixin {
+abstract class ImmutableColumnsConstraintMixin extends AbstractConstraintMixin 
{
 
-    static final String NAME = "name";
-    static final String ENFORCED = "enforced";
     static final String TYPE = "type";
     static final String COLUMNS = "columns";
 
     @JsonCreator
-    private UniqueConstraintMixin(
+    private ImmutableColumnsConstraintMixin(
             @JsonProperty(NAME) String name,
             @JsonProperty(ENFORCED) boolean enforced,
-            @JsonProperty(TYPE) ConstraintType type,
+            @JsonProperty(TYPE) Constraint.ConstraintType type,
             @JsonProperty(COLUMNS) List<String> columns) {}
 
-    @JsonProperty(NAME)
-    public abstract String getName();
-
     @JsonProperty(TYPE)
-    public abstract ConstraintType getType();
-
-    @JsonProperty(ENFORCED)
-    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-    public abstract boolean isEnforced();
+    public abstract Constraint.ConstraintType getType();
 
     @JsonProperty(COLUMNS)
     public abstract List<String> getColumns();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java
index 7795facd1e4..82c84ff9acc 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonDeserializer.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.DefaultIndex;
+import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 import org.apache.flink.table.catalog.Index;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
@@ -39,6 +40,7 @@ import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.CompiledPlanS
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.CompiledPlanSerdeUtil.deserializeList;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.CompiledPlanSerdeUtil.deserializeListOrEmpty;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.COLUMNS;
+import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.IMMUTABLE_COLUMNS;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.INDEXES;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.PRIMARY_KEY;
 import static 
org.apache.flink.table.planner.plan.nodes.exec.serde.ResolvedSchemaJsonSerializer.WATERMARK_SPECS;
@@ -69,7 +71,10 @@ final class ResolvedSchemaJsonDeserializer extends 
StdDeserializer<ResolvedSchem
                 deserializeFieldOrNull(jsonNode, PRIMARY_KEY, 
UniqueConstraint.class, codec, ctx);
         List<Index> indexes =
                 deserializeListOrEmpty(jsonNode, INDEXES, DefaultIndex.class, 
codec, ctx);
+        ImmutableColumnsConstraint immutableColumns =
+                deserializeFieldOrNull(
+                        jsonNode, IMMUTABLE_COLUMNS, 
ImmutableColumnsConstraint.class, codec, ctx);
 
-        return new ResolvedSchema(columns, watermarkSpecs, primaryKey, 
indexes);
+        return new ResolvedSchema(columns, watermarkSpecs, primaryKey, 
indexes, immutableColumns);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java
index f3d0c83df2b..1cd450e37d2 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedSchemaJsonSerializer.java
@@ -43,6 +43,7 @@ final class ResolvedSchemaJsonSerializer extends 
StdSerializer<ResolvedSchema> {
     static final String WATERMARK_SPECS = "watermarkSpecs";
     static final String PRIMARY_KEY = "primaryKey";
     static final String INDEXES = "indexes";
+    static final String IMMUTABLE_COLUMNS = "immutableColumns";
 
     ResolvedSchemaJsonSerializer() {
         super(ResolvedSchema.class);
@@ -67,6 +68,11 @@ final class ResolvedSchemaJsonSerializer extends 
StdSerializer<ResolvedSchema> {
                 jsonGenerator, PRIMARY_KEY, resolvedSchema.getPrimaryKey(), 
serializerProvider);
         serializeListIfNotEmpty(
                 jsonGenerator, INDEXES, resolvedSchema.getIndexes(), 
serializerProvider);
+        serializeOptionalField(
+                jsonGenerator,
+                IMMUTABLE_COLUMNS,
+                resolvedSchema.getImmutableColumns(),
+                serializerProvider);
 
         jsonGenerator.writeEndObject();
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
index 62501577f0f..46ecd9012e9 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/UniqueConstraintMixin.java
@@ -23,17 +23,14 @@ import 
org.apache.flink.table.catalog.Constraint.ConstraintType;
 import org.apache.flink.table.catalog.UniqueConstraint;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
 import java.util.List;
 
 /** Mixin for {@link UniqueConstraint}. */
 @Internal
-abstract class UniqueConstraintMixin {
+abstract class UniqueConstraintMixin extends AbstractConstraintMixin {
 
-    static final String NAME = "name";
-    static final String ENFORCED = "enforced";
     static final String TYPE = "type";
     static final String COLUMNS = "columns";
 
@@ -44,16 +41,9 @@ abstract class UniqueConstraintMixin {
             @JsonProperty(TYPE) ConstraintType type,
             @JsonProperty(COLUMNS) List<String> columns) {}
 
-    @JsonProperty(NAME)
-    public abstract String getName();
-
     @JsonProperty(TYPE)
     public abstract ConstraintType getType();
 
-    @JsonProperty(ENFORCED)
-    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
-    public abstract boolean isEnforced();
-
     @JsonProperty(COLUMNS)
     public abstract List<String> getColumns();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
index 042bff697a0..738824398cd 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.DefaultIndex;
+import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
@@ -78,7 +79,9 @@ public class CatalogConstraintTest {
                                                 "primary_constraint",
                                                 
Collections.singletonList("b")),
                                         Collections.singletonList(
-                                                DefaultIndex.newIndex("idx", 
List.of("a", "b")))))
+                                                DefaultIndex.newIndex("idx", 
List.of("a", "b"))),
+                                        
ImmutableColumnsConstraint.immutableColumns(
+                                                "immutable_constraint", 
List.of("b"))))
                         .build();
         Map<String, String> properties = buildCatalogTableProperties();
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java
index 35d300ea35e..b0ffe87df9a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/lineage/TableLineageUtilsTest.java
@@ -54,7 +54,8 @@ class TableLineageUtilsTest {
                             Column.physical("c", DataTypes.BOOLEAN())),
                     Collections.emptyList(),
                     null,
-                    Collections.emptyList());
+                    Collections.emptyList(),
+                    null);
     private static final Schema CATALOG_TABLE_SCHEMA =
             
Schema.newBuilder().fromResolvedSchema(CATALOG_TABLE_RESOLVED_SCHEMA).build();
     private static final Map<String, String> TABLE_OPTIONS = new HashMap<>();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 6f246c2fbb1..dd00b8c8506 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.TableDistribution;
 import org.apache.flink.table.catalog.TableDistribution.Kind;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -1469,9 +1470,9 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                         Column.physical("shop_id", 
DataTypes.BIGINT()),
                                         Column.physical("user_id", 
DataTypes.INT().notNull())),
                                 List.of(),
-                                
org.apache.flink.table.catalog.UniqueConstraint.primaryKey(
-                                        "PK_user_id", List.of("user_id")),
-                                List.of())),
+                                UniqueConstraint.primaryKey("PK_user_id", 
List.of("user_id")),
+                                List.of(),
+                                null)),
                 Arguments.of(
                         operation
                                 + "MATERIALIZED TABLE users_shops (PRIMARY 
KEY(user_id) NOT ENFORCED)"
@@ -1482,9 +1483,9 @@ class SqlMaterializedTableNodeToOperationConverterTest
                                         Column.physical("shop_id", 
DataTypes.INT().notNull()),
                                         Column.physical("user_id", 
DataTypes.INT().notNull())),
                                 List.of(),
-                                
org.apache.flink.table.catalog.UniqueConstraint.primaryKey(
-                                        "PK_user_id", List.of("user_id")),
-                                List.of())));
+                                UniqueConstraint.primaryKey("PK_user_id", 
List.of("user_id")),
+                                List.of(),
+                                null)));
     }
 
     /** Boilerplate CatalogMaterializedTable builder for tests. */
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
index 302bcae32b6..ce8a16f534b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReaderTest.java
@@ -102,7 +102,8 @@ class FlinkCalciteCatalogReaderTest {
                         Collections.emptyList(),
                         Collections.emptyList(),
                         null,
-                        Collections.emptyList());
+                        Collections.emptyList(),
+                        null);
         final CatalogTable catalogTable =
                 ConnectorCatalogTable.source(
                         new TestTableSource(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java
index 6417922b107..0ba42e3dd68 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ContextResolvedTableSerdeTest.java
@@ -83,7 +83,8 @@ public class ContextResolvedTableSerdeTest {
                     Collections.emptyList(),
                     null,
                     Collections.singletonList(
-                            DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))));
+                            DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+                    null);
     private static final Schema CATALOG_TABLE_SCHEMA =
             
Schema.newBuilder().fromResolvedSchema(CATALOG_TABLE_RESOLVED_SCHEMA).build();
 
@@ -416,7 +417,8 @@ public class ContextResolvedTableSerdeTest {
                                 null,
                                 Collections.singletonList(
                                         DefaultIndex.newIndex(
-                                                "idx", 
Collections.singletonList("a"))));
+                                                "idx", 
Collections.singletonList("a"))),
+                                null);
                 final ContextResolvedTable spec =
                         ContextResolvedTable.permanent(
                                 PERMANENT_TABLE_IDENTIFIER,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
index 461b1346e7b..cc151c09382 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
@@ -90,7 +90,8 @@ class DynamicTableSinkSpecSerdeTest {
                         Collections.emptyList(),
                         null,
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+                        null);
         final CatalogTable catalogTable1 =
                 CatalogTable.newBuilder()
                         
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema1).build())
@@ -122,7 +123,8 @@ class DynamicTableSinkSpecSerdeTest {
                         Collections.emptyList(),
                         null,
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+                        null);
         final CatalogTable catalogTable2 =
                 CatalogTable.newBuilder()
                         
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema2).build())
@@ -160,7 +162,8 @@ class DynamicTableSinkSpecSerdeTest {
                         Collections.emptyList(),
                         null,
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+                        null);
         final CatalogTable catalogTable3 =
                 CatalogTable.newBuilder()
                         
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema3).build())
@@ -195,7 +198,8 @@ class DynamicTableSinkSpecSerdeTest {
                         Collections.emptyList(),
                         null,
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+                        null);
         final CatalogTable catalogTable4 =
                 CatalogTable.newBuilder()
                         
.schema(Schema.newBuilder().fromResolvedSchema(resolvedSchema4).build())
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
index c32eaae35ce..6fd5942f19e 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
@@ -107,7 +107,8 @@ public class DynamicTableSourceSpecSerdeTest {
                         Collections.emptyList(),
                         null,
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+                        null);
 
         final CatalogTable catalogTable1 =
                 CatalogTable.newBuilder()
@@ -146,7 +147,8 @@ public class DynamicTableSourceSpecSerdeTest {
                         Collections.emptyList(),
                         null,
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+                        null);
 
         final CatalogTable catalogTable2 =
                 CatalogTable.newBuilder()
@@ -393,7 +395,8 @@ public class DynamicTableSourceSpecSerdeTest {
                         Collections.emptyList(),
                         null,
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+                        null);
 
         return new ResolvedCatalogTable(
                 CatalogTable.newBuilder()
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableSerdeTest.java
index 9e6162c9ccc..323221bca37 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ResolvedCatalogTableSerdeTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.DefaultIndex;
 import org.apache.flink.table.catalog.ExternalCatalogTable;
+import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.TableDistribution;
@@ -96,7 +97,9 @@ class ResolvedCatalogTableSerdeTest {
                     Collections.singletonList(WatermarkSpec.of("b", 
REX_NODE_EXPRESSION)),
                     UniqueConstraint.primaryKey("myPrimaryKey", 
Arrays.asList("a", "c")),
                     Collections.singletonList(
-                            DefaultIndex.newIndex("idx", 
Collections.singletonList("b"))));
+                            DefaultIndex.newIndex("idx", 
Collections.singletonList("b"))),
+                    ImmutableColumnsConstraint.immutableColumns(
+                            "imt", Collections.singletonList("d")));
 
     private static final ResolvedCatalogTable FULL_RESOLVED_CATALOG_TABLE =
             new ResolvedCatalogTable(
@@ -126,7 +129,8 @@ class ResolvedCatalogTableSerdeTest {
                         Collections.emptyList(),
                         null,
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+                        null);
 
         return Stream.of(
                 FULL_RESOLVED_CATALOG_TABLE,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
index 7eb6132a75d..78804d8d0a2 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/TemporalTableSourceSpecSerdeTest.java
@@ -79,7 +79,8 @@ public class TemporalTableSourceSpecSerdeTest {
                         Collections.emptyList(),
                         null,
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+                        null);
 
         final CatalogTable catalogTable1 =
                 CatalogTable.newBuilder()
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java
index 8604882b7de..cef6487cc8b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/VectorSearchTableSourceSpecSerdeTest.java
@@ -78,7 +78,8 @@ public class VectorSearchTableSourceSpecSerdeTest {
                         Collections.singletonList(Column.physical("a", 
DataTypes.BIGINT())),
                         Collections.emptyList(),
                         null,
-                        Collections.emptyList());
+                        Collections.emptyList(),
+                        null);
 
         final CatalogTable catalogTable1 =
                 CatalogTable.newBuilder()
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index 94ebdba3d04..98310727df5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -370,7 +370,8 @@ class DataStreamJavaITCase {
                                                 TIMESTAMP_LTZ(3), 
"`SOURCE_WATERMARK`()"))),
                         null,
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("f0")))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("f0"))),
+                        null));
 
         tableEnv.createTemporaryView("t", table);
 
@@ -644,7 +645,8 @@ class DataStreamJavaITCase {
                                                 TIMESTAMP(3), 
"`SOURCE_WATERMARK`()"))),
                         null,
                         Collections.singletonList(
-                                DefaultIndex.newIndex("idx", 
Collections.singletonList("f0")))));
+                                DefaultIndex.newIndex("idx", 
Collections.singletonList("f0"))),
+                        null));
 
         final DataStream<Long> rowtimeStream =
                 tableEnv.toDataStream(table)
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
index e8c321236c2..5e2ffd7c58e 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
@@ -321,7 +321,8 @@ object MetadataTestUtil {
       ),
       Collections.emptyList(),
       UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "d")),
-      Collections.singletonList(DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))))
+      Collections.singletonList(DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+      null)
 
     val catalogTable = getCatalogTable(resolvedSchema)
 
@@ -366,7 +367,8 @@ object MetadataTestUtil {
       ),
       Collections.emptyList(),
       UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "b")),
-      Collections.singletonList(DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))))
+      Collections.singletonList(DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+      null)
 
     val typeFactory = new 
FlinkTypeFactory(Thread.currentThread().getContextClassLoader)
     val rowType = typeFactory.buildRelNodeRowType(
@@ -394,7 +396,8 @@ object MetadataTestUtil {
       ),
       Collections.emptyList(),
       UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("b")),
-      Collections.singletonList(DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))))
+      Collections.singletonList(DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+      null)
 
     val catalogTable = getCatalogTable(resolvedSchema)
 
@@ -424,7 +427,8 @@ object MetadataTestUtil {
       ),
       Collections.emptyList(),
       null,
-      Collections.singletonList(DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))))
+      Collections.singletonList(DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+      null)
 
     val catalogTable = getCatalogTable(resolvedSchema)
 
@@ -451,7 +455,8 @@ object MetadataTestUtil {
         Column.physical("b", DataTypes.BIGINT().notNull())),
       Collections.emptyList(),
       UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a", "b")),
-      Collections.singletonList(DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))))
+      Collections.singletonList(DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+      null)
 
     val catalogTable = getCatalogTable(resolvedSchema)
 
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
index 9be9499428c..7a117ef58c2 100644
--- 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshMode;
 import org.apache.flink.table.catalog.CatalogMaterializedTable.RefreshStatus;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 import org.apache.flink.table.catalog.IntervalFreshness;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -69,13 +70,20 @@ public class TestFileSystemCatalogTest extends 
TestFileSystemCatalogTestBase {
                     Column.physical("age", DataTypes.INT()),
                     Column.physical("tss", DataTypes.TIMESTAMP(3)),
                     Column.physical("partition", DataTypes.VARCHAR(10)));
-    private static final UniqueConstraint CONSTRAINTS =
+    private static final UniqueConstraint PK_CONSTRAINT =
             UniqueConstraint.primaryKey("primary_constraint", 
Collections.singletonList("id"));
+    private static final ImmutableColumnsConstraint IMMUTABLE_COLS_CONSTRAINT =
+            ImmutableColumnsConstraint.immutableColumns(
+                    "imt_constraint", Collections.singletonList("name"));
     private static final List<String> PARTITION_KEYS = 
Collections.singletonList("partition");
 
     private static final ResolvedSchema CREATE_RESOLVED_SCHEMA =
             new ResolvedSchema(
-                    CREATE_COLUMNS, Collections.emptyList(), CONSTRAINTS, 
Collections.emptyList());
+                    CREATE_COLUMNS,
+                    Collections.emptyList(),
+                    PK_CONSTRAINT,
+                    Collections.emptyList(),
+                    IMMUTABLE_COLS_CONSTRAINT);
 
     private static final Schema CREATE_SCHEMA =
             
Schema.newBuilder().fromResolvedSchema(CREATE_RESOLVED_SCHEMA).build();

Reply via email to