This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 50d46e11bd3 [FLINK-37475] Drop ChangelogNormalize for piping from 
upsert source to sink
50d46e11bd3 is described below

commit 50d46e11bd37ea3f2c19dd8f9a77c2b2dad4b7aa
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Feb 28 15:08:06 2025 +0100

    [FLINK-37475] Drop ChangelogNormalize for piping from upsert source to sink
    
    * Add information about DELETE to ChangelogMode
    * Adapt FlinkChangelogModeInferenceProgram to remove ChangelogNormalize
      if possible
---
 .../flink/table/connector/ChangelogMode.java       |  50 ++-
 .../ChangelogNormalizeRequirementResolver.java     | 179 +++++++++
 .../StreamNonDeterministicUpdatePlanVisitor.java   |  21 +-
 .../flink/table/planner/plan/trait/DeleteKind.java |  35 ++
 .../table/planner/plan/utils/FlinkRelUtil.java     |  19 +
 .../FlinkChangelogModeInferenceProgram.scala       | 419 +++++++++++++++++++--
 .../table/planner/plan/trait/DeleteKindTrait.scala | 100 +++++
 .../planner/plan/trait/DeleteKindTraitDef.scala    |  50 +++
 .../planner/plan/utils/ChangelogPlanUtils.scala    |  18 +-
 .../planner/factories/TestValuesTableFactory.java  |  50 ++-
 .../nodes/exec/stream/DeletesByKeyPrograms.java    | 307 +++++++++++++++
 .../exec/stream/DeletesByKeySemanticTests.java     |  39 ++
 .../sql/ChangelogNormalizeOptimizationTest.java    | 301 +++++++++++++++
 .../runtime/stream/sql/DataStreamJavaITCase.java   |   4 +-
 .../sql/ChangelogNormalizeOptimizationTest.xml     | 418 ++++++++++++++++++++
 .../planner/plan/stream/sql/TableScanTest.xml      |  14 +-
 .../plan/stream/sql/NonDeterministicDagTest.scala  |   4 +-
 .../planner/plan/stream/sql/TableScanTest.scala    |   9 +-
 .../flink/table/planner/utils/TableTestBase.scala  |  11 +-
 19 files changed, 1969 insertions(+), 79 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
index 9d4a5fd500f..467d374d183 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/ChangelogMode.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * The set of changes contained in a changelog.
@@ -43,6 +44,15 @@ public final class ChangelogMode {
                     .addContainedKind(RowKind.INSERT)
                     .addContainedKind(RowKind.UPDATE_AFTER)
                     .addContainedKind(RowKind.DELETE)
+                    .keyOnlyDeletes(true)
+                    .build();
+
+    private static final ChangelogMode UPSERT_WITH_FULL_DELETES =
+            ChangelogMode.newBuilder()
+                    .addContainedKind(RowKind.INSERT)
+                    .addContainedKind(RowKind.UPDATE_AFTER)
+                    .addContainedKind(RowKind.DELETE)
+                    .keyOnlyDeletes(false)
                     .build();
 
     private static final ChangelogMode ALL =
@@ -54,11 +64,13 @@ public final class ChangelogMode {
                     .build();
 
     private final Set<RowKind> kinds;
+    private final boolean keyOnlyDeletes;
 
-    private ChangelogMode(Set<RowKind> kinds) {
+    private ChangelogMode(Set<RowKind> kinds, boolean keyOnlyDeletes) {
         Preconditions.checkArgument(
                 kinds.size() > 0, "At least one kind of row should be 
contained in a changelog.");
         this.kinds = Collections.unmodifiableSet(kinds);
+        this.keyOnlyDeletes = keyOnlyDeletes;
     }
 
     /** Shortcut for a simple {@link RowKind#INSERT}-only changelog. */
@@ -71,7 +83,21 @@ public final class ChangelogMode {
      * contain {@link RowKind#UPDATE_BEFORE} rows.
      */
     public static ChangelogMode upsert() {
-        return UPSERT;
+        return upsert(true);
+    }
+
+    /**
+     * Shortcut for an upsert changelog that describes idempotent updates on a 
key and thus does not
+     * contain {@link RowKind#UPDATE_BEFORE} rows.
+     *
+     * @param keyOnlyDeletes Tells the system the DELETEs contain just the key.
+     */
+    public static ChangelogMode upsert(boolean keyOnlyDeletes) {
+        if (keyOnlyDeletes) {
+            return UPSERT;
+        } else {
+            return UPSERT_WITH_FULL_DELETES;
+        }
     }
 
     /** Shortcut for a changelog that can contain all {@link RowKind}s. */
@@ -96,6 +122,10 @@ public final class ChangelogMode {
         return kinds.size() == 1 && kinds.contains(kind);
     }
 
+    public boolean keyOnlyDeletes() {
+        return keyOnlyDeletes;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -115,7 +145,13 @@ public final class ChangelogMode {
 
     @Override
     public String toString() {
-        return kinds.toString();
+        if (!keyOnlyDeletes) {
+            return kinds.toString();
+        } else {
+            return kinds.stream()
+                    .map(kind -> kind == RowKind.DELETE ? "~D" : 
kind.toString())
+                    .collect(Collectors.joining(", ", "[", "]"));
+        }
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -125,6 +161,7 @@ public final class ChangelogMode {
     public static class Builder {
 
         private final Set<RowKind> kinds = EnumSet.noneOf(RowKind.class);
+        private boolean keyOnlyDeletes = false;
 
         private Builder() {
             // default constructor to allow a fluent definition
@@ -135,8 +172,13 @@ public final class ChangelogMode {
             return this;
         }
 
+        public Builder keyOnlyDeletes(boolean flag) {
+            this.keyOnlyDeletes = flag;
+            return this;
+        }
+
         public ChangelogMode build() {
-            return new ChangelogMode(kinds);
+            return new ChangelogMode(kinds, keyOnlyDeletes);
         }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java
new file mode 100644
index 00000000000..2191fd51940
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/ChangelogNormalizeRequirementResolver.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize;
+
+import org.apache.flink.table.catalog.Column;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import org.apache.flink.table.planner.connectors.DynamicSourceUtils;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalCalcBase;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalChangelogNormalize;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalTableSourceScan;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalWatermarkAssigner;
+import 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.plan.trait.UpdateKindTrait$;
+import org.apache.flink.table.planner.plan.trait.UpdateKindTraitDef$;
+import org.apache.flink.table.planner.plan.utils.FlinkRelUtil;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.SingleRel;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Checks if it is safe to remove ChangelogNormalize as part of {@link
+ * FlinkChangelogModeInferenceProgram}. It checks:
+ *
+ * <ul>
+ *   <li>if there is no filter pushed into the changelog normalize
+ *   <li>if we don't need to produce UPDATE_BEFORE
+ *   <li>we don't access any metadata columns
+ * </ul>
+ */
+public class ChangelogNormalizeRequirementResolver {
+
+    /** Checks if it is safe to remove ChangelogNormalize. */
+    public static boolean isRequired(StreamPhysicalChangelogNormalize 
normalize) {
+        if (normalize.filterCondition() != null) {
+            return true;
+        }
+        if (!Objects.equals(
+                
normalize.getTraitSet().getTrait(UpdateKindTraitDef$.MODULE$.INSTANCE()),
+                UpdateKindTrait$.MODULE$.ONLY_UPDATE_AFTER())) {
+            return true;
+        }
+
+        // check if metadata columns are accessed
+        final RelNode input = normalize.getInput();
+
+        return visit(input, bitSetForAllOutputColumns(input));
+    }
+
+    private static ImmutableBitSet bitSetForAllOutputColumns(RelNode input) {
+        return ImmutableBitSet.builder().set(0, 
input.getRowType().getFieldCount()).build();
+    }
+
+    private static boolean visit(final RelNode rel, final ImmutableBitSet 
requiredColumns) {
+        if (rel instanceof StreamPhysicalCalcBase) {
+            return visitCalc((StreamPhysicalCalcBase) rel, requiredColumns);
+        } else if (rel instanceof StreamPhysicalTableSourceScan) {
+            return visitTableSourceScan((StreamPhysicalTableSourceScan) rel, 
requiredColumns);
+        } else if (rel instanceof StreamPhysicalWatermarkAssigner
+                || rel instanceof StreamPhysicalExchange) {
+            // require all input columns
+            final RelNode input = ((SingleRel) rel).getInput();
+            return visit(input, bitSetForAllOutputColumns(input));
+        } else {
+            // these nodes should not be in an input of a changelog normalize
+            // StreamPhysicalChangelogNormalize
+            // StreamPhysicalDropUpdateBefore
+            // StreamPhysicalUnion
+            // StreamPhysicalSort
+            // StreamPhysicalLimit
+            // StreamPhysicalSortLimit
+            // StreamPhysicalTemporalSort
+            // StreamPhysicalWindowTableFunction
+            // StreamPhysicalWindowRank
+            // StreamPhysicalWindowDeduplicate
+            // StreamPhysicalRank
+            // StreamPhysicalOverAggregateBase
+            // CommonPhysicalJoin
+            // StreamPhysicalMatch
+            // StreamPhysicalMiniBatchAssigner
+            // StreamPhysicalExpand
+            // StreamPhysicalWindowAggregateBase
+            // StreamPhysicalGroupAggregateBase
+            // StreamPhysicalSink
+            // StreamPhysicalLegacySink
+            // StreamPhysicalCorrelateBase
+            // StreamPhysicalLookupJoin
+            // StreamPhysicalValues
+            // StreamPhysicalDataStreamScan
+            // StreamPhysicalLegacyTableSourceScan
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Unsupported to visit node %s. The node either 
should not be pushed"
+                                    + " through the changelog normalize or is 
not supported yet.",
+                            rel.getClass().getSimpleName()));
+        }
+    }
+
+    private static boolean visitTableSourceScan(
+            StreamPhysicalTableSourceScan tableScan, ImmutableBitSet 
requiredColumns) {
+        if (!(tableScan.tableSource() instanceof SupportsReadingMetadata)) {
+            // source does not have metadata, no need to check
+            return false;
+        }
+        final TableSourceTable sourceTable = 
tableScan.getTable().unwrap(TableSourceTable.class);
+        assert sourceTable != null;
+        // check if requiredColumns contain metadata column
+        final List<Column.MetadataColumn> metadataColumns =
+                DynamicSourceUtils.extractMetadataColumns(
+                        
sourceTable.contextResolvedTable().getResolvedSchema());
+        final Set<String> metaColumnSet =
+                
metadataColumns.stream().map(Column::getName).collect(Collectors.toSet());
+        final List<String> columns = tableScan.getRowType().getFieldNames();
+        for (int index = 0; index < columns.size(); index++) {
+            String column = columns.get(index);
+            if (metaColumnSet.contains(column) && requiredColumns.get(index)) {
+                // we require metadata column, therefore, we cannot remove the 
changelog normalize
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    private static boolean visitCalc(StreamPhysicalCalcBase calc, 
ImmutableBitSet requiredColumns) {
+        // evaluate required columns from input
+        final List<RexNode> projects =
+                calc.getProgram().getProjectList().stream()
+                        .map(expr -> calc.getProgram().expandLocalRef(expr))
+                        .collect(Collectors.toList());
+        final Map<Integer, List<Integer>> outFromSourcePos =
+                FlinkRelUtil.extractSourceMapping(projects);
+        final List<Integer> conv2Inputs =
+                requiredColumns.toList().stream()
+                        .map(
+                                out ->
+                                        
Optional.ofNullable(outFromSourcePos.get(out))
+                                                .orElseThrow(
+                                                        () ->
+                                                                new 
IllegalStateException(
+                                                                        
String.format(
+                                                                               
 "Invalid pos:%d over projection:%s",
+                                                                               
 out,
+                                                                               
 calc
+                                                                               
         .getProgram()))))
+                        .flatMap(Collection::stream)
+                        .filter(index -> index != -1)
+                        .distinct()
+                        .collect(Collectors.toList());
+        return visit(calc.getInput(), ImmutableBitSet.of(conv2Inputs));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
index 98a1a6782cf..aaa4c4ff845 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java
@@ -62,6 +62,7 @@ import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalW
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils;
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
+import org.apache.flink.table.planner.plan.utils.FlinkRelUtil;
 import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
 import org.apache.flink.table.planner.plan.utils.JoinUtil;
 import org.apache.flink.table.planner.plan.utils.OverAggregateUtil;
@@ -78,7 +79,6 @@ import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
-import org.apache.calcite.rex.RexSlot;
 import org.apache.calcite.sql.SqlExplainLevel;
 import org.apache.calcite.util.ImmutableBitSet;
 
@@ -297,7 +297,8 @@ public class StreamNonDeterministicUpdatePlanVisitor {
                     calc.getProgram().getProjectList().stream()
                             .map(expr -> 
calc.getProgram().expandLocalRef(expr))
                             .collect(Collectors.toList());
-            Map<Integer, List<Integer>> outFromSourcePos = 
extractSourceMapping(projects);
+            Map<Integer, List<Integer>> outFromSourcePos =
+                    FlinkRelUtil.extractSourceMapping(projects);
             List<Integer> conv2Inputs =
                     requireDeterminism.toList().stream()
                             .map(
@@ -812,22 +813,6 @@ public class StreamNonDeterministicUpdatePlanVisitor {
         return transmitDeterminismRequirement(rel, inputRequireDeterminism);
     }
 
-    /** Extracts the out from source field index mapping of the given 
projects. */
-    private Map<Integer, List<Integer>> extractSourceMapping(final 
List<RexNode> projects) {
-        Map<Integer, List<Integer>> mapOutFromInPos = new HashMap<>();
-
-        for (int index = 0; index < projects.size(); index++) {
-            RexNode expr = projects.get(index);
-            mapOutFromInPos.put(
-                    index,
-                    FlinkRexUtil.findAllInputRefs(expr).stream()
-                            .mapToInt(RexSlot::getIndex)
-                            .boxed()
-                            .collect(Collectors.toList()));
-        }
-        return mapOutFromInPos;
-    }
-
     private void checkNonDeterministicRexProgram(
             final ImmutableBitSet requireDeterminism,
             final RexProgram program,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DeleteKind.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DeleteKind.java
new file mode 100644
index 00000000000..8f864f7ac48
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/trait/DeleteKind.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.trait;
+
+/** Lists all kinds of {@link ModifyKind#DELETE} operation. */
+public enum DeleteKind {
+
+    /** This kind indicates that operators do not emit {@link 
ModifyKind#DELETE} operation. */
+    NONE,
+
+    /**
+     * This kind indicates that operators can emit deletes with the key only. 
The rest of the row
+     * may be not present.
+     */
+    DELETE_BY_KEY,
+
+    /** This kind indicates that operators should emit deletes with the full 
row. */
+    FULL_DELETE
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FlinkRelUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FlinkRelUtil.java
index a920a6133ff..ee39cdb807b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FlinkRelUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/FlinkRelUtil.java
@@ -28,12 +28,15 @@ import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexProgram;
 import org.apache.calcite.rex.RexProgramBuilder;
+import org.apache.calcite.rex.RexSlot;
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.rex.RexVisitorImpl;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 /** Utilities for {@link RelNode}. */
@@ -170,6 +173,22 @@ public class FlinkRelUtil {
         return array;
     }
 
+    /** Extracts the out from source field index mapping of the given 
projects. */
+    public static Map<Integer, List<Integer>> extractSourceMapping(final 
List<RexNode> projects) {
+        Map<Integer, List<Integer>> mapOutFromInPos = new HashMap<>();
+
+        for (int index = 0; index < projects.size(); index++) {
+            RexNode expr = projects.get(index);
+            mapOutFromInPos.put(
+                    index,
+                    FlinkRexUtil.findAllInputRefs(expr).stream()
+                            .mapToInt(RexSlot::getIndex)
+                            .boxed()
+                            .collect(Collectors.toList()));
+        }
+        return mapOutFromInPos;
+    }
+
     /**
      * An InputRefCounter that count every inputRef's reference count number, 
every reference will
      * be counted, e.g., '$0 + 1' & '$0 + 2' will count 2 instead of 1.
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
index 620cd5802ab..988bcbc3554 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala
@@ -23,9 +23,11 @@ import 
org.apache.flink.table.api.config.ExecutionConfigOptions
 import 
org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
 import org.apache.flink.table.connector.ChangelogMode
 import org.apache.flink.table.planner.plan.`trait`._
+import 
org.apache.flink.table.planner.plan.`trait`.DeleteKindTrait.{deleteOnKeyOrNone, 
fullDeleteOrNone, DELETE_BY_KEY, FULL_DELETE}
 import 
org.apache.flink.table.planner.plan.`trait`.UpdateKindTrait.{beforeAfterOrNone, 
onlyAfterOrNone, BEFORE_AND_AFTER, ONLY_UPDATE_AFTER}
 import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.physical.stream._
+import 
org.apache.flink.table.planner.plan.optimize.ChangelogNormalizeRequirementResolver
 import org.apache.flink.table.planner.plan.utils._
 import 
org.apache.flink.table.planner.plan.utils.RankProcessStrategy.{AppendFastStrategy,
 RetractStrategy, UpdateFastStrategy}
 import org.apache.flink.table.planner.sinks.DataStreamTableSink
@@ -70,12 +72,33 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
     }
 
     val updateKindTraitVisitor = new SatisfyUpdateKindTraitVisitor(context)
-    val finalRoot = requiredUpdateKindTraits.flatMap {
+    val updateRoot = requiredUpdateKindTraits.flatMap {
       requiredUpdateKindTrait =>
         updateKindTraitVisitor.visit(rootWithModifyKindSet, 
requiredUpdateKindTrait)
     }
 
-    // step3: sanity check and return non-empty root
+    // step3: satisfy DeleteKind trait
+    val requiredDeleteKindTraits = if 
(rootModifyKindSet.contains(ModifyKind.DELETE)) {
+      root match {
+        case _: StreamPhysicalSink =>
+          // try DELETE_BY_KEY first, and then FULL_DELETE
+          Seq(DeleteKindTrait.DELETE_BY_KEY, DeleteKindTrait.FULL_DELETE)
+        case _ =>
+          // for non-sink nodes prefer full deletes
+          Seq(DeleteKindTrait.FULL_DELETE)
+      }
+    } else {
+      // there is no deletes
+      Seq(DeleteKindTrait.NONE)
+    }
+
+    val deleteKindTraitVisitor = new SatisfyDeleteKindTraitVisitor(context)
+    val finalRoot = requiredDeleteKindTraits.flatMap {
+      requiredDeleteKindTrait =>
+        deleteKindTraitVisitor.visit(updateRoot.head, requiredDeleteKindTrait)
+    }
+
+    // step4: sanity check and return non-empty root
     if (finalRoot.isEmpty) {
       val plan = FlinkRelOptUtil.toString(root, withChangelogTraits = true)
       throw new TableException(
@@ -489,13 +512,15 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
      *
      * @param rel
      *   the node who should satisfy the requiredTrait
-     * @param requiredTrait
+     * @param requiredUpdateTrait
      *   the required UpdateKindTrait
      * @return
      *   A converted node which satisfies required traits by input nodes of 
current node. Or None if
      *   required traits cannot be satisfied.
      */
-    def visit(rel: StreamPhysicalRel, requiredTrait: UpdateKindTrait): 
Option[StreamPhysicalRel] =
+    def visit(
+        rel: StreamPhysicalRel,
+        requiredUpdateTrait: UpdateKindTrait): Option[StreamPhysicalRel] =
       rel match {
         case sink: StreamPhysicalSink =>
           val sinkRequiredTraits = inferSinkRequiredTraits(sink)
@@ -534,10 +559,10 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
             _: StreamPhysicalWindowAggregate =>
           // Aggregate, TableAggregate, Limit, GroupWindowAggregate, 
WindowAggregate,
           // and WindowTableAggregate requires update_before if there are 
updates
-          val requiredChildTrait = 
beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
-          val children = visitChildren(rel, requiredChildTrait)
+          val requiredChildUpdateTrait = 
beforeAfterOrNone(getModifyKindSet(rel.getInput(0)))
+          val children = visitChildren(rel, requiredChildUpdateTrait)
           // use requiredTrait as providedTrait, because they should support 
all kinds of UpdateKind
-          createNewNode(rel, children, requiredTrait)
+          createNewNode(rel, children, requiredUpdateTrait)
 
         case _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
             _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
@@ -546,14 +571,14 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
           // WindowRank, WindowDeduplicate, Deduplicate, TemporalSort, CEP, 
OverAggregate,
           // and IntervalJoin, WindowJoin require nothing about UpdateKind.
           val children = visitChildren(rel, UpdateKindTrait.NONE)
-          createNewNode(rel, children, requiredTrait)
+          createNewNode(rel, children, requiredUpdateTrait)
 
         case rank: StreamPhysicalRank =>
           val rankStrategies =
             RankProcessStrategy.analyzeRankProcessStrategies(rank, 
rank.partitionKey, rank.orderKey)
           visitRankStrategies(
             rankStrategies,
-            requiredTrait,
+            requiredUpdateTrait,
             rankStrategy => rank.copy(rankStrategy))
 
         case sortLimit: StreamPhysicalSortLimit =>
@@ -563,16 +588,16 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
             sortLimit.getCollation)
           visitRankStrategies(
             rankStrategies,
-            requiredTrait,
+            requiredUpdateTrait,
             rankStrategy => sortLimit.copy(rankStrategy))
 
         case sort: StreamPhysicalSort =>
           val requiredChildTrait = 
beforeAfterOrNone(getModifyKindSet(sort.getInput))
           val children = visitChildren(sort, requiredChildTrait)
-          createNewNode(sort, children, requiredTrait)
+          createNewNode(sort, children, requiredUpdateTrait)
 
         case join: StreamPhysicalJoin =>
-          val onlyAfterByParent = requiredTrait.updateKind == 
UpdateKind.ONLY_UPDATE_AFTER
+          val onlyAfterByParent = requiredUpdateTrait.updateKind == 
UpdateKind.ONLY_UPDATE_AFTER
           val children = join.getInputs.zipWithIndex.map {
             case (child, childOrdinal) =>
               val physicalChild = child.asInstanceOf[StreamPhysicalRel]
@@ -592,7 +617,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
           if (children.exists(_.isEmpty)) {
             None
           } else {
-            createNewNode(join, Some(children.flatten.toList), requiredTrait)
+            createNewNode(join, Some(children.flatten.toList), 
requiredUpdateTrait)
           }
 
         case temporalJoin: StreamPhysicalTemporalJoin =>
@@ -601,7 +626,8 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
 
           // the left input required trait depends on it's parent in temporal 
join
           // the left input will send message to parent
-          val requiredUpdateBeforeByParent = requiredTrait.updateKind == 
UpdateKind.BEFORE_AND_AFTER
+          val requiredUpdateBeforeByParent =
+            requiredUpdateTrait.updateKind == UpdateKind.BEFORE_AND_AFTER
           val leftInputModifyKindSet = getModifyKindSet(left)
           val leftRequiredTrait = if (requiredUpdateBeforeByParent) {
             beforeAfterOrNone(leftInputModifyKindSet)
@@ -630,7 +656,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
 
         case calc: StreamPhysicalCalcBase =>
           if (
-            requiredTrait == UpdateKindTrait.ONLY_UPDATE_AFTER &&
+            requiredUpdateTrait == UpdateKindTrait.ONLY_UPDATE_AFTER &&
             calc.getProgram.getCondition != null
           ) {
             // we don't expect filter to satisfy ONLY_UPDATE_AFTER update kind,
@@ -639,7 +665,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
             None
           } else {
             // otherwise, forward UpdateKind requirement
-            visitChildren(rel, requiredTrait) match {
+            visitChildren(rel, requiredUpdateTrait) match {
               case None => None
               case Some(children) =>
                 val childTrait = 
children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
@@ -652,7 +678,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
             _: StreamPhysicalMiniBatchAssigner | _: 
StreamPhysicalWatermarkAssigner |
             _: StreamPhysicalWindowTableFunction =>
           // transparent forward requiredTrait to children
-          visitChildren(rel, requiredTrait) match {
+          visitChildren(rel, requiredUpdateTrait) match {
             case None => None
             case Some(children) =>
               val childTrait = 
children.head.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
@@ -666,7 +692,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
               val requiredChildTrait = if (childModifyKindSet.isInsertOnly) {
                 UpdateKindTrait.NONE
               } else {
-                requiredTrait
+                requiredUpdateTrait
               }
               this.visit(child, requiredChildTrait)
           }.toList
@@ -704,7 +730,7 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
           val children = visitChildren(normalize, 
UpdateKindTrait.ONLY_UPDATE_AFTER)
           // use requiredTrait as providedTrait,
           // because changelog normalize supports all kinds of UpdateKind
-          createNewNode(rel, children, requiredTrait)
+          createNewNode(rel, children, requiredUpdateTrait)
 
         case ts: StreamPhysicalTableSourceScan =>
           // currently only support BEFORE_AND_AFTER if source produces updates
@@ -712,11 +738,11 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
           val newSource = createNewNode(rel, Some(List()), providedTrait)
           if (
             providedTrait.equals(UpdateKindTrait.BEFORE_AND_AFTER) &&
-            requiredTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER)
+            requiredUpdateTrait.equals(UpdateKindTrait.ONLY_UPDATE_AFTER)
           ) {
             // requiring only-after, but the source is CDC source, then drop 
update_before manually
             val dropUB = new StreamPhysicalDropUpdateBefore(rel.getCluster, 
rel.getTraitSet, rel)
-            createNewNode(dropUB, newSource.map(s => List(s)), requiredTrait)
+            createNewNode(dropUB, newSource.map(s => List(s)), 
requiredUpdateTrait)
           } else {
             newSource
           }
@@ -730,9 +756,9 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
             // we can't drop UPDATE_BEFORE if it is required by other parent 
blocks
             UpdateKindTrait.BEFORE_AND_AFTER
           } else {
-            requiredTrait
+            requiredUpdateTrait
           }
-          if (!providedTrait.satisfies(requiredTrait)) {
+          if (!providedTrait.satisfies(requiredUpdateTrait)) {
             // require ONLY_AFTER but can only provide BEFORE_AND_AFTER
             None
           } else {
@@ -744,12 +770,13 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
           val children = process.getInputs.map {
             case child: StreamPhysicalRel =>
               val childModifyKindSet = getModifyKindSet(child)
-              val requiredChildTrait = if (childModifyKindSet.isInsertOnly) {
-                UpdateKindTrait.NONE
-              } else {
-                UpdateKindTrait.BEFORE_AND_AFTER
-              }
-              this.visit(child, requiredChildTrait)
+              val requiredUpdateChildTrait =
+                if (childModifyKindSet.isInsertOnly) {
+                  UpdateKindTrait.NONE
+                } else {
+                  UpdateKindTrait.BEFORE_AND_AFTER
+                }
+              this.visit(child, requiredUpdateChildTrait)
           }.toList
           createNewNode(rel, Some(children.flatten), UpdateKindTrait.NONE)
 
@@ -761,15 +788,15 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
 
     private def visitChildren(
         parent: StreamPhysicalRel,
-        requiredChildrenTrait: UpdateKindTrait): 
Option[List[StreamPhysicalRel]] = {
+        requiredChildrenUpdateTrait: UpdateKindTrait): 
Option[List[StreamPhysicalRel]] = {
       val newChildren = for (child <- parent.getInputs) yield {
-        this.visit(child.asInstanceOf[StreamPhysicalRel], 
requiredChildrenTrait) match {
+        this.visit(child.asInstanceOf[StreamPhysicalRel], 
requiredChildrenUpdateTrait) match {
           case None =>
             // return None if one of the children can't satisfy
             return None
           case Some(newChild) =>
-            val providedTrait = 
newChild.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
-            if (!providedTrait.satisfies(requiredChildrenTrait)) {
+            val providedUpdateTrait = 
newChild.getTraitSet.getTrait(UpdateKindTraitDef.INSTANCE)
+            if (!providedUpdateTrait.satisfies(requiredChildrenUpdateTrait)) {
               // the provided trait can't satisfy required trait, thus we 
should return None.
               return None
             }
@@ -782,13 +809,13 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
     private def createNewNode(
         node: StreamPhysicalRel,
         childrenOption: Option[List[StreamPhysicalRel]],
-        providedTrait: UpdateKindTrait): Option[StreamPhysicalRel] = 
childrenOption match {
+        providedUpdateTrait: UpdateKindTrait): Option[StreamPhysicalRel] = 
childrenOption match {
       case None =>
         None
       case Some(children) =>
         val modifyKindSetTrait = 
node.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
         val nodeDescription = node.getRelDetailedDescription
-        val isUpdateKindValid = providedTrait.updateKind match {
+        val isUpdateKindValid = providedUpdateTrait.updateKind match {
           case UpdateKind.NONE =>
             !modifyKindSetTrait.modifyKindSet.contains(ModifyKind.UPDATE)
           case UpdateKind.BEFORE_AND_AFTER | UpdateKind.ONLY_UPDATE_AFTER =>
@@ -796,17 +823,19 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         }
         if (!isUpdateKindValid) {
           throw new TableException(
-            s"UpdateKindTrait $providedTrait conflicts with " +
+            s"UpdateKindTrait $providedUpdateTrait conflicts with " +
               s"ModifyKindSetTrait $modifyKindSetTrait. " +
               s"This is a bug in planner, please file an issue. \n" +
               s"Current node is $nodeDescription.")
         }
-        val newTraitSet = node.getTraitSet.plus(providedTrait)
+
+        val newTraitSet = node.getTraitSet.plus(providedUpdateTrait)
         Some(node.copy(newTraitSet, children).asInstanceOf[StreamPhysicalRel])
     }
 
     /**
      * Try all possible rank strategies and return the first viable new node.
+     *
      * @param rankStrategies
      *   all possible supported rank strategy by current node
      * @param requiredUpdateKindTrait
@@ -932,10 +961,324 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
     }
   }
 
-  // 
-------------------------------------------------------------------------------------------
+  /**
+   * A visitor which will try to satisfy the required [[DeleteKindTrait]] from 
root.
+   *
+   * <p>After traversed by this visitor, every node should have a correct 
[[DeleteKindTrait]] or
+   * returns None if the planner doesn't support to satisfy the required 
[[DeleteKindTrait]].
+   */
+  private class SatisfyDeleteKindTraitVisitor(private val context: 
StreamOptimizeContext) {
+
+    /**
+     * Try to satisfy the required [[DeleteKindTrait]] from root.
+     *
+     * <p>Each node will first require a DeleteKindTrait to its children. The 
required
+     * DeleteKindTrait may come from the node's parent, or come from the node 
itself, depending on
+     * whether the node will destroy the trait provided by children or pass 
the trait from children.
+     *
+     * <p>If the node will pass the children's DeleteKindTrait without 
destroying it, then return a
+     * new node with new inputs and forwarded DeleteKindTrait.
+     *
+     * <p>If the node will destroy the children's UpdateKindTrait, then the 
node itself needs to be
+     * converted, or a new node should be generated to satisfy the required 
trait, such as marking
+     * itself not to generate UPDATE_BEFORE, or generating a new node to 
filter UPDATE_BEFORE.
+     *
+     * @param rel
+     *   the node who should satisfy the requiredTrait
+     * @param requiredTrait
+     *   the required DeleteKindTrait
+     * @return
+     *   A converted node which satisfies required traits by input nodes of 
current node. Or None if
+     *   required traits cannot be satisfied.
+     */
+    def visit(rel: StreamPhysicalRel, requiredTrait: DeleteKindTrait): 
Option[StreamPhysicalRel] =
+      rel match {
+        case sink: StreamPhysicalSink =>
+          val sinkRequiredTraits = inferSinkRequiredTraits(sink)
+          visitSink(sink, sinkRequiredTraits)
+
+        case sink: StreamPhysicalLegacySink[_] =>
+          val childModifyKindSet = getModifyKindSet(sink.getInput)
+          val fullDelete = fullDeleteOrNone(childModifyKindSet)
+          visitSink(sink, Seq(fullDelete))
+
+        case _: StreamPhysicalGroupAggregate | _: 
StreamPhysicalGroupTableAggregate |
+            _: StreamPhysicalLimit | _: StreamPhysicalPythonGroupAggregate |
+            _: StreamPhysicalPythonGroupTableAggregate | _: 
StreamPhysicalGroupWindowAggregateBase |
+            _: StreamPhysicalWindowAggregate | _: StreamPhysicalSort | _: 
StreamPhysicalRank |
+            _: StreamPhysicalSortLimit | _: StreamPhysicalTemporalJoin |
+            _: StreamPhysicalCorrelateBase | _: StreamPhysicalLookupJoin |
+            _: StreamPhysicalWatermarkAssigner | _: 
StreamPhysicalWindowTableFunction |
+            _: StreamPhysicalWindowRank | _: StreamPhysicalWindowDeduplicate |
+            _: StreamPhysicalTemporalSort | _: StreamPhysicalMatch |
+            _: StreamPhysicalOverAggregate | _: StreamPhysicalIntervalJoin |
+            _: StreamPhysicalPythonOverAggregate | _: StreamPhysicalWindowJoin 
|
+            _: StreamPhysicalProcessTableFunction =>
+          // if not explicitly supported, all operators require full deletes 
if there are updates
+          // ProcessTableFunction currently only consumes full deletes or 
insert-only
+          val children = rel.getInputs.map {
+            case child: StreamPhysicalRel =>
+              val childModifyKindSet = getModifyKindSet(child)
+              this.visit(child, fullDeleteOrNone(childModifyKindSet))
+          }.toList
+          createNewNode(rel, Some(children.flatten), 
fullDeleteOrNone(getModifyKindSet(rel)))
+
+        case join: StreamPhysicalJoin =>
+          val children = join.getInputs.zipWithIndex.map {
+            case (child, childOrdinal) =>
+              val physicalChild = child.asInstanceOf[StreamPhysicalRel]
+              val supportsDeleteByKey = 
join.inputUniqueKeyContainsJoinKey(childOrdinal)
+              val inputModifyKindSet = getModifyKindSet(physicalChild)
+              if (supportsDeleteByKey && requiredTrait == DELETE_BY_KEY) {
+                this
+                  .visit(physicalChild, deleteOnKeyOrNone(inputModifyKindSet))
+                  .orElse(this.visit(physicalChild, 
fullDeleteOrNone(inputModifyKindSet)))
+              } else {
+                this.visit(physicalChild, fullDeleteOrNone(inputModifyKindSet))
+              }
+          }
+          if (children.exists(_.isEmpty)) {
+            None
+          } else {
+            val childRels = children.flatten.toList
+            if (childRels.exists(r => getDeleteKind(r) == 
DeleteKind.DELETE_BY_KEY)) {
+              createNewNode(join, Some(childRels), 
deleteOnKeyOrNone(getModifyKindSet(rel)))
+            } else {
+              createNewNode(join, Some(childRels), 
fullDeleteOrNone(getModifyKindSet(rel)))
+            }
+          }
+
+        case calc: StreamPhysicalCalcBase =>
+          if (
+            requiredTrait == DeleteKindTrait.DELETE_BY_KEY &&
+            calc.getProgram.getCondition != null
+          ) {
+            // this can be further improved by checking if the filter 
condition is on the key
+            None
+          } else {
+            // otherwise, forward DeleteKind requirement
+            visitChildren(rel, requiredTrait) match {
+              case None => None
+              case Some(children) =>
+                val childTrait = 
children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+                createNewNode(rel, Some(children), childTrait)
+            }
+          }
+
+        case _: StreamPhysicalExchange | _: StreamPhysicalExpand |
+            _: StreamPhysicalMiniBatchAssigner | _: 
StreamPhysicalDropUpdateBefore =>
+          // transparent forward requiredTrait to children
+          visitChildren(rel, requiredTrait) match {
+            case None => None
+            case Some(children) =>
+              val childTrait = 
children.head.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+              createNewNode(rel, Some(children), childTrait)
+          }
+
+        case union: StreamPhysicalUnion =>
+          val children = union.getInputs.map {
+            case child: StreamPhysicalRel =>
+              val childModifyKindSet = getModifyKindSet(child)
+              val requiredChildTrait = if 
(!childModifyKindSet.contains(ModifyKind.DELETE)) {
+                DeleteKindTrait.NONE
+              } else {
+                requiredTrait
+              }
+              this.visit(child, requiredChildTrait)
+          }.toList
+
+          if (children.exists(_.isEmpty)) {
+            None
+          } else {
+            val deleteKinds = children.flatten
+              .map(_.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE))
+            // union can just forward changes, can't actively satisfy to 
another changelog mode
+            val providedTrait = if (deleteKinds.forall(k => 
DeleteKindTrait.NONE == k)) {
+              // if all the children is NONE, union is NONE
+              DeleteKindTrait.NONE
+            } else {
+              // otherwise, merge update kinds.
+              val merged = deleteKinds
+                .map(_.deleteKind)
+                .reduce {
+                  (l, r) =>
+                    (l, r) match {
+                      case (DeleteKind.NONE, r: DeleteKind) => r
+                      case (l: DeleteKind, DeleteKind.NONE) => l
+                      case (l: DeleteKind, r: DeleteKind) =>
+                        if (l == r) {
+                          l
+                        } else {
+                          // if any of the union input produces DELETE_BY_KEY, 
the union produces
+                          // delete by key
+                          DeleteKind.DELETE_BY_KEY
+                        }
+                    }
+                }
+              new DeleteKindTrait(merged)
+            }
+            createNewNode(union, Some(children.flatten), providedTrait)
+          }
+
+        case normalize: StreamPhysicalChangelogNormalize =>
+          // if
+          // 1. we don't need to produce UPDATE_BEFORE,
+          // 2. children can satisfy the required delete trait,
+          // 3. the normalize doesn't have filter condition which we'd lose,
+          // 4. we don't use metadata columns
+          // we can skip ChangelogNormalize
+          if (!ChangelogNormalizeRequirementResolver.isRequired(normalize)) {
+            visitChildren(normalize, requiredTrait) match {
+              case Some(children) =>
+                val input = children.head match {
+                  case exchange: StreamPhysicalExchange =>
+                    exchange.getInput
+                  case _ =>
+                    normalize.getInput
+                }
+                return Some(input.asInstanceOf[StreamPhysicalRel])
+              case _ =>
+            }
+          }
+          val childModifyKindTrait = getModifyKindSet(rel.getInput(0))
+
+          // prefer delete by key, but accept both
+          val children = visitChildren(normalize, 
deleteOnKeyOrNone(childModifyKindTrait))
+            .orElse(visitChildren(normalize, 
fullDeleteOrNone(childModifyKindTrait)))
+
+          // changelog normalize produces full deletes
+          createNewNode(rel, children, fullDeleteOrNone(getModifyKindSet(rel)))
+
+        case ts: StreamPhysicalTableSourceScan =>
+          // currently only support BEFORE_AND_AFTER if source produces updates
+          val providedTrait = 
DeleteKindTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
+          createNewNode(rel, Some(List()), providedTrait)
+
+        case _: StreamPhysicalDataStreamScan | _: 
StreamPhysicalLegacyTableSourceScan |
+            _: StreamPhysicalValues =>
+          createNewNode(rel, Some(List()), DeleteKindTrait.NONE)
+
+        case _: StreamPhysicalIntermediateTableScan =>
+          createNewNode(rel, Some(List()), 
fullDeleteOrNone(getModifyKindSet(rel)))
+
+        case _ =>
+          throw new UnsupportedOperationException(
+            s"Unsupported visit for ${rel.getClass.getSimpleName}")
+
+      }
+
+    private def visitChildren(
+        parent: StreamPhysicalRel,
+        requiredChildrenTrait: DeleteKindTrait): 
Option[List[StreamPhysicalRel]] = {
+      val newChildren = for (child <- parent.getInputs) yield {
+        this.visit(child.asInstanceOf[StreamPhysicalRel], 
requiredChildrenTrait) match {
+          case None =>
+            // return None if one of the children can't satisfy
+            return None
+          case Some(newChild) =>
+            val providedTrait = 
newChild.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+            if (!providedTrait.satisfies(requiredChildrenTrait)) {
+              // the provided trait can't satisfy required trait, thus we 
should return None.
+              return None
+            }
+            newChild
+        }
+      }
+      Some(newChildren.toList)
+    }
+
+    private def createNewNode(
+        node: StreamPhysicalRel,
+        childrenOption: Option[List[StreamPhysicalRel]],
+        providedDeleteTrait: DeleteKindTrait): Option[StreamPhysicalRel] = 
childrenOption match {
+      case None =>
+        None
+      case Some(children) =>
+        val modifyKindSetTrait = 
node.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
+        val nodeDescription = node.getRelDetailedDescription
+        val isDeleteKindValid = providedDeleteTrait.deleteKind match {
+          case DeleteKind.NONE =>
+            !modifyKindSetTrait.modifyKindSet.contains(ModifyKind.DELETE)
+          case DeleteKind.DELETE_BY_KEY | DeleteKind.FULL_DELETE =>
+            modifyKindSetTrait.modifyKindSet.contains(ModifyKind.DELETE)
+        }
+        if (!isDeleteKindValid) {
+          throw new TableException(
+            s"DeleteKindTrait $providedDeleteTrait conflicts with " +
+              s"ModifyKindSetTrait $modifyKindSetTrait. " +
+              s"This is a bug in planner, please file an issue. \n" +
+              s"Current node is $nodeDescription.")
+        }
+        val newTraitSet = node.getTraitSet.plus(providedDeleteTrait)
+        Some(node.copy(newTraitSet, children).asInstanceOf[StreamPhysicalRel])
+    }
+
+    private def visitSink(
+        sink: StreamPhysicalRel,
+        sinkRequiredTraits: Seq[DeleteKindTrait]): Option[StreamPhysicalRel] = 
{
+      val children = sinkRequiredTraits.flatMap(t => visitChildren(sink, t))
+      if (children.isEmpty) {
+        None
+      } else {
+        val sinkTrait = sink.getTraitSet.plus(DeleteKindTrait.NONE)
+        Some(sink.copy(sinkTrait, 
children.head).asInstanceOf[StreamPhysicalRel])
+      }
+    }
+
+    /**
+     * Infer sink required traits by the sink node and its input. Sink 
required traits is based on
+     * the sink node's changelog mode, the only exception is when sink's pk(s) 
not exactly the same
+     * as the changeLogUpsertKeys and sink' changelog mode is DELETE_BY_KEY.
+     */
+    private def inferSinkRequiredTraits(sink: StreamPhysicalSink): 
Seq[DeleteKindTrait] = {
+      val childModifyKindSet = getModifyKindSet(sink.getInput)
+      val sinkChangelogMode = 
sink.tableSink.getChangelogMode(childModifyKindSet.toChangelogMode)
+
+      val sinkDeleteTrait = 
DeleteKindTrait.fromChangelogMode(sinkChangelogMode)
+
+      val fullDelete = fullDeleteOrNone(childModifyKindSet)
+      if (sinkDeleteTrait.equals(DeleteKindTrait.DELETE_BY_KEY)) {
+        if (areUpsertKeysDifferentFromPk(sink)) {
+          Seq(fullDelete)
+        } else {
+          Seq(sinkDeleteTrait, fullDelete)
+        }
+      } else {
+        Seq(fullDelete)
+      }
+    }
+
+    // 
-------------------------------------------------------------------------------------------
+
+    private def areUpsertKeysDifferentFromPk(sink: StreamPhysicalSink) = {
+      // if sink's pk(s) are not exactly match input changeLogUpsertKeys then 
it will fallback
+      // to beforeAndAfter mode for the correctness
+      var upsertKeyDifferentFromPk: Boolean = false
+      val sinkDefinedPks = 
sink.contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes
+
+      if (sinkDefinedPks.nonEmpty) {
+        val sinkPks = ImmutableBitSet.of(sinkDefinedPks: _*)
+        val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery)
+        val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput)
+        // if input is UA only, primary key != upsert key (upsert key can be 
null) we should
+        // fallback to beforeAndAfter.
+        // Notice: even sink pk(s) contains input upsert key we cannot 
optimize to UA only,
+        // this differs from batch job's unique key inference
+        if (changeLogUpsertKeys == null || 
!changeLogUpsertKeys.exists(_.equals(sinkPks))) {
+          upsertKeyDifferentFromPk = true
+        }
+      }
+      upsertKeyDifferentFromPk
+    }
+  }
 
   private def getModifyKindSet(node: RelNode): ModifyKindSet = {
     val modifyKindSetTrait = 
node.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
     modifyKindSetTrait.modifyKindSet
   }
+
+  private def getDeleteKind(node: RelNode): DeleteKind = {
+    val deleteKindTrait = 
node.getTraitSet.getTrait(DeleteKindTraitDef.INSTANCE)
+    deleteKindTrait.deleteKind
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTrait.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTrait.scala
new file mode 100644
index 00000000000..b23063bd359
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTrait.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.`trait`
+
+import org.apache.flink.table.connector.ChangelogMode
+import org.apache.flink.types.RowKind
+
+import org.apache.calcite.plan.{RelOptPlanner, RelTrait, RelTraitDef}
+
+/** DeleteKindTrait is used to describe the kind of delete operation. */
+class DeleteKindTrait(val deleteKind: DeleteKind) extends RelTrait {
+
+  override def satisfies(relTrait: RelTrait): Boolean = relTrait match {
+    case other: DeleteKindTrait =>
+      // should totally match
+      other.deleteKind == this.deleteKind
+    case _ => false
+  }
+
+  override def getTraitDef: RelTraitDef[_ <: RelTrait] = 
DeleteKindTraitDef.INSTANCE
+
+  override def register(planner: RelOptPlanner): Unit = {}
+
+  override def hashCode(): Int = deleteKind.hashCode()
+
+  override def equals(obj: Any): Boolean = obj match {
+    case t: DeleteKindTrait => this.deleteKind.equals(t.deleteKind)
+    case _ => false
+  }
+
+  override def toString: String = s"[${deleteKind.toString}]"
+}
+
+object DeleteKindTrait {
+
+  /** An [[DeleteKindTrait]] that indicates the node does not support delete 
operation. */
+  val NONE = new DeleteKindTrait(DeleteKind.NONE)
+
+  /** An [[DeleteKindTrait]] that indicates the node supports deletes by key 
only. */
+  val DELETE_BY_KEY = new DeleteKindTrait(DeleteKind.DELETE_BY_KEY)
+
+  /** An [[DeleteKindTrait]] that indicates the node produces requires deletes 
by full records. */
+  val FULL_DELETE = new DeleteKindTrait(DeleteKind.FULL_DELETE)
+
+  /**
+   * Returns DELETE_BY_KEY [[DeleteKindTrait]] if there are delete changes. 
Otherwise, returns NONE
+   * [[DeleteKindTrait]].
+   */
+  def deleteOnKeyOrNone(modifyKindSet: ModifyKindSet): DeleteKindTrait = {
+    val deleteKind = if (modifyKindSet.contains(ModifyKind.DELETE)) {
+      DeleteKind.DELETE_BY_KEY
+    } else {
+      DeleteKind.NONE
+    }
+    new DeleteKindTrait(deleteKind)
+  }
+
+  /**
+   * Returns FULL_DELETE [[DeleteKindTrait]] if there are delete changes. 
Otherwise, returns NONE
+   * [[DeleteKindTrait]].
+   */
+  def fullDeleteOrNone(modifyKindSet: ModifyKindSet): DeleteKindTrait = {
+    val deleteKind = if (modifyKindSet.contains(ModifyKind.DELETE)) {
+      DeleteKind.FULL_DELETE
+    } else {
+      DeleteKind.NONE
+    }
+    new DeleteKindTrait(deleteKind)
+  }
+
+  /** Creates an instance of [[DeleteKindTrait]] from the given 
[[ChangelogMode]]. */
+  def fromChangelogMode(changelogMode: ChangelogMode): DeleteKindTrait = {
+    val hasDelete = changelogMode.contains(RowKind.DELETE)
+    if (!hasDelete) {
+      DeleteKindTrait.NONE
+    } else {
+      val hasDeleteOnKey = changelogMode.keyOnlyDeletes()
+      if (hasDeleteOnKey) {
+        DeleteKindTrait.DELETE_BY_KEY
+      } else {
+        DeleteKindTrait.FULL_DELETE
+      }
+    }
+  }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTraitDef.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTraitDef.scala
new file mode 100644
index 00000000000..5d374c48bf1
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/trait/DeleteKindTraitDef.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.`trait`
+
+import org.apache.calcite.plan.{RelOptPlanner, RelTraitDef}
+import org.apache.calcite.rel.RelNode
+
+/** Definition of a trait containing [[DeleteKindTrait]]. */
+class DeleteKindTraitDef extends RelTraitDef[DeleteKindTrait] {
+
+  override def getTraitClass: Class[DeleteKindTrait] = classOf[DeleteKindTrait]
+
+  override def getSimpleName: String = this.getClass.getSimpleName
+
+  override def convert(
+      planner: RelOptPlanner,
+      rel: RelNode,
+      toTrait: DeleteKindTrait,
+      allowInfiniteCostConverters: Boolean): RelNode = {
+    rel.copy(rel.getTraitSet.plus(toTrait), rel.getInputs)
+  }
+
+  override def canConvert(
+      planner: RelOptPlanner,
+      fromTrait: DeleteKindTrait,
+      toTrait: DeleteKindTrait): Boolean = {
+    throw new UnsupportedOperationException("DeleteKindTrait conversion is not 
supported for now.")
+  }
+
+  override def getDefault: DeleteKindTrait = DeleteKindTrait.FULL_DELETE
+}
+
+object DeleteKindTraitDef {
+  val INSTANCE = new DeleteKindTraitDef()
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ChangelogPlanUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ChangelogPlanUtils.scala
index 72eb3ecfc4d..ae90a5d6d7f 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ChangelogPlanUtils.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ChangelogPlanUtils.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.planner.plan.utils
 
 import org.apache.flink.table.connector.ChangelogMode
-import org.apache.flink.table.planner.plan.`trait`.{ModifyKind, 
ModifyKindSetTraitDef, UpdateKind, UpdateKindTraitDef}
+import org.apache.flink.table.planner.plan.`trait`.{DeleteKind, 
DeleteKindTraitDef, ModifyKind, ModifyKindSetTraitDef, UpdateKind, 
UpdateKindTraitDef}
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel
 import 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram
 import org.apache.flink.types.RowKind
@@ -85,6 +85,11 @@ object ChangelogPlanUtils {
     val updateKind = node.getTraitSet
       .getTrait(UpdateKindTraitDef.INSTANCE)
       .updateKind
+    val deleteKind = Option(
+      node.getTraitSet
+        .getTrait(DeleteKindTraitDef.INSTANCE))
+      .map(_.deleteKind)
+      .getOrElse(DeleteKind.NONE)
 
     if (modifyKindSet.isEmpty) {
       None
@@ -102,6 +107,11 @@ object ChangelogPlanUtils {
           modeBuilder.addContainedKind(RowKind.UPDATE_BEFORE)
         }
       }
+
+      if (deleteKind == DeleteKind.DELETE_BY_KEY) {
+        modeBuilder.keyOnlyDeletes(true)
+      }
+
       Some(modeBuilder.build())
     }
   }
@@ -121,7 +131,11 @@ object ChangelogPlanUtils {
         kinds += "UA"
       }
       if (mode.contains(RowKind.DELETE)) {
-        kinds += "D"
+        if (mode.keyOnlyDeletes()) {
+          kinds += "PD"
+        } else {
+          kinds += "D"
+        }
       }
       kinds.mkString(",")
   }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 659e6b11209..f75cb778597 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -457,6 +457,22 @@ public final class TestValuesTableFactory
                     .withDescription(
                             "Option to determine whether or not to require the 
distribution bucket count");
 
+    private static final ConfigOption<Boolean> SINK_SUPPORTS_DELETE_BY_KEY =
+            ConfigOptions.key("sink.supports-delete-by-key")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Option to determine whether or not to require 
deletes to have the"
+                                    + " entire row or is a delete by key 
sufficient.");
+
+    private static final ConfigOption<Boolean> SOURCE_PRODUCES_DELETE_BY_KEY =
+            ConfigOptions.key("source.produces-delete-by-key")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Option to determine whether or not to require 
deletes to have the"
+                                    + " entire row or is a delete by key 
sufficient.");
+
     private static final ConfigOption<Integer> SOURCE_NUM_ELEMENT_TO_SKIP =
             ConfigOptions.key("source.num-element-to-skip")
                     .intType()
@@ -504,7 +520,10 @@ public final class TestValuesTableFactory
 
         helper.validate();
 
-        ChangelogMode changelogMode = 
parseChangelogMode(helper.getOptions().get(CHANGELOG_MODE));
+        ChangelogMode changelogMode =
+                parseChangelogMode(
+                        helper.getOptions().get(CHANGELOG_MODE),
+                        
helper.getOptions().get(SOURCE_PRODUCES_DELETE_BY_KEY));
         String runtimeSource = helper.getOptions().get(RUNTIME_SOURCE);
         boolean isBounded = helper.getOptions().get(BOUNDED);
         boolean isFinite = helper.getOptions().get(TERMINATING);
@@ -749,6 +768,7 @@ public final class TestValuesTableFactory
                 
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
 
         boolean requireBucketCount = 
helper.getOptions().get(SINK_BUCKET_COUNT_REQUIRED);
+        boolean supportsDeleteByKey = 
helper.getOptions().get(SINK_SUPPORTS_DELETE_BY_KEY);
         if (sinkClass.equals("DEFAULT")) {
             int rowTimeIndex =
                     validateAndExtractRowtimeIndex(
@@ -765,7 +785,8 @@ public final class TestValuesTableFactory
                     changelogMode,
                     rowTimeIndex,
                     tableSchema,
-                    requireBucketCount);
+                    requireBucketCount,
+                    supportsDeleteByKey);
         } else {
             try {
                 return InstantiationUtil.instantiate(
@@ -816,6 +837,8 @@ public final class TestValuesTableFactory
                         ENABLE_WATERMARK_PUSH_DOWN,
                         SINK_DROP_LATE_EVENT,
                         SINK_BUCKET_COUNT_REQUIRED,
+                        SINK_SUPPORTS_DELETE_BY_KEY,
+                        SOURCE_PRODUCES_DELETE_BY_KEY,
                         SOURCE_NUM_ELEMENT_TO_SKIP,
                         SOURCE_SLEEP_AFTER_ELEMENTS,
                         SOURCE_SLEEP_TIME,
@@ -916,6 +939,10 @@ public final class TestValuesTableFactory
     }
 
     private ChangelogMode parseChangelogMode(String string) {
+        return parseChangelogMode(string, false);
+    }
+
+    private ChangelogMode parseChangelogMode(String string, boolean 
producesDeleteByKey) {
         ChangelogMode.Builder builder = ChangelogMode.newBuilder();
         for (String split : string.split(",")) {
             switch (split.trim()) {
@@ -935,6 +962,7 @@ public final class TestValuesTableFactory
                     throw new IllegalArgumentException("Invalid ChangelogMode 
string: " + string);
             }
         }
+        builder.keyOnlyDeletes(producesDeleteByKey);
         return builder.build();
     }
 
@@ -1621,7 +1649,7 @@ public final class TestValuesTableFactory
             implements SupportsWatermarkPushDown, SupportsSourceWatermark {
         private final String tableName;
 
-        private WatermarkStrategy<RowData> watermarkStrategy;
+        private WatermarkStrategy<RowData> watermarkStrategy = 
WatermarkStrategy.noWatermarks();
 
         private TestValuesScanTableSourceWithWatermarkPushDown(
                 DataType producedDataType,
@@ -2207,6 +2235,7 @@ public final class TestValuesTableFactory
         private final int rowtimeIndex;
         private final TableSchema tableSchema;
         private final boolean requireBucketCount;
+        private final boolean supportsDeleteByKey;
 
         private TestValuesTableSink(
                 DataType consumedDataType,
@@ -2220,7 +2249,8 @@ public final class TestValuesTableFactory
                 @Nullable ChangelogMode changelogModeEnforced,
                 int rowtimeIndex,
                 TableSchema tableSchema,
-                boolean requireBucketCount) {
+                boolean requireBucketCount,
+                boolean supportsDeleteByKey) {
             this.consumedDataType = consumedDataType;
             this.primaryKeyIndices = primaryKeyIndices;
             this.tableName = tableName;
@@ -2233,10 +2263,19 @@ public final class TestValuesTableFactory
             this.rowtimeIndex = rowtimeIndex;
             this.tableSchema = tableSchema;
             this.requireBucketCount = requireBucketCount;
+            this.supportsDeleteByKey = supportsDeleteByKey;
         }
 
         @Override
         public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+            final ChangelogMode mode = getMode(requestedMode);
+            final ChangelogMode.Builder builder = ChangelogMode.newBuilder();
+            mode.getContainedKinds().forEach(builder::addContainedKind);
+            builder.keyOnlyDeletes(supportsDeleteByKey);
+            return builder.build();
+        }
+
+        private ChangelogMode getMode(ChangelogMode requestedMode) {
             // if param [changelogModeEnforced] is passed in, return it 
directly
             if (changelogModeEnforced != null) {
                 return changelogModeEnforced;
@@ -2376,7 +2415,8 @@ public final class TestValuesTableFactory
                     changelogModeEnforced,
                     rowtimeIndex,
                     tableSchema,
-                    requireBucketCount);
+                    requireBucketCount,
+                    supportsDeleteByKey);
         }
 
         @Override
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeyPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeyPrograms.java
new file mode 100644
index 00000000000..6599f649950
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeyPrograms.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.test.program.SinkTestStep;
+import org.apache.flink.table.test.program.SourceTestStep;
+import org.apache.flink.table.test.program.TableTestProgram;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+/**
+ * Tests for verifying semantic of operations when sources produce deletes by 
key only and the sink
+ * can accept deletes by key only as well.
+ */
+public final class DeletesByKeyPrograms {
+
+    /**
+     * Tests a simple INSERT INTO SELECT scenario where ChangelogNormalize can 
be eliminated since
+     * we don't need UPDATE_BEFORE, and we have key information for all 
changes.
+     */
+    public static final TableTestProgram 
INSERT_SELECT_DELETE_BY_KEY_DELETE_BY_KEY =
+            TableTestProgram.of(
+                            "select-delete-on-key-to-delete-on-key",
+                            "No ChangelogNormalize: validates results when 
querying source with deletes by key"
+                                    + " only, writing to sink supporting 
deletes by key only, which"
+                                    + " is a case where ChangelogNormalize can 
be eliminated")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(
+                                            "id INT PRIMARY KEY NOT ENFORCED",
+                                            "name STRING",
+                                            "`value` INT")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    
.addOption("source.produces-delete-by-key", "true")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice", 10),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob", 20),
+                                            // Delete by key
+                                            Row.ofKind(RowKind.DELETE, 1, 
null, null),
+                                            // Update after only
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
2, "Bob", 30))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "id INT PRIMARY KEY NOT ENFORCED",
+                                            "name STRING",
+                                            "`value` INT")
+                                    .addOption(
+                                            "changelog-mode",
+                                            "I,UA,D") // Insert, UpdateAfter, 
Delete
+                                    .addOption("sink.supports-delete-by-key", 
"true")
+                                    .consumedValues(
+                                            "+I[1, Alice, 10]",
+                                            "+I[2, Bob, 20]",
+                                            "-D[1, null, null]",
+                                            "+U[2, Bob, 30]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT id, name, `value` FROM 
source_t")
+                    .build();
+
+    public static final TableTestProgram 
INSERT_SELECT_DELETE_BY_KEY_DELETE_BY_KEY_WITH_PROJECTION =
+            TableTestProgram.of(
+                            
"select-delete-on-key-to-delete-on-key-with-projection",
+                            "No ChangelogNormalize: validates results when 
querying source with deletes by key"
+                                    + " only, writing to sink supporting 
deletes by key only with a"
+                                    + "projection, which is a case where 
ChangelogNormalize can be"
+                                    + " eliminated")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(
+                                            "id INT PRIMARY KEY NOT ENFORCED",
+                                            "name STRING NOT NULL",
+                                            "`value` INT NOT NULL")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    
.addOption("source.produces-delete-by-key", "true")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice", 10),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob", 20),
+                                            // Delete by key
+                                            Row.ofKind(RowKind.DELETE, 1, 
null, null),
+                                            // Update after only
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
2, "Bob", 30))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "id INT PRIMARY KEY NOT ENFORCED",
+                                            "name STRING",
+                                            "`value` INT")
+                                    .addOption(
+                                            "changelog-mode",
+                                            "I,UA,D") // Insert, UpdateAfter, 
Delete
+                                    .addOption("sink.supports-delete-by-key", 
"true")
+                                    .consumedValues(
+                                            "+I[1, Alice, 12]",
+                                            "+I[2, Bob, 22]",
+                                            "-D[1, , -1]",
+                                            "+U[2, Bob, 32]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT id, name, `value` + 2 
FROM source_t")
+                    .build();
+
+    public static final TableTestProgram 
INSERT_SELECT_DELETE_BY_KEY_FULL_DELETE =
+            TableTestProgram.of(
+                            "select-delete-on-key-to-full-delete",
+                            "ChangelogNormalize: validates results when 
querying source with deletes by key"
+                                    + " only, writing to sink supporting 
requiring full deletes, "
+                                    + "which is a case where 
ChangelogNormalize stays")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(
+                                            "id INT PRIMARY KEY NOT ENFORCED",
+                                            "name STRING",
+                                            "`value` INT")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    
.addOption("source.produces-delete-by-key", "true")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice", 10),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob", 20),
+                                            // Delete by key
+                                            Row.ofKind(RowKind.DELETE, 1, 
null, null),
+                                            // Update after only
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
2, "Bob", 30))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "id INT PRIMARY KEY NOT ENFORCED",
+                                            "name STRING",
+                                            "`value` INT")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    .addOption("sink.supports-delete-by-key", 
"false")
+                                    .consumedValues(
+                                            "+I[1, Alice, 10]",
+                                            "+I[2, Bob, 20]",
+                                            "-D[1, Alice, 10]",
+                                            "+U[2, Bob, 30]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT id, name, `value` FROM 
source_t")
+                    .build();
+
+    public static final TableTestProgram INSERT_SELECT_FULL_DELETE_FULL_DELETE 
=
+            TableTestProgram.of(
+                            "select-full-delete-to-full-delete",
+                            "No ChangelogNormalize: validates results when 
querying source with full deletes, "
+                                    + "writing to sink requiring full deletes, 
which is a case"
+                                    + " where ChangelogNormalize can be 
eliminated")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(
+                                            "id INT PRIMARY KEY NOT ENFORCED",
+                                            "name STRING",
+                                            "`value` INT")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    
.addOption("source.produces-delete-by-key", "false")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice", 10),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob", 20),
+                                            // Delete by key
+                                            Row.ofKind(RowKind.DELETE, 1, 
"Alice", 10),
+                                            // Update after only
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
2, "Bob", 30))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "id INT PRIMARY KEY NOT ENFORCED",
+                                            "name STRING",
+                                            "`value` INT")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    .addOption("sink.supports-delete-by-key", 
"false")
+                                    .consumedValues(
+                                            "+I[1, Alice, 10]",
+                                            "+I[2, Bob, 20]",
+                                            "-D[1, Alice, 10]",
+                                            "+U[2, Bob, 30]")
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT id, name, `value` FROM 
source_t")
+                    .build();
+
+    public static final TableTestProgram JOIN_INTO_FULL_DELETES =
+            TableTestProgram.of(
+                            "join-to-full-delete",
+                            "ChangelogNormalize: validates results when 
joining sources with deletes by key"
+                                    + " only, writing to sink requiring full 
deletes, which"
+                                    + " is a case where ChangelogNormalize 
stays")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("left_t")
+                                    .addSchema("id INT PRIMARY KEY NOT 
ENFORCED", "`value` INT")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    
.addOption("source.produces-delete-by-key", "true")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 10),
+                                            Row.ofKind(RowKind.INSERT, 2, 20),
+                                            Row.ofKind(RowKind.INSERT, 3, 30),
+                                            // Delete by key
+                                            Row.ofKind(RowKind.DELETE, 1, 
null),
+                                            // Update after only
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
3, 40))
+                                    .build())
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("right_t")
+                                    .addSchema("id INT PRIMARY KEY NOT 
ENFORCED", "name STRING")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    
.addOption("source.produces-delete-by-key", "true")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"),
+                                            Row.ofKind(RowKind.INSERT, 3, 
"Emily"),
+                                            // Delete by key
+                                            Row.ofKind(RowKind.DELETE, 1, 
null),
+                                            // Update after only
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
2, "BOB"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "id INT PRIMARY KEY NOT ENFORCED",
+                                            "name STRING",
+                                            "`value` INT")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    .addOption("sink.supports-delete-by-key", 
"false")
+                                    .consumedValues(
+                                            "+I[1, Alice, 10]",
+                                            "+I[2, Bob, 20]",
+                                            "+I[3, Emily, 30]",
+                                            "-D[1, Alice, 10]",
+                                            "+U[3, Emily, 40]",
+                                            "+U[2, BOB, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT l.id, r.name, l.`value` 
FROM left_t l JOIN right_t r ON l.id = r.id")
+                    .build();
+
+    public static final TableTestProgram JOIN_INTO_DELETES_BY_KEY =
+            TableTestProgram.of(
+                            "join-to-delete-on-key",
+                            "No ChangelogNormalize: validates results when 
joining sources with deletes by key"
+                                    + " only, writing to sink supporting 
deletes by key, which"
+                                    + " is a case where ChangelogNormalize can 
be removed")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("left_t")
+                                    .addSchema("id INT PRIMARY KEY NOT 
ENFORCED", "`value` INT")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    
.addOption("source.produces-delete-by-key", "true")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 10),
+                                            Row.ofKind(RowKind.INSERT, 2, 20),
+                                            Row.ofKind(RowKind.INSERT, 3, 30),
+                                            // Delete by key
+                                            Row.ofKind(RowKind.DELETE, 1, 
null),
+                                            // Update after only
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
3, 40))
+                                    .build())
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("right_t")
+                                    .addSchema("id INT PRIMARY KEY NOT 
ENFORCED", "name STRING")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    
.addOption("source.produces-delete-by-key", "true")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"),
+                                            Row.ofKind(RowKind.INSERT, 3, 
"Emily"),
+                                            // Delete by key
+                                            Row.ofKind(RowKind.DELETE, 1, 
null),
+                                            // Update after only
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
2, "BOB"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "id INT PRIMARY KEY NOT ENFORCED",
+                                            "name STRING",
+                                            "`value` INT")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    .addOption("sink.supports-delete-by-key", 
"true")
+                                    .consumedValues(
+                                            "+I[1, Alice, 10]",
+                                            "+I[2, Bob, 20]",
+                                            "+I[3, Emily, 30]",
+                                            "-D[1, Alice, null]",
+                                            "+U[3, Emily, 40]",
+                                            "+U[2, BOB, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT l.id, r.name, l.`value` 
FROM left_t l JOIN right_t r ON l.id = r.id")
+                    .build();
+
+    private DeletesByKeyPrograms() {}
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeySemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeySemanticTests.java
new file mode 100644
index 00000000000..eba93c8bba9
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/DeletesByKeySemanticTests.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import 
org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase;
+import org.apache.flink.table.test.program.TableTestProgram;
+
+import java.util.List;
+
+/** Semantic tests for various {@link StreamExecNode}s and sources producing 
deletes by key only. */
+public class DeletesByKeySemanticTests extends SemanticTestBase {
+
+    @Override
+    public List<TableTestProgram> programs() {
+        return List.of(
+                DeletesByKeyPrograms.INSERT_SELECT_DELETE_BY_KEY_DELETE_BY_KEY,
+                DeletesByKeyPrograms.INSERT_SELECT_DELETE_BY_KEY_FULL_DELETE,
+                DeletesByKeyPrograms.INSERT_SELECT_FULL_DELETE_FULL_DELETE,
+                
DeletesByKeyPrograms.INSERT_SELECT_DELETE_BY_KEY_DELETE_BY_KEY_WITH_PROJECTION,
+                DeletesByKeyPrograms.JOIN_INTO_FULL_DELETES,
+                DeletesByKeyPrograms.JOIN_INTO_DELETES_BY_KEY);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java
new file mode 100644
index 00000000000..79f6da967e1
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.stream.sql;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.JavaStreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.util.StringUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Plan tests for removal of redundant changelog normalize. */
+public class ChangelogNormalizeOptimizationTest extends TableTestBase {
+
+    private final JavaStreamTableTestUtil util = javaStreamTestUtil();
+
+    static List<TestSpec> getTests() {
+        return Arrays.asList(
+                TestSpec.select(SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, 
SinkTable.UPSERT_SINK),
+                TestSpec.select(SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.UPSERT_SINK),
+                TestSpec.select(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.UPSERT_SINK_FULL_DELETES),
+                TestSpec.select(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.UPSERT_SINK_FULL_DELETES),
+                TestSpec.select(SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, 
SinkTable.RETRACT_SINK),
+                TestSpec.select(SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.RETRACT_SINK),
+                TestSpec.selectWithFilter(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, 
SinkTable.UPSERT_SINK),
+                TestSpec.selectWithFilter(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.UPSERT_SINK),
+                TestSpec.selectWithFilter(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES, 
SinkTable.RETRACT_SINK),
+                TestSpec.selectWithFilter(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES, 
SinkTable.RETRACT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES,
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES,
+                        SinkTable.UPSERT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.UPSERT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES,
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.UPSERT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SourceTable.UPSERT_SOURCE_FULL_DELETES,
+                        SinkTable.UPSERT_SINK),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.UPSERT_SINK_FULL_DELETES),
+                TestSpec.join(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES,
+                        SinkTable.RETRACT_SINK),
+                TestSpec.select(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES_METADATA,
+                        SinkTable.UPSERT_SINK_METADATA),
+                TestSpec.selectWithoutMetadata(
+                        SourceTable.UPSERT_SOURCE_PARTIAL_DELETES_METADATA, 
SinkTable.UPSERT_SINK));
+    }
+
+    @AfterEach
+    void tearDown() {
+        Arrays.stream(util.tableEnv().listTables())
+                .forEach(t -> util.tableEnv().executeSql("DROP TABLE " + t));
+    }
+
+    @ParameterizedTest()
+    @MethodSource("getTests")
+    void testChangelogNormalizePlan(TestSpec spec) {
+        for (TableProperties tableProperties : spec.tablesToCreate) {
+            final String additionalColumns =
+                    String.join(",\n", tableProperties.getAdditionalColumns());
+            util.tableEnv()
+                    .executeSql(
+                            String.format(
+                                    "CREATE TABLE %s ( id INT,\n"
+                                            + " col1 INT,\n"
+                                            + " col2 STRING,\n"
+                                            + "%s"
+                                            + " PRIMARY KEY(id) NOT ENFORCED) 
WITH (%s)",
+                                    tableProperties.getTableName(),
+                                    
StringUtils.isNullOrWhitespaceOnly(additionalColumns)
+                                            ? ""
+                                            : additionalColumns + ",\n",
+                                    String.join(",\n", 
tableProperties.getOptions())));
+        }
+        util.verifyRelPlanInsert(
+                spec.query,
+                JavaScalaConversionUtil.toScala(
+                        
Collections.singletonList(ExplainDetail.CHANGELOG_MODE)));
+    }
+
+    interface TableProperties {
+
+        String getTableName();
+
+        List<String> getOptions();
+
+        List<String> getAdditionalColumns();
+    }
+
+    public enum SourceTable implements TableProperties {
+        UPSERT_SOURCE_PARTIAL_DELETES(
+                "upsert_table_partial_deletes",
+                "'connector' = 'values'",
+                "'changelog-mode' = 'UA,D'",
+                "'source.produces-delete-by-key'='true'"),
+        UPSERT_SOURCE_PARTIAL_DELETES_METADATA(
+                "upsert_table_partial_deletes_metadata",
+                List.of("`offset` BIGINT METADATA"),
+                "'connector' = 'values'",
+                "'changelog-mode' = 'UA,D'",
+                "'source.produces-delete-by-key'='true'",
+                "'readable-metadata' = 'offset:BIGINT'"),
+        UPSERT_SOURCE_FULL_DELETES(
+                "upsert_table_full_deletes",
+                "'connector' = 'values'",
+                "'changelog-mode' = 'UA,D'",
+                "'source.produces-delete-by-key'='false'");
+
+        private final String tableName;
+        private final List<String> options;
+        private final List<String> additionalColumns;
+
+        SourceTable(String tableName, String... options) {
+            this(tableName, Collections.emptyList(), options);
+        }
+
+        SourceTable(String tableName, List<String> additionalColumns, 
String... options) {
+            this.tableName = tableName;
+            this.additionalColumns = additionalColumns;
+            this.options = Arrays.asList(options);
+        }
+
+        @Override
+        public String getTableName() {
+            return tableName;
+        }
+
+        @Override
+        public List<String> getOptions() {
+            return options;
+        }
+
+        @Override
+        public List<String> getAdditionalColumns() {
+            return additionalColumns;
+        }
+    }
+
+    public enum SinkTable implements TableProperties {
+        UPSERT_SINK(
+                "upsert_sink_table",
+                "  'connector' = 'values'",
+                "'sink.supports-delete-by-key' = 'true'",
+                "'sink-changelog-mode-enforced' = 'I,UA,D'"),
+        UPSERT_SINK_METADATA(
+                "upsert_sink_table",
+                List.of("`offset` BIGINT METADATA"),
+                "  'connector' = 'values'",
+                "'sink.supports-delete-by-key' = 'true'",
+                "'writable-metadata' = 'offset:BIGINT'",
+                "'sink-changelog-mode-enforced' = 'I,UA,D'"),
+        UPSERT_SINK_FULL_DELETES(
+                "upsert_sink_table_full_deletes",
+                "  'connector' = 'values'",
+                "'sink.supports-delete-by-key' = 'false'",
+                "'sink-changelog-mode-enforced' = 'I,UA,D'"),
+        RETRACT_SINK(
+                "all_change_sink_table",
+                "'connector' = 'values'",
+                "'sink-changelog-mode-enforced' = 'I,UA,UB,D'");
+
+        private final String tableName;
+        private final List<String> options;
+        private final List<String> additionalColumns;
+
+        SinkTable(String tableName, String... options) {
+            this(tableName, Collections.emptyList(), options);
+        }
+
+        SinkTable(String tableName, List<String> additionalColumns, String... 
options) {
+            this.tableName = tableName;
+            this.options = Arrays.asList(options);
+            this.additionalColumns = additionalColumns;
+        }
+
+        @Override
+        public String getTableName() {
+            return tableName;
+        }
+
+        @Override
+        public List<String> getOptions() {
+            return options;
+        }
+
+        @Override
+        public List<String> getAdditionalColumns() {
+            return additionalColumns;
+        }
+    }
+
+    private static class TestSpec {
+
+        private final Set<TableProperties> tablesToCreate;
+        private final String query;
+        private final String description;
+
+        private TestSpec(String description, Set<TableProperties> 
tablesToCreate, String query) {
+            this.tablesToCreate = tablesToCreate;
+            this.query = query;
+            this.description = description;
+        }
+
+        public static TestSpec selectWithoutMetadata(SourceTable sourceTable, 
SinkTable sinkTable) {
+            return new TestSpec(
+                    String.format(
+                            "select_no_metadata_%s_into_%s",
+                            sourceTable.getTableName(), 
sinkTable.getTableName()),
+                    new HashSet<>(Arrays.asList(sourceTable, sinkTable)),
+                    String.format(
+                            "INSERT INTO %s SELECT id, col1, col2 FROM %s",
+                            sinkTable.getTableName(), 
sourceTable.getTableName()));
+        }
+
+        public static TestSpec select(SourceTable sourceTable, SinkTable 
sinkTable) {
+            return new TestSpec(
+                    String.format(
+                            "select_%s_into_%s",
+                            sourceTable.getTableName(), 
sinkTable.getTableName()),
+                    new HashSet<>(Arrays.asList(sourceTable, sinkTable)),
+                    String.format(
+                            "INSERT INTO %s SELECT * FROM %s",
+                            sinkTable.getTableName(), 
sourceTable.getTableName()));
+        }
+
+        public static TestSpec selectWithFilter(SourceTable sourceTable, 
SinkTable sinkTable) {
+            return new TestSpec(
+                    String.format(
+                            "select_with_filter_%s_into_%s",
+                            sourceTable.getTableName(), 
sinkTable.getTableName()),
+                    new HashSet<>(Arrays.asList(sourceTable, sinkTable)),
+                    String.format(
+                            "INSERT INTO %s SELECT * FROM %s WHERE col1 > 2",
+                            sinkTable.getTableName(), 
sourceTable.getTableName()));
+        }
+
+        public static TestSpec join(
+                SourceTable leftTable, SourceTable rightTable, SinkTable 
sinkTable) {
+            return new TestSpec(
+                    String.format(
+                            "join_%s_%s_into_%s",
+                            leftTable.getTableName(),
+                            rightTable.getTableName(),
+                            sinkTable.getTableName()),
+                    new HashSet<>(Arrays.asList(leftTable, rightTable, 
sinkTable)),
+                    String.format(
+                            "INSERT INTO %s SELECT l.* FROM %s l JOIN %s r ON 
l.id = r.id",
+                            sinkTable.getTableName(),
+                            leftTable.getTableName(),
+                            rightTable.getTableName()));
+        }
+
+        @Override
+        public String toString() {
+            return description;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
index c0c769a698a..f876e815b51 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DataStreamJavaITCase.java
@@ -577,6 +577,7 @@ class DataStreamJavaITCase {
                 tableEnv.fromChangelogStream(
                         changelogStream,
                         Schema.newBuilder().primaryKey("f0").build(),
+                        // produce partial deletes
                         ChangelogMode.upsert()));
 
         final Table result = tableEnv.sqlQuery("SELECT f0, SUM(f1) FROM t 
GROUP BY f0");
@@ -585,7 +586,8 @@ class DataStreamJavaITCase {
                 tableEnv.toChangelogStream(
                         result,
                         Schema.newBuilder().primaryKey("f0").build(),
-                        ChangelogMode.upsert()),
+                        // expect full deletes, therefore, require changelog 
normalize
+                        ChangelogMode.upsert(false)),
                 getOutput(inputOrOutput));
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml
new file mode 100644
index 00000000000..c15ac5f8d62
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ChangelogNormalizeOptimizationTest.xml
@@ -0,0 +1,418 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testChangelogNormalizePlan[[10] 
select_with_filter_upsert_table_full_deletes_into_all_change_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO all_change_sink_table SELECT * FROM 
upsert_table_full_deletes WHERE col1 > 2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.all_change_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalFilter(condition=[>($1, 2)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.all_change_sink_table], 
fields=[id, col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], condition=[>(col1, 2)], 
changelogMode=[I,UB,UA,D])
+   +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes, filter=[]]], fields=[id, col1, col2], 
changelogMode=[UA,D])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[11] 
join_upsert_table_full_deletes_upsert_table_full_deletes_into_upsert_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO upsert_sink_table SELECT l.* FROM 
upsert_table_full_deletes l JOIN upsert_table_full_deletes r ON l.id = r.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id, 
col1, col2], changelogMode=[NONE])
++- Calc(select=[id, col1, col2], changelogMode=[I,UA,D])
+   +- Join(joinType=[InnerJoin], where=[=(id, id0)], select=[id, col1, col2, 
id0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
+      :- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+      :  +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes, project=[id, col1, col2], metadata=[]]], fields=[id, 
col1, col2], changelogMode=[UA,D])
+      +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+         +- Calc(select=[id], changelogMode=[UA,D])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes, project=[id, col1, col2], metadata=[]]], fields=[id, 
col1, col2], changelogMode=[UA,D])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[12] 
join_upsert_table_partial_deletes_upsert_table_partial_deletes_into_upsert_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO upsert_sink_table SELECT l.* FROM 
upsert_table_partial_deletes l JOIN upsert_table_partial_deletes r ON l.id = 
r.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id, 
col1, col2], changelogMode=[NONE])
++- Calc(select=[id, col1, col2], changelogMode=[I,UA,PD])
+   +- Join(joinType=[InnerJoin], where=[=(id, id0)], select=[id, col1, col2, 
id0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,PD])
+      :- Exchange(distribution=[hash[id]], changelogMode=[I,UA,PD])
+      :  +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes, project=[id, col1, col2], metadata=[]]], 
fields=[id, col1, col2], changelogMode=[UA,PD])
+      +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,PD])
+         +- Calc(select=[id], changelogMode=[UA,PD])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes, project=[id, col1, col2], metadata=[]]], 
fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[13] 
join_upsert_table_full_deletes_upsert_table_partial_deletes_into_upsert_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO upsert_sink_table SELECT l.* FROM 
upsert_table_full_deletes l JOIN upsert_table_partial_deletes r ON l.id = 
r.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id, 
col1, col2], changelogMode=[NONE])
++- Calc(select=[id, col1, col2], changelogMode=[I,UA,PD])
+   +- Join(joinType=[InnerJoin], where=[=(id, id0)], select=[id, col1, col2, 
id0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,PD])
+      :- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+      :  +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]], fields=[id, col1, col2], changelogMode=[UA,D])
+      +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,PD])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes, project=[id], metadata=[]]], fields=[id], 
changelogMode=[UA,PD])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[14] 
join_upsert_table_partial_deletes_upsert_table_full_deletes_into_upsert_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO upsert_sink_table SELECT l.* FROM 
upsert_table_partial_deletes l JOIN upsert_table_full_deletes r ON l.id = 
r.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id, 
col1, col2], changelogMode=[NONE])
++- Calc(select=[id, col1, col2], changelogMode=[I,UA,PD])
+   +- Join(joinType=[InnerJoin], where=[=(id, id0)], select=[id, col1, col2, 
id0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,PD])
+      :- Exchange(distribution=[hash[id]], changelogMode=[I,UA,PD])
+      :  +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]], fields=[id, col1, col2], changelogMode=[UA,PD])
+      +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes, project=[id], metadata=[]]], fields=[id], 
changelogMode=[UA,D])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[15] 
join_upsert_table_partial_deletes_upsert_table_partial_deletes_into_upsert_sink_table_full_deletes]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO upsert_sink_table_full_deletes SELECT l.* FROM 
upsert_table_partial_deletes l JOIN upsert_table_partial_deletes r ON l.id = 
r.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table_full_deletes],
 fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table_full_deletes], 
fields=[id, col1, col2], changelogMode=[NONE])
++- Calc(select=[id, col1, col2], changelogMode=[I,UA,D])
+   +- Join(joinType=[InnerJoin], where=[=(id, id0)], select=[id, col1, col2, 
id0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
+      :- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+      :  +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
+      :     +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+      :        +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes, project=[id, col1, col2], metadata=[]]], 
fields=[id, col1, col2], changelogMode=[UA,PD])
+      +- Exchange(distribution=[hash[id]], changelogMode=[I,UA,D])
+         +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
+            +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+               +- Calc(select=[id], changelogMode=[UA,PD])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, upsert_table_partial_deletes, project=[id, col1, col2], 
metadata=[]]], fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[16] 
join_upsert_table_partial_deletes_upsert_table_partial_deletes_into_all_change_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO all_change_sink_table SELECT l.* FROM 
upsert_table_partial_deletes l JOIN upsert_table_partial_deletes r ON l.id = 
r.id]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.all_change_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalJoin(condition=[=($0, $3)], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.all_change_sink_table], 
fields=[id, col1, col2], changelogMode=[NONE])
++- Calc(select=[id, col1, col2], changelogMode=[I,UB,UA,D])
+   +- Join(joinType=[InnerJoin], where=[=(id, id0)], select=[id, col1, col2, 
id0], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA,D])
+      :- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+      :  +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+      :     +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+      :        +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes, project=[id, col1, col2], metadata=[]]], 
fields=[id, col1, col2], changelogMode=[UA,PD])
+      +- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+         +- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+            +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+               +- Calc(select=[id], changelogMode=[UA,PD])
+                  +- TableSourceScan(table=[[default_catalog, 
default_database, upsert_table_partial_deletes, project=[id, col1, col2], 
metadata=[]]], fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[17] 
select_upsert_table_partial_deletes_metadata_into_upsert_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO upsert_sink_table SELECT * FROM 
upsert_table_partial_deletes_metadata]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table], 
fields=[id, col1, col2, offset])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2], offset=[$3])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes_metadata, metadata=[offset]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id, 
col1, col2, offset], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
+   +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes_metadata, metadata=[offset]]], fields=[id, col1, 
col2, offset], changelogMode=[UA,PD])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[18] 
select_no_metadata_upsert_table_partial_deletes_metadata_into_upsert_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO upsert_sink_table SELECT id, col1, col2 FROM 
upsert_table_partial_deletes_metadata]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalProject(id=[$0], col1=[$1], col2=[$2], offset=[$3])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes_metadata, metadata=[offset]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id, 
col1, col2], changelogMode=[NONE])
++- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes_metadata, project=[id, col1, col2], metadata=[]]], 
fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[1] 
select_upsert_table_partial_deletes_into_upsert_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO upsert_sink_table SELECT * FROM 
upsert_table_partial_deletes]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id, 
col1, col2], changelogMode=[NONE])
++- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]], fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[2] 
select_upsert_table_full_deletes_into_upsert_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO upsert_sink_table SELECT * FROM 
upsert_table_full_deletes]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id, 
col1, col2], changelogMode=[NONE])
++- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]], fields=[id, col1, col2], changelogMode=[UA,D])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[3] 
select_upsert_table_partial_deletes_into_upsert_sink_table_full_deletes]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO upsert_sink_table_full_deletes SELECT * FROM 
upsert_table_partial_deletes]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table_full_deletes],
 fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table_full_deletes], 
fields=[id, col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
+   +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]], fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[4] 
select_upsert_table_full_deletes_into_upsert_sink_table_full_deletes]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO upsert_sink_table_full_deletes SELECT * FROM 
upsert_table_full_deletes]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table_full_deletes],
 fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table_full_deletes], 
fields=[id, col1, col2], changelogMode=[NONE])
++- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]], fields=[id, col1, col2], changelogMode=[UA,D])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[5] 
select_upsert_table_partial_deletes_into_all_change_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO all_change_sink_table SELECT * FROM 
upsert_table_partial_deletes]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.all_change_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.all_change_sink_table], 
fields=[id, col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+   +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]], fields=[id, col1, col2], changelogMode=[UA,PD])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[6] 
select_upsert_table_full_deletes_into_all_change_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO all_change_sink_table SELECT * FROM 
upsert_table_full_deletes]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.all_change_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.all_change_sink_table], 
fields=[id, col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], changelogMode=[I,UB,UA,D])
+   +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]], fields=[id, col1, col2], changelogMode=[UA,D])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[7] 
select_with_filter_upsert_table_partial_deletes_into_upsert_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO upsert_sink_table SELECT * FROM 
upsert_table_partial_deletes WHERE col1 > 2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalFilter(condition=[>($1, 2)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id, 
col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], condition=[>(col1, 2)], 
changelogMode=[I,UB,UA,D])
+   +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes, filter=[]]], fields=[id, col1, col2], 
changelogMode=[UA,PD])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[8] 
select_with_filter_upsert_table_full_deletes_into_upsert_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO upsert_sink_table SELECT * FROM 
upsert_table_full_deletes WHERE col1 > 2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.upsert_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalFilter(condition=[>($1, 2)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.upsert_sink_table], fields=[id, 
col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], condition=[>(col1, 2)], 
changelogMode=[I,UB,UA,D])
+   +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_full_deletes, filter=[]]], fields=[id, col1, col2], 
changelogMode=[UA,D])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testChangelogNormalizePlan[[9] 
select_with_filter_upsert_table_partial_deletes_into_all_change_sink_table]">
+    <Resource name="sql">
+      <![CDATA[INSERT INTO all_change_sink_table SELECT * FROM 
upsert_table_partial_deletes WHERE col1 > 2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.all_change_sink_table], 
fields=[id, col1, col2])
++- LogicalProject(id=[$0], col1=[$1], col2=[$2])
+   +- LogicalFilter(condition=[>($1, 2)])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.all_change_sink_table], 
fields=[id, col1, col2], changelogMode=[NONE])
++- ChangelogNormalize(key=[id], condition=[>(col1, 2)], 
changelogMode=[I,UB,UA,D])
+   +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_table_partial_deletes, filter=[]]], fields=[id, col1, col2], 
changelogMode=[UA,PD])
+]]>
+    </Resource>
+  </TestCase>
+</Root>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
index 04775282ec1..52db3da571c 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.xml
@@ -333,7 +333,7 @@ Calc(select=[currency, amount, rate, *(amount, rate) AS 
EXPR$3], changelogMode=[
    :     +- TableSourceScan(table=[[default_catalog, default_database, 
orders]], fields=[amount, currency, rowtime], changelogMode=[I])
    +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
       +- WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime], 
changelogMode=[UA,D])
-         +- TableSourceScan(table=[[default_catalog, default_database, 
rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,D])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
rates_history]], fields=[currency, rate, rowtime], changelogMode=[UA,PD])
 ]]>
     </Resource>
   </TestCase>
@@ -486,8 +486,8 @@ Calc(select=[currency, amount, rate, *(amount, rate) AS 
EXPR$3], changelogMode=[
    :     +- TableSourceScan(table=[[default_catalog, default_database, 
orders]], fields=[amount, currency], changelogMode=[I])
    +- Exchange(distribution=[hash[currency]], changelogMode=[I,UA,D])
       +- ChangelogNormalize(key=[currency], changelogMode=[I,UA,D])
-         +- Exchange(distribution=[hash[currency]], changelogMode=[UA,D])
-            +- TableSourceScan(table=[[default_catalog, default_database, 
rates_history]], fields=[currency, rate], changelogMode=[UA,D])
+         +- Exchange(distribution=[hash[currency]], changelogMode=[UA,PD])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
rates_history]], fields=[currency, rate], changelogMode=[UA,PD])
 ]]>
     </Resource>
   </TestCase>
@@ -658,8 +658,8 @@ LogicalProject(b=[$2], ts=[$0], a=[$1])
 Union(all=[true], union=[b, ts, a], changelogMode=[I,UA,D])
 :- Calc(select=[b, ts, CAST(a AS INTEGER) AS a], changelogMode=[I,UA,D])
 :  +- ChangelogNormalize(key=[a], changelogMode=[I,UA,D])
-:     +- Exchange(distribution=[hash[a]], changelogMode=[UA,D])
-:        +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_src]], fields=[ts, a, b], changelogMode=[UA,D])
+:     +- Exchange(distribution=[hash[a]], changelogMode=[UA,PD])
+:        +- TableSourceScan(table=[[default_catalog, default_database, 
upsert_src]], fields=[ts, a, b], changelogMode=[UA,PD])
 +- Calc(select=[b, t AS ts, a], changelogMode=[I,UA])
    +- GroupAggregate(groupBy=[a], select=[a, MAX(ts) AS t, MAX(b) AS b], 
changelogMode=[I,UA])
       +- Exchange(distribution=[hash[a]], changelogMode=[I])
@@ -744,8 +744,8 @@ LogicalProject(id=[$0], ts=[$4])
       <![CDATA[
 Calc(select=[id, Reinterpret(TO_TIMESTAMP(c)) AS ts], changelogMode=[I,UA,D])
 +- ChangelogNormalize(key=[id], changelogMode=[I,UA,D])
-   +- Exchange(distribution=[hash[id]], changelogMode=[UA,D])
-      +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id, c], metadata=[], watermark=[-(TO_TIMESTAMP(c), 1000:INTERVAL 
SECOND)], watermarkEmitStrategy=[on-periodic]]], fields=[id, c], 
changelogMode=[UA,D])
+   +- Exchange(distribution=[hash[id]], changelogMode=[UA,PD])
+      +- TableSourceScan(table=[[default_catalog, default_database, src, 
project=[id, c], metadata=[], watermark=[-(TO_TIMESTAMP(c), 1000:INTERVAL 
SECOND)], watermarkEmitStrategy=[on-periodic]]], fields=[id, c], 
changelogMode=[UA,PD])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
index 3dc7bb8a666..2ba440b7fd1 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.scala
@@ -95,7 +95,8 @@ class NonDeterministicDagTest(nonDeterministicUpdateStrategy: 
NonDeterministicUp
                                | primary key (a) not enforced
                                |) with (
                                | 'connector' = 'values',
-                               | 'changelog-mode' = 'I,UA,D'
+                               | 'changelog-mode' = 'I,UA,D',
+                               | 'source.produces-delete-by-key' = 'true'
                                |)""".stripMargin)
     util.tableEnv.executeSql("""
                                |create temporary table upsert_src_with_meta (
@@ -109,6 +110,7 @@ class 
NonDeterministicDagTest(nonDeterministicUpdateStrategy: NonDeterministicUp
                                |) with (
                                | 'connector' = 'values',
                                | 'changelog-mode' = 'I,UA,D',
+                               | 'source.produces-delete-by-key' = 'true',
                                | 'readable-metadata' = 'metadata_1:INT, 
metadata_2:STRING'
                                |)""".stripMargin)
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
index eb533cbe1c0..ea905662a42 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala
@@ -492,6 +492,7 @@ class TableScanTest extends TableTestBase {
                     |  'connector' = 'values',
                     |  'changelog-mode' = 'UA,D',
                     |  'enable-watermark-push-down' = 'true',
+                    |  'source.produces-delete-by-key' = 'true',
                     |  'disable-lookup' = 'true'
                     |)
       """.stripMargin)
@@ -508,7 +509,8 @@ class TableScanTest extends TableTestBase {
                     |  PRIMARY KEY (a) NOT ENFORCED
                     |) WITH (
                     |  'connector' = 'values',
-                    |  'changelog-mode' = 'UA,D'
+                    |  'changelog-mode' = 'UA,D',
+                    |  'source.produces-delete-by-key' = 'true'
                     |)
       """.stripMargin)
     util.addTable("""
@@ -593,6 +595,7 @@ class TableScanTest extends TableTestBase {
                     |) WITH (
                     |  'connector' = 'values',
                     |  'changelog-mode' = 'UA,D',
+                    |  'source.produces-delete-by-key' = 'true',
                     |  'disable-lookup' = 'true'
                     |)
       """.stripMargin)
@@ -628,6 +631,7 @@ class TableScanTest extends TableTestBase {
                     |) WITH (
                     |  'connector' = 'values',
                     |  'changelog-mode' = 'UA,D',
+                    |  'source.produces-delete-by-key' = 'true',
                     |  'disable-lookup' = 'true'
                     |)
       """.stripMargin)
@@ -786,7 +790,8 @@ class TableScanTest extends TableTestBase {
                     |  'runtime-source' = 'DataStream',
                     |  'scan.parallelism' = '5',
                     |  'enable-projection-push-down' = 'false',
-                    |  'changelog-mode' = 'I,UA,D'
+                    |  'changelog-mode' = 'I,UA,D',
+                    |  'source.produces-delete-by-key' = 'true'
                     |)
       """.stripMargin)
     util.addTable("""
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 2f4c576bc7c..6dc54d54ee3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -90,8 +90,10 @@ import org.apache.calcite.sql.{SqlExplainLevel, 
SqlIntervalQualifier}
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.assertj.core.api.Assertions.{assertThat, assertThatExceptionOfType, 
fail}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.TestTemplate
 import org.junit.jupiter.api.extension.{BeforeEachCallback, ExtendWith, 
ExtensionContext, RegisterExtension}
 import org.junit.jupiter.api.io.TempDir
+import org.junit.jupiter.params.ParameterizedTest
 import org.junit.platform.commons.support.AnnotationSupport
 
 import java.io.{File, IOException}
@@ -155,7 +157,14 @@ class TestName extends BeforeEachCallback {
       }
       methodName = s"${context.getTestMethod.get().getName}$displayName"
     } else {
-      methodName = context.getTestMethod.get().getName
+      if (
+        AnnotationSupport.isAnnotated(context.getTestMethod, 
classOf[ParameterizedTest])
+        || AnnotationSupport.isAnnotated(context.getTestMethod, 
classOf[TestTemplate])
+      ) {
+        methodName = 
s"${context.getTestMethod.get().getName}[${context.getDisplayName}]"
+      } else {
+        methodName = context.getTestMethod.get().getName
+      }
     }
   }
 


Reply via email to