This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5543195eb06 [FLINK-38511][table] Join that consumes cdc source without 
delete may be converted to delta join (#27111)
5543195eb06 is described below

commit 5543195eb06599727d55a85efe89dda45e421597
Author: Xuyang <[email protected]>
AuthorDate: Mon Oct 27 09:38:52 2025 +0800

    [FLINK-38511][table] Join that consumes cdc source without delete may be 
converted to delta join (#27111)
---
 .../nodes/exec/stream/StreamExecDeltaJoin.java     |  33 +-
 .../physical/stream/StreamPhysicalDeltaJoin.java   |  64 +-
 .../physical/stream/DeltaJoinRewriteRule.java      |   7 +-
 .../table/planner/plan/utils/DeltaJoinUtil.java    |  71 +-
 .../planner/factories/TestValuesTableFactory.java  |  13 +-
 .../planner/plan/stream/sql/DeltaJoinTest.xml      | 254 ++++++-
 .../planner/plan/stream/sql/DeltaJoinTest.scala    | 162 ++++-
 .../runtime/stream/sql/DeltaJoinITCase.scala       | 602 ++++++++++------
 .../join/deltajoin/StreamingDeltaJoinOperator.java |   5 +-
 .../deltajoin/StreamingDeltaJoinOperatorTest.java  | 782 +++++++++++++++------
 10 files changed, 1461 insertions(+), 532 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java
index 962cc550541..07ae35a6e82 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java
@@ -80,6 +80,8 @@ import org.apache.calcite.tools.RelBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -117,6 +119,9 @@ public class StreamExecDeltaJoin extends 
ExecNodeBase<RowData>
             "lookupRightTableJoinSpec";
     private static final String FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC = 
"lookupLeftTableJoinSpec";
 
+    private static final String FIELD_NAME_LEFT_UPSERT_KEY = "leftUpsertKey";
+    private static final String FIELD_NAME_RIGHT_UPSERT_KEY = "rightUpsertKey";
+
     private static final String FIELD_NAME_JOIN_TYPE = "joinType";
 
     public static final String FIELD_NAME_ASYNC_OPTIONS = "asyncOptions";
@@ -135,6 +140,11 @@ public class StreamExecDeltaJoin extends 
ExecNodeBase<RowData>
     @JsonProperty(FIELD_NAME_LEFT_JOIN_KEYS)
     private final int[] leftJoinKeys;
 
+    @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final int[] leftUpsertKeys;
+
     // left (streaming) side join right (lookup) side
     @JsonProperty(FIELD_NAME_LOOKUP_RIGHT_TABLE_JOIN_SPEC)
     private final DeltaJoinSpec lookupRightTableJoinSpec;
@@ -144,6 +154,11 @@ public class StreamExecDeltaJoin extends 
ExecNodeBase<RowData>
     @JsonProperty(FIELD_NAME_RIGHT_JOIN_KEYS)
     private final int[] rightJoinKeys;
 
+    @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final int[] rightUpsertKeys;
+
     // right (streaming) side join left (lookup) side
     @JsonProperty(FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC)
     private final DeltaJoinSpec lookupLeftTableJoinSpec;
@@ -153,9 +168,11 @@ public class StreamExecDeltaJoin extends 
ExecNodeBase<RowData>
             FlinkJoinType flinkJoinType,
             // delta join args related with the left side
             int[] leftJoinKeys,
+            @Nullable int[] leftUpsertKeys,
             DeltaJoinSpec lookupRightTableJoinSpec,
             // delta join args related with the right side
             int[] rightJoinKeys,
+            @Nullable int[] rightUpsertKeys,
             DeltaJoinSpec lookupLeftTableJoinSpec,
             InputProperty leftInputProperty,
             InputProperty rightInputProperty,
@@ -168,8 +185,10 @@ public class StreamExecDeltaJoin extends 
ExecNodeBase<RowData>
                 ExecNodeContext.newPersistedConfig(StreamExecDeltaJoin.class, 
tableConfig),
                 flinkJoinType,
                 leftJoinKeys,
+                leftUpsertKeys,
                 lookupRightTableJoinSpec,
                 rightJoinKeys,
+                rightUpsertKeys,
                 lookupLeftTableJoinSpec,
                 Lists.newArrayList(leftInputProperty, rightInputProperty),
                 outputType,
@@ -184,9 +203,11 @@ public class StreamExecDeltaJoin extends 
ExecNodeBase<RowData>
             @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig 
persistedConfig,
             @JsonProperty(FIELD_NAME_JOIN_TYPE) FlinkJoinType flinkJoinType,
             @JsonProperty(FIELD_NAME_LEFT_JOIN_KEYS) int[] leftJoinKeys,
+            @JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY) @Nullable int[] 
leftUpsertKeys,
             @JsonProperty(FIELD_NAME_LOOKUP_RIGHT_TABLE_JOIN_SPEC)
                     DeltaJoinSpec lookupRightTableJoinSpec,
             @JsonProperty(FIELD_NAME_RIGHT_JOIN_KEYS) int[] rightJoinKeys,
+            @JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY) @Nullable int[] 
rightUpsertKeys,
             @JsonProperty(FIELD_NAME_LOOKUP_LEFT_TABLE_JOIN_SPEC)
                     DeltaJoinSpec lookupLeftTableJoinSpec,
             @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
@@ -197,8 +218,10 @@ public class StreamExecDeltaJoin extends 
ExecNodeBase<RowData>
 
         this.flinkJoinType = flinkJoinType;
         this.leftJoinKeys = leftJoinKeys;
+        this.leftUpsertKeys = leftUpsertKeys;
         this.lookupRightTableJoinSpec = lookupRightTableJoinSpec;
         this.rightJoinKeys = rightJoinKeys;
+        this.rightUpsertKeys = rightUpsertKeys;
         this.lookupLeftTableJoinSpec = lookupLeftTableJoinSpec;
         this.asyncLookupOptions = asyncLookupOptions;
     }
@@ -237,17 +260,15 @@ public class StreamExecDeltaJoin extends 
ExecNodeBase<RowData>
         RowDataKeySelector leftJoinKeySelector =
                 KeySelectorUtil.getRowDataSelector(
                         classLoader, leftJoinKeys, 
InternalTypeInfo.of(leftStreamType));
-        // currently, delta join only supports consuming INSERT-ONLY stream
         RowDataKeySelector leftUpsertKeySelector =
-                getUpsertKeySelector(new int[0], leftStreamType, classLoader);
+                getUpsertKeySelector(leftUpsertKeys, leftStreamType, 
classLoader);
 
         // right side selector
         RowDataKeySelector rightJoinKeySelector =
                 KeySelectorUtil.getRowDataSelector(
                         classLoader, rightJoinKeys, 
InternalTypeInfo.of(rightStreamType));
-        // currently, delta join only supports consuming INSERT-ONLY stream
         RowDataKeySelector rightUpsertKeySelector =
-                getUpsertKeySelector(new int[0], rightStreamType, classLoader);
+                getUpsertKeySelector(rightUpsertKeys, rightStreamType, 
classLoader);
 
         StreamOperatorFactory<RowData> operatorFactory =
                 createAsyncLookupDeltaJoin(
@@ -485,9 +506,9 @@ public class StreamExecDeltaJoin extends 
ExecNodeBase<RowData>
     }
 
     private RowDataKeySelector getUpsertKeySelector(
-            int[] upsertKey, RowType rowType, ClassLoader classLoader) {
+            @Nullable int[] upsertKey, RowType rowType, ClassLoader 
classLoader) {
         final int[] rightUpsertKeys;
-        if (upsertKey.length > 0) {
+        if (upsertKey != null && upsertKey.length > 0) {
             rightUpsertKeys = upsertKey;
         } else {
             rightUpsertKeys = IntStream.range(0, 
rowType.getFields().size()).toArray();
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java
index 7c0f11cdc61..217ad55a570 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalDeltaJoin.java
@@ -22,39 +22,35 @@ import 
org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.DeltaJoinSpec;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecDeltaJoin;
 import org.apache.flink.table.planner.plan.utils.FunctionCallUtil;
+import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
 import org.apache.flink.table.planner.plan.utils.RelExplainUtil;
+import org.apache.flink.table.planner.plan.utils.UpsertKeyUtil;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
 
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.BiRel;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.core.JoinInfo;
-import org.apache.calcite.rel.hint.Hintable;
+import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
 import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig;
 
 /** Stream physical RelNode for delta join. */
-public class StreamPhysicalDeltaJoin extends BiRel implements 
StreamPhysicalRel, Hintable {
-
-    private final FlinkJoinType joinType;
-
-    private final RexNode originalJoinCondition;
-
-    private final com.google.common.collect.ImmutableList<RelHint> hints;
+public class StreamPhysicalDeltaJoin extends Join implements StreamPhysicalRel 
{
 
     private final RelDataType rowType;
 
@@ -70,15 +66,20 @@ public class StreamPhysicalDeltaJoin extends BiRel 
implements StreamPhysicalRel,
             List<RelHint> hints,
             RelNode left,
             RelNode right,
-            FlinkJoinType joinType,
+            JoinRelType joinType,
             RexNode originalJoinCondition,
             DeltaJoinSpec lookupRightTableJoinSpec,
             DeltaJoinSpec lookupLeftTableJoinSpec,
             RelDataType rowType) {
-        super(cluster, traitSet, left, right);
-        this.hints = com.google.common.collect.ImmutableList.copyOf(hints);
-        this.joinType = joinType;
-        this.originalJoinCondition = originalJoinCondition;
+        super(
+                cluster,
+                traitSet,
+                hints,
+                left,
+                right,
+                originalJoinCondition,
+                Collections.emptySet(),
+                joinType);
         this.lookupRightTableJoinSpec = lookupRightTableJoinSpec;
         this.lookupLeftTableJoinSpec = lookupLeftTableJoinSpec;
         this.rowType = rowType;
@@ -97,15 +98,20 @@ public class StreamPhysicalDeltaJoin extends BiRel 
implements StreamPhysicalRel,
                         // scenarios to enhance throughput as much as possible.
                         true,
                         AsyncDataStream.OutputMode.ORDERED);
+        FlinkRelMetadataQuery fmq =
+                
FlinkRelMetadataQuery.reuseOrCreate(this.getCluster().getMetadataQuery());
 
-        JoinInfo joinInfo = JoinInfo.of(left, right, originalJoinCondition);
+        int[] leftUpsertKey = 
UpsertKeyUtil.smallestKey(fmq.getUpsertKeys(left)).orElse(null);
+        int[] rightUpsertKey = 
UpsertKeyUtil.smallestKey(fmq.getUpsertKeys(right)).orElse(null);
 
         return new StreamExecDeltaJoin(
                 config,
-                joinType,
+                JoinTypeUtil.getFlinkJoinType(joinType),
                 joinInfo.leftKeys.toIntArray(),
+                leftUpsertKey,
                 lookupRightTableJoinSpec,
                 joinInfo.rightKeys.toIntArray(),
+                rightUpsertKey,
                 lookupLeftTableJoinSpec,
                 InputProperty.DEFAULT,
                 InputProperty.DEFAULT,
@@ -120,16 +126,21 @@ public class StreamPhysicalDeltaJoin extends BiRel 
implements StreamPhysicalRel,
     }
 
     @Override
-    public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
-        assert inputs.size() == 2;
+    public Join copy(
+            RelTraitSet traitSet,
+            RexNode conditionExpr,
+            RelNode left,
+            RelNode right,
+            JoinRelType joinType,
+            boolean semiJoinDone) {
         return new StreamPhysicalDeltaJoin(
                 getCluster(),
                 traitSet,
                 hints,
-                inputs.get(0),
-                inputs.get(1),
+                left,
+                right,
                 joinType,
-                originalJoinCondition,
+                conditionExpr,
                 lookupRightTableJoinSpec,
                 lookupLeftTableJoinSpec,
                 rowType);
@@ -147,12 +158,13 @@ public class StreamPhysicalDeltaJoin extends BiRel 
implements StreamPhysicalRel,
 
     @Override
     public RelWriter explainTerms(RelWriter pw) {
-        return super.explainTerms(pw)
-                .item("joinType", joinType.toString())
+        return pw.input("left", left)
+                .input("right", right)
+                .item("joinType", 
JoinTypeUtil.getFlinkJoinType(joinType).toString())
                 .item(
                         "where",
                         getExpressionString(
-                                originalJoinCondition,
+                                condition,
                                 
JavaScalaConversionUtil.toScala(this.getRowType().getFieldNames())
                                         .toList(),
                                 
JavaScalaConversionUtil.toScala(Optional.empty()),
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java
index e4c90fea9a2..14529f69ab8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/DeltaJoinRewriteRule.java
@@ -25,7 +25,6 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DeltaJoinSpec;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeltaJoin;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin;
 import org.apache.flink.table.planner.plan.utils.DeltaJoinUtil;
-import org.apache.flink.table.planner.plan.utils.JoinTypeUtil;
 
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelRule;
@@ -44,7 +43,9 @@ import static 
org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConf
  *   <li>The join is INNER join.
  *   <li>There is at least one join key pair in the join.
  *   <li>The downstream nodes of this join can accept duplicate changes.
- *   <li>All join inputs are insert only streams.
+ *   <li>All join inputs are with changelog "I" or "I, UA".
+ *   <li>If this join outputs update records, the non-equiv conditions must be 
applied on upsert
+ *       keys of this join.
  *   <li>All upstream nodes of this join are in {@code
  *       DeltaJoinUtil#ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES}
  *   <li>The join keys include at least one complete index in each source 
table of the join input.
@@ -92,7 +93,7 @@ public class DeltaJoinRewriteRule extends 
RelRule<DeltaJoinRewriteRule.Config> {
                 join.getHints(),
                 join.getLeft(),
                 join.getRight(),
-                JoinTypeUtil.getFlinkJoinType(join.getJoinType()),
+                join.getJoinType(),
                 join.getCondition(),
                 lookupRightTableJoinSpec,
                 lookupLeftTableJoinSpec,
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
index e0920a4d1c1..a2e356e714f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java
@@ -25,9 +25,12 @@ import 
org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.functions.AsyncTableFunction;
 import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.DeltaJoinSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.TemporalTableSourceSpec;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDeltaJoin;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalDropUpdateBefore;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalIntermediateTableScan;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalJoin;
@@ -53,10 +56,12 @@ import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
 import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.mapping.IntPair;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -75,7 +80,10 @@ public class DeltaJoinUtil {
      * <p>More physical nodes can be added to support more patterns for delta 
join.
      */
     private static final Set<Class<?>> ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES 
=
-            Sets.newHashSet(StreamPhysicalTableSourceScan.class, 
StreamPhysicalExchange.class);
+            Sets.newHashSet(
+                    StreamPhysicalTableSourceScan.class,
+                    StreamPhysicalExchange.class,
+                    StreamPhysicalDropUpdateBefore.class);
 
     private DeltaJoinUtil() {}
 
@@ -95,8 +103,8 @@ public class DeltaJoinUtil {
             return false;
         }
 
-        // currently, only join with append-only inputs is supported
-        if (!areAllInputsInsertOnly(join)) {
+        // currently, only join that consumes +I and +U is supported
+        if (!areAllInputsInsertOrUpdateAfter(join)) {
             return false;
         }
 
@@ -240,7 +248,29 @@ public class DeltaJoinUtil {
     private static boolean areJoinConditionsSupported(StreamPhysicalJoin join) 
{
         JoinInfo joinInfo = join.analyzeCondition();
         // there must be one pair of join key
-        return !joinInfo.pairs().isEmpty();
+        if (joinInfo.pairs().isEmpty()) {
+            return false;
+        }
+
+        // if this join outputs cdc records and has non-equiv condition, the 
reference columns in
+        // the non-equiv condition must come from the same set of upsert keys
+        ChangelogMode changelogMode = getChangelogMode(join);
+        if (changelogMode.containsOnly(RowKind.INSERT)) {
+            return true;
+        }
+        JoinSpec joinSpec = join.joinSpec();
+        Optional<RexNode> nonEquiCond = joinSpec.getNonEquiCondition();
+        if (nonEquiCond.isEmpty()) {
+            return true;
+        }
+        ImmutableBitSet fieldRefIndices =
+                ImmutableBitSet.of(
+                        RexNodeExtractor.extractRefInputFields(
+                                Collections.singletonList(nonEquiCond.get())));
+        FlinkRelMetadataQuery fmq =
+                
FlinkRelMetadataQuery.reuseOrCreate(join.getCluster().getMetadataQuery());
+        Set<ImmutableBitSet> upsertKeys = fmq.getUpsertKeys(join);
+        return upsertKeys.stream().anyMatch(uk -> 
uk.contains(fieldRefIndices));
     }
 
     private static boolean areAllJoinTableScansSupported(StreamPhysicalJoin 
join) {
@@ -301,8 +331,9 @@ public class DeltaJoinUtil {
         node = unwrapNode(node, true);
         // support to get table across more nodes if we support more nodes in
         // `ALL_SUPPORTED_DELTA_JOIN_UPSTREAM_NODES`
-        if (node instanceof StreamPhysicalExchange) {
-            return getTableScan(((StreamPhysicalExchange) node).getInput());
+        if (node instanceof StreamPhysicalExchange
+                || node instanceof StreamPhysicalDropUpdateBefore) {
+            return getTableScan(node.getInput(0));
         }
 
         Preconditions.checkState(node instanceof TableScan);
@@ -340,25 +371,29 @@ public class DeltaJoinUtil {
         return DuplicateChanges.ALLOW.equals(duplicateChanges);
     }
 
-    private static boolean areAllInputsInsertOnly(StreamPhysicalJoin join) {
+    private static boolean areAllInputsInsertOrUpdateAfter(StreamPhysicalJoin 
join) {
         for (RelNode input : join.getInputs()) {
-            if (!isInsertOnly(unwrapNode(input, false))) {
+            if (!onlyProduceInsertOrUpdateAfter(unwrapNode(input, false))) {
                 return false;
             }
         }
         return true;
     }
 
-    private static boolean isInsertOnly(StreamPhysicalRel node) {
-        ChangelogMode changelogMode =
-                
JavaScalaConversionUtil.toJava(ChangelogPlanUtils.getChangelogMode(node))
-                        .orElseThrow(
-                                () ->
-                                        new IllegalStateException(
-                                                String.format(
-                                                        "Unable to derive 
changelog mode from node %s. This is a bug.",
-                                                        node)));
-        return changelogMode.containsOnly(RowKind.INSERT);
+    private static boolean onlyProduceInsertOrUpdateAfter(StreamPhysicalRel 
node) {
+        ChangelogMode changelogMode = getChangelogMode(node);
+        Set<RowKind> allKinds = changelogMode.getContainedKinds();
+        return !allKinds.contains(RowKind.UPDATE_BEFORE) && 
!allKinds.contains(RowKind.DELETE);
+    }
+
+    private static ChangelogMode getChangelogMode(StreamPhysicalRel node) {
+        return 
JavaScalaConversionUtil.toJava(ChangelogPlanUtils.getChangelogMode(node))
+                .orElseThrow(
+                        () ->
+                                new IllegalStateException(
+                                        String.format(
+                                                "Unable to derive changelog 
mode from node %s. This is a bug.",
+                                                node)));
     }
 
     private static StreamPhysicalRel unwrapNode(RelNode node, boolean 
transposeToChildBlock) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 9f05cafb49f..4e12f5f72d8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -1977,8 +1977,9 @@ public final class TestValuesTableFactory
                 Row pk = extractPk(row);
                 RowKind originalRowKind = row.getKind();
                 if (originalRowKind == RowKind.INSERT || originalRowKind == 
RowKind.UPDATE_AFTER) {
-                    row.setKind(RowKind.INSERT);
-                    pkMap.put(pk, row);
+                    Row copiedRow = copyRow(row);
+                    copiedRow.setKind(RowKind.INSERT);
+                    pkMap.put(pk, copiedRow);
                 } else {
                     pkMap.remove(pk);
                 }
@@ -1986,6 +1987,14 @@ public final class TestValuesTableFactory
             return new ArrayList<>(pkMap.values());
         }
 
+        private Row copyRow(Row oldRow) {
+            Row newRow = new Row(oldRow.getKind(), oldRow.getArity());
+            for (int i = 0; i < newRow.getArity(); i++) {
+                newRow.setField(i, oldRow.getField(i));
+            }
+            return newRow;
+        }
+
         private Row extractPk(Row row) {
             Object[] pk = new Object[primaryKeyIndices.length];
             for (int i = 0; i < primaryKeyIndices.length; i++) {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
index c37d0b83d1c..55249bedc21 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml
@@ -37,30 +37,6 @@ Sink(table=[default_catalog.default_database.append_snk], 
fields=[a0, a1, a2, a3
    :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a0, a1, a2, a3])
    +- Exchange(distribution=[hash[b1, b2]])
       +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testCdcSource">
-    <Resource name="sql">
-      <![CDATA[insert into snk select * from src1 join cdc_src on src1.a1 = 
cdc_src.b1 and src1.a2 = cdc_src.b2]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, 
a3, b0, b2, b1])
-+- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
-   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
-      :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
-      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_src]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, 
b2, b1])
-+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, 
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
-   :- Exchange(distribution=[hash[a1, a2]])
-   :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a0, a1, a2, a3])
-   +- Exchange(distribution=[hash[b1, b2]])
-      +- TableSourceScan(table=[[default_catalog, default_database, cdc_src]], 
fields=[b0, b2, b1])
 ]]>
     </Resource>
   </TestCase>
@@ -298,6 +274,84 @@ Sink(table=[default_catalog.default_database.snk], 
fields=[a0, a1, a2, a3, b0, b
    :     +- TableSourceScan(table=[[default_catalog, default_database, src1, 
filter=[]]], fields=[a0, a1, a2, a3])
    +- Exchange(distribution=[hash[b1, b2]])
       +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase 
name="testFilterOnNonUpsertKeysAfterJoinWithCdcSourceWithoutDelete">
+    <Resource name="sql">
+      <![CDATA[insert into snk_for_cdc_src select * from no_delete_src1 join 
no_delete_src2 on a1 = b1 and a2 = b2 where a3 > b0]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src], 
fields=[a0, a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalFilter(condition=[>($3, $4)])
+      +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+         :- LogicalTableScan(table=[[default_catalog, default_database, 
no_delete_src1]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
no_delete_src2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1, 
a2, a3, b0, b2, b1], changelogMode=[NONE])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2), >(a3, b0))], 
select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA])
+   :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA])
+   :  +- DropUpdateBefore(changelogMode=[I,UA])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+   +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
+      +- DropUpdateBefore(changelogMode=[I,UA])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testFilterOnUpsertKeysAfterJoinWithCdcSourceWithoutDelete">
+    <Resource name="sql">
+      <![CDATA[insert into snk_for_cdc_src select * from no_delete_src1 join 
no_delete_src2 on a1 = b1 and a2 = b2 where a0 > b1]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src], 
fields=[a0, a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalFilter(condition=[>($0, $6)])
+      +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+         :- LogicalTableScan(table=[[default_catalog, default_database, 
no_delete_src1]])
+         +- LogicalTableScan(table=[[default_catalog, default_database, 
no_delete_src2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1, 
a2, a3, b0, b2, b1], changelogMode=[NONE])
++- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2), >(a0, 
b1))], select=[a0, a1, a2, a3, b0, b2, b1], changelogMode=[I,UA])
+   :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA])
+   :  +- DropUpdateBefore(changelogMode=[I,UA])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+   +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
+      +- DropUpdateBefore(changelogMode=[I,UA])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinAppendOnlySourceAndSourceWithoutDelete">
+    <Resource name="sql">
+      <![CDATA[insert into tmp_snk select * from src1 join no_delete_src2 on 
a1 = b1 and a2 = b2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.tmp_snk], fields=[a0, a1, 
a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
no_delete_src2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.tmp_snk], fields=[a0, a1, a2, a3, 
b0, b2, b1], changelogMode=[NONE])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, 
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA])
+   :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a0, a1, a2, a3], changelogMode=[I])
+   +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UB,UA])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
 ]]>
     </Resource>
   </TestCase>
@@ -600,6 +654,86 @@ Sink(table=[default_catalog.default_database.snk], 
fields=[a0, a1, a2, a3, b0, b
 
 Sink(table=[default_catalog.default_database.snk2], fields=[a0, a1, a2, a3, 
b0, b2, b1])
 +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPKContainJoinKeyAndOnlyOneSourceNoDelete">
+    <Resource name="sql">
+      <![CDATA[insert into snk_for_cdc_src select * from all_changelog_src 
join no_delete_src2 on a1 = b1 and a2 = b2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src], 
fields=[a0, a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
all_changelog_src]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
no_delete_src2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1, 
a2, a3, b0, b2, b1], changelogMode=[NONE])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, 
a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
+   :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA,D])
+   :  +- DropUpdateBefore(changelogMode=[I,UA,D])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, 
all_changelog_src]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA,D])
+   +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
+      +- DropUpdateBefore(changelogMode=[I,UA])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPKContainsJoinKeyAndSourceNoUBAndD">
+    <Resource name="sql">
+      <![CDATA[insert into snk_for_cdc_src select * from 
no_delete_and_update_before_src1 join no_delete_and_update_before_src2 on a1 = 
b1 and a2 = b2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src], 
fields=[a0, a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
no_delete_and_update_before_src1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
no_delete_and_update_before_src2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1, 
a2, a3, b0, b2, b1], changelogMode=[NONE])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, 
a1, a2, a3, b0, b2, b1], leftInputSpec=[HasUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UA,D])
+   :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA,D])
+   :  +- ChangelogNormalize(key=[a0, a1, a2], changelogMode=[I,UA,D])
+   :     +- Exchange(distribution=[hash[a0, a1, a2]], changelogMode=[I,UA])
+   :        +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_and_update_before_src1]], fields=[a0, a1, a2, a3], 
changelogMode=[I,UA])
+   +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA,D])
+      +- ChangelogNormalize(key=[b1, b2], changelogMode=[I,UA,D])
+         +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
+            +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_and_update_before_src2]], fields=[b0, b2, b1], changelogMode=[I,UA])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testPKContainsJoinKeyAndTwoSourcesNoDelete">
+    <Resource name="sql">
+      <![CDATA[insert into snk_for_cdc_src select * from no_delete_src1 join 
no_delete_src2 on a1 = b1 and a2 = b2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src], 
fields=[a0, a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
no_delete_src1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
no_delete_src2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1, 
a2, a3, b0, b2, b1], changelogMode=[NONE])
++- DeltaJoin(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], 
select=[a0, a1, a2, a3, b0, b2, b1], changelogMode=[I,UA])
+   :- Exchange(distribution=[hash[a1, a2]], changelogMode=[I,UA])
+   :  +- DropUpdateBefore(changelogMode=[I,UA])
+   :     +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+   +- Exchange(distribution=[hash[b1, b2]], changelogMode=[I,UA])
+      +- DropUpdateBefore(changelogMode=[I,UA])
+         +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
 ]]>
     </Resource>
   </TestCase>
@@ -651,6 +785,54 @@ Sink(table=[default_catalog.default_database.snk], 
targetColumns=[[0],[1],[2],[4
       :  +- TableSourceScan(table=[[default_catalog, default_database, src1, 
project=[a0, a1, a2], metadata=[]]], fields=[a0, a1, a2])
       +- Exchange(distribution=[hash[b1, b2]])
          +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRetractSink">
+    <Resource name="sql">
+      <![CDATA[insert into retract_snk select * from src1 join src2 on src1.a1 
= src2.b1 and src1.a2 = src2.b2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.retract_snk], fields=[a0, 
a1, a2, a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.retract_snk], fields=[a0, a1, a2, 
a3, b0, b2, b1])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, 
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a1, a2]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a0, a1, a2, a3])
+   +- Exchange(distribution=[hash[b1, b2]])
+      +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testSourceWithAllRowKinds">
+    <Resource name="sql">
+      <![CDATA[insert into snk select * from src1 join cdc_src on src1.a1 = 
cdc_src.b1 and src1.a2 = cdc_src.b2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, 
a3, b0, b2, b1])
++- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
+   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, cdc_src]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Sink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3, b0, 
b2, b1])
++- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, 
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
+   :- Exchange(distribution=[hash[a1, a2]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a0, a1, a2, a3])
+   +- Exchange(distribution=[hash[b1, b2]])
+      +- TableSourceScan(table=[[default_catalog, default_database, cdc_src]], 
fields=[b0, b2, b1])
 ]]>
     </Resource>
   </TestCase>
@@ -708,27 +890,27 @@ Sink(table=[default_catalog.default_database.tmp_snk], 
fields=[a0, a1, a2, a3, b
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRetractSink">
+  <TestCase name="testPKNotContainJoinKeyAndTwoSourcesNoDelete">
     <Resource name="sql">
-      <![CDATA[insert into retract_snk select * from src1 join src2 on src1.a1 
= src2.b1 and src1.a2 = src2.b2]]>
+      <![CDATA[insert into snk_for_cdc_src select * from no_delete_src1 join 
no_delete_src2 on a0 = b0 and a1 = b1 and a2 = b2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalSink(table=[default_catalog.default_database.retract_snk], fields=[a0, 
a1, a2, a3, b0, b2, b1])
+LogicalSink(table=[default_catalog.default_database.snk_for_cdc_src], 
fields=[a0, a1, a2, a3, b0, b2, b1])
 +- LogicalProject(a0=[$0], a1=[$1], a2=[$2], a3=[$3], b0=[$4], b2=[$5], 
b1=[$6])
-   +- LogicalJoin(condition=[AND(=($1, $6), =($2, $5))], joinType=[inner])
-      :- LogicalTableScan(table=[[default_catalog, default_database, src1]])
-      +- LogicalTableScan(table=[[default_catalog, default_database, src2]])
+   +- LogicalJoin(condition=[AND(=($0, $4), =($1, $6), =($2, $5))], 
joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, 
no_delete_src1]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, 
no_delete_src2]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-Sink(table=[default_catalog.default_database.retract_snk], fields=[a0, a1, a2, 
a3, b0, b2, b1])
-+- Join(joinType=[InnerJoin], where=[AND(=(a1, b1), =(a2, b2))], select=[a0, 
a1, a2, a3, b0, b2, b1], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
-   :- Exchange(distribution=[hash[a1, a2]])
-   :  +- TableSourceScan(table=[[default_catalog, default_database, src1]], 
fields=[a0, a1, a2, a3])
-   +- Exchange(distribution=[hash[b1, b2]])
-      +- TableSourceScan(table=[[default_catalog, default_database, src2]], 
fields=[b0, b2, b1])
+Sink(table=[default_catalog.default_database.snk_for_cdc_src], fields=[a0, a1, 
a2, a3, b0, b2, b1], changelogMode=[NONE])
++- Join(joinType=[InnerJoin], where=[AND(=(a0, b0), =(a1, b1), =(a2, b2))], 
select=[a0, a1, a2, a3, b0, b2, b1], leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey], changelogMode=[I,UB,UA])
+   :- Exchange(distribution=[hash[a0, a1, a2]], changelogMode=[I,UB,UA])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_src1]], fields=[a0, a1, a2, a3], changelogMode=[I,UB,UA])
+   +- Exchange(distribution=[hash[b0, b1, b2]], changelogMode=[I,UB,UA])
+      +- TableSourceScan(table=[[default_catalog, default_database, 
no_delete_src2]], fields=[b0, b2, b1], changelogMode=[I,UB,UA])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
index 9ed9ddaee8d..7cb04376bba 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.JMap
 import org.apache.flink.table.planner.utils.{TableTestBase, 
TestingTableEnvironment}
 
 import org.assertj.core.api.Assertions.assertThatThrownBy
+import org.assertj.core.util.Maps
 import org.junit.jupiter.api.{BeforeEach, Test}
 
 import java.util.{Collections, HashMap => JHashMap}
@@ -76,6 +77,33 @@ class DeltaJoinTest extends TableTestBase {
         .build()
     )
 
+    addTable(
+      "no_delete_src1",
+      Schema
+        .newBuilder()
+        .column("a0", DataTypes.INT.notNull)
+        .column("a1", DataTypes.DOUBLE.notNull)
+        .column("a2", DataTypes.STRING.notNull)
+        .column("a3", DataTypes.INT)
+        .primaryKey("a0", "a1", "a2")
+        .index("a1", "a2")
+        .build(),
+      Maps.newHashMap("changelog-mode", "I,UA,UB")
+    )
+
+    addTable(
+      "no_delete_src2",
+      Schema
+        .newBuilder()
+        .column("b0", DataTypes.INT)
+        .column("b2", DataTypes.STRING.notNull)
+        .column("b1", DataTypes.DOUBLE.notNull)
+        .primaryKey("b1", "b2")
+        .index("b2")
+        .build(),
+      Maps.newHashMap("changelog-mode", "I,UA,UB")
+    )
+
     addTable(
       "snk",
       Schema
@@ -90,6 +118,21 @@ class DeltaJoinTest extends TableTestBase {
         .primaryKey("l0", "r0")
         .build()
     )
+
+    addTable(
+      "snk_for_cdc_src",
+      Schema
+        .newBuilder()
+        .column("l0", DataTypes.INT.notNull)
+        .column("l1", DataTypes.DOUBLE.notNull)
+        .column("l2", DataTypes.STRING.notNull)
+        .column("l3", DataTypes.INT)
+        .column("r0", DataTypes.INT)
+        .column("r2", DataTypes.STRING.notNull)
+        .column("r1", DataTypes.DOUBLE.notNull)
+        .primaryKey("l0", "l1", "l2", "r1", "r2")
+        .build()
+    )
   }
 
   @Test
@@ -188,6 +231,30 @@ class DeltaJoinTest extends TableTestBase {
         "where a3 > b0")
   }
 
+  @Test
+  def testFilterOnNonUpsertKeysAfterJoinWithCdcSourceWithoutDelete(): Unit = {
+    util.verifyRelPlanInsert(
+      "insert into snk_for_cdc_src select * from no_delete_src1 " +
+        "join no_delete_src2 " +
+        "on a1 = b1 " +
+        "and a2 = b2 " +
+        "where a3 > b0",
+      ExplainDetail.CHANGELOG_MODE
+    )
+  }
+
+  @Test
+  def testFilterOnUpsertKeysAfterJoinWithCdcSourceWithoutDelete(): Unit = {
+    util.verifyRelPlanInsert(
+      "insert into snk_for_cdc_src select * from no_delete_src1 " +
+        "join no_delete_src2 " +
+        "on a1 = b1 " +
+        "and a2 = b2 " +
+        "where a0 > b1",
+      ExplainDetail.CHANGELOG_MODE
+    )
+  }
+
   @Test
   def testMultiRootsWithReusingJoinView(): Unit = {
     util.tableConfig.set(
@@ -372,7 +439,7 @@ class DeltaJoinTest extends TableTestBase {
   }
 
   @Test
-  def testCdcSource(): Unit = {
+  def testSourceWithAllRowKinds(): Unit = {
     util.tableConfig.set(
       ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
       UpsertMaterialize.NONE)
@@ -387,6 +454,99 @@ class DeltaJoinTest extends TableTestBase {
         "and src1.a2 = cdc_src.b2")
   }
 
+  @Test
+  def testPKContainsJoinKeyAndTwoSourcesNoDelete(): Unit = {
+    util.verifyRelPlanInsert(
+      "insert into snk_for_cdc_src " +
+        "select * from no_delete_src1 join no_delete_src2 " +
+        "on a1 = b1 " +
+        "and a2 = b2",
+      ExplainDetail.CHANGELOG_MODE)
+  }
+
+  @Test
+  def testPKNotContainJoinKeyAndTwoSourcesNoDelete(): Unit = {
+    util.tableConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+      UpsertMaterialize.NONE)
+
+    util.verifyRelPlanInsert(
+      "insert into snk_for_cdc_src " +
+        "select * from no_delete_src1 join no_delete_src2 " +
+        "on a0 = b0 " +
+        "and a1 = b1 " +
+        "and a2 = b2",
+      ExplainDetail.CHANGELOG_MODE
+    )
+  }
+
+  @Test
+  def testPKContainJoinKeyAndOnlyOneSourceNoDelete(): Unit = {
+    util.tableEnv.executeSql("""
+                               |create table all_changelog_src with (
+                               | 'changelog-mode' = 'I,UA,UB,D'
+                               |) like no_delete_src1
+                               |""".stripMargin)
+
+    util.verifyRelPlanInsert(
+      "insert into snk_for_cdc_src " +
+        "select * from all_changelog_src join no_delete_src2 " +
+        "on a1 = b1 " +
+        "and a2 = b2",
+      ExplainDetail.CHANGELOG_MODE
+    )
+  }
+
+  @Test
+  def testPKContainsJoinKeyAndSourceNoUBAndD(): Unit = {
+    // FLINK-38489 Currently, ChangelogNormalize will always generate 
changelog mode with D,
+    // and Join with D cannot be optimized into Delta Join
+    util.tableEnv.executeSql("""
+                               |create table no_delete_and_update_before_src1 
with (
+                               | 'changelog-mode' = 'I,UA'
+                               |) like no_delete_src1
+                               |""".stripMargin)
+
+    util.tableEnv.executeSql("""
+                               |create table no_delete_and_update_before_src2 
with (
+                               | 'changelog-mode' = 'I,UA'
+                               |) like no_delete_src2
+                               |""".stripMargin)
+
+    util.verifyRelPlanInsert(
+      "insert into snk_for_cdc_src " +
+        "select * from no_delete_and_update_before_src1 " +
+        "join no_delete_and_update_before_src2 " +
+        "on a1 = b1 " +
+        "and a2 = b2",
+      ExplainDetail.CHANGELOG_MODE
+    )
+  }
+
+  @Test
+  def testJoinAppendOnlySourceAndSourceWithoutDelete(): Unit = {
+    util.tableConfig.set(
+      ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE,
+      UpsertMaterialize.NONE)
+
+    util.tableEnv.executeSql("""
+                               |create table tmp_snk (
+                               | primary key (r1, r2) not enforced
+                               |) like snk_for_cdc_src (
+                               |  EXCLUDING CONSTRAINTS
+                               |)
+                               |""".stripMargin)
+
+    // the join could not be converted into the delta join
+    // because the upsert key of the join is `null`
+    util.verifyRelPlanInsert(
+      "insert into tmp_snk " +
+        "select * from src1 join no_delete_src2 " +
+        "on a1 = b1 " +
+        "and a2 = b2",
+      ExplainDetail.CHANGELOG_MODE)
+  }
+
   @Test
   def testSourceWithSourceAbilities(): Unit = {
     util.tableEnv.executeSql(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
index 063fe973c67..0c5d881d660 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/DeltaJoinITCase.scala
@@ -30,13 +30,14 @@ import 
org.apache.flink.table.planner.runtime.utils.{FailingCollectionSource, St
 import 
org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension,
 Parameters}
 import org.apache.flink.types.Row
 
-import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+import org.assertj.core.api.Assertions.assertThat
 import org.junit.jupiter.api.{BeforeEach, TestTemplate}
 import org.junit.jupiter.api.extension.ExtendWith
 
 import javax.annotation.Nullable
 
 import java.time.LocalDateTime
+import java.util.Objects.requireNonNull
 import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConversions._
@@ -81,7 +82,18 @@ class DeltaJoinITCase(enableCache: Boolean) extends 
StreamingTestBase {
       "+I[1.0, 1, 2021-01-01T01:01:01, 1, 1.0, 2021-01-01T01:01:01]",
       "+I[2.0, 2, 2022-02-02T02:02:02, 2, 2.0, 2022-02-02T02:02:22]"
     )
-    testUpsertResult(List("a1"), List("b1"), data1, data2, "a1 = b1", 
expected, 6)
+
+    testUpsertResult(
+      newTestSpecBuilder()
+        .withLeftIndex(List("a1"))
+        .withRightIndex(List("b1"))
+        .withLeftData(data1)
+        .withRightData(data2)
+        .withJoinCondition("a1 = b1")
+        .withSinkPk(List("l0", "r0"))
+        .withExpectedData(expected)
+        .withExpectedLookupFunctionInvokeCount(6)
+        .build())
   }
 
   @TestTemplate
@@ -106,34 +118,18 @@ class DeltaJoinITCase(enableCache: Boolean) extends 
StreamingTestBase {
       "+I[2.0, 2, 2022-02-02T02:02:02, 2, 2.0, 2022-02-02T02:02:02]",
       "+I[1.0, 1, 2021-01-01T01:01:01, 1, 1.0, 2021-01-01T01:01:01]"
     )
-    testUpsertResult(List("a1"), List("b1"), data1, data2, "a1 = b1 and a2 = 
b2", expected, 6)
-  }
-
-  @TestTemplate
-  def testJoinKeyNotContainsIndex(): Unit = {
-    val data1 = List(
-      changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 
1, 1, 1, 1, 1)),
-      changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 
2, 2, 2, 2, 2)),
-      // mismatch
-      changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2023, 
3, 3, 3, 3, 3))
-    )
 
-    val data2 = List(
-      changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 
1, 1, 1, 1, 11)),
-      changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022, 
2, 2, 2, 2, 22)),
-      // mismatch
-      changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 
2, 2, 2, 2, 2))
-    )
-
-    // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the 
RowKind from
-    // "+U" to "+I"
-    val expected = List("+I[1.0, 1, 2022-02-02T02:02:02, 1, 1.0, 
2022-02-02T02:02:22]")
-
-    // could not optimize into delta join because join keys do not contain 
indexes strictly
-    assertThatThrownBy(
-      () =>
-        testUpsertResult(List("a0", "a1"), List("b0", "b1"), data1, data2, "a1 
= b1", expected, 6))
-      .hasMessageContaining("The current sql doesn't support to do delta join 
optimization.")
+    testUpsertResult(
+      newTestSpecBuilder()
+        .withLeftIndex(List("a1"))
+        .withRightIndex(List("b1"))
+        .withLeftData(data1)
+        .withRightData(data2)
+        .withJoinCondition("a1 = b1 and a2 = b2")
+        .withSinkPk(List("l0", "r0"))
+        .withExpectedData(expected)
+        .withExpectedLookupFunctionInvokeCount(6)
+        .build())
   }
 
   @TestTemplate
@@ -157,14 +153,18 @@ class DeltaJoinITCase(enableCache: Boolean) extends 
StreamingTestBase {
     val expected = List(
       "+I[1.0, 1, 2022-02-02T02:02:02, 1, 1.0, 2022-02-02T02:02:22]"
     )
+
     testUpsertResult(
-      List("a1"),
-      List("b1"),
-      data1,
-      data2,
-      "a1 = b1",
-      expected,
-      if (enableCache) 4 else 6)
+      newTestSpecBuilder()
+        .withLeftIndex(List("a1"))
+        .withRightIndex(List("b1"))
+        .withLeftData(data1)
+        .withRightData(data2)
+        .withJoinCondition("a1 = b1")
+        .withSinkPk(List("l0", "r0"))
+        .withExpectedData(expected)
+        .withExpectedLookupFunctionInvokeCount(if (enableCache) 4 else 6)
+        .build())
   }
 
   @TestTemplate
@@ -192,13 +192,17 @@ class DeltaJoinITCase(enableCache: Boolean) extends 
StreamingTestBase {
       "+I[1.0, 2, 2021-01-01T01:01:01, 2, 1.0, 2021-01-01T01:01:01]"
     )
     testUpsertResult(
-      List("a1"),
-      List("b1"),
-      data1,
-      data2,
-      "a1 = b1 and a2 = b2",
-      expected,
-      if (enableCache) 4 else 6)
+      newTestSpecBuilder()
+        .withLeftIndex(List("a1"))
+        .withRightIndex(List("b1"))
+        .withLeftData(data1)
+        .withRightData(data2)
+        .withJoinCondition("a1 = b1 and a2 = b2")
+        .withSinkPk(List("l0", "r0"))
+        .withExpectedData(expected)
+        .withExpectedLookupFunctionInvokeCount(if (enableCache) 4 else 6)
+        .build()
+    )
   }
 
   @TestTemplate
@@ -222,99 +226,147 @@ class DeltaJoinITCase(enableCache: Boolean) extends 
StreamingTestBase {
     val expected = List("+I[2.0, 2, 2023-03-03T03:03:03, 2, 2.0, 
2022-02-02T02:02:22]")
 
     testUpsertResult(
-      List("a0"),
-      List("b0"),
-      data1,
-      data2,
-      "a0 = b0 and a1 = b1 and a2 > b2",
-      expected,
-      6)
+      newTestSpecBuilder()
+        .withLeftIndex(List("a0"))
+        .withRightIndex(List("b0"))
+        .withLeftData(data1)
+        .withRightData(data2)
+        .withJoinCondition("a0 = b0 and a1 = b1 and a2 > b2")
+        .withSinkPk(List("l0", "r0"))
+        .withExpectedData(expected)
+        .withExpectedLookupFunctionInvokeCount(6)
+        .build())
   }
 
   @TestTemplate
-  def testWithNonEquiCondition2(): Unit = {
+  def testCdcSourceWithoutDelete(): Unit = {
     val data1 = List(
+      // pk1
       changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 
1, 1, 1, 1, 1)),
-      changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2023, 
3, 3, 3, 3, 3)),
+      changelogRow("-U", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 
1, 1, 1, 1, 1)),
+      changelogRow("+U", Double.box(11.0), Int.box(1), LocalDateTime.of(2021, 
1, 1, 1, 1, 11)),
+      // pk2
+      changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 
2, 2, 2, 2, 2)),
+      changelogRow("-U", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 
2, 2, 2, 2, 2)),
+      changelogRow("+U", Double.box(22.0), Int.box(2), LocalDateTime.of(2022, 
2, 2, 2, 2, 22)),
+      // mismatch
+      changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2023, 
3, 3, 3, 3, 3))
+    )
+
+    val data2 = List(
+      // pk1
+      changelogRow("+I", Int.box(1), Double.box(12.0), LocalDateTime.of(2021, 
1, 1, 1, 1, 12)),
+      changelogRow("-U", Int.box(1), Double.box(12.0), LocalDateTime.of(2021, 
1, 1, 1, 1, 12)),
+      changelogRow("+U", Int.box(1), Double.box(13.0), LocalDateTime.of(2021, 
1, 1, 1, 1, 13)),
+      // pk2
+      changelogRow("+I", Int.box(2), Double.box(22.0), LocalDateTime.of(2022, 
2, 2, 2, 2, 22)),
+      changelogRow("-U", Int.box(2), Double.box(22.0), LocalDateTime.of(2022, 
2, 2, 2, 2, 22)),
+      changelogRow("+U", Int.box(2), Double.box(23.0), LocalDateTime.of(2022, 
2, 2, 2, 2, 23)),
+      // mismatch
+      changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 
2, 2, 2, 2, 2))
+    )
+
+    // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the 
RowKind from
+    // "+U" to "+I"
+    val expected = List(
+      "+I[11.0, 1, 2021-01-01T01:01:11, 1, 13.0, 2021-01-01T01:01:13]",
+      "+I[22.0, 2, 2022-02-02T02:02:22, 2, 23.0, 2022-02-02T02:02:23]"
+    )
+
+    testUpsertResult(
+      newTestSpecBuilder()
+        .withLeftIndex(List("a0"))
+        .withRightIndex(List("b0"))
+        .withLeftPk(List("a0"))
+        .withRightPk(List("b0"))
+        .withSinkPk(List("l0", "r0"))
+        .withLeftChangelogMode("I,UA,UB")
+        .withRightChangelogMode("I,UA,UB")
+        .withLeftData(data1)
+        .withRightData(data2)
+        .withJoinCondition("a0 = b0")
+        .withExpectedData(expected)
+        .withExpectedLookupFunctionInvokeCount(if (enableCache) 6 else 10)
+        .build())
+  }
+
+  @TestTemplate
+  def testFilterFieldsAfterJoin(): Unit = {
+    val data1 = List(
+      changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 
1, 1, 1, 1, 1)),
+      changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 
2, 2, 2, 2, 2)),
       // mismatch
       changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033, 
3, 3, 3, 3, 3))
     )
 
     val data2 = List(
       changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 
1, 1, 1, 1, 11)),
-      changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022, 
2, 2, 2, 2, 22)),
+      changelogRow("+I", Int.box(2), Double.box(3.0), LocalDateTime.of(2022, 
2, 2, 2, 2, 33)),
       // mismatch
       changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 
2, 2, 2, 2, 2))
     )
 
     // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the 
RowKind from
     // "+U" to "+I"
-    val expected = List("+I[2.0, 2, 2023-03-03T03:03:03, 2, 2.0, 
2022-02-02T02:02:22]")
+    val expected = List("+I[2.0, 2, 2022-02-02T02:02:02, 2, 3.0, 
2022-02-02T02:02:33]")
 
-    // could not optimize into delta join because there is calc between join 
and source
-    assertThatThrownBy(
-      () =>
-        testUpsertResult(
-          List("a0"),
-          List("b0"),
-          data1,
-          data2,
-          "a0 = b0 and a1 = b1 and a2 > TO_TIMESTAMP('2021-01-01 01:01:11')",
-          expected,
-          6))
-      .hasMessageContaining("The current sql doesn't support to do delta join 
optimization.")
-
-    // could not optimize into delta join because there is calc between join 
and source
-    assertThatThrownBy(
-      () =>
-        testUpsertResult(
-          List("a0"),
-          List("b0"),
-          data1,
-          data2,
-          "a0 = b0 and b1 > 1.0",
-          expected,
-          12))
-      .hasMessageContaining("The current sql doesn't support to do delta join 
optimization.")
+    testUpsertResult(
+      newTestSpecBuilder()
+        .withLeftIndex(List("a0"))
+        .withRightIndex(List("b0"))
+        .withLeftData(data1)
+        .withRightData(data2)
+        .withJoinCondition("a0 = b0")
+        .withFilterAfterJoin("a1 <> b1")
+        .withSinkPk(List("l0", "r0"))
+        .withExpectedData(expected)
+        .withExpectedLookupFunctionInvokeCount(6)
+        .build())
   }
 
   @TestTemplate
-  def testFilterFieldsBeforeJoin(): Unit = {
+  def testFilterFieldsAfterJoinWithCdcSourceWithoutDelete(): Unit = {
     val data1 = List(
+      // pk1
       changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 
1, 1, 1, 1, 1)),
-      changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 
2, 2, 2, 2, 2)),
+      changelogRow("-U", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 
1, 1, 1, 1, 1)),
+      changelogRow("+U", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 
1, 1, 1, 1, 2)),
+      // pk2
+      changelogRow("+I", Double.box(2.0), Int.box(3), LocalDateTime.of(2022, 
2, 2, 2, 2, 2)),
+      changelogRow("-U", Double.box(2.0), Int.box(3), LocalDateTime.of(2022, 
2, 2, 2, 2, 2)),
+      changelogRow("+U", Double.box(2.0), Int.box(3), LocalDateTime.of(2022, 
2, 2, 2, 2, 3)),
       // mismatch
       changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033, 
3, 3, 3, 3, 3))
     )
 
     val data2 = List(
+      // pk1
       changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 
1, 1, 1, 1, 11)),
-      changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022, 
2, 2, 2, 2, 22)),
+      changelogRow("+I", Int.box(3), Double.box(2.0), LocalDateTime.of(2022, 
2, 2, 2, 2, 22)),
       // mismatch
       changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 
2, 2, 2, 2, 2))
     )
 
     // TestValuesRuntimeFunctions#KeyedUpsertingSinkFunction will change the 
RowKind from
     // "+U" to "+I"
-    val expected = List("+I[1.0, 1, 2021-01-01T01:01:01, 1, 1.0, 
2021-01-01T01:01:11]")
-
-    // could not optimize into delta join because there is calc between join 
and source
-    assertThatThrownBy(
-      () =>
-        testUpsertResult(
-          List("a0"),
-          List("b0"),
-          data1,
-          data2,
-          "a0 = b0 and a1 = b1 and a2 = TO_TIMESTAMP('2021-01-01 01:01:01')",
-          expected,
-          6))
-      .hasMessageContaining("The current sql doesn't support to do delta join 
optimization.")
-
-    // could not optimize into delta join because there is calc between join 
and source
-    assertThatThrownBy(
-      () => testUpsertResult(List(), List(), data1, data2, "a0 = b0 and b1 = 
1.0", expected, 12))
-      .hasMessageContaining("The current sql doesn't support to do delta join 
optimization.")
+    val expected = List("+I[2.0, 3, 2022-02-02T02:02:03, 3, 2.0, 
2022-02-02T02:02:22]")
+
+    testUpsertResult(
+      newTestSpecBuilder()
+        .withLeftIndex(List("a0"))
+        .withRightIndex(List("b0"))
+        .withLeftPk(List("a0", "a1"))
+        .withRightPk(List("b0", "b1"))
+        .withSinkPk(List("l0", "r0", "l1", "r1"))
+        .withLeftData(data1)
+        .withRightData(data2)
+        .withLeftChangelogMode("I,UA,UB")
+        .withRightChangelogMode("I,UA,UB")
+        .withJoinCondition("a0 = b0")
+        .withFilterAfterJoin("a1 < b0")
+        .withExpectedData(expected)
+        .withExpectedLookupFunctionInvokeCount(if (enableCache) 5 else 8)
+        .build())
   }
 
   @TestTemplate
@@ -363,85 +415,6 @@ class DeltaJoinITCase(enableCache: Boolean) extends 
StreamingTestBase {
     assertThat(AsyncTestValueLookupFunction.invokeCount.get()).isEqualTo(6)
   }
 
-  @TestTemplate
-  def testProjectFieldsBeforeJoin(): Unit = {
-    val data1 = List(
-      changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 
1, 1, 1, 1, 1)),
-      changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 
2, 2, 2, 2, 2)),
-      // mismatch
-      changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033, 
3, 3, 3, 3, 3))
-    )
-
-    val data2 = List(
-      changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 
1, 1, 1, 1, 11)),
-      changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022, 
2, 2, 2, 2, 22)),
-      // mismatch
-      changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 
2, 2, 2, 2, 2))
-    )
-
-    prepareTable(List("a0"), List("b0"), data1, data2)
-    tEnv.executeSql(s"""
-                       |create table projectedSink(
-                       |  l0 int,
-                       |  r0 int,
-                       |  l1 double,
-                       |  l2 timestamp(3),
-                       |  primary key(l0, r0) not enforced
-                       |) with (
-                       |  'connector' = 'values',
-                       |  'bounded' = 'false',
-                       |  'sink-insert-only' = 'false'
-                       |)
-                       |""".stripMargin)
-
-    // could not optimize into delta join
-    // because there is ProjectPushDownSpec between join and source
-    assertThatThrownBy(() => tEnv.executeSql("""
-                                               |insert into projectedSink
-                                               |  select
-                                               |    testLeft.a0,
-                                               |    testRight.b0,
-                                               |    testLeft.a1,
-                                               |    testLeft.a2
-                                               |    from testLeft
-                                               |  join testRight
-                                               |    on a0 = b0
-                                               |""".stripMargin))
-      .hasMessageContaining("The current sql doesn't support to do delta join 
optimization.")
-  }
-
-  @TestTemplate
-  def testProjectFieldsBeforeJoin2(): Unit = {
-    val data1 = List(
-      changelogRow("+I", Double.box(1.0), Int.box(1), LocalDateTime.of(2021, 
1, 1, 1, 1, 1)),
-      changelogRow("+I", Double.box(2.0), Int.box(2), LocalDateTime.of(2022, 
2, 2, 2, 2, 2)),
-      // mismatch
-      changelogRow("+I", Double.box(3.0), Int.box(3), LocalDateTime.of(2033, 
3, 3, 3, 3, 3))
-    )
-
-    val data2 = List(
-      changelogRow("+I", Int.box(1), Double.box(1.0), LocalDateTime.of(2021, 
1, 1, 1, 1, 11)),
-      changelogRow("+I", Int.box(2), Double.box(2.0), LocalDateTime.of(2022, 
2, 2, 2, 2, 22)),
-      // mismatch
-      changelogRow("+I", Int.box(99), Double.box(99.0), LocalDateTime.of(2099, 
2, 2, 2, 2, 2))
-    )
-
-    prepareTable(List("a0"), List("b0"), data1, data2)
-
-    // could not optimize into delta join because there is calc between join 
and source
-    assertThatThrownBy(
-      () =>
-        tEnv
-          .executeSql(
-            s"insert into testSnk(l0, l1, l2, r0, r1, r2) " +
-              "select * from ( " +
-              "  select a0, a1, a2 from testLeft" +
-              ") join testRight " +
-              "on a1 = b1")
-          .await())
-      .hasMessageContaining("The current sql doesn't support to do delta join 
optimization.")
-  }
-
   @TestTemplate
   def testFailOverAndRestore(): Unit = {
     // enable checkpoint, we are using failing source to force have a complete 
checkpoint
@@ -468,14 +441,16 @@ class DeltaJoinITCase(enableCache: Boolean) extends 
StreamingTestBase {
     val expected = List("+I[2.0, 2, 2022-02-02T02:02:02, 2, 2.0, 
2022-02-02T02:02:22]")
 
     testUpsertResult(
-      List("a0"),
-      List("b0"),
-      data1,
-      data2,
-      "a0 = b0 and a1 = b1",
-      expected,
-      null,
-      testFailingSource = true)
+      newTestSpecBuilder()
+        .withLeftIndex(List("a0"))
+        .withRightIndex(List("b0"))
+        .withLeftData(data1)
+        .withRightData(data2)
+        .withJoinCondition("a0 = b0 and a1 = b1")
+        .withSinkPk(List("l0", "r0"))
+        .withExpectedData(expected)
+        .withTestFailingSource(true)
+        .build())
   }
 
   /** TODO add index in DDL. */
@@ -510,26 +485,35 @@ class DeltaJoinITCase(enableCache: Boolean) extends 
StreamingTestBase {
     catalog.createTable(tablePath, newResolvedTable, false)
   }
 
-  private def testUpsertResult(
-      leftIndex: List[String],
-      rightIndex: List[String],
-      leftData: List[Row],
-      rightData: List[Row],
-      joinKeyStr: String,
-      expected: List[String],
-      @Nullable expectedLookupFunctionInvokeCount: Integer,
-      testFailingSource: Boolean = false): Unit = {
-    prepareTable(leftIndex, rightIndex, leftData, rightData, testFailingSource)
+  private def testUpsertResult(testSpec: TestSpec): Unit = {
+    prepareTable(
+      testSpec.leftIndex,
+      testSpec.rightIndex,
+      testSpec.leftPk.orNull,
+      testSpec.rightPk.orNull,
+      testSpec.sinkPk,
+      testSpec.leftData,
+      testSpec.rightData,
+      testSpec.testFailingSource,
+      testSpec.leftChangelogMode,
+      testSpec.rightChangelogMode
+    )
 
+    val sql =
+      s"""
+         | insert into testSnk
+         | select * from testLeft join testRight on ${testSpec.joinCondition}
+         | ${if (testSpec.filterAfterJoin.isEmpty) "" else s"where 
${testSpec.filterAfterJoin.get}"}
+         |""".stripMargin
     tEnv
-      .executeSql(s"insert into testSnk select * from testLeft join testRight 
on $joinKeyStr")
+      .executeSql(sql)
       .await(60, TimeUnit.SECONDS)
     val result = TestValuesTableFactory.getResultsAsStrings("testSnk")
 
-    assertThat(result.sorted).isEqualTo(expected.sorted)
-    if (expectedLookupFunctionInvokeCount != null) {
+    assertThat(result.sorted).isEqualTo(testSpec.expected.sorted)
+    if (testSpec.expectedLookupFunctionInvokeCount.isDefined) {
       assertThat(AsyncTestValueLookupFunction.invokeCount.get())
-        .isEqualTo(expectedLookupFunctionInvokeCount)
+        .isEqualTo(testSpec.expectedLookupFunctionInvokeCount.get)
     }
   }
 
@@ -537,40 +521,67 @@ class DeltaJoinITCase(enableCache: Boolean) extends 
StreamingTestBase {
       leftIndex: List[String],
       rightIndex: List[String],
       leftData: List[Row],
+      rightData: List[Row]): Unit = {
+    prepareTable(
+      leftIndex,
+      rightIndex,
+      null,
+      null,
+      List("l0", "r0"),
+      leftData,
+      rightData,
+      testFailingSource = false,
+      "I",
+      "I")
+  }
+
+  private def prepareTable(
+      leftIndex: List[String],
+      rightIndex: List[String],
+      @Nullable leftPk: List[String],
+      @Nullable rightPk: List[String],
+      sinkPk: List[String],
+      leftData: List[Row],
       rightData: List[Row],
-      testFailingSource: Boolean = false): Unit = {
+      testFailingSource: Boolean,
+      leftChangelogMode: String,
+      rightChangelogMode: String): Unit = {
     tEnv.executeSql("drop table if exists testLeft")
-    tEnv.executeSql(s"""
-                       |create table testLeft(
-                       |  a1 double,
-                       |  a0 int,
-                       |  a2 timestamp(3)
-                       |) with (
-                       |  'connector' = 'values',
-                       |  'bounded' = 'false',
-                       |  'changelog-mode' = 'I',
-                       |  'data-id' = 
'${TestValuesTableFactory.registerData(leftData)}',
-                       |  'async' = 'true',
-                       |  'failing-source' = '$testFailingSource'
-                       |)
-                       |""".stripMargin)
+    tEnv.executeSql(
+      s"""
+         |create table testLeft(
+         |  a1 double,
+         |  a0 int,
+         |  a2 timestamp(3)
+         |  ${if (leftPk == null) "" else s", primary key 
(${leftPk.mkString(",")}) not enforced"}
+         |) with (
+         |  'connector' = 'values',
+         |  'bounded' = 'false',
+         |  'changelog-mode' = '$leftChangelogMode',
+         |  'data-id' = '${TestValuesTableFactory.registerData(leftData)}',
+         |  'async' = 'true',
+         |  'failing-source' = '$testFailingSource'
+         |)
+         |""".stripMargin)
     addIndex("testLeft", leftIndex)
 
     tEnv.executeSql("drop table if exists testRight")
-    tEnv.executeSql(s"""
-                       |create table testRight(
-                       |  b0 int,
-                       |  b1 double,
-                       |  b2 timestamp(3)
-                       |) with (
-                       |  'connector' = 'values',
-                       |  'bounded' = 'false',
-                       |  'changelog-mode' = 'I',
-                       |  'data-id' = 
'${TestValuesTableFactory.registerData(rightData)}',
-                       |  'async' = 'true',
-                       |  'failing-source' = '$testFailingSource'
-                       |)
-                       |""".stripMargin)
+    tEnv.executeSql(
+      s"""
+         |create table testRight(
+         |  b0 int,
+         |  b1 double,
+         |  b2 timestamp(3)
+         |  ${if (rightPk == null) "" else s", primary key 
(${rightPk.mkString(",")}) not enforced"}
+         |) with (
+         |  'connector' = 'values',
+         |  'bounded' = 'false',
+         |  'changelog-mode' = '$rightChangelogMode',
+         |  'data-id' = '${TestValuesTableFactory.registerData(rightData)}',
+         |  'async' = 'true',
+         |  'failing-source' = '$testFailingSource'
+         |)
+         |""".stripMargin)
     addIndex("testRight", rightIndex)
 
     tEnv.executeSql("drop table if exists testSnk")
@@ -582,7 +593,7 @@ class DeltaJoinITCase(enableCache: Boolean) extends 
StreamingTestBase {
                        |  r0 int,
                        |  r1 double,
                        |  r2 timestamp(3),
-                       |  primary key(l0, r0) not enforced
+                       |  primary key(${sinkPk.mkString(",")}) not enforced
                        |) with (
                        |  'connector' = 'values',
                        |  'bounded' = 'false',
@@ -591,6 +602,133 @@ class DeltaJoinITCase(enableCache: Boolean) extends 
StreamingTestBase {
                        |""".stripMargin)
   }
 
+  private def newTestSpecBuilder(): TestSpecBuilder = {
+    new TestSpecBuilder
+  }
+
+  private case class TestSpec(
+      leftIndex: List[String],
+      rightIndex: List[String],
+      leftPk: Option[List[String]],
+      rightPk: Option[List[String]],
+      sinkPk: List[String],
+      leftData: List[Row],
+      rightData: List[Row],
+      joinCondition: String,
+      filterAfterJoin: Option[String],
+      expected: List[String],
+      expectedLookupFunctionInvokeCount: Option[Int],
+      testFailingSource: Boolean,
+      leftChangelogMode: String,
+      rightChangelogMode: String
+  )
+
+  private class TestSpecBuilder {
+    private var leftIndex: Option[List[String]] = None
+    private var rightIndex: Option[List[String]] = None
+    private var leftPk: Option[List[String]] = None
+    private var rightPk: Option[List[String]] = None
+    private var sinkPk: Option[List[String]] = None
+    private var joinCondition: Option[String] = None
+    private var filterAfterJoin: Option[String] = None
+    private var leftData: Option[List[Row]] = None
+    private var rightData: Option[List[Row]] = None
+    private var expectedData: Option[List[String]] = None
+    private var expectedLookupFunctionInvokeCount: Option[Int] = None
+    private var testFailingSource: Boolean = false
+    private var leftChangelogMode: String = "I"
+    private var rightChangelogMode: String = "I"
+
+    def withLeftIndex(index: List[String]): TestSpecBuilder = {
+      leftIndex = Some(requireNonNull(index))
+      this
+    }
+
+    def withRightIndex(index: List[String]): TestSpecBuilder = {
+      rightIndex = Some(requireNonNull(index))
+      this
+    }
+
+    def withLeftPk(pk: List[String]): TestSpecBuilder = {
+      leftPk = Some(requireNonNull(pk))
+      this
+    }
+
+    def withRightPk(pk: List[String]): TestSpecBuilder = {
+      rightPk = Some(requireNonNull(pk))
+      this
+    }
+
+    def withSinkPk(pk: List[String]): TestSpecBuilder = {
+      sinkPk = Some(requireNonNull(pk))
+      this
+    }
+
+    def withLeftData(data: List[Row]): TestSpecBuilder = {
+      leftData = Some(requireNonNull(data))
+      this
+    }
+
+    def withRightData(data: List[Row]): TestSpecBuilder = {
+      rightData = Some(requireNonNull(data))
+      this
+    }
+
+    def withJoinCondition(condition: String): TestSpecBuilder = {
+      joinCondition = Some(requireNonNull(condition))
+      this
+    }
+
+    def withFilterAfterJoin(filter: String): TestSpecBuilder = {
+      filterAfterJoin = Some(requireNonNull(filter))
+      this
+    }
+
+    def withExpectedData(expected: List[String]): TestSpecBuilder = {
+      this.expectedData = Some(requireNonNull(expected))
+      this
+    }
+
+    def withExpectedLookupFunctionInvokeCount(count: Int): TestSpecBuilder = {
+      expectedLookupFunctionInvokeCount = Some(requireNonNull(count))
+      this
+    }
+
+    def withTestFailingSource(flag: Boolean): TestSpecBuilder = {
+      testFailingSource = requireNonNull(flag)
+      this
+    }
+
+    def withLeftChangelogMode(mode: String): TestSpecBuilder = {
+      leftChangelogMode = requireNonNull(mode)
+      this
+    }
+
+    def withRightChangelogMode(mode: String): TestSpecBuilder = {
+      rightChangelogMode = requireNonNull(mode)
+      this
+    }
+
+    def build(): TestSpec = {
+      TestSpec(
+        requireNonNull(leftIndex.orNull),
+        requireNonNull(rightIndex.orNull),
+        leftPk,
+        rightPk,
+        requireNonNull(sinkPk.orNull),
+        requireNonNull(leftData.orNull),
+        requireNonNull(rightData.orNull),
+        requireNonNull(joinCondition.orNull),
+        filterAfterJoin,
+        requireNonNull(expectedData.orNull),
+        expectedLookupFunctionInvokeCount,
+        testFailingSource,
+        leftChangelogMode,
+        rightChangelogMode
+      )
+    }
+
+  }
 }
 
 object DeltaJoinITCase {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java
index 45fc04b239c..2505550104b 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperator.java
@@ -357,8 +357,9 @@ public class StreamingDeltaJoinOperator
 
     private void processElement(StreamRecord<RowData> element, int inputIndex) 
throws Exception {
         Preconditions.checkArgument(
-                RowKind.INSERT == element.getValue().getRowKind(),
-                "Currently, delta join only supports to consume append only 
stream.");
+                RowKind.INSERT == element.getValue().getRowKind()
+                        || RowKind.UPDATE_AFTER == 
element.getValue().getRowKind(),
+                "Currently, delta join only supports to consume insert record 
or update after record.");
         tryProcess();
         StreamRecord<RowData> record;
         boolean isLeft = isLeft(inputIndex);
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java
index 17b286e9819..de52c8f2be2 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/deltajoin/StreamingDeltaJoinOperatorTest.java
@@ -35,7 +35,7 @@ import 
org.apache.flink.table.data.conversion.DataStructureConverter;
 import org.apache.flink.table.data.conversion.DataStructureConverters;
 import org.apache.flink.table.runtime.collector.TableFunctionCollector;
 import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
-import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
+import org.apache.flink.table.runtime.generated.GeneratedFunction;
 import org.apache.flink.table.runtime.generated.GeneratedResultFutureWrapper;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import 
org.apache.flink.table.runtime.operators.join.lookup.keyordered.AecRecord;
@@ -67,7 +67,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -83,6 +82,7 @@ import java.util.stream.Stream;
 
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.binaryrow;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -94,77 +94,9 @@ public class StreamingDeltaJoinOperatorTest {
     private static final int CACHE_SIZE = 10;
 
     // the data snapshot of the left/right table when joining
-    private static final LinkedList<RowData> leftTableCurrentData = new 
LinkedList<>();
-    private static final LinkedList<RowData> rightTableCurrentData = new 
LinkedList<>();
-
-    /**
-     * Mock sql like the following.
-     *
-     * <pre>
-     *      CREATE TABLE leftSrc(
-     *          left_value INT,
-     *          left_jk1 BOOLEAN,
-     *          left_jk2_lk STRING,
-     *          INDEX(left_jk2_lk)
-     *      )
-     * </pre>
-     *
-     * <pre>
-     *      CREATE TABLE rightSrc(
-     *          right_jk2 STRING,
-     *          right_value INT,
-     *          right_jk1_lk BOOLEAN,
-     *          INDEX(right_jk1_lk)
-     *      )
-     * </pre>
-     *
-     * <pre>
-     *     select * from leftSrc join rightSrc
-     *      on leftSrc.left_jk1 = rightSrc.right_jk1_lk
-     *      and leftSrc.left_jk2_lk = rightSrc.right_jk2
-     * </pre>
-     *
-     * <p>For right lookup table(left stream side delta join right table), the 
join key is
-     * [right_jk1_lk, right_jk2], and it will be split into lookup key 
[right_jk1_lk] and {@code
-     * DeltaJoinSpec#remainingCondition} [right_jk2].
-     *
-     * <p>For left lookup table(right stream side delta join left table), the 
join key is [left_jk1,
-     * left_jk2_lk], and it will be split into lookup key [left_jk2_lk] and 
{@code
-     * DeltaJoinSpec#remainingCondition} [left_jk1].
-     */
-
-    // left join key: <left_jk1, left_jk2_lk>
-    // left lookup key: <left_jk2_lk>
-    private static final RowType leftRowType =
-            RowType.of(
-                    new LogicalType[] {new IntType(), new BooleanType(), 
VarCharType.STRING_TYPE},
-                    new String[] {"left_value", "left_jk1", "left_jk2_lk"});
-
-    private static final InternalTypeInfo<RowData> leftTypeInfo = 
InternalTypeInfo.of(leftRowType);
-
-    private static final int[] leftJoinKeyIndices = new int[] {1, 2};
-
-    // right join key: <right_jk1_lk, right_jk2>
-    // right lookup key: <right_jk1_lk>
-    private static final RowType rightRowType =
-            RowType.of(
-                    new LogicalType[] {VarCharType.STRING_TYPE, new IntType(), 
new BooleanType()},
-                    new String[] {"right_jk2", "right_value", "right_jk1_lk"});
-
-    private static final InternalTypeInfo<RowData> rightTypeInfo =
-            InternalTypeInfo.of(rightRowType);
-
-    private static final int[] rightJoinKeyIndices = new int[] {2, 0};
-
-    private static final RowDataKeySelector leftJoinKeySelector =
-            HandwrittenSelectorUtil.getRowDataSelector(
-                    leftJoinKeyIndices, leftRowType.getChildren().toArray(new 
LogicalType[0]));
-    private static final RowDataKeySelector rightJoinKeySelector =
-            HandwrittenSelectorUtil.getRowDataSelector(
-                    rightJoinKeyIndices, 
rightRowType.getChildren().toArray(new LogicalType[0]));
-
-    private static final int[] outputFieldIndices =
-            IntStream.range(0, leftTypeInfo.getArity() + 
rightTypeInfo.getArity()).toArray();
+    // <upsert key, data>
+    private static final HashMap<RowData, RowData> leftTableCurrentData = new 
HashMap<>();
+    private static final HashMap<RowData, RowData> rightTableCurrentData = new 
HashMap<>();
 
     @Parameters(name = "EnableCache = {0}")
     public static List<Boolean> parameters() {
@@ -181,46 +113,6 @@ public class StreamingDeltaJoinOperatorTest {
 
     @BeforeEach
     public void beforeEach() throws Exception {
-        testHarness = createDeltaJoinOperatorTestHarness();
-        testHarness.setup();
-        testHarness.open();
-        StreamingDeltaJoinOperator operator = unwrapOperator(testHarness);
-        // set external failure cause consumer to prevent hang
-        testHarness
-                .getEnvironment()
-                .setExternalFailureCauseConsumer(
-                        error -> {
-                            latestException = Optional.of(error);
-                            // DO NOT throw exception up again to avoid hang
-                        });
-        operator.setAsyncExecutionController(
-                new 
MyAsyncExecutionControllerDelegate(operator.getAsyncExecutionController()));
-        prepareOperatorRuntimeInfo(operator);
-
-        assertor =
-                new RowDataHarnessAssertor(
-                        getOutputType().getChildren().toArray(new 
LogicalType[0]),
-                        // sort the result by the output upsert key
-                        (o1, o2) -> {
-                            for (int keyIndex : outputFieldIndices) {
-                                LogicalType type = 
getOutputType().getChildren().get(keyIndex);
-                                RowData.FieldGetter getter =
-                                        RowData.createFieldGetter(type, 
keyIndex);
-
-                                int compareResult =
-                                        
Objects.requireNonNull(getter.getFieldOrNull(o1))
-                                                .toString()
-                                                .compareTo(
-                                                        Objects.requireNonNull(
-                                                                        
getter.getFieldOrNull(o2))
-                                                                .toString());
-
-                                if (compareResult != 0) {
-                                    return compareResult;
-                                }
-                            }
-                            return o1.toString().compareTo(o2.toString());
-                        });
         MyAsyncFunction.leftInvokeCount.set(0);
         MyAsyncFunction.rightInvokeCount.set(0);
         MyAsyncExecutionControllerDelegate.insertTableDataAfterEmit = true;
@@ -228,7 +120,9 @@ public class StreamingDeltaJoinOperatorTest {
 
     @AfterEach
     public void afterEach() throws Exception {
-        testHarness.close();
+        if (assertor != null) {
+            testHarness.close();
+        }
         leftTableCurrentData.clear();
         rightTableCurrentData.clear();
         latestException = Optional.empty();
@@ -236,7 +130,11 @@ public class StreamingDeltaJoinOperatorTest {
     }
 
     @TestTemplate
-    void testJoinBothAppendOnlyTables() throws Exception {
+    void testJoinBothLogTables() throws Exception {
+        LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE;
+        initTestHarness(testSpec);
+        initAssertor(testSpec);
+
         StreamRecord<RowData> leftRecord1 = insertRecord(100, true, "jklk1");
         StreamRecord<RowData> leftRecord2 = insertRecord(100, false, "jklk2");
         testHarness.processElement1(leftRecord1);
@@ -293,6 +191,8 @@ public class StreamingDeltaJoinOperatorTest {
 
         DeltaJoinCache cache = unwrapCache(testHarness);
         if (enableCache) {
+            RowType leftRowType = testSpec.getLeftInputRowType();
+            RowType rightRowType = testSpec.getRightInputRowType();
             Map<RowData, Map<RowData, Object>> expectedLeftCacheData =
                     newHashMap(
                             binaryrow(true, "jklk1"),
@@ -339,8 +239,102 @@ public class StreamingDeltaJoinOperatorTest {
         }
     }
 
+    @TestTemplate
+    void testJoinBothPkTables() throws Exception {
+        PkPkTableJoinTestSpec testSpec = PkPkTableJoinTestSpec.INSTANCE;
+        initTestHarness(testSpec);
+        initAssertor(testSpec);
+
+        StreamRecord<RowData> leftRecordK1V1 = insertRecord(100, true, "Tom");
+        StreamRecord<RowData> leftRecordK2V1 = insertRecord(101, false, "Tom");
+        // mismatch
+        StreamRecord<RowData> leftRecordK3V1 = insertRecord(1999, false, 
"Jim");
+        testHarness.processElement1(leftRecordK1V1);
+        testHarness.processElement1(leftRecordK2V1);
+        testHarness.processElement1(leftRecordK3V1);
+
+        waitAllDataProcessed();
+        final ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+        assertor.assertOutputEqualsSorted(
+                "result mismatch", expectedOutput, testHarness.getOutput());
+
+        StreamRecord<RowData> rightRecordK1V1 = insertRecord("Tom", 200, true);
+        StreamRecord<RowData> rightRecordK2V1 = insertRecord("Tom", 201, 
false);
+        // mismatch
+        StreamRecord<RowData> rightRecordK3V1 = insertRecord("Sam", 2999, 
false);
+        testHarness.processElement2(rightRecordK1V1);
+        testHarness.processElement2(rightRecordK2V1);
+        testHarness.processElement2(rightRecordK3V1);
+
+        waitAllDataProcessed();
+        expectedOutput.add(insertRecord(100, true, "Tom", "Tom", 200, true));
+        expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 200, true));
+        expectedOutput.add(insertRecord(100, true, "Tom", "Tom", 201, false));
+        expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 201, false));
+        assertor.assertOutputEqualsSorted(
+                "result mismatch", expectedOutput, testHarness.getOutput());
+
+        StreamRecord<RowData> leftRecordK1V2 = updateAfterRecord(1000, true, 
"Tom");
+        testHarness.processElement1(leftRecordK1V2);
+
+        waitAllDataProcessed();
+        expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 200, 
true));
+        expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 201, 
false));
+        assertor.assertOutputEqualsSorted(
+                "result mismatch", expectedOutput, testHarness.getOutput());
+
+        StreamRecord<RowData> rightRecordK1V2 = updateAfterRecord("Tom", 2000, 
true);
+        StreamRecord<RowData> rightRecordK2V2 = updateAfterRecord("Tom", 2001, 
false);
+        testHarness.processElement2(rightRecordK1V2);
+        testHarness.processElement2(rightRecordK2V2);
+
+        waitAllDataProcessed();
+        expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 2000, 
true));
+        expectedOutput.add(updateAfterRecord(101, false, "Tom", "Tom", 2000, 
true));
+        expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 2001, 
false));
+        expectedOutput.add(updateAfterRecord(101, false, "Tom", "Tom", 2001, 
false));
+        assertor.assertOutputEqualsSorted(
+                "result mismatch", expectedOutput, testHarness.getOutput());
+
+        DeltaJoinCache cache = unwrapCache(testHarness);
+        if (enableCache) {
+            Map<RowData, Map<RowData, Object>> expectedLeftCacheData =
+                    newHashMap(
+                            binaryrow("Tom"),
+                            newHashMap(
+                                    binaryrow(true, "Tom"),
+                                    leftRecordK1V2.getValue(),
+                                    binaryrow(false, "Tom"),
+                                    leftRecordK2V1.getValue()),
+                            binaryrow("Sam"),
+                            Collections.emptyMap());
+
+            Map<RowData, Map<RowData, Object>> expectedRightCacheData =
+                    newHashMap(
+                            binaryrow("Tom"),
+                            newHashMap(
+                                    binaryrow("Tom", true),
+                                    rightRecordK1V2.getValue(),
+                                    binaryrow("Tom", false),
+                                    rightRecordK2V2.getValue()),
+                            binaryrow("Jim"),
+                            Collections.emptyMap());
+            verifyCacheData(cache, expectedLeftCacheData, 
expectedRightCacheData, 5, 3, 4, 2);
+            assertThat(MyAsyncFunction.leftInvokeCount.get()).isEqualTo(2);
+            assertThat(MyAsyncFunction.rightInvokeCount.get()).isEqualTo(2);
+        } else {
+            verifyCacheData(cache, Collections.emptyMap(), 
Collections.emptyMap(), 0, 0, 0, 0);
+            assertThat(MyAsyncFunction.leftInvokeCount.get()).isEqualTo(4);
+            assertThat(MyAsyncFunction.rightInvokeCount.get()).isEqualTo(5);
+        }
+    }
+
     @TestTemplate
     void testBlockingWithSameJoinKey() throws Exception {
+        LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE;
+        initTestHarness(testSpec);
+        initAssertor(testSpec);
+
         // block the async function
         MyAsyncFunction.block();
 
@@ -377,6 +371,9 @@ public class StreamingDeltaJoinOperatorTest {
         assertThat(recordsBuffer.getActiveBuffer().size()).isEqualTo(3);
         assertThat(recordsBuffer.getBlockingBuffer().size()).isEqualTo(2);
 
+        RowDataKeySelector leftJoinKeySelector = 
testSpec.getLeftJoinKeySelector();
+        RowDataKeySelector rightJoinKeySelector = 
testSpec.getRightJoinKeySelector();
+
         RowData joinKey1 = leftJoinKeySelector.getKey(insertRecord(100, true, 
"jklk1").getValue());
         RowData joinKey2 = leftJoinKeySelector.getKey(insertRecord(100, false, 
"jklk2").getValue());
         RowData joinKey3 =
@@ -411,6 +408,8 @@ public class StreamingDeltaJoinOperatorTest {
 
         DeltaJoinCache cache = unwrapCache(testHarness);
         if (enableCache) {
+            RowType leftRowType = testSpec.getLeftInputRowType();
+            RowType rightRowType = testSpec.getRightInputRowType();
             Map<RowData, Map<RowData, Object>> expectedLeftCacheData =
                     newHashMap(
                             binaryrow(true, "jklk1"),
@@ -457,29 +456,33 @@ public class StreamingDeltaJoinOperatorTest {
      * source and delta-join).
      */
     @TestTemplate
-    void testTableDataVisibleBeforeJoin() throws Exception {
+    void testLogTableDataVisibleBeforeJoin() throws Exception {
+        LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE;
+        initTestHarness(testSpec);
+        initAssertor(testSpec);
+
         MyAsyncExecutionControllerDelegate.insertTableDataAfterEmit = false;
 
         // prepare the data first to mock all following requests were in 
flight between source and
         // delta-join
         final StreamRecord<RowData> leftRecord1 = insertRecord(100, true, 
"jklk1");
-        insertLeftTable(leftRecord1);
+        insertLeftTable(testSpec, leftRecord1);
 
         final StreamRecord<RowData> leftRecord2 = insertRecord(200, true, 
"jklk1");
-        insertLeftTable(leftRecord2);
+        insertLeftTable(testSpec, leftRecord2);
 
         final StreamRecord<RowData> rightRecord1 = insertRecord("jklk1", 300, 
true);
-        insertRightTable(rightRecord1);
+        insertRightTable(testSpec, rightRecord1);
 
         // mismatch
         final StreamRecord<RowData> rightRecord2 = insertRecord("jklk2", 500, 
false);
-        insertRightTable(rightRecord2);
+        insertRightTable(testSpec, rightRecord2);
 
         final StreamRecord<RowData> leftRecord3 = insertRecord(800, true, 
"jklk1");
-        insertLeftTable(leftRecord3);
+        insertLeftTable(testSpec, leftRecord3);
 
         final StreamRecord<RowData> rightRecord3 = insertRecord("jklk1", 1000, 
true);
-        insertRightTable(rightRecord3);
+        insertRightTable(testSpec, rightRecord3);
 
         testHarness.processElement1(leftRecord1);
         testHarness.processElement1(leftRecord2);
@@ -525,6 +528,8 @@ public class StreamingDeltaJoinOperatorTest {
 
         DeltaJoinCache cache = unwrapCache(testHarness);
         if (enableCache) {
+            RowType leftRowType = testSpec.getLeftInputRowType();
+            RowType rightRowType = testSpec.getRightInputRowType();
             Map<RowData, Map<RowData, Object>> expectedLeftCacheData =
                     newHashMap(
                             binaryrow(true, "jklk1"),
@@ -557,8 +562,134 @@ public class StreamingDeltaJoinOperatorTest {
         }
     }
 
+    /**
+     * This test is used to test the scenario where the right stream side 
joined out a record from
+     * the left table that has not been sent to the delta-join operator (maybe 
is in flight between
+     * source and delta-join).
+     */
+    @TestTemplate
+    void testPkTableDataVisibleBeforeJoin() throws Exception {
+        PkPkTableJoinTestSpec testSpec = PkPkTableJoinTestSpec.INSTANCE;
+        initTestHarness(testSpec);
+        initAssertor(testSpec);
+
+        MyAsyncExecutionControllerDelegate.insertTableDataAfterEmit = false;
+
+        // prepare the data first to mock all following requests were in 
flight between source and
+        // delta-join
+        final StreamRecord<RowData> leftRecordK1V1 = insertRecord(100, true, 
"Tom");
+        insertLeftTable(testSpec, leftRecordK1V1);
+        final StreamRecord<RowData> leftRecordK1V2 = updateAfterRecord(1000, 
true, "Tom");
+        insertLeftTable(testSpec, leftRecordK1V2);
+
+        final StreamRecord<RowData> leftRecordK2V1 = insertRecord(101, false, 
"Tom");
+        insertLeftTable(testSpec, leftRecordK2V1);
+
+        // mismatch
+        final StreamRecord<RowData> leftRecordK3V1 = insertRecord(101, false, 
"Jim");
+        insertLeftTable(testSpec, leftRecordK3V1);
+        final StreamRecord<RowData> leftRecordK3V2 = updateAfterRecord(1001, 
false, "Jim");
+        insertLeftTable(testSpec, leftRecordK3V2);
+
+        final StreamRecord<RowData> rightRecordK1V1 = insertRecord("Tom", 200, 
true);
+        insertRightTable(testSpec, rightRecordK1V1);
+        final StreamRecord<RowData> rightRecordK1V2 = updateAfterRecord("Tom", 
2000, true);
+        insertRightTable(testSpec, rightRecordK1V2);
+        final StreamRecord<RowData> rightRecordK1V3 = updateAfterRecord("Tom", 
20000, true);
+        insertRightTable(testSpec, rightRecordK1V3);
+
+        final StreamRecord<RowData> rightRecordK2V1 = insertRecord("Tom", 201, 
false);
+        insertRightTable(testSpec, rightRecordK2V1);
+
+        // mismatch
+        final StreamRecord<RowData> rightRecordK3V1 = insertRecord("Sam", 999, 
false);
+        insertRightTable(testSpec, rightRecordK3V1);
+
+        final ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+
+        testHarness.processElement1(leftRecordK1V1);
+        expectedOutput.add(insertRecord(100, true, "Tom", "Tom", 20000, true));
+        expectedOutput.add(insertRecord(100, true, "Tom", "Tom", 201, false));
+
+        testHarness.processElement1(leftRecordK1V2);
+        expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 20000, 
true));
+        expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 201, 
false));
+
+        testHarness.processElement1(leftRecordK2V1);
+        expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 20000, 
true));
+        expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 201, false));
+
+        testHarness.processElement1(leftRecordK3V1);
+        testHarness.processElement1(leftRecordK3V2);
+
+        testHarness.processElement2(rightRecordK1V1);
+        expectedOutput.add(insertRecord(1000, true, "Tom", "Tom", 200, true));
+        expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 200, true));
+
+        testHarness.processElement2(rightRecordK1V2);
+        expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 2000, 
true));
+        expectedOutput.add(updateAfterRecord(101, false, "Tom", "Tom", 2000, 
true));
+
+        testHarness.processElement2(rightRecordK1V3);
+        expectedOutput.add(updateAfterRecord(1000, true, "Tom", "Tom", 20000, 
true));
+        expectedOutput.add(updateAfterRecord(101, false, "Tom", "Tom", 20000, 
true));
+
+        testHarness.processElement2(rightRecordK2V1);
+        expectedOutput.add(insertRecord(1000, true, "Tom", "Tom", 201, false));
+        expectedOutput.add(insertRecord(101, false, "Tom", "Tom", 201, false));
+
+        testHarness.processElement2(rightRecordK3V1);
+
+        waitAllDataProcessed();
+
+        assertor.assertOutputEqualsSorted(
+                "result mismatch", expectedOutput, testHarness.getOutput());
+
+        TableAsyncExecutionController<RowData, RowData, RowData> aec = 
unwrapAEC(testHarness);
+        assertThat(aec.getBlockingSize()).isEqualTo(0);
+        assertThat(aec.getInFlightSize()).isEqualTo(0);
+        assertThat(aec.getFinishSize()).isEqualTo(0);
+
+        DeltaJoinCache cache = unwrapCache(testHarness);
+        if (enableCache) {
+            Map<RowData, Map<RowData, Object>> expectedLeftCacheData =
+                    newHashMap(
+                            binaryrow("Tom"),
+                            newHashMap(
+                                    binaryrow(true, "Tom"),
+                                    leftRecordK1V2.getValue(),
+                                    binaryrow(false, "Tom"),
+                                    leftRecordK2V1.getValue()),
+                            binaryrow("Sam"),
+                            Collections.emptyMap());
+
+            Map<RowData, Map<RowData, Object>> expectedRightCacheData =
+                    newHashMap(
+                            binaryrow("Tom"),
+                            newHashMap(
+                                    binaryrow("Tom", true),
+                                    rightRecordK1V3.getValue(),
+                                    binaryrow("Tom", false),
+                                    rightRecordK2V1.getValue()),
+                            binaryrow("Jim"),
+                            Collections.emptyMap());
+
+            verifyCacheData(cache, expectedLeftCacheData, 
expectedRightCacheData, 5, 3, 5, 3);
+            assertThat(MyAsyncFunction.leftInvokeCount.get()).isEqualTo(2);
+            assertThat(MyAsyncFunction.rightInvokeCount.get()).isEqualTo(2);
+        } else {
+            verifyCacheData(cache, Collections.emptyMap(), 
Collections.emptyMap(), 0, 0, 0, 0);
+            assertThat(MyAsyncFunction.leftInvokeCount.get()).isEqualTo(5);
+            assertThat(MyAsyncFunction.rightInvokeCount.get()).isEqualTo(5);
+        }
+    }
+
     @TestTemplate
     void testCheckpointAndRestore() throws Exception {
+        LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE;
+        initTestHarness(testSpec);
+        initAssertor(testSpec);
+
         // block the async function
         MyAsyncFunction.block();
 
@@ -599,19 +730,19 @@ public class StreamingDeltaJoinOperatorTest {
 
         MyAsyncFunction.block();
         // restoring
-        testHarness = createDeltaJoinOperatorTestHarness();
+        testHarness = createDeltaJoinOperatorTestHarness(testSpec);
 
         testHarness.setup();
 
         StreamingDeltaJoinOperator operator = unwrapOperator(testHarness);
         operator.setAsyncExecutionController(
-                new 
MyAsyncExecutionControllerDelegate(operator.getAsyncExecutionController()));
+                new MyAsyncExecutionControllerDelegate(
+                        testSpec, operator.getAsyncExecutionController()));
 
         latestException = Optional.empty();
         testHarness.initializeState(snapshot);
 
         testHarness.open();
-        prepareOperatorRuntimeInfo(operator);
 
         aec = unwrapAEC(testHarness);
         assertThat(aec.getBlockingSize()).isEqualTo(2);
@@ -641,6 +772,8 @@ public class StreamingDeltaJoinOperatorTest {
 
         DeltaJoinCache cache = unwrapCache(testHarness);
         if (enableCache) {
+            RowType leftRowType = testSpec.getLeftInputRowType();
+            RowType rightRowType = testSpec.getRightInputRowType();
             Map<RowData, Map<RowData, Object>> expectedLeftCacheData =
                     newHashMap(
                             binaryrow(true, "jklk1"),
@@ -671,6 +804,10 @@ public class StreamingDeltaJoinOperatorTest {
 
     @TestTemplate
     void testClearLegacyStateWhenCheckpointing() throws Exception {
+        LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE;
+        initTestHarness(testSpec);
+        initAssertor(testSpec);
+
         // block the async function
         MyAsyncFunction.block();
 
@@ -722,6 +859,10 @@ public class StreamingDeltaJoinOperatorTest {
 
     @TestTemplate
     void testMeetExceptionWhenLookup() throws Exception {
+        LogLogTableJoinTestSpec testSpec = LogLogTableJoinTestSpec.INSTANCE;
+        initTestHarness(testSpec);
+        initAssertor(testSpec);
+
         Throwable expectedException = new IllegalStateException("Mock to 
fail");
         MyAsyncFunction.setExpectedThrownException(expectedException);
 
@@ -739,6 +880,52 @@ public class StreamingDeltaJoinOperatorTest {
                 .isEqualTo(expectedException);
     }
 
+    private void initTestHarness(AbstractTestSpec testSpec) throws Exception {
+        testHarness = createDeltaJoinOperatorTestHarness(testSpec);
+        testHarness.setup();
+        testHarness.open();
+        StreamingDeltaJoinOperator operator = unwrapOperator(testHarness);
+        // set external failure cause consumer to prevent hang
+        testHarness
+                .getEnvironment()
+                .setExternalFailureCauseConsumer(
+                        error -> {
+                            latestException = Optional.of(error);
+                            // DO NOT throw exception up again to avoid hang
+                        });
+        operator.setAsyncExecutionController(
+                new MyAsyncExecutionControllerDelegate(
+                        testSpec, operator.getAsyncExecutionController()));
+    }
+
+    private void initAssertor(AbstractTestSpec testSpec) {
+        RowType outputRowType = testSpec.getOutputRowType();
+        assertor =
+                new RowDataHarnessAssertor(
+                        outputRowType.getChildren().toArray(new 
LogicalType[0]),
+                        // sort the result by the output upsert key
+                        (o1, o2) -> {
+                            for (int keyIndex : 
testSpec.getOutputFieldIndices()) {
+                                LogicalType type = 
outputRowType.getChildren().get(keyIndex);
+                                RowData.FieldGetter getter =
+                                        RowData.createFieldGetter(type, 
keyIndex);
+
+                                int compareResult =
+                                        
Objects.requireNonNull(getter.getFieldOrNull(o1))
+                                                .toString()
+                                                .compareTo(
+                                                        Objects.requireNonNull(
+                                                                        
getter.getFieldOrNull(o2))
+                                                                .toString());
+
+                                if (compareResult != 0) {
+                                    return compareResult;
+                                }
+                            }
+                            return o1.toString().compareTo(o2.toString());
+                        });
+    }
+
     private void verifyCacheData(
             DeltaJoinCache actualCache,
             Map<RowData, Map<RowData, Object>> expectedLeftCacheData,
@@ -820,58 +1007,68 @@ public class StreamingDeltaJoinOperatorTest {
     }
 
     private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, 
RowData>
-            createDeltaJoinOperatorTestHarness() throws Exception {
+            createDeltaJoinOperatorTestHarness(AbstractTestSpec testSpec) 
throws Exception {
         TaskMailbox mailbox = new TaskMailboxImpl();
         MailboxProcessor mailboxProcessor =
                 new MailboxProcessor(controller -> {}, mailbox, 
StreamTaskActionExecutor.IMMEDIATE);
 
         DataStructureConverter<RowData, Object> leftFetcherConverter =
                 (DataStructureConverter)
-                        
DataStructureConverters.getConverter(leftTypeInfo.getDataType());
-
-        RowDataKeySelector leftUpsertKeySelector = 
getUpsertKeySelector(leftRowType, null);
-        RowDataKeySelector rightUpsertKeySelector = 
getUpsertKeySelector(rightRowType, null);
+                        DataStructureConverters.getConverter(
+                                testSpec.getLeftTypeInfo().getDataType());
 
         AsyncDeltaJoinRunner leftAsyncFunction =
                 new AsyncDeltaJoinRunner(
-                        new GeneratedFunctionWrapper<>(new MyAsyncFunction()),
+                        new GeneratedFunction<>("", "", new Object[0]) {
+                            @Override
+                            public MyAsyncFunction newInstance(ClassLoader 
classLoader) {
+                                return new MyAsyncFunction(testSpec, false);
+                            }
+                        },
                         leftFetcherConverter,
                         new GeneratedResultFutureWrapper<>(new 
TestingFetcherResultFuture()),
-                        leftTypeInfo.toRowSerializer(),
-                        leftJoinKeySelector,
-                        leftUpsertKeySelector,
-                        rightJoinKeySelector,
-                        rightUpsertKeySelector,
+                        testSpec.getLeftTypeInfo().toRowSerializer(),
+                        testSpec.getLeftJoinKeySelector(),
+                        testSpec.getLeftUpsertKeySelector(),
+                        testSpec.getRightJoinKeySelector(),
+                        testSpec.getRightUpsertKeySelector(),
                         AEC_CAPACITY,
                         false,
                         enableCache);
 
         DataStructureConverter<RowData, Object> rightFetcherConverter =
                 (DataStructureConverter)
-                        
DataStructureConverters.getConverter(rightTypeInfo.getDataType());
+                        DataStructureConverters.getConverter(
+                                testSpec.getRightTypeInfo().getDataType());
 
         AsyncDeltaJoinRunner rightAsyncFunction =
                 new AsyncDeltaJoinRunner(
-                        new GeneratedFunctionWrapper<>(new MyAsyncFunction()),
+                        new GeneratedFunction<>("", "", new Object[0]) {
+                            @Override
+                            public MyAsyncFunction newInstance(ClassLoader 
classLoader) {
+                                return new MyAsyncFunction(testSpec, true);
+                            }
+                        },
                         rightFetcherConverter,
                         new GeneratedResultFutureWrapper<>(new 
TestingFetcherResultFuture()),
-                        rightTypeInfo.toRowSerializer(),
-                        leftJoinKeySelector,
-                        leftUpsertKeySelector,
-                        rightJoinKeySelector,
-                        rightUpsertKeySelector,
+                        testSpec.getRightTypeInfo().toRowSerializer(),
+                        testSpec.getLeftJoinKeySelector(),
+                        testSpec.getLeftUpsertKeySelector(),
+                        testSpec.getRightJoinKeySelector(),
+                        testSpec.getRightUpsertKeySelector(),
                         AEC_CAPACITY,
                         true,
                         enableCache);
 
-        InternalTypeInfo<RowData> joinKeyTypeInfo = 
leftJoinKeySelector.getProducedType();
+        InternalTypeInfo<RowData> joinKeyTypeInfo =
+                testSpec.getLeftJoinKeySelector().getProducedType();
 
         StreamingDeltaJoinOperator operator =
                 new StreamingDeltaJoinOperator(
                         rightAsyncFunction,
                         leftAsyncFunction,
-                        leftJoinKeySelector,
-                        rightJoinKeySelector,
+                        testSpec.getLeftJoinKeySelector(),
+                        testSpec.getRightJoinKeySelector(),
                         -1L,
                         AEC_CAPACITY,
                         new TestProcessingTimeService(),
@@ -879,41 +1076,19 @@ public class StreamingDeltaJoinOperatorTest {
                                 mailbox, 0, 
StreamTaskActionExecutor.IMMEDIATE, mailboxProcessor),
                         CACHE_SIZE,
                         CACHE_SIZE,
-                        leftRowType,
-                        rightRowType);
+                        testSpec.getLeftInputRowType(),
+                        testSpec.getRightInputRowType());
 
         return new KeyedTwoInputStreamOperatorTestHarness<>(
                 operator,
-                leftJoinKeySelector,
-                rightJoinKeySelector,
+                testSpec.getLeftJoinKeySelector(),
+                testSpec.getRightJoinKeySelector(),
                 joinKeyTypeInfo,
                 1,
                 1,
                 0,
-                leftTypeInfo.toSerializer(),
-                rightTypeInfo.toSerializer());
-    }
-
-    private RowDataKeySelector getUpsertKeySelector(RowType rowType, @Nullable 
int[] upsertKey) {
-        if (upsertKey == null) {
-            upsertKey = IntStream.range(0, rowType.getFieldCount()).toArray();
-        }
-        return HandwrittenSelectorUtil.getRowDataSelector(
-                upsertKey, rowType.getChildren().toArray(new LogicalType[0]));
-    }
-
-    private void prepareOperatorRuntimeInfo(StreamingDeltaJoinOperator 
operator) {
-        unwrapAsyncFunction(operator, true).tagInvokingSideDuringRuntime(true);
-        unwrapAsyncFunction(operator, 
false).tagInvokingSideDuringRuntime(false);
-    }
-
-    private MyAsyncFunction unwrapAsyncFunction(
-            StreamingDeltaJoinOperator operator, boolean unwrapLeft) {
-        if (unwrapLeft) {
-            return (MyAsyncFunction) 
operator.getLeftTriggeredUserFunction().getFetcher();
-        } else {
-            return (MyAsyncFunction) 
operator.getRightTriggeredUserFunction().getFetcher();
-        }
+                testSpec.getLeftTypeInfo().toSerializer(),
+                testSpec.getRightTypeInfo().toSerializer());
     }
 
     private TableAsyncExecutionController<RowData, RowData, RowData> unwrapAEC(
@@ -941,36 +1116,27 @@ public class StreamingDeltaJoinOperatorTest {
         return cacheInLeftRunner;
     }
 
-    private RowType getOutputType() {
-        return RowType.of(
-                Stream.concat(
-                                leftRowType.getChildren().stream(),
-                                rightRowType.getChildren().stream())
-                        .toArray(LogicalType[]::new),
-                Stream.concat(
-                                leftRowType.getFieldNames().stream(),
-                                rightRowType.getFieldNames().stream())
-                        .toArray(String[]::new));
+    private void insertLeftTable(AbstractTestSpec testSpec, 
StreamRecord<RowData> record) {
+        insertTableData(testSpec, record, true);
     }
 
-    private void insertLeftTable(StreamRecord<RowData> record) {
-        insertTableData(record, true);
+    private void insertRightTable(AbstractTestSpec testSpec, 
StreamRecord<RowData> record) {
+        insertTableData(testSpec, record, false);
     }
 
-    private void insertRightTable(StreamRecord<RowData> record) {
-        insertTableData(record, false);
-    }
-
-    private static void insertTableData(StreamRecord<RowData> record, boolean 
insertLeftTable) {
+    private static void insertTableData(
+            AbstractTestSpec testSpec, StreamRecord<RowData> record, boolean 
insertLeftTable) {
         RowData rowData = record.getValue();
         try {
             if (insertLeftTable) {
                 synchronized (leftTableCurrentData) {
-                    leftTableCurrentData.add(rowData);
+                    RowData upsertKey = 
testSpec.getLeftUpsertKeySelector().getKey(rowData);
+                    leftTableCurrentData.put(upsertKey, rowData);
                 }
             } else {
                 synchronized (rightTableCurrentData) {
-                    rightTableCurrentData.add(rowData);
+                    RowData upsertKey = 
testSpec.getRightUpsertKeySelector().getKey(rowData);
+                    rightTableCurrentData.put(upsertKey, rowData);
                 }
             }
         } catch (Exception e) {
@@ -1018,11 +1184,12 @@ public class StreamingDeltaJoinOperatorTest {
 
         private static Optional<Throwable> expectedThrownException = 
Optional.empty();
 
-        // ===== runtime info =====
-        private Boolean treatRightAsLookupTable;
+        private final AbstractTestSpec testSpec;
+        private final boolean treatRightAsLookupTable;
 
-        public void tagInvokingSideDuringRuntime(boolean isLeftInvoking) {
-            this.treatRightAsLookupTable = isLeftInvoking;
+        private MyAsyncFunction(AbstractTestSpec testSpec, boolean 
treatRightAsLookupTable) {
+            this.testSpec = testSpec;
+            this.treatRightAsLookupTable = treatRightAsLookupTable;
         }
 
         public static void block() throws Exception {
@@ -1054,29 +1221,29 @@ public class StreamingDeltaJoinOperatorTest {
                                 lock.await();
                             }
 
-                            LinkedList<RowData> lookupTableData;
+                            HashMap<RowData, RowData> lookupTableData;
                             RowDataKeySelector streamSideJoinKeySelector;
                             RowDataKeySelector lookupSideJoinKeySelector;
-                            if 
(Objects.requireNonNull(treatRightAsLookupTable)) {
+                            if (treatRightAsLookupTable) {
                                 synchronized (rightTableCurrentData) {
-                                    lookupTableData = new 
LinkedList<>(rightTableCurrentData);
+                                    lookupTableData = new 
HashMap<>(rightTableCurrentData);
                                 }
 
-                                streamSideJoinKeySelector = 
leftJoinKeySelector.copy();
-                                lookupSideJoinKeySelector = 
rightJoinKeySelector.copy();
+                                streamSideJoinKeySelector = 
testSpec.getLeftJoinKeySelector();
+                                lookupSideJoinKeySelector = 
testSpec.getRightJoinKeySelector();
                                 leftInvokeCount.incrementAndGet();
                             } else {
                                 synchronized (leftTableCurrentData) {
-                                    lookupTableData = new 
LinkedList<>(leftTableCurrentData);
+                                    lookupTableData = new 
HashMap<>(leftTableCurrentData);
                                 }
 
-                                streamSideJoinKeySelector = 
rightJoinKeySelector.copy();
-                                lookupSideJoinKeySelector = 
leftJoinKeySelector.copy();
+                                streamSideJoinKeySelector = 
testSpec.getRightJoinKeySelector();
+                                lookupSideJoinKeySelector = 
testSpec.getLeftJoinKeySelector();
                                 rightInvokeCount.incrementAndGet();
                             }
 
                             List<Object> results = new ArrayList<>();
-                            for (RowData row : lookupTableData) {
+                            for (RowData row : lookupTableData.values()) {
                                 if (streamSideJoinKeySelector
                                         .getKey(input)
                                         
.equals(lookupSideJoinKeySelector.getKey(row))) {
@@ -1139,6 +1306,7 @@ public class StreamingDeltaJoinOperatorTest {
         private static boolean insertTableDataAfterEmit = true;
 
         public MyAsyncExecutionControllerDelegate(
+                AbstractTestSpec testSpec,
                 TableAsyncExecutionController<RowData, RowData, RowData> 
innerAec) {
             super(
                     innerAec.getAsyncInvoke(),
@@ -1153,6 +1321,7 @@ public class StreamingDeltaJoinOperatorTest {
                             int inputIndex = 
inputIndexAwareEntry.getInputIndex();
                             //noinspection unchecked
                             insertTableData(
+                                    testSpec,
                                     (StreamRecord<RowData>) 
inputIndexAwareEntry.getInputElement(),
                                     inputIndex == 0);
                         }
@@ -1178,4 +1347,205 @@ public class StreamingDeltaJoinOperatorTest {
             getResultFuture().complete((Collection) result);
         }
     }
+
+    private abstract static class AbstractTestSpec {
+
+        abstract RowType getLeftInputRowType();
+
+        final InternalTypeInfo<RowData> getLeftTypeInfo() {
+            return InternalTypeInfo.of(getLeftInputRowType());
+        }
+
+        abstract Optional<int[]> getLeftUpsertKey();
+
+        final RowDataKeySelector getLeftUpsertKeySelector() {
+            return getUpsertKeySelector(getLeftInputRowType(), 
getLeftUpsertKey().orElse(null));
+        }
+
+        abstract RowType getRightInputRowType();
+
+        final InternalTypeInfo<RowData> getRightTypeInfo() {
+            return InternalTypeInfo.of(getRightInputRowType());
+        }
+
+        abstract Optional<int[]> getRightUpsertKey();
+
+        final RowDataKeySelector getRightUpsertKeySelector() {
+            return getUpsertKeySelector(getRightInputRowType(), 
getRightUpsertKey().orElse(null));
+        }
+
+        abstract int[] getLeftJoinKeyIndices();
+
+        final RowDataKeySelector getLeftJoinKeySelector() {
+            return HandwrittenSelectorUtil.getRowDataSelector(
+                    getLeftJoinKeyIndices(),
+                    getLeftInputRowType().getChildren().toArray(new 
LogicalType[0]));
+        }
+
+        abstract int[] getRightJoinKeyIndices();
+
+        final RowDataKeySelector getRightJoinKeySelector() {
+            return HandwrittenSelectorUtil.getRowDataSelector(
+                    getRightJoinKeyIndices(),
+                    getRightInputRowType().getChildren().toArray(new 
LogicalType[0]));
+        }
+
+        final RowType getOutputRowType() {
+            return RowType.of(
+                    Stream.concat(
+                                    
getLeftInputRowType().getChildren().stream(),
+                                    
getRightInputRowType().getChildren().stream())
+                            .toArray(LogicalType[]::new),
+                    Stream.concat(
+                                    
getLeftInputRowType().getFieldNames().stream(),
+                                    
getRightInputRowType().getFieldNames().stream())
+                            .toArray(String[]::new));
+        }
+
+        final int[] getOutputFieldIndices() {
+            return IntStream.range(0, 
getOutputRowType().getFieldCount()).toArray();
+        }
+
+        private RowDataKeySelector getUpsertKeySelector(
+                RowType rowType, @Nullable int[] upsertKey) {
+            if (upsertKey == null) {
+                upsertKey = IntStream.range(0, 
rowType.getFieldCount()).toArray();
+            }
+            return HandwrittenSelectorUtil.getRowDataSelector(
+                    upsertKey, rowType.getChildren().toArray(new 
LogicalType[0]));
+        }
+    }
+
+    /**
+     * Mock sql like the following.
+     *
+     * <pre>
+     *      CREATE TABLE leftSrc(
+     *          left_value INT,
+     *          left_jk1 BOOLEAN,
+     *          left_jk2_index STRING,
+     *          INDEX(left_jk2_index)
+     *      )
+     * </pre>
+     *
+     * <pre>
+     *      CREATE TABLE rightSrc(
+     *          right_jk2 STRING,
+     *          right_value INT,
+     *          right_jk1_index BOOLEAN,
+     *          INDEX(right_jk1_index)
+     *      )
+     * </pre>
+     *
+     * <pre>
+     *     select * from leftSrc join rightSrc
+     *      on leftSrc.left_jk1 = rightSrc.right_jk1_index
+     *      and leftSrc.left_jk2_index = rightSrc.right_jk2
+     * </pre>
+     */
+    private static class LogLogTableJoinTestSpec extends AbstractTestSpec {
+
+        private static final LogLogTableJoinTestSpec INSTANCE = new 
LogLogTableJoinTestSpec();
+
+        @Override
+        RowType getLeftInputRowType() {
+            return RowType.of(
+                    new LogicalType[] {new IntType(), new BooleanType(), 
VarCharType.STRING_TYPE},
+                    new String[] {"left_value", "left_jk1", "left_jk2_index"});
+        }
+
+        @Override
+        RowType getRightInputRowType() {
+            return RowType.of(
+                    new LogicalType[] {VarCharType.STRING_TYPE, new IntType(), 
new BooleanType()},
+                    new String[] {"right_jk2", "right_value", 
"right_jk1_index"});
+        }
+
+        @Override
+        Optional<int[]> getLeftUpsertKey() {
+            return Optional.empty();
+        }
+
+        @Override
+        Optional<int[]> getRightUpsertKey() {
+            return Optional.empty();
+        }
+
+        @Override
+        int[] getLeftJoinKeyIndices() {
+            return new int[] {1, 2};
+        }
+
+        @Override
+        int[] getRightJoinKeyIndices() {
+            return new int[] {2, 0};
+        }
+    }
+
+    /**
+     * Mock sql like the following.
+     *
+     * <pre>
+     *      CREATE TABLE leftSrc(
+     *          left_value INT,
+     *          left_pk1 BOOLEAN,
+     *          left_pk2_jk_index STRING,
+     *          PRIMARY KEY (left_pk1, left_pk2_jk_index) NOT ENFORCED
+     *          INDEX(left_pk2_jk_index)
+     *      )
+     * </pre>
+     *
+     * <pre>
+     *      CREATE TABLE rightSrc(
+     *          right_pk2_jk_index STRING,
+     *          right_value INT,
+     *          right_pk1 BOOLEAN,
+     *          PRIMARY KEY (right_pk2_jk_index, right_pk1) NOT ENFORCED
+     *          INDEX(right_pk2_jk_index)
+     *      )
+     * </pre>
+     *
+     * <pre>
+     *     select * from leftSrc join rightSrc
+     *      on leftSrc.left_pk2_jk_index = rightSrc.right_pk2_jk_index
+     * </pre>
+     */
+    private static class PkPkTableJoinTestSpec extends AbstractTestSpec {
+
+        private static final PkPkTableJoinTestSpec INSTANCE = new 
PkPkTableJoinTestSpec();
+
+        @Override
+        RowType getLeftInputRowType() {
+            return RowType.of(
+                    new LogicalType[] {new IntType(), new BooleanType(), 
VarCharType.STRING_TYPE},
+                    new String[] {"left_value", "left_pk1", 
"left_pk2_jk_index"});
+        }
+
+        @Override
+        RowType getRightInputRowType() {
+            return RowType.of(
+                    new LogicalType[] {VarCharType.STRING_TYPE, new IntType(), 
new BooleanType()},
+                    new String[] {"right_pk2_jk_index", "right_value", 
"right_pk1"});
+        }
+
+        @Override
+        Optional<int[]> getLeftUpsertKey() {
+            return Optional.of(new int[] {1, 2});
+        }
+
+        @Override
+        Optional<int[]> getRightUpsertKey() {
+            return Optional.of(new int[] {0, 2});
+        }
+
+        @Override
+        int[] getLeftJoinKeyIndices() {
+            return new int[] {2};
+        }
+
+        @Override
+        int[] getRightJoinKeyIndices() {
+            return new int[] {0};
+        }
+    }
 }

Reply via email to