This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 370a8870450e4cd827c5fd1c4aacef551c15193e
Author: xuyang <[email protected]>
AuthorDate: Mon Mar 23 15:17:37 2026 +0800

    [FLINK-39287][table-planner] Introduce FlinkRelMdImmutableColumns to 
implement the derivation logic of some simple nodes and update the logic to 
infer upsert key with immutable columns
---
 .../planner/connectors/DynamicSourceUtils.java     |   7 +
 .../table/planner/plan/metadata/FlinkMetadata.java |  15 +
 .../plan/metadata/FlinkRelMdImmutableColumns.java  | 311 ++++++++++++
 .../plan/metadata/FlinkRelMetadataQuery.java       |  21 +
 .../metadata/FlinkDefaultRelMetadataProvider.scala |   3 +-
 .../plan/metadata/FlinkRelMdUpsertKeys.scala       | 107 +++--
 .../planner/catalog/CatalogConstraintTest.java     | 137 ++++--
 .../metadata/FlinkRelMdImmutableColumnsTest.java   | 527 +++++++++++++++++++++
 .../utils/ImmutableColConstraintTestUtils.java     |  66 +++
 .../planner/plan/stream/sql/join/JoinTest.xml      |  66 +++
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |  28 ++
 .../plan/metadata/FlinkRelMdUpsertKeysTest.scala   | 192 +++++++-
 .../planner/plan/metadata/MetadataTestUtil.scala   |  52 ++
 .../planner/plan/stream/sql/join/JoinTest.scala    | 127 +++++
 14 files changed, 1584 insertions(+), 75 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index d293342abb4..a78f9f52ce0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -533,6 +533,7 @@ public final class DynamicSourceUtils {
                 hasChangelogMode && 
changelogMode.contains(RowKind.UPDATE_BEFORE);
         final boolean hasUpdateAfter =
                 hasChangelogMode && 
changelogMode.contains(RowKind.UPDATE_AFTER);
+        final boolean hasDelete = hasChangelogMode && 
changelogMode.contains(RowKind.DELETE);
         if (!hasUpdateBefore && hasUpdateAfter) {
             // only UPDATE_AFTER
             if (!schema.getPrimaryKey().isPresent()) {
@@ -564,6 +565,12 @@ public final class DynamicSourceUtils {
                                 tableDebugName));
             }
         }
+        if (hasDelete) {
+            if (schema.getImmutableColumns().isPresent()) {
+                throw new ValidationException(
+                        "The immutable constraint cannot be defined on the 
table with changelog mode [DELETE].");
+            }
+        }
     }
 
     private static void validateScanSourceForBatch(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkMetadata.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkMetadata.java
index d7cf6cfa927..d35f2df3f3b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkMetadata.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkMetadata.java
@@ -254,4 +254,19 @@ public abstract class FlinkMetadata {
             Set<ImmutableBitSet> getUpsertKeys(RelNode r, RelMetadataQuery mq);
         }
     }
+
+    /** Metadata about which combinations of columns are unmodified 
corresponding each pk. */
+    public interface ImmutableColumns extends Metadata {
+        Method METHOD = Types.lookupMethod(ImmutableColumns.class, 
"getImmutableColumns");
+
+        MetadataDef<ImmutableColumns> DEF =
+                MetadataDef.of(ImmutableColumns.class, 
ImmutableColumns.Handler.class, METHOD);
+
+        ImmutableBitSet getImmutableColumns();
+
+        /** Handler API. */
+        interface Handler extends MetadataHandler<ImmutableColumns> {
+            ImmutableBitSet getImmutableColumns(RelNode r, RelMetadataQuery 
mq);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMdImmutableColumns.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMdImmutableColumns.java
new file mode 100644
index 00000000000..10dc43a5d67
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMdImmutableColumns.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.metadata;
+
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.planner.plan.nodes.calcite.WatermarkAssigner;
+import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalLookupJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.hep.HepRelVertex;
+import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.Exchange;
+import org.apache.calcite.rel.core.Filter;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.metadata.MetadataDef;
+import org.apache.calcite.rel.metadata.MetadataHandler;
+import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataProvider;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.Util;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** A metadata handler for {@link FlinkMetadata.ImmutableColumns}. */
+public class FlinkRelMdImmutableColumns implements 
MetadataHandler<FlinkMetadata.ImmutableColumns> {
+    static final FlinkRelMdImmutableColumns INSTANCE = new 
FlinkRelMdImmutableColumns();
+
+    public static final RelMetadataProvider SOURCE =
+            ReflectiveRelMetadataProvider.reflectiveSource(
+                    FlinkMetadata.ImmutableColumns.METHOD, INSTANCE);
+
+    // ~ Constructors 
-----------------------------------------------------------
+
+    private FlinkRelMdImmutableColumns() {}
+
+    // ~ Methods 
----------------------------------------------------------------
+
+    public MetadataDef<FlinkMetadata.ImmutableColumns> getDef() {
+        return FlinkMetadata.ImmutableColumns.DEF;
+    }
+
+    public ImmutableBitSet getImmutableColumns(TableScan rel, RelMetadataQuery 
mq) {
+        return getTableImmutableColumns(rel.getTable());
+    }
+
+    public ImmutableBitSet getImmutableColumns(Project rel, RelMetadataQuery 
mq) {
+        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq);
+        return guardByUpsertKeys(
+                rel, getProjectImmutableColumns(rel.getProjects(), 
rel.getInput(), fmq), fmq);
+    }
+
+    public ImmutableBitSet getImmutableColumns(Filter rel, RelMetadataQuery 
mq) {
+        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq);
+        return guardByUpsertKeys(rel, fmq.getImmutableColumns(rel.getInput()), 
fmq);
+    }
+
+    public ImmutableBitSet getImmutableColumns(Calc rel, RelMetadataQuery mq) {
+        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq);
+        List<RexNode> projects =
+                rel.getProgram().getProjectList().stream()
+                        .map(localRef -> 
rel.getProgram().expandLocalRef(localRef))
+                        .collect(Collectors.toList());
+        return guardByUpsertKeys(
+                rel, getProjectImmutableColumns(projects, rel.getInput(), 
fmq), fmq);
+    }
+
+    public ImmutableBitSet getImmutableColumns(Exchange rel, RelMetadataQuery 
mq) {
+        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq);
+        return guardByUpsertKeys(rel, fmq.getImmutableColumns(rel.getInput()), 
fmq);
+    }
+
+    public ImmutableBitSet getImmutableColumns(
+            StreamPhysicalChangelogNormalize rel, RelMetadataQuery mq) {
+        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq);
+        return guardByUpsertKeys(rel, fmq.getImmutableColumns(rel.getInput()), 
fmq);
+    }
+
+    public ImmutableBitSet getImmutableColumns(
+            StreamPhysicalMiniBatchAssigner rel, RelMetadataQuery mq) {
+        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq);
+        return guardByUpsertKeys(rel, fmq.getImmutableColumns(rel.getInput()), 
fmq);
+    }
+
+    public ImmutableBitSet getImmutableColumns(
+            StreamPhysicalDropUpdateBefore rel, RelMetadataQuery mq) {
+        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq);
+        return guardByUpsertKeys(rel, fmq.getImmutableColumns(rel.getInput()), 
fmq);
+    }
+
+    public ImmutableBitSet getImmutableColumns(WatermarkAssigner rel, 
RelMetadataQuery mq) {
+        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq);
+        return guardByUpsertKeys(rel, fmq.getImmutableColumns(rel.getInput()), 
fmq);
+    }
+
+    public ImmutableBitSet getImmutableColumns(Join join, RelMetadataQuery mq) 
{
+        JoinRelType joinType = join.getJoinType();
+
+        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq);
+        int leftFieldCount = join.getLeft().getRowType().getFieldCount();
+
+        return unionJoinImmutableCols(
+                join,
+                joinType,
+                () -> fmq.getImmutableColumns(join.getLeft()),
+                () -> fmq.getImmutableColumns(join.getRight()),
+                leftFieldCount,
+                fmq);
+    }
+
+    public ImmutableBitSet getImmutableColumns(CommonPhysicalLookupJoin join, 
RelMetadataQuery mq) {
+        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq);
+        int leftFieldCount = join.getInput().getRowType().getFieldCount();
+
+        return unionJoinImmutableCols(
+                join,
+                join.joinType(),
+                () -> fmq.getImmutableColumns(join.getInput()),
+                // TODO support propagating immutable columns from the lookup 
side
+                () -> null, // rightImmutableColsSupplier
+                leftFieldCount,
+                fmq);
+    }
+
+    public ImmutableBitSet getImmutableColumns(HepRelVertex rel, 
RelMetadataQuery mq) {
+        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq);
+        return guardByUpsertKeys(rel, 
fmq.getImmutableColumns(rel.getCurrentRel()), fmq);
+    }
+
+    public ImmutableBitSet getImmutableColumns(RelSubset rel, RelMetadataQuery 
mq) {
+        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(mq);
+        return guardByUpsertKeys(
+                rel, fmq.getImmutableColumns(Util.first(rel.getBest(), 
rel.getOriginal())), fmq);
+    }
+
+    public ImmutableBitSet getImmutableColumns(RelNode rel, RelMetadataQuery 
mq) {
+        // Catch-all rule when none of the others apply.
+        // More nodes can be supported later, such as Expand, Aggregate, 
Window, Rank, etc.
+        return null;
+    }
+
+    /**
+     * Guards the immutable columns by verifying that the node has upsert 
keys. Immutable columns
+     * are only meaningful "within each pk"; if no upsert key exists, the 
result is cleared.
+     */
+    @Nullable
+    private ImmutableBitSet guardByUpsertKeys(
+            RelNode rel, @Nullable ImmutableBitSet immutableColumns, 
FlinkRelMetadataQuery fmq) {
+        if (immutableColumns == null || immutableColumns.isEmpty()) {
+            return immutableColumns;
+        }
+        Set<ImmutableBitSet> upsertKeys = fmq.getUpsertKeys(rel);
+        if (upsertKeys == null || upsertKeys.isEmpty()) {
+            return null;
+        }
+        return immutableColumns;
+    }
+
+    /**
+     * Unions left/right immutable columns for a join, respecting join type 
semantics:
+     *
+     * <ul>
+     *   <li>SEMI / ANTI: output contains only left-side columns → propagate 
left immutable only
+     *   <li>LEFT: right side may produce nulls → ignore right immutable
+     *   <li>RIGHT: left side may produce nulls → ignore left immutable
+     *   <li>FULL: both sides may produce nulls → ignore both
+     *   <li>INNER: both sides preserved
+     * </ul>
+     *
+     * <p>Right-side indices are shifted by leftFieldCount before union. The 
result is guarded by
+     * upsert keys.
+     */
+    @Nullable
+    private ImmutableBitSet unionJoinImmutableCols(
+            RelNode rel,
+            JoinRelType joinType,
+            Supplier<ImmutableBitSet> leftImmutableColsSupplier,
+            Supplier<ImmutableBitSet> rightImmutableColsSupplier,
+            int leftFieldCount,
+            FlinkRelMetadataQuery fmq) {
+        if (joinType == JoinRelType.SEMI || joinType == JoinRelType.ANTI) {
+            return guardByUpsertKeys(rel, leftImmutableColsSupplier.get(), 
fmq);
+        }
+
+        // nullable side's columns may flip between value/null → not immutable
+        ImmutableBitSet leftImmutableColumns =
+                joinType.generatesNullsOnLeft() ? null : 
leftImmutableColsSupplier.get();
+        ImmutableBitSet rightImmutableColumns =
+                joinType.generatesNullsOnRight() ? null : 
rightImmutableColsSupplier.get();
+
+        // shift right side indices by left field count
+        ImmutableBitSet shiftedRight =
+                rightImmutableColumns == null || 
rightImmutableColumns.isEmpty()
+                        ? rightImmutableColumns
+                        : ImmutableBitSet.of(
+                                rightImmutableColumns.toList().stream()
+                                        .map(i -> i + leftFieldCount)
+                                        .collect(Collectors.toList()));
+
+        // union left and right immutable columns
+        ImmutableBitSet result;
+        if (leftImmutableColumns != null && shiftedRight != null) {
+            result = leftImmutableColumns.union(shiftedRight);
+        } else {
+            result = 
Optional.ofNullable(leftImmutableColumns).orElse(shiftedRight);
+        }
+        return guardByUpsertKeys(rel, result, fmq);
+    }
+
+    @Nullable
+    private ImmutableBitSet getTableImmutableColumns(RelOptTable relOptTable) {
+        if (!(relOptTable instanceof TableSourceTable)) {
+            return null;
+        }
+
+        TableSourceTable tst = (TableSourceTable) relOptTable;
+        ResolvedSchema schema = 
tst.contextResolvedTable().getResolvedTable().getResolvedSchema();
+
+        if (schema.getPrimaryKey().isEmpty()) {
+            return null;
+        }
+
+        // use relOptTable's type which may be projected based on original 
schema
+        List<String> tableOutputFields = 
relOptTable.getRowType().getFieldNames();
+
+        // add pk
+        Set<String> allImmutableFieldsInSchema =
+                new HashSet<>(schema.getPrimaryKey().get().getColumns());
+        // add constraint for immutable columns
+        if (schema.getImmutableColumns().isPresent()) {
+            
allImmutableFieldsInSchema.addAll(schema.getImmutableColumns().get().getColumns());
+        }
+
+        Set<Integer> outputImmutableColumns =
+                allImmutableFieldsInSchema.stream()
+                        .flatMap(
+                                immutableField -> {
+                                    int fieldIdx = 
tableOutputFields.indexOf(immutableField);
+                                    if (fieldIdx >= 0) {
+                                        return Stream.of(fieldIdx);
+                                    } else {
+                                        return Stream.empty();
+                                    }
+                                })
+                        .collect(Collectors.toSet());
+
+        return ImmutableBitSet.of(outputImmutableColumns);
+    }
+
+    @Nullable
+    private ImmutableBitSet getProjectImmutableColumns(
+            List<RexNode> projects, RelNode inputNode, FlinkRelMetadataQuery 
fmq) {
+        ImmutableBitSet inputImmutableColumns = 
fmq.getImmutableColumns(inputNode);
+        if (inputImmutableColumns == null || inputImmutableColumns.isEmpty()) {
+            return inputImmutableColumns;
+        }
+
+        Map<Integer, List<Integer>> mapInToOutPos = new HashMap<>();
+        for (int i = 0; i < projects.size(); i++) {
+            RexNode projExpr = projects.get(i);
+            if (projExpr instanceof RexInputRef) {
+                mapInToOutPos
+                        .computeIfAbsent(
+                                ((RexInputRef) projExpr).getIndex(), k -> new 
ArrayList<>())
+                        .add(i);
+            }
+        }
+        return ImmutableBitSet.of(
+                inputImmutableColumns.toList().stream()
+                        .flatMap(in -> mapInToOutPos.getOrDefault(in, 
List.of()).stream())
+                        .collect(Collectors.toList()));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMetadataQuery.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMetadataQuery.java
index 5c1cf8de5d3..562ae680a91 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMetadataQuery.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMetadataQuery.java
@@ -50,6 +50,7 @@ public class FlinkRelMetadataQuery extends RelMetadataQuery {
     private FlinkMetadata.ModifiedMonotonicity.Handler 
modifiedMonotonicityHandler;
     private FlinkMetadata.WindowProperties.Handler windowPropertiesHandler;
     private FlinkMetadata.UpsertKeys.Handler upsertKeysHandler;
+    private FlinkMetadata.ImmutableColumns.Handler immutableColumnsHandler;
 
     /**
      * Returns an instance of FlinkRelMetadataQuery. It ensures that cycles do 
not occur while
@@ -85,6 +86,7 @@ public class FlinkRelMetadataQuery extends RelMetadataQuery {
         this.modifiedMonotonicityHandler = 
HANDLERS.modifiedMonotonicityHandler;
         this.windowPropertiesHandler = HANDLERS.windowPropertiesHandler;
         this.upsertKeysHandler = HANDLERS.upsertKeysHandler;
+        this.immutableColumnsHandler = HANDLERS.immutableColumnsHandler;
     }
 
     /** Extended handlers. */
@@ -107,6 +109,8 @@ public class FlinkRelMetadataQuery extends RelMetadataQuery 
{
                 initialHandler(FlinkMetadata.WindowProperties.Handler.class);
         private FlinkMetadata.UpsertKeys.Handler upsertKeysHandler =
                 initialHandler(FlinkMetadata.UpsertKeys.Handler.class);
+        private FlinkMetadata.ImmutableColumns.Handler immutableColumnsHandler 
=
+                initialHandler(FlinkMetadata.ImmutableColumns.Handler.class);
     }
 
     /**
@@ -308,4 +312,21 @@ public class FlinkRelMetadataQuery extends 
RelMetadataQuery {
         }
         return getUpsertKeys(rel);
     }
+
+    /**
+     * Returns the columns that will never be updated upstream within each pk.
+     *
+     * @return the columns that will never be updated upstream within each pk, 
or null if this
+     *     information cannot be determined (whereas empty set indicates that 
all columns may be
+     *     updated)
+     */
+    public ImmutableBitSet getImmutableColumns(RelNode rel) {
+        for (; ; ) {
+            try {
+                return immutableColumnsHandler.getImmutableColumns(rel, this);
+            } catch (JaninoRelMetadataProvider.NoHandler e) {
+                immutableColumnsHandler = revise(e.relClass, 
FlinkMetadata.ImmutableColumns.DEF);
+            }
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkDefaultRelMetadataProvider.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkDefaultRelMetadataProvider.scala
index f5d4d93832a..135603d56c2 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkDefaultRelMetadataProvider.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkDefaultRelMetadataProvider.scala
@@ -48,7 +48,8 @@ object FlinkDefaultRelMetadataProvider {
       RelMdPredicates.SOURCE,
       FlinkRelMdCollation.SOURCE,
       RelMdExplainVisibility.SOURCE,
-      FlinkRelMdWindowProperties.SOURCE
+      FlinkRelMdWindowProperties.SOURCE,
+      FlinkRelMdImmutableColumns.SOURCE
     )
   )
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala
index ea52022297c..7890b116b26 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeys.scala
@@ -48,10 +48,13 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
   override def getDef: MetadataDef[UpsertKeys] = UpsertKeys.DEF
 
   def getUpsertKeys(rel: TableScan, mq: RelMetadataQuery): 
JSet[ImmutableBitSet] = {
-    rel.getTable match {
+    val baseKeys = rel.getTable match {
       case t: IntermediateRelTable => t.upsertKeys
       case _ => mq.getUniqueKeys(rel)
     }
+    enrichWithImmutableColumns(
+      baseKeys,
+      () => FlinkRelMetadataQuery.reuseOrCreate(mq).getImmutableColumns(rel))
   }
 
   def getUpsertKeys(rel: Project, mq: RelMetadataQuery): JSet[ImmutableBitSet] 
=
@@ -81,11 +84,13 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
       () => 
FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput))
 
   def getUpsertKeys(rel: Exchange, mq: RelMetadataQuery): 
JSet[ImmutableBitSet] = {
-    val keys = 
FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput)
+    val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+    val upsertKeys = fmq.getUpsertKeys(rel.getInput)
+    val immutableColumns = fmq.getImmutableColumns(rel.getInput)
     rel.getDistribution.getType match {
       case RelDistribution.Type.HASH_DISTRIBUTED =>
-        filterKeys(keys, ImmutableBitSet.of(rel.getDistribution.getKeys))
-      case RelDistribution.Type.SINGLETON => keys
+        filterKeys(upsertKeys, 
ImmutableBitSet.of(rel.getDistribution.getKeys), immutableColumns)
+      case RelDistribution.Type.SINGLETON => upsertKeys
       case t => throw new UnsupportedOperationException("Unsupported 
distribution type: " + t)
     }
   }
@@ -95,19 +100,20 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
       case rank: StreamPhysicalRank if RankUtil.isDeduplication(rel) =>
         
ImmutableSet.of(ImmutableBitSet.of(rank.partitionKey.toArray.map(Integer.valueOf).toList))
       case _ =>
-        val inputKeys = filterKeys(
-          FlinkRelMetadataQuery
-            .reuseOrCreate(mq)
-            .getUpsertKeys(rel.getInput),
-          rel.partitionKey)
+        val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+        val inputUpsertKeys = fmq.getUpsertKeys(rel.getInput)
+        val inputImmutableColumns = fmq.getImmutableColumns(rel.getInput)
+        val inputKeys = filterKeys(inputUpsertKeys, rel.partitionKey, 
inputImmutableColumns)
         FlinkRelMdUniqueKeys.INSTANCE.getRankUniqueKeys(rel, inputKeys)
     }
   }
 
-  def getUpsertKeys(rel: Sort, mq: RelMetadataQuery): JSet[ImmutableBitSet] =
-    filterKeys(
-      FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput),
-      ImmutableBitSet.of(rel.getCollation.getKeys))
+  def getUpsertKeys(rel: Sort, mq: RelMetadataQuery): JSet[ImmutableBitSet] = {
+    val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+    val upsertKeys = fmq.getUpsertKeys(rel.getInput)
+    val immutableColumns = fmq.getImmutableColumns(rel.getInput)
+    filterKeys(upsertKeys, ImmutableBitSet.of(rel.getCollation.getKeys), 
immutableColumns)
+  }
 
   def getUpsertKeys(
       rel: StreamPhysicalChangelogNormalize,
@@ -206,20 +212,23 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
       rel: SingleRel,
       mq: RelMetadataQuery,
       distributionKeys: ImmutableBitSet*): JSet[ImmutableBitSet] = {
-    var inputKeys = 
FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(rel.getInput)
+    val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+    var inputUpsertKeys = fmq.getUpsertKeys(rel.getInput)
+    val inputImmutableColumns = fmq.getImmutableColumns(rel.getInput)
     for (distributionKey <- distributionKeys) {
-      inputKeys = filterKeys(inputKeys, distributionKey)
+      inputUpsertKeys = filterKeys(inputUpsertKeys, distributionKey, 
inputImmutableColumns)
     }
-    inputKeys
+    inputUpsertKeys
   }
 
   def getUpsertKeys(join: Join, mq: RelMetadataQuery): JSet[ImmutableBitSet] = 
{
     val joinInfo = join.analyzeCondition()
     join.getJoinType match {
       case JoinRelType.SEMI | JoinRelType.ANTI =>
-        filterKeys(
-          FlinkRelMetadataQuery.reuseOrCreate(mq).getUpsertKeys(join.getLeft),
-          joinInfo.leftSet())
+        val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
+        val leftInputUpsertKeys = fmq.getUpsertKeys(join.getLeft)
+        val leftInputImmutableColumns = fmq.getImmutableColumns(join.getLeft)
+        filterKeys(leftInputUpsertKeys, joinInfo.leftSet(), 
leftInputImmutableColumns)
       case _ =>
         getJoinUpsertKeys(joinInfo, join.getJoinType, join.getLeft, 
join.getRight, mq)
     }
@@ -348,6 +357,8 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
     val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq)
     val leftKeys = fmq.getUpsertKeys(left)
     val rightKeys = fmq.getUpsertKeys(right)
+    val leftImmutableColumns = fmq.getImmutableColumns(left)
+    val rightImmutableColumns = fmq.getImmutableColumns(right)
 
     FlinkRelMdUniqueKeys.INSTANCE.getJoinUniqueKeys(
       joinRelType,
@@ -356,8 +367,8 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
       // (the distribution keys), ensuring the result remains an upsert key.
       // Note: An Exchange typically applies this filtering already via 
fmq.getUpsertKeys(...).
       // We keep it here to be safe in case a join can appear without a 
preceding Exchange.
-      filterKeys(leftKeys, joinInfo.leftSet),
-      filterKeys(rightKeys, joinInfo.rightSet),
+      filterKeys(leftKeys, joinInfo.leftSet, leftImmutableColumns),
+      filterKeys(rightKeys, joinInfo.rightSet, rightImmutableColumns),
       isSideUnique(leftKeys, joinInfo.leftSet),
       isSideUnique(rightKeys, joinInfo.rightSet)
     )
@@ -392,17 +403,35 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
    *
    * Example:
    * - distributionKey = {k1}
-   * - keys = {{k1}, {k1, k2}, {k2}}
+   * - upsertKeys = {{k1}, {k1, k2}, {k2}}
+   * - immutableColumns = null
    * Result: {{k1}, {k1, k2}} (drops {k2})
+   *
+   * Example:
+   * - distributionKey = {k1, k3}
+   * - upsertKeys = {{k1}, {k1, k2}, {k1, k3}, {k2}}
+   * - immutableColumns = {k3}
+   * Result: {{k1}, {k1, k2}, {k1, k3}} (drops {k2})
    */
   private def filterKeys(
-      keys: JSet[ImmutableBitSet],
-      distributionKey: ImmutableBitSet): JSet[ImmutableBitSet] = {
-    if (keys != null) {
-      keys.filter(k => k.contains(distributionKey))
-    } else {
-      null
+      upsertKeys: JSet[ImmutableBitSet],
+      distributionKey: ImmutableBitSet,
+      immutableColumns: ImmutableBitSet): JSet[ImmutableBitSet] = {
+    if (upsertKeys == null) {
+      return null
     }
+
+    upsertKeys.filter(
+      upsertKey => {
+        val key =
+          if (immutableColumns == null) {
+            upsertKey
+          } else {
+            upsertKey.union(immutableColumns)
+          }
+
+        key.contains(distributionKey)
+      })
   }
 
   /*
@@ -426,6 +455,28 @@ class FlinkRelMdUpsertKeys private extends 
MetadataHandler[UpsertKeys] {
 
   // Catch-all rule when none of the others apply.
   def getUpsertKeys(rel: RelNode, mq: RelMetadataQuery): JSet[ImmutableBitSet] 
= null
+
+  /**
+   * Enriches the given upsert keys with immutable columns as an additional 
upsert key.
+   *
+   * If baseKeys is null or empty, returns as-is without invoking the supplier 
(immutable columns
+   * are meaningless without existing upsert keys).
+   */
+  private def enrichWithImmutableColumns(
+      baseKeys: JSet[ImmutableBitSet],
+      immutableColsSupplier: () => ImmutableBitSet): JSet[ImmutableBitSet] = {
+    if (baseKeys == null || baseKeys.isEmpty) {
+      return baseKeys
+    }
+    val immutableCols = immutableColsSupplier()
+    if (immutableCols != null && !immutableCols.isEmpty) {
+      val enriched = new util.HashSet[ImmutableBitSet](baseKeys)
+      enriched.add(immutableCols)
+      enriched
+    } else {
+      baseKeys
+    }
+  }
 }
 
 object FlinkRelMdUpsertKeys {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
index 738824398cd..82786f3d431 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogConstraintTest.java
@@ -22,11 +22,11 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.DefaultIndex;
-import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
@@ -39,6 +39,8 @@ import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -47,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for Catalog constraints. */
 public class CatalogConstraintTest {
@@ -58,32 +61,18 @@ public class CatalogConstraintTest {
 
     @BeforeEach
     void setup() {
-        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inBatchMode().build();
+        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
         tEnv = TableEnvironment.create(settings);
         catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).orElse(null);
         assertThat(catalog).isNotNull();
     }
 
-    @Test
-    void testWithPrimaryKey() throws Exception {
-        final Schema tableSchema =
-                Schema.newBuilder()
-                        .fromResolvedSchema(
-                                new ResolvedSchema(
-                                        Arrays.asList(
-                                                Column.physical("a", 
DataTypes.STRING()),
-                                                Column.physical("b", 
DataTypes.BIGINT().notNull()),
-                                                Column.physical("c", 
DataTypes.INT())),
-                                        Collections.emptyList(),
-                                        UniqueConstraint.primaryKey(
-                                                "primary_constraint",
-                                                
Collections.singletonList("b")),
-                                        Collections.singletonList(
-                                                DefaultIndex.newIndex("idx", 
List.of("a", "b"))),
-                                        
ImmutableColumnsConstraint.immutableColumns(
-                                                "immutable_constraint", 
List.of("b"))))
-                        .build();
-        Map<String, String> properties = buildCatalogTableProperties();
+    @ParameterizedTest()
+    @ValueSource(booleans = {true, false})
+    void testWithPrimaryKey(boolean containsPrimaryKey) throws Exception {
+        ResolvedSchema resolvedSchema = 
buildResolvedSchema(containsPrimaryKey);
+        final Schema tableSchema = 
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build();
+        Map<String, String> properties = buildCatalogTableProperties(true);
 
         catalog.createTable(
                 new ObjectPath(databaseName, "T1"),
@@ -97,21 +86,62 @@ public class CatalogConstraintTest {
         RelNode t1 = TableTestUtil.toRelNode(tEnv.sqlQuery("select * from 
T1"));
         FlinkRelMetadataQuery mq =
                 
FlinkRelMetadataQuery.reuseOrCreate(t1.getCluster().getMetadataQuery());
+        if (containsPrimaryKey) {
+            
assertThat(mq.getUniqueKeys(t1)).isEqualTo(ImmutableSet.of(ImmutableBitSet.of(1)));
+        } else {
+            assertThat(mq.getUniqueKeys(t1)).isEqualTo(ImmutableSet.of());
+        }
+    }
+
+    @ParameterizedTest()
+    @ValueSource(booleans = {true, false})
+    void testWithImmutableColsConstraint(boolean 
containsImmutableColsConstraint) throws Exception {
+        final Schema.Builder schemaBuilder =
+                
Schema.newBuilder().fromResolvedSchema(buildResolvedSchema(true));
+        if (containsImmutableColsConstraint) {
+            schemaBuilder.immutableColumnsNamed("immutable_constraint", 
List.of("a"));
+        }
+        final Schema tableSchema = schemaBuilder.build();
+        Map<String, String> properties = buildCatalogTableProperties(false);
+
+        catalog.createTable(
+                new ObjectPath(databaseName, "T1"),
+                CatalogTable.newBuilder()
+                        .schema(tableSchema)
+                        .comment("")
+                        .options(properties)
+                        .build(),
+                false);
+
+        RelNode t1 = TableTestUtil.toRelNode(tEnv.sqlQuery("select * from 
T1"));
+
+        FlinkRelMetadataQuery mq =
+                
FlinkRelMetadataQuery.reuseOrCreate(t1.getCluster().getMetadataQuery());
+        // unique keys are not changed
         
assertThat(mq.getUniqueKeys(t1)).isEqualTo(ImmutableSet.of(ImmutableBitSet.of(1)));
+
+        if (containsImmutableColsConstraint) {
+            assertThat((Iterable<? extends Integer>) 
mq.getImmutableColumns(t1))
+                    .isEqualTo(ImmutableBitSet.of(0, 1));
+            assertThat(mq.getUpsertKeys(t1))
+                    .isEqualTo(ImmutableSet.of(ImmutableBitSet.of(1), 
ImmutableBitSet.of(0, 1)));
+        } else {
+            assertThat((Iterable<? extends Integer>) 
mq.getImmutableColumns(t1))
+                    .isEqualTo(ImmutableBitSet.of(1));
+            
assertThat(mq.getUpsertKeys(t1)).isEqualTo(ImmutableSet.of(ImmutableBitSet.of(1)));
+        }
     }
 
     @Test
-    void testWithoutPrimaryKey() throws Exception {
-
+    void testImmutableColsConstraintDefinedOnSourceWithDelete() throws 
Exception {
         final Schema tableSchema =
                 Schema.newBuilder()
-                        .fromResolvedSchema(
-                                ResolvedSchema.of(
-                                        Column.physical("a", 
DataTypes.BIGINT()),
-                                        Column.physical("b", 
DataTypes.STRING()),
-                                        Column.physical("c", DataTypes.INT())))
+                        .fromResolvedSchema(buildResolvedSchema(true))
+                        .immutableColumnsNamed("immutable_constraint", 
List.of("a"))
                         .build();
-        Map<String, String> properties = buildCatalogTableProperties();
+
+        Map<String, String> properties = buildCatalogTableProperties(false);
+        properties.put("changelog-mode", "I,UA,UB,D");
 
         catalog.createTable(
                 new ObjectPath(databaseName, "T1"),
@@ -122,21 +152,44 @@ public class CatalogConstraintTest {
                         .build(),
                 false);
 
-        RelNode t1 = TableTestUtil.toRelNode(tEnv.sqlQuery("select * from 
T1"));
-        FlinkRelMetadataQuery mq =
-                
FlinkRelMetadataQuery.reuseOrCreate(t1.getCluster().getMetadataQuery());
-        assertThat(mq.getUniqueKeys(t1)).isEqualTo(ImmutableSet.of());
+        assertThatThrownBy(() -> TableTestUtil.toRelNode(tEnv.sqlQuery("select 
* from T1")))
+                .isInstanceOf(ValidationException.class)
+                .hasMessageContaining(
+                        "The immutable constraint cannot be defined "
+                                + "on the table with changelog mode 
[DELETE].");
     }
 
-    private Map<String, String> buildCatalogTableProperties() {
-        Map<String, String> properties = new HashMap<>();
-        properties.put("connector.type", "filesystem");
-        properties.put("connector.property-version", "1");
-        properties.put("connector.path", "/path/to/csv");
+    private ResolvedSchema buildResolvedSchema(boolean containsPrimaryKey) {
+        return containsPrimaryKey
+                ? new ResolvedSchema(
+                        Arrays.asList(
+                                Column.physical("a", DataTypes.STRING()),
+                                Column.physical("b", 
DataTypes.BIGINT().notNull()),
+                                Column.physical("c", DataTypes.INT())),
+                        Collections.emptyList(),
+                        UniqueConstraint.primaryKey(
+                                "primary_constraint", 
Collections.singletonList("b")),
+                        Collections.singletonList(DefaultIndex.newIndex("idx", 
List.of("a", "b"))),
+                        null)
+                : ResolvedSchema.of(
+                        Column.physical("a", DataTypes.BIGINT()),
+                        Column.physical("b", DataTypes.STRING()),
+                        Column.physical("c", DataTypes.INT()));
+    }
 
-        properties.put("format.type", "csv");
-        properties.put("format.property-version", "1");
-        properties.put("format.field-delimiter", ";");
+    private Map<String, String> buildCatalogTableProperties(boolean 
legacyTable) {
+        Map<String, String> properties = new HashMap<>();
+        if (legacyTable) {
+            properties.put("connector.type", "filesystem");
+            properties.put("connector.property-version", "1");
+            properties.put("connector.path", "/path/to/csv");
+
+            properties.put("format.type", "csv");
+            properties.put("format.property-version", "1");
+            properties.put("format.field-delimiter", ";");
+        } else {
+            properties.put("connector", "values");
+        }
 
         return properties;
     }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMdImmutableColumnsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMdImmutableColumnsTest.java
new file mode 100644
index 00000000000..02d49885739
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/metadata/FlinkRelMdImmutableColumnsTest.java
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.metadata;
+
+import org.apache.flink.table.planner.calcite.FlinkContext;
+import 
org.apache.flink.table.planner.plan.nodes.calcite.LogicalWatermarkAssigner;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalLookupJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalMiniBatchAssigner;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Calc;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Option;
+
+import static org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+/** Tests for {@link FlinkRelMdImmutableColumns}. */
+public class FlinkRelMdImmutableColumnsTest extends FlinkRelMdHandlerTestBase {
+
+    // 
-------------------------------------------------------------------------------------
+    // TableScan
+    // 
-------------------------------------------------------------------------------------
+
+    @Test
+    public void testGetImmutableColumnsOnTableScanWithImmutableCols() {
+        // Projected rowType (a,c,d): PK(a)=0, immutable(c)=1, immutable(d)=2 
→ {0, 1, 2}
+        assertEquals(
+                ImmutableBitSet.of(0, 1, 2),
+                mq().getImmutableColumns(tableWithImmutableColsLogicalScan()));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnTableScanWithPkOnly() {
+        // TableSourceTable1: PK(a,b)={0,1}, no immutable constraint → {0, 1}
+        RelNode scan = relBuilder().scan("TableSourceTable1").build();
+        assertEquals(ImmutableBitSet.of(0, 1), mq().getImmutableColumns(scan));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnTableScanWithSingleColPk() {
+        // TableSourceTable2: PK(b)={1} → {1}
+        RelNode scan = relBuilder().scan("TableSourceTable2").build();
+        assertEquals(ImmutableBitSet.of(1), mq().getImmutableColumns(scan));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnTableScanWithoutPk() {
+        // TableSourceTable3: no PK → null
+        RelNode scan = relBuilder().scan("TableSourceTable3").build();
+        assertNull(mq().getImmutableColumns(scan));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnNonTableSourceTableScan() {
+        // student uses MockMetaTable (not TableSourceTable) → null
+        assertNull(mq().getImmutableColumns(studentLogicalScan()));
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Project
+    // 
-------------------------------------------------------------------------------------
+
+    @Test
+    public void testGetImmutableColumnsOnProjectKeepsAll() {
+        // Project: a(0), c(1), d(2) → output immutable = {0, 1, 2}
+        relBuilder().push(tableWithImmutableColsLogicalScan());
+        RelNode project =
+                relBuilder()
+                        .project(
+                                relBuilder().field(0), // a → out 0
+                                relBuilder().field(1), // c → out 1
+                                relBuilder().field(2)) // d → out 2
+                        .build();
+        assertEquals(ImmutableBitSet.of(0, 1, 2), 
mq().getImmutableColumns(project));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnProjectDropsImmutableCols() {
+        // Project with only expressions (no direct field refs) → none tracked 
→ empty
+        relBuilder().push(tableWithImmutableColsLogicalScan());
+        RelNode project =
+                relBuilder()
+                        .project(
+                                relBuilder()
+                                        .call(
+                                                SqlStdOperatorTable.PLUS,
+                                                relBuilder().field(0),
+                                                relBuilder().literal(1)))
+                        .build();
+        assertEquals(ImmutableBitSet.of(), mq().getImmutableColumns(project));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnProjectWithDuplicateRefs() {
+        // Project: a(0), a(0), c(1) → a maps to {0, 1}, c maps to {2} → {0, 
1, 2}
+        relBuilder().push(tableWithImmutableColsLogicalScan());
+        RelNode project =
+                relBuilder()
+                        .project(
+                                relBuilder().field(0), // a → out 0
+                                relBuilder().field(0), // a → out 1
+                                relBuilder().field(1)) // c → out 2
+                        .build();
+        assertEquals(ImmutableBitSet.of(0, 1, 2), 
mq().getImmutableColumns(project));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnProjectWithExpression() {
+        // Project: a(0), a+1 (expression), c(1)
+        // Expressions (non-RexInputRef) are not tracked → a+1 is not immutable
+        relBuilder().push(tableWithImmutableColsLogicalScan());
+        RelNode project =
+                relBuilder()
+                        .project(
+                                relBuilder().field(0), // a → out 0
+                                relBuilder()
+                                        .call(
+                                                SqlStdOperatorTable.PLUS,
+                                                relBuilder().field(0),
+                                                relBuilder().literal(1)), // 
a+1 → out 1
+                                relBuilder().field(1)) // c → out 2
+                        .build();
+        // a→0, c→2 → {0, 2}
+        assertEquals(ImmutableBitSet.of(0, 2), 
mq().getImmutableColumns(project));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnProjectNullInput() {
+        // Project on student (MockMetaTable immutable = null) → null
+        relBuilder().push(studentLogicalScan());
+        RelNode project =
+                relBuilder().project(relBuilder().field(0), 
relBuilder().field(1)).build();
+        assertNull(mq().getImmutableColumns(project));
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Filter
+    // 
-------------------------------------------------------------------------------------
+
+    @Test
+    public void testGetImmutableColumnsOnFilter() {
+        // Filter passes through immutable columns unchanged
+        relBuilder().push(tableWithImmutableColsLogicalScan());
+        RelNode filter =
+                relBuilder()
+                        .filter(
+                                relBuilder()
+                                        .call(
+                                                SqlStdOperatorTable.LESS_THAN,
+                                                relBuilder().field(0),
+                                                relBuilder().literal(100)))
+                        .build();
+        assertEquals(ImmutableBitSet.of(0, 1, 2), 
mq().getImmutableColumns(filter));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnFilterNullInput() {
+        // Filter on student (MockMetaTable immutable = null) → null
+        assertNull(mq().getImmutableColumns(logicalFilter()));
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Calc
+    // 
-------------------------------------------------------------------------------------
+
+    @Test
+    public void testGetImmutableColumnsOnCalc() {
+        // Calc with projection [a(0), c(2)] and filter a < 100
+        RelNode input = tableWithImmutableColsLogicalScan();
+        relBuilder().push(input);
+
+        RexNode proj0 = relBuilder().field(0); // a
+        RexNode proj1 = relBuilder().field(1); // c
+        List<RexNode> projects = Arrays.asList(proj0, proj1);
+
+        RexNode condition =
+                relBuilder()
+                        .call(
+                                SqlStdOperatorTable.LESS_THAN,
+                                relBuilder().field(0),
+                                relBuilder().literal(100));
+        List<RexNode> conditions = Collections.singletonList(condition);
+
+        // Build a temp project to get the output row type
+        RelNode tempProject = relBuilder().project(proj0, proj1).build();
+        RelDataType outputRowType = tempProject.getRowType();
+
+        Calc calc = createLogicalCalc(input, outputRowType, projects, 
conditions);
+
+        // Input immutable: {0, 1, 2} → a→0, c→1 → {0, 1}
+        assertEquals(ImmutableBitSet.of(0, 1), mq().getImmutableColumns(calc));
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // WatermarkAssigner
+    // 
-------------------------------------------------------------------------------------
+
+    @Test
+    public void testGetImmutableColumnsOnWatermarkAssigner() {
+        // Build a WatermarkAssigner on top of the immutable scan, using 
rowtime column (index 3)
+        RelNode input = tableWithImmutableColsLogicalScan();
+        FlinkContext flinkContext = unwrapContext(cluster());
+        RexNode watermarkExpr =
+                flinkContext
+                        .getRexFactory()
+                        .createSqlToRexConverter(input.getRowType(), null)
+                        .convertToRexNode("rowtime - INTERVAL '10' SECOND");
+        RelNode watermarkAssigner =
+                LogicalWatermarkAssigner.create(
+                        cluster(), input, Collections.emptyList(), 3, 
watermarkExpr);
+        // Pass through: {0, 1, 2}
+        assertEquals(ImmutableBitSet.of(0, 1, 2), 
mq().getImmutableColumns(watermarkAssigner));
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // MiniBatchAssigner
+    // 
-------------------------------------------------------------------------------------
+
+    @Test
+    public void testGetImmutableColumnsOnMiniBatchAssigner() {
+        RelNode input = tableWithImmutableColsStreamScan();
+        RelNode miniBatchAssigner =
+                new StreamPhysicalMiniBatchAssigner(cluster(), 
streamPhysicalTraits(), input);
+        // Pass through: {0, 1, 2}
+        assertEquals(ImmutableBitSet.of(0, 1, 2), 
mq().getImmutableColumns(miniBatchAssigner));
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Exchange
+    // 
-------------------------------------------------------------------------------------
+
+    @Test
+    public void testGetImmutableColumnsOnExchange() {
+        RelNode scan = tableWithImmutableColsStreamScan();
+        FlinkRelDistribution hash = FlinkRelDistribution.hash(new int[] {0}, 
true);
+        RelNode exchange =
+                new StreamPhysicalExchange(
+                        cluster(), streamPhysicalTraits().replace(hash), scan, 
hash);
+        // Pass through: {0, 1, 2}
+        assertEquals(ImmutableBitSet.of(0, 1, 2), 
mq().getImmutableColumns(exchange));
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // ChangelogNormalize
+    // 
-------------------------------------------------------------------------------------
+
+    @Test
+    public void testGetImmutableColumnsOnChangelogNormalize() {
+        RelNode scan = tableWithImmutableColsStreamScan();
+        RelNode changelogNormalize =
+                new StreamPhysicalChangelogNormalize(
+                        cluster(),
+                        streamPhysicalTraits(),
+                        scan,
+                        new int[] {0},
+                        null,
+                        false,
+                        new RexNode[] {});
+        // Pass through: {0, 1, 2}
+        assertEquals(ImmutableBitSet.of(0, 1, 2), 
mq().getImmutableColumns(changelogNormalize));
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // DropUpdateBefore
+    // 
-------------------------------------------------------------------------------------
+
+    @Test
+    public void testGetImmutableColumnsOnDropUpdateBefore() {
+        RelNode scan = tableWithImmutableColsStreamScan();
+        RelNode dropUpdateBefore =
+                new StreamPhysicalDropUpdateBefore(cluster(), 
streamPhysicalTraits(), scan);
+        // Pass through: {0, 1, 2}
+        assertEquals(ImmutableBitSet.of(0, 1, 2), 
mq().getImmutableColumns(dropUpdateBefore));
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Join
+    // 
-------------------------------------------------------------------------------------
+
+    @Test
+    public void testGetImmutableColumnsOnInnerJoin1() {
+        // Left: projected_table_source_table_with_immutable_cols 
(a,c,d,rowtime) →
+        // immutable={0,1,2}
+        // Right: TableSourceTable1 (a,b,c,d) → immutable={0,1}
+        // Right shifted by 4: {4,5}
+        // Union: {0,1,2,4,5}
+        RelNode join =
+                relBuilder()
+                        
.scan("projected_table_source_table_with_immutable_cols")
+                        .scan("TableSourceTable1")
+                        .join(
+                                JoinRelType.INNER,
+                                relBuilder()
+                                        .call(
+                                                SqlStdOperatorTable.EQUALS,
+                                                relBuilder().field(2, 0, 0),
+                                                relBuilder().field(2, 1, 0)))
+                        .build();
+        assertEquals(ImmutableBitSet.of(0, 1, 2, 4, 5), 
mq().getImmutableColumns(join));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnInnerJoin2() {
+        // Left: projected_table_source_table_with_immutable_cols → {0,1,2}
+        // Right: projected_table_source_table_with_immutable_cols → {0,1,2} 
shifted by 4 → {4,5,6}
+        // Union: {0,1,2,4,5,6}
+        RelNode join =
+                relBuilder()
+                        
.scan("projected_table_source_table_with_immutable_cols")
+                        
.scan("projected_table_source_table_with_immutable_cols")
+                        .join(
+                                JoinRelType.INNER,
+                                relBuilder()
+                                        .call(
+                                                SqlStdOperatorTable.EQUALS,
+                                                relBuilder().field(2, 0, 0),
+                                                relBuilder().field(2, 1, 0)))
+                        .build();
+        assertEquals(ImmutableBitSet.of(0, 1, 2, 4, 5, 6), 
mq().getImmutableColumns(join));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnInnerJoinWhileOneSideNoUpsertKey() {
+        // Left: TableSourceTable3 → immutable & pk = null
+        // Right: projected_table_source_table_with_immutable_cols → {0,1,2} 
shifted by 7 → {7,8,9}
+        // Join has no upsert keys → guarded to null
+        RelNode join =
+                relBuilder()
+                        .scan("TableSourceTable3")
+                        
.scan("projected_table_source_table_with_immutable_cols")
+                        .join(
+                                JoinRelType.INNER,
+                                relBuilder()
+                                        .call(
+                                                SqlStdOperatorTable.EQUALS,
+                                                relBuilder().field(2, 0, 0),
+                                                relBuilder().field(2, 1, 0)))
+                        .build();
+        assertNull(mq().getImmutableColumns(join));
+
+        // Left: projected_table_source_table_with_immutable_cols → {0,1,2}
+        // Right: TableSourceTable3 → immutable & pk = null
+        // Join has no upsert keys → guarded to null
+        join =
+                relBuilder()
+                        
.scan("projected_table_source_table_with_immutable_cols")
+                        .scan("TableSourceTable3")
+                        .join(
+                                JoinRelType.INNER,
+                                relBuilder()
+                                        .call(
+                                                SqlStdOperatorTable.EQUALS,
+                                                relBuilder().field(2, 0, 0),
+                                                relBuilder().field(2, 1, 0)))
+                        .build();
+
+        assertNull(mq().getImmutableColumns(join));
+
+        // Left: TableSourceTable3 → immutable & pk = null
+        // Right: TableSourceTable3 → immutable & pk = null
+        // Join has no upsert keys → guarded to null
+        join =
+                relBuilder()
+                        .scan("TableSourceTable3")
+                        .scan("TableSourceTable3")
+                        .join(
+                                JoinRelType.INNER,
+                                relBuilder()
+                                        .call(
+                                                SqlStdOperatorTable.EQUALS,
+                                                relBuilder().field(2, 0, 0),
+                                                relBuilder().field(2, 1, 0)))
+                        .build();
+
+        assertNull(mq().getImmutableColumns(join));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnLeftJoin() {
+        // LEFT JOIN: right side may produce nulls → ignore right immutable
+        // Left: projected_table_source_table_with_immutable_cols → {0,1,2}
+        // Result: {0,1,2}
+        RelNode join =
+                relBuilder()
+                        
.scan("projected_table_source_table_with_immutable_cols")
+                        
.scan("projected_table_source_table_with_immutable_cols")
+                        .join(
+                                JoinRelType.LEFT,
+                                relBuilder()
+                                        .call(
+                                                SqlStdOperatorTable.EQUALS,
+                                                relBuilder().field(2, 0, 0),
+                                                relBuilder().field(2, 1, 0)))
+                        .build();
+        assertEquals(ImmutableBitSet.of(0, 1, 2), 
mq().getImmutableColumns(join));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnRightJoin() {
+        // RIGHT JOIN: left side may produce nulls → ignore left immutable
+        // Right: projected_table_source_table_with_immutable_cols → shifted 
by 4 → {4,5,6}
+        // Result: {4,5,6}
+        RelNode join =
+                relBuilder()
+                        
.scan("projected_table_source_table_with_immutable_cols")
+                        
.scan("projected_table_source_table_with_immutable_cols")
+                        .join(
+                                JoinRelType.RIGHT,
+                                relBuilder()
+                                        .call(
+                                                SqlStdOperatorTable.EQUALS,
+                                                relBuilder().field(2, 0, 0),
+                                                relBuilder().field(2, 1, 0)))
+                        .build();
+        assertEquals(ImmutableBitSet.of(4, 5, 6), 
mq().getImmutableColumns(join));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnFullJoin() {
+        // FULL JOIN: both sides may produce nulls → both ignored → null
+        RelNode join =
+                relBuilder()
+                        
.scan("projected_table_source_table_with_immutable_cols")
+                        
.scan("projected_table_source_table_with_immutable_cols")
+                        .join(
+                                JoinRelType.FULL,
+                                relBuilder()
+                                        .call(
+                                                SqlStdOperatorTable.EQUALS,
+                                                relBuilder().field(2, 0, 0),
+                                                relBuilder().field(2, 1, 0)))
+                        .build();
+        assertNull(mq().getImmutableColumns(join));
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Lookup Join
+    // 
-------------------------------------------------------------------------------------
+
+    @Test
+    public void testGetImmutableColumnsOnLookupJoinWithImmutableCols() {
+        // Left: projected_table_source_table_with_immutable_cols → {0,1,2}
+        // Right projected_table_source_table_with_immutable_cols = {0,1,2}, 
ignored
+        // Result: {0,1,2}
+        TableScan src = tableWithImmutableColsStreamScan();
+        StreamPhysicalLookupJoin lookupJoin =
+                getStreamLookupJoinsWithImmutableCols(
+                        src,
+                        src.getTable(),
+                        JoinInfo.of(ImmutableIntList.of(0), 
ImmutableIntList.of(0)),
+                        JoinRelType.INNER,
+                        Option.empty());
+        assertEquals(ImmutableBitSet.of(0, 1, 2), 
mq().getImmutableColumns(lookupJoin));
+
+        lookupJoin =
+                getStreamLookupJoinsWithImmutableCols(
+                        src,
+                        src.getTable(),
+                        JoinInfo.of(ImmutableIntList.of(0), 
ImmutableIntList.of(0)),
+                        JoinRelType.LEFT,
+                        Option.empty());
+        assertEquals(ImmutableBitSet.of(0, 1, 2), 
mq().getImmutableColumns(lookupJoin));
+
+        // join without lookup side's pk
+        // Result: null
+        lookupJoin =
+                getStreamLookupJoinsWithImmutableCols(
+                        src,
+                        src.getTable(),
+                        JoinInfo.of(ImmutableIntList.of(0), 
ImmutableIntList.of(1)),
+                        JoinRelType.LEFT,
+                        Option.empty());
+        assertNull(mq().getImmutableColumns(lookupJoin));
+    }
+
+    // 
-------------------------------------------------------------------------------------
+    // Default (catch-all)
+    // 
-------------------------------------------------------------------------------------
+
+    @Test
+    public void testGetImmutableColumnsOnDefault() {
+        // TestRel has no specific handler → catch-all returns null
+        assertNull(mq().getImmutableColumns(testRel()));
+    }
+
+    @Test
+    public void testGetImmutableColumnsOnValues() {
+        // LogicalValues has no specific handler → catch-all returns null
+        assertNull(mq().getImmutableColumns(logicalValues()));
+        assertNull(mq().getImmutableColumns(emptyValues()));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ImmutableColConstraintTestUtils.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ImmutableColConstraintTestUtils.java
new file mode 100644
index 00000000000..cdf9e6efcd0
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ImmutableColConstraintTestUtils.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ImmutableColumnsConstraint;
+import org.apache.flink.table.catalog.ObjectPath;
+
+import java.util.List;
+
+/**
+ * Utils about {@link ImmutableColumnsConstraint} for tests.
+ *
+ * <p>This utils can be removed after we support syntax to define immutable 
columns constraint in
+ * DDL.
+ */
+public class ImmutableColConstraintTestUtils {
+
+    public static void addImmutableColConstraint(
+            Catalog catalog, String databaseName, String tableName, String... 
immutableCols)
+            throws Exception {
+        ObjectPath tablePath = new ObjectPath(databaseName, tableName);
+        CatalogTable originalTable = (CatalogTable) 
catalog.getTable(tablePath);
+        catalog.dropTable(tablePath, false);
+
+        Schema.UnresolvedImmutableColumns immutableColumns =
+                new Schema.UnresolvedImmutableColumns("imt", 
List.of(immutableCols));
+
+        Schema schema = originalTable.getUnresolvedSchema();
+        schema =
+                new Schema(
+                        schema.getColumns(),
+                        schema.getWatermarkSpecs(),
+                        schema.getPrimaryKey().orElse(null),
+                        schema.getIndexes(),
+                        immutableColumns);
+
+        CatalogTable newTable =
+                CatalogTable.newBuilder()
+                        .schema(schema)
+                        .comment(originalTable.getComment())
+                        .partitionKeys(originalTable.getPartitionKeys())
+                        .options(originalTable.getOptions())
+                        .build();
+
+        catalog.createTable(tablePath, newTable, false);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
index b79d0e2d3d6..eeebf3f02d8 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.xml
@@ -708,6 +708,72 @@ Sink(table=[default_catalog.default_database.sink], 
fields=[city_id, city_name,
          +- ChangelogNormalize(key=[id])
             +- Exchange(distribution=[hash[id]])
                +- TableSourceScan(table=[[default_catalog, default_database, 
source_city]], fields=[id, city_name])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinOutputUpsertKeyInSinkPkWhileJoinOnImmutableCols">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[id, 
city_id, city_name, city_detail])
++- LogicalProject(id=[$0], city_id=[$2], city_name=[$1], city_detail=[$4])
+   +- LogicalJoin(condition=[=($1, $3)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
source_city]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
source_city_detail]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[id, city_id, 
city_name, city_detail], conflictStrategy=[DEDUPLICATE], changelogMode=[NONE])
++- Calc(select=[id, city_id, city_name, city_detail], changelogMode=[I,UA])
+   +- Join(joinType=[InnerJoin], where=[=(city_name, city_name0)], select=[id, 
city_name, city_id, city_name0, city_detail], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[HasUniqueKey], changelogMode=[I,UA])
+      :- Exchange(distribution=[hash[city_name]], changelogMode=[I,UA])
+      :  +- DropUpdateBefore(changelogMode=[I,UA])
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
source_city]], fields=[id, city_name], changelogMode=[I,UB,UA])
+      +- Exchange(distribution=[hash[city_name]], changelogMode=[I,UA])
+         +- DropUpdateBefore(changelogMode=[I,UA])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
source_city_detail]], fields=[city_id, city_name, city_detail], 
changelogMode=[I,UB,UA])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[id, city_id, 
city_name, city_detail], conflictStrategy=[DEDUPLICATE])
++- Calc(select=[id, city_id, city_name, city_detail])
+   +- Join(joinType=[InnerJoin], where=[(city_name = city_name0)], select=[id, 
city_name, city_id, city_name0, city_detail], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[HasUniqueKey])
+      :- Exchange(distribution=[hash[city_name]])
+      :  +- DropUpdateBefore
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
source_city]], fields=[id, city_name])
+      +- Exchange(distribution=[hash[city_name]])
+         +- DropUpdateBefore
+            +- TableSourceScan(table=[[default_catalog, default_database, 
source_city_detail]], fields=[city_id, city_name, city_detail])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testJoinOutputUpsertKeyInSinkPkWhileJoinOnPartOfImmutableCols">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.sink], fields=[id, 
city_id, city_name, city_detail])
++- LogicalProject(id=[$0], city_id=[$3], city_name=[$1], city_detail=[$6])
+   +- LogicalJoin(condition=[=($1, $4)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
source_city]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
source_city_detail]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[id, city_id, 
city_name, city_detail], conflictStrategy=[DEDUPLICATE], changelogMode=[NONE])
++- Calc(select=[id, city_id, city_name, city_detail], changelogMode=[I,UA])
+   +- Join(joinType=[InnerJoin], where=[=(city_name, city_name0)], select=[id, 
city_name, city_id, city_name0, city_detail], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[HasUniqueKey], changelogMode=[I,UA])
+      :- Exchange(distribution=[hash[city_name]], changelogMode=[I,UA])
+      :  +- DropUpdateBefore(changelogMode=[I,UA])
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
source_city, project=[id, city_name], metadata=[]]], fields=[id, city_name], 
changelogMode=[I,UB,UA])
+      +- Exchange(distribution=[hash[city_name]], changelogMode=[I,UA])
+         +- DropUpdateBefore(changelogMode=[I,UA])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
source_city_detail, project=[city_id, city_name, city_detail], metadata=[]]], 
fields=[city_id, city_name, city_detail], changelogMode=[I,UB,UA])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.sink], fields=[id, city_id, 
city_name, city_detail], conflictStrategy=[DEDUPLICATE])
++- Calc(select=[id, city_id, city_name, city_detail])
+   +- Join(joinType=[InnerJoin], where=[(city_name = city_name0)], select=[id, 
city_name, city_id, city_name0, city_detail], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[HasUniqueKey])
+      :- Exchange(distribution=[hash[city_name]])
+      :  +- DropUpdateBefore
+      :     +- TableSourceScan(table=[[default_catalog, default_database, 
source_city, project=[id, city_name], metadata=[]]], fields=[id, city_name])
+      +- Exchange(distribution=[hash[city_name]])
+         +- DropUpdateBefore
+            +- TableSourceScan(table=[[default_catalog, default_database, 
source_city_detail, project=[city_id, city_name, city_detail], metadata=[]]], 
fields=[city_id, city_name, city_detail])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 3e8bf996630..88dd7214062 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -216,6 +216,15 @@ class FlinkRelMdHandlerTestBase {
   protected lazy val temporalTableStreamScan: StreamPhysicalDataStreamScan =
     createDataStreamScan(ImmutableList.of("TemporalTable4"), 
streamPhysicalTraits)
 
+  protected lazy val tableWithImmutableColsLogicalScan: LogicalTableScan =
+    createTableSourceTable(
+      ImmutableList.of("projected_table_source_table_with_immutable_cols"),
+      logicalTraits)
+  protected lazy val tableWithImmutableColsStreamScan: 
StreamPhysicalDataStreamScan =
+    createTableSourceTable(
+      ImmutableList.of("projected_table_source_table_with_immutable_cols"),
+      streamPhysicalTraits)
+
   private lazy val valuesType = relBuilder.getTypeFactory
     .builder()
     .add("a", SqlTypeName.BIGINT)
@@ -2694,6 +2703,25 @@ class FlinkRelMdHandlerTestBase {
     (batchLookupJoin, streamLookupJoin)
   }
 
+  protected def getStreamLookupJoinsWithImmutableCols(
+      leftInput: RelNode,
+      temporalTable: RelOptTable,
+      joinInfo: JoinInfo,
+      joinType: JoinRelType,
+      calcOnTemporalTable: Option[RexProgram]): StreamPhysicalLookupJoin = {
+    new StreamPhysicalLookupJoin(
+      cluster,
+      streamPhysicalTraits,
+      leftInput,
+      temporalTable,
+      calcOnTemporalTable,
+      joinInfo,
+      joinType,
+      Option.empty[RelHint],
+      false
+    )
+  }
+
   // select * from MyTable1 join MyTable4 on MyTable1.b = MyTable4.a
   protected lazy val logicalInnerJoinOnUniqueKeys: RelNode = relBuilder
     .scan("MyTable1")
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala
index 3b7524e1bd3..996ef4f2de8 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdUpsertKeysTest.scala
@@ -17,17 +17,25 @@
  */
 package org.apache.flink.table.planner.plan.metadata
 
+import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.planner.plan.nodes.calcite.LogicalExpand
-import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregate
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.{StreamPhysicalExchange,
 StreamPhysicalOverAggregate, StreamPhysicalRank, StreamPhysicalTableSourceScan}
 import org.apache.flink.table.planner.plan.schema.TableSourceTable
-import org.apache.flink.table.planner.plan.utils.ExpandUtil
+import org.apache.flink.table.planner.plan.utils.{ExpandUtil, 
RankProcessStrategy}
+import org.apache.flink.table.runtime.operators.rank.{ConstantRankRange, 
RankType}
 
 import com.google.common.collect.{ImmutableList, ImmutableSet}
 import org.apache.calcite.prepare.CalciteCatalogReader
-import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataTypeFieldImpl
+import org.apache.calcite.rel.{RelCollations, RelFieldCollation, RelNode}
+import org.apache.calcite.rel.core.{JoinRelType, Window}
 import org.apache.calcite.rel.hint.RelHint
+import org.apache.calcite.rex.{RexInputRef, RexNode, RexWindowBounds}
 import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
-import org.apache.calcite.sql.fun.SqlStdOperatorTable.{EQUALS, LESS_THAN}
+import org.apache.calcite.sql.SqlWindow
+import org.apache.calcite.sql.fun.SqlStdOperatorTable.{EQUALS, LESS_THAN, MAX}
+import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.util.ImmutableBitSet
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
@@ -477,6 +485,182 @@ class FlinkRelMdUpsertKeysTest extends 
FlinkRelMdHandlerTestBase {
     assertEquals(toBitSet(Array(0)), mq.getUpsertKeys(intermediateScan).toSet)
   }
 
+  @Test
+  def testGetUpsertKeysOnTableScanWithImmutableCols(): Unit = {
+    // Immutable columns: {0, 1, 2} (PK 'a' + immutable 'c', 'd')
+    assertEquals(
+      toBitSet(Array(0), Array(0, 1, 2)),
+      mq.getUpsertKeys(tableWithImmutableColsLogicalScan).toSet)
+  }
+
+  @Test
+  def testGetUpsertKeysOnExchangeWithImmutableCols(): Unit = {
+    // Hash exchange on column 1 (c, immutable)
+    val hash1 = FlinkRelDistribution.hash(Array(1), requireStrict = true)
+    val exchange1 = new StreamPhysicalExchange(
+      cluster,
+      streamPhysicalTraits.replace(hash1),
+      tableWithImmutableColsStreamScan,
+      hash1)
+    assertEquals(toBitSet(Array(0), Array(0, 1, 2)), 
mq.getUpsertKeys(exchange1).toSet)
+
+    // Hash exchange on column 3 (rowtime, NOT immutable)
+    val hash3 = FlinkRelDistribution.hash(Array(3), requireStrict = true)
+    val exchange3 = new StreamPhysicalExchange(
+      cluster,
+      streamPhysicalTraits.replace(hash3),
+      tableWithImmutableColsStreamScan,
+      hash3)
+    assertEquals(toBitSet(), mq.getUpsertKeys(exchange3).toSet)
+  }
+
+  @Test
+  def testGetUpsertKeysOnSortWithImmutableCols(): Unit = {
+    // Sort on column 1 (c, immutable)
+    relBuilder.push(tableWithImmutableColsLogicalScan)
+    val sort1 = relBuilder.sort(relBuilder.field(1)).build()
+    assertEquals(toBitSet(Array(0), Array(0, 1, 2)), 
mq.getUpsertKeys(sort1).toSet)
+
+    // Sort on column 3 (rowtime, NOT immutable)
+    relBuilder.push(tableWithImmutableColsLogicalScan)
+    val sort3 = relBuilder.sort(relBuilder.field(3)).build()
+    assertEquals(toBitSet(), mq.getUpsertKeys(sort3).toSet)
+  }
+
+  @Test
+  def testGetUpsertKeysOnRankWithImmutableCols(): Unit = {
+    def buildRank(partitionKey: Int): RelNode = {
+      val hash = FlinkRelDistribution.hash(Array(partitionKey), requireStrict 
= true)
+      val exchange = new StreamPhysicalExchange(
+        cluster,
+        tableWithImmutableColsStreamScan.getTraitSet.replace(hash),
+        tableWithImmutableColsStreamScan,
+        hash)
+      new StreamPhysicalRank(
+        cluster,
+        streamPhysicalTraits,
+        exchange,
+        ImmutableBitSet.of(partitionKey),
+        RelCollations.of(2),
+        RankType.RANK,
+        new ConstantRankRange(1, 5),
+        new RelDataTypeFieldImpl("rk", 4, longType),
+        true,
+        RankProcessStrategy.UNDEFINED_STRATEGY,
+        false
+      )
+    }
+
+    // Rank partitioned by column 1 (c, immutable)
+    val rank1 = buildRank(1)
+    assertEquals(toBitSet(Array(0), Array(0, 1, 2)), 
mq.getUpsertKeys(rank1).toSet)
+
+    // Rank partitioned by column 3 (rowtime, NOT immutable)
+    val rank3 = buildRank(3)
+    assertEquals(toBitSet(), mq.getUpsertKeys(rank3).toSet)
+  }
+
+  @Test
+  def testGetUpsertKeysOnOverAggWithImmutableCols(): Unit = {
+    def buildOverAgg(partitionKey: Int): RelNode = {
+      val inputRowType = tableWithImmutableColsStreamScan.getRowType
+      val rowtimeType = inputRowType.getFieldList.get(3).getType
+
+      val group = new Window.Group(
+        ImmutableBitSet.of(partitionKey),
+        true,
+        RexWindowBounds.create(SqlWindow.createUnboundedPreceding(new 
SqlParserPos(0, 0)), null),
+        RexWindowBounds.create(SqlWindow.createCurrentRow(new SqlParserPos(0, 
0)), null),
+        RelCollations.of(
+          new RelFieldCollation(
+            2,
+            RelFieldCollation.Direction.ASCENDING,
+            RelFieldCollation.NullDirection.FIRST)),
+        ImmutableList.of(
+          new Window.RexWinAggCall(
+            MAX,
+            rowtimeType,
+            ImmutableList.of[RexNode](new RexInputRef(3, rowtimeType)),
+            0,
+            false,
+            false
+          )
+        )
+      )
+
+      val outputBuilder = typeFactory.builder()
+      inputRowType.getFieldList.forEach(f => outputBuilder.add(f.getName, 
f.getType))
+      outputBuilder.add("max_rowtime", rowtimeType)
+      val outputRowType = outputBuilder.build()
+
+      val logicalOverAgg = new FlinkLogicalOverAggregate(
+        cluster,
+        flinkLogicalTraits,
+        tableWithImmutableColsLogicalScan,
+        ImmutableList.of(),
+        outputRowType,
+        ImmutableList.of(group)
+      )
+
+      val hash = FlinkRelDistribution.hash(Array(partitionKey), requireStrict 
= true)
+      val exchange = new StreamPhysicalExchange(
+        cluster,
+        tableWithImmutableColsStreamScan.getTraitSet.replace(hash),
+        tableWithImmutableColsStreamScan,
+        hash)
+
+      new StreamPhysicalOverAggregate(
+        cluster,
+        streamPhysicalTraits,
+        exchange,
+        outputRowType,
+        logicalOverAgg
+      )
+    }
+
+    // Over agg partitioned by column 1 (c, immutable)
+    val over1 = buildOverAgg(1)
+    assertEquals(toBitSet(Array(0), Array(0, 1, 2)), 
mq.getUpsertKeys(over1).toSet)
+
+    // Over agg partitioned by column 3 (rowtime, NOT immutable)
+    val over3 = buildOverAgg(3)
+    assertEquals(toBitSet(), mq.getUpsertKeys(over3).toSet)
+  }
+
+  @Test
+  def testGetUpsertKeysOnSemiAntiJoinWithImmutableCols(): Unit = {
+    // SEMI join on left.c(1) = right.c(1)
+    // Left upsert keys: {{0}, {0,1,2}}, left immutable: {0,1,2}
+    val join1 = relBuilder
+      .scan("projected_table_source_table_with_immutable_cols")
+      .scan("projected_table_source_table_with_immutable_cols")
+      .join(
+        JoinRelType.SEMI,
+        relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 
1, 1)))
+      .build()
+    assertEquals(toBitSet(Array(0), Array(0, 1, 2)), 
mq.getUpsertKeys(join1).toSet)
+  }
+
+  @Test
+  def testGetUpsertKeysOnInnerJoinWithImmutableCols(): Unit = {
+    // Inner join on left.c(1) = right.c(1)
+    // Both sides: upsert keys = {{0}, {0,1,2}}, immutable = {0,1,2}
+    // filterKeys on both sides with join key {1}: both retain {{0}, {0,1,2}}
+    // Neither side is unique on {1}, so only concatenated keys survive
+    // Right shifted by 4: {{4}, {4,5,6}}
+    // Concat: {0}x{4}, {0}x{4,5,6}, {0,1,2}x{4}, {0,1,2}x{4,5,6}
+    val join = relBuilder
+      .scan("projected_table_source_table_with_immutable_cols")
+      .scan("projected_table_source_table_with_immutable_cols")
+      .join(
+        JoinRelType.INNER,
+        relBuilder.call(EQUALS, relBuilder.field(2, 0, 1), relBuilder.field(2, 
1, 1)))
+      .build()
+    assertEquals(
+      toBitSet(Array(0, 4), Array(0, 4, 5, 6), Array(0, 1, 2, 4), Array(0, 1, 
2, 4, 5, 6)),
+      mq.getUpsertKeys(join).toSet)
+  }
+
   private def toBitSet(keys: Array[Int]*): Set[ImmutableBitSet] = {
     keys.map(k => ImmutableBitSet.of(k: _*)).toSet
   }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
index 5e2ffd7c58e..8f5f94572e5 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/metadata/MetadataTestUtil.scala
@@ -70,6 +70,9 @@ object MetadataTestUtil {
     rootSchema.add(
       "projected_table_source_table_with_partial_pk",
       createProjectedTableSourceTableWithPartialCompositePrimaryKey())
+    rootSchema.add(
+      "projected_table_source_table_with_immutable_cols",
+      createProjectedTableSourceTableWithImmutableCols())
     rootSchema
   }
 
@@ -477,6 +480,46 @@ object MetadataTestUtil {
       flinkContext)
   }
 
+  private def createProjectedTableSourceTableWithImmutableCols(): Table = {
+    val resolvedSchema = new ResolvedSchema(
+      util.Arrays.asList(
+        Column.physical("a", DataTypes.BIGINT().notNull()),
+        Column.physical("b", DataTypes.INT().notNull()),
+        Column.physical("c", DataTypes.STRING().notNull()),
+        Column.physical("d", DataTypes.BIGINT().notNull()),
+        Column.physical("rowtime", DataTypes.TIMESTAMP(3))
+      ),
+      Collections.emptyList(),
+      UniqueConstraint.primaryKey("PK_1", util.Arrays.asList("a")),
+      Collections.singletonList(DefaultIndex.newIndex("idx", 
Collections.singletonList("a"))),
+      ImmutableColumnsConstraint.immutableColumns("imt", 
util.Arrays.asList("c", "d")))
+
+    val catalogTable = getCatalogTable(resolvedSchema)
+
+    // projected: drop column b, keep a, c, d, rowtime
+    val typeFactory = new 
FlinkTypeFactory(Thread.currentThread().getContextClassLoader)
+    val rowType = typeFactory.buildRelNodeRowType(
+      Seq("a", "c", "d", "rowtime"),
+      Seq(
+        new BigIntType(false),
+        new VarCharType(false, 100),
+        new BigIntType(false),
+        new TimestampType(true, TimestampKind.ROWTIME, 3)))
+
+    new MockTableSourceTable(
+      rowType,
+      new TestTableSource(),
+      true,
+      ContextResolvedTable.temporary(
+        ObjectIdentifier.of(
+          "default_catalog",
+          "default_database",
+          "projected_table_source_table_with_immutable_cols"),
+        new ResolvedCatalogTable(catalogTable, resolvedSchema)
+      ),
+      flinkContext)
+  }
+
   private def getCatalogTable(resolvedSchema: ResolvedSchema) = {
     CatalogTable
       .newBuilder()
@@ -573,4 +616,13 @@ class MockTableSourceTable(
       call: SqlCall,
       parent: SqlNode,
       config: CalciteConnectionConfig): Boolean = false
+
+  def copy(newTableSource: DynamicTableSource, newRowType: RelDataType): 
MockTableSourceTable = {
+    new MockTableSourceTable(
+      newRowType,
+      newTableSource,
+      isStreamingMode,
+      contextResolvedTable,
+      flinkContext)
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
index c4d8e7962c0..f1c72583924 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/JoinTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.stream.sql.join
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.planner.utils.{StreamTableTestUtil, TableFunc1, 
TableTestBase}
+import 
org.apache.flink.table.planner.utils.ImmutableColConstraintTestUtils.addImmutableColConstraint
 
 import org.assertj.core.api.Assertions.assertThatThrownBy
 import org.junit.jupiter.api.Test
@@ -510,6 +511,132 @@ class JoinTest extends TableTestBase {
     )
   }
 
+  @Test
+  def testJoinOutputUpsertKeyInSinkPkWhileJoinOnImmutableCols(): Unit = {
+    val catalog = 
util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get()
+
+    util.tableEnv.executeSql("""
+                               |create table source_city (
+                               | id varchar,
+                               | city_name varchar,
+                               | primary key (id) not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB'
+                               |)
+                               |""".stripMargin)
+    addImmutableColConstraint(catalog, util.tableEnv.getCurrentDatabase, 
"source_city", "city_name")
+
+    util.tableEnv.executeSql("""
+                               |create table source_city_detail (
+                               | city_id varchar,
+                               | city_name varchar,
+                               | city_detail varchar,
+                               | primary key (city_id) not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB'
+                               |)
+                               |""".stripMargin)
+    addImmutableColConstraint(
+      catalog,
+      util.tableEnv.getCurrentDatabase,
+      "source_city_detail",
+      "city_name")
+
+    util.tableEnv.executeSql("""
+                               |create table sink (
+                               | id varchar,
+                               | city_id varchar,
+                               | city_name varchar,
+                               | city_detail varchar,
+                               | primary key (id, city_id) not enforced
+                               |) with (
+                               | 'connector' = 'values'
+                               | ,'sink-insert-only' = 'false'
+                               |)
+                               |""".stripMargin)
+
+    // verify UB should be dropped and no upsertMaterialize
+    util.verifyExplainInsert(
+      """
+        |insert into sink
+        |select t1.id, t2.city_id, t1.city_name, t2.city_detail
+        | from source_city t1
+        | join source_city_detail t2 on t1.city_name = t2.city_name
+        |on conflict do deduplicate
+        |""".stripMargin,
+      ExplainDetail.CHANGELOG_MODE
+    )
+  }
+
+  @Test
+  def testJoinOutputUpsertKeyInSinkPkWhileJoinOnPartOfImmutableCols(): Unit = {
+    val catalog = 
util.tableEnv.getCatalog(util.tableEnv.getCurrentCatalog).get()
+
+    util.tableEnv.executeSql("""
+                               |create table source_city (
+                               | id varchar,
+                               | city_name varchar,
+                               | city_no int,
+                               | primary key (id) not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB'
+                               |)
+                               |""".stripMargin)
+    addImmutableColConstraint(
+      catalog,
+      util.tableEnv.getCurrentDatabase,
+      "source_city",
+      "city_name",
+      "city_no")
+
+    util.tableEnv.executeSql("""
+                               |create table source_city_detail (
+                               | city_id varchar,
+                               | city_name varchar,
+                               | city_no int,
+                               | city_detail varchar,
+                               | primary key (city_id) not enforced
+                               |) with (
+                               | 'connector' = 'values',
+                               | 'changelog-mode' = 'I,UA,UB'
+                               |)
+                               |""".stripMargin)
+    addImmutableColConstraint(
+      catalog,
+      util.tableEnv.getCurrentDatabase,
+      "source_city_detail",
+      "city_name",
+      "city_no")
+
+    util.tableEnv.executeSql("""
+                               |create table sink (
+                               | id varchar,
+                               | city_id varchar,
+                               | city_name varchar,
+                               | city_detail varchar,
+                               | primary key (id, city_id) not enforced
+                               |) with (
+                               | 'connector' = 'values'
+                               | ,'sink-insert-only' = 'false'
+                               |)
+                               |""".stripMargin)
+
+    // verify UB should be dropped and no upsertMaterialize on sink
+    util.verifyExplainInsert(
+      """
+        |insert into sink
+        |select t1.id, t2.city_id, t1.city_name, t2.city_detail
+        | from source_city t1
+        | join source_city_detail t2 on t1.city_name = t2.city_name
+        |on conflict do deduplicate
+        |""".stripMargin,
+      ExplainDetail.CHANGELOG_MODE
+    )
+  }
+
   @Test
   def testInnerJoinWithFilterPushDown(): Unit = {
     util.verifyExecPlan("""


Reply via email to