This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bff1fc2782e04598275ab05177b89df38f891c94
Author: lincoln.lil <lincoln.8...@gmail.com>
AuthorDate: Thu Sep 8 21:50:10 2022 +0800

    [FLINK-28569][table-planner] Fix SinkUpsertMaterializer that should be 
aware of the input upsertKey if it is not empty to prevent wrong results
    
    This closes #20791
---
 .../plan/nodes/exec/batch/BatchExecSink.java       |   3 +-
 .../plan/nodes/exec/common/CommonExecSink.java     |  25 +++-
 .../plan/nodes/exec/stream/StreamExecSink.java     |  12 +-
 .../table/planner/plan/utils/UpsertKeyUtil.java    |  66 +++++++++
 .../nodes/physical/stream/StreamPhysicalSink.scala |  10 +-
 .../nodes/exec/stream/TableSinkJsonPlanTest.java   |  36 +++++
 .../planner/plan/utils/UpsertKeyUtilTest.java      |  52 ++++++++
 .../utils/JavaUserDefinedScalarFunctions.java      |   4 +
 .../testChangelogSource.out                        |   1 +
 .../testUpsertSource.out                           |   1 +
 .../testDeduplication.out                          |   1 +
 .../ExpandJsonPlanTest_jsonplan/testExpand.out     |   1 +
 ...tDistinctAggCalls[isMiniBatchEnabled=false].out |   1 +
 ...stDistinctAggCalls[isMiniBatchEnabled=true].out |   1 +
 ...gCallsWithGroupBy[isMiniBatchEnabled=false].out |   1 +
 ...ggCallsWithGroupBy[isMiniBatchEnabled=true].out |   1 +
 ...erDefinedAggCalls[isMiniBatchEnabled=false].out |   1 +
 ...serDefinedAggCalls[isMiniBatchEnabled=true].out |   1 +
 .../testEventTimeTumbleWindow.out                  |   1 +
 .../testProcTimeTumbleWindow.out                   |   1 +
 .../testIncrementalAggregate.out                   |   1 +
 ...lAggregateWithSumCountDistinctAndRetraction.out |   1 +
 .../testInnerJoinWithEqualPk.out                   |   1 +
 ...WithNonDeterministicFuncSinkWithDifferentPk.out | 147 +++++++++++++++++++++
 .../runtime/stream/sql/TableSinkITCase.scala       |  62 +++++++++
 .../operators/sink/SinkUpsertMaterializer.java     | 125 +++++++++++++-----
 .../operators/sink/SinkUpsertMaterializerTest.java | 102 +++++++++++---
 27 files changed, 601 insertions(+), 58 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
index a6202871816..c9068656a51 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
@@ -71,6 +71,7 @@ public class BatchExecSink extends CommonExecSink implements 
BatchExecNode<Objec
                 inputTransform,
                 tableSink,
                 -1,
-                false);
+                false,
+                null);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 71f8184df2c..7f978565060 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -61,6 +61,7 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import 
org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.typeutils.RowTypeUtils;
 import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
@@ -141,7 +142,8 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
             Transformation<RowData> inputTransform,
             DynamicTableSink tableSink,
             int rowtimeFieldIndex,
-            boolean upsertMaterialize) {
+            boolean upsertMaterialize,
+            int[] inputUpsertKey) {
         final ResolvedSchema schema = 
tableSinkSpec.getContextResolvedTable().getResolvedSchema();
         final SinkRuntimeProvider runtimeProvider =
                 tableSink.getSinkRuntimeProvider(new 
SinkRuntimeProviderContext(isBounded));
@@ -193,7 +195,8 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
                             sinkParallelism,
                             config,
                             classLoader,
-                            physicalRowType);
+                            physicalRowType,
+                            inputUpsertKey);
         }
 
         return (Transformation<Object>)
@@ -402,16 +405,28 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
             int sinkParallelism,
             ExecNodeConfig config,
             ClassLoader classLoader,
-            RowType physicalRowType) {
-        GeneratedRecordEqualiser equaliser =
+            RowType physicalRowType,
+            int[] inputUpsertKey) {
+        final GeneratedRecordEqualiser rowEqualiser =
                 new EqualiserCodeGenerator(physicalRowType, classLoader)
                         .generateRecordEqualiser("SinkMaterializeEqualiser");
+        final GeneratedRecordEqualiser upsertKeyEqualiser =
+                inputUpsertKey == null
+                        ? null
+                        : new EqualiserCodeGenerator(
+                                        RowTypeUtils.projectRowType(
+                                                physicalRowType, 
inputUpsertKey),
+                                        classLoader)
+                                
.generateRecordEqualiser("SinkMaterializeUpsertKeyEqualiser");
+
         SinkUpsertMaterializer operator =
                 new SinkUpsertMaterializer(
                         StateConfigUtil.createTtlConfig(
                                 
config.get(ExecutionConfigOptions.IDLE_STATE_RETENTION).toMillis()),
                         InternalSerializers.create(physicalRowType),
-                        equaliser);
+                        rowEqualiser,
+                        upsertKeyEqualiser,
+                        inputUpsertKey);
         final String[] fieldNames = 
physicalRowType.getFieldNames().toArray(new String[0]);
         final List<String> pkFieldNames =
                 Arrays.stream(primaryKeys)
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
index 1bd7ad0b22b..6932c1a951b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecSink.java
@@ -74,6 +74,7 @@ public class StreamExecSink extends CommonExecSink implements 
StreamExecNode<Obj
 
     public static final String FIELD_NAME_INPUT_CHANGELOG_MODE = 
"inputChangelogMode";
     public static final String FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE = 
"requireUpsertMaterialize";
+    public static final String FIELD_NAME_INPUT_UPSERT_KEY = "inputUpsertKey";
 
     @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODE)
     private final ChangelogMode inputChangelogMode;
@@ -82,6 +83,10 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
     @JsonInclude(JsonInclude.Include.NON_DEFAULT)
     private final boolean upsertMaterialize;
 
+    @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY)
+    @JsonInclude(JsonInclude.Include.NON_DEFAULT)
+    private final int[] inputUpsertKey;
+
     public StreamExecSink(
             ReadableConfig tableConfig,
             DynamicTableSinkSpec tableSinkSpec,
@@ -89,6 +94,7 @@ public class StreamExecSink extends CommonExecSink implements 
StreamExecNode<Obj
             InputProperty inputProperty,
             LogicalType outputType,
             boolean upsertMaterialize,
+            int[] inputUpsertKey,
             String description) {
         this(
                 ExecNodeContext.newNodeId(),
@@ -99,6 +105,7 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
                 Collections.singletonList(inputProperty),
                 outputType,
                 upsertMaterialize,
+                inputUpsertKey,
                 description);
     }
 
@@ -112,6 +119,7 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
             @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> 
inputProperties,
             @JsonProperty(FIELD_NAME_OUTPUT_TYPE) LogicalType outputType,
             @JsonProperty(FIELD_NAME_REQUIRE_UPSERT_MATERIALIZE) boolean 
upsertMaterialize,
+            @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEY) int[] inputUpsertKey,
             @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
         super(
                 id,
@@ -125,6 +133,7 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
                 description);
         this.inputChangelogMode = inputChangelogMode;
         this.upsertMaterialize = upsertMaterialize;
+        this.inputUpsertKey = inputUpsertKey;
     }
 
     @SuppressWarnings("unchecked")
@@ -171,6 +180,7 @@ public class StreamExecSink extends CommonExecSink 
implements StreamExecNode<Obj
                 inputTransform,
                 tableSink,
                 rowtimeFieldIndex,
-                upsertMaterialize);
+                upsertMaterialize,
+                inputUpsertKey);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java
new file mode 100644
index 00000000000..3e054f6793f
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.calcite.util.ImmutableBitSet;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Set;
+
+/**
+ * Utility for upsertKey which represented as a Set of {@link
+ * org.apache.calcite.util.ImmutableBitSet}.
+ */
+public class UpsertKeyUtil {
+
+    /**
+     * Returns the smallest key of given upsert keys. The rule of 'small' is 
an upsert key
+     * represented by {@link ImmutableBitSet} has smaller cardinality or has a 
smaller leading
+     * element when the same cardinality. E.g., '{0,1}' is smaller than 
'{0,1,2}' and '{0,1}' is
+     * smaller than '{0,2}'.
+     *
+     * @param upsertKeys input upsert keys
+     * @return the smallest key
+     */
+    @Nonnull
+    public static int[] getSmallestKey(@Nullable Set<ImmutableBitSet> 
upsertKeys) {
+        if (null == upsertKeys || upsertKeys.isEmpty()) {
+            return new int[0];
+        }
+        return upsertKeys.stream()
+                .map(ImmutableBitSet::toArray)
+                .reduce(
+                        (k1, k2) -> {
+                            if (k1.length < k2.length) {
+                                return k1;
+                            }
+                            if (k1.length == k2.length) {
+                                for (int index = 0; index < k1.length; 
index++) {
+                                    if (k1[index] < k2[index]) {
+                                        return k1;
+                                    }
+                                }
+                            }
+                            return k2;
+                        })
+                .get();
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
index c64c41721ca..bffb097d6d7 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
@@ -21,11 +21,12 @@ import org.apache.flink.table.catalog.ContextResolvedTable
 import org.apache.flink.table.connector.sink.DynamicTableSink
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec
+import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.calcite.Sink
 import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink
-import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils
+import org.apache.flink.table.planner.plan.utils.{ChangelogPlanUtils, 
UpsertKeyUtil}
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapTableConfig
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
@@ -81,6 +82,12 @@ class StreamPhysicalSink(
     val tableSinkSpec =
       new DynamicTableSinkSpec(contextResolvedTable, 
util.Arrays.asList(abilitySpecs: _*))
     tableSinkSpec.setTableSink(tableSink)
+    // no need to call getUpsertKeysInKeyGroupRange here because there's no 
exchange before sink,
+    // and only add exchange in exec sink node.
+    val inputUpsertKeys = FlinkRelMetadataQuery
+      .reuseOrCreate(cluster.getMetadataQuery)
+      .getUpsertKeys(inputRel)
+
     new StreamExecSink(
       unwrapTableConfig(this),
       tableSinkSpec,
@@ -88,6 +95,7 @@ class StreamPhysicalSink(
       InputProperty.DEFAULT,
       FlinkTypeFactory.toLogicalRowType(getRowType),
       upsertMaterialize,
+      UpsertKeyUtil.getSmallestKey(inputUpsertKeys),
       getRelDetailedDescription)
   }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java
index e161bddd8f6..a467493499d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
+import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
 import org.apache.flink.table.planner.utils.StreamTableTestUtil;
 import org.apache.flink.table.planner.utils.TableTestBase;
 
@@ -91,4 +92,39 @@ public class TableSinkJsonPlanTest extends TableTestBase {
         tEnv.executeSql(sinkTableDdl);
         util.verifyJsonPlan("insert into MySink select * from MyTable");
     }
+
+    @Test
+    public void testCdcWithNonDeterministicFuncSinkWithDifferentPk() {
+        tEnv.createTemporaryFunction(
+                "ndFunc", new 
JavaUserDefinedScalarFunctions.NonDeterministicUdf());
+
+        String cdcDdl =
+                "CREATE TABLE users (\n"
+                        + "  user_id STRING,\n"
+                        + "  user_name STRING,\n"
+                        + "  email STRING,\n"
+                        + "  balance DECIMAL(18,2),\n"
+                        + "  primary key (user_id) not enforced\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'changelog-mode' = 'I,UA,UB,D'\n"
+                        + ")";
+
+        String sinkTableDdl =
+                "CREATE TABLE sink (\n"
+                        + "  user_id STRING,\n"
+                        + "  user_name STRING,\n"
+                        + "  email STRING,\n"
+                        + "  balance DECIMAL(18,2),\n"
+                        + "  PRIMARY KEY(email) NOT ENFORCED\n"
+                        + ") WITH(\n"
+                        + "  'connector' = 'values',\n"
+                        + "  'sink-insert-only' = 'false'\n"
+                        + ")";
+        tEnv.executeSql(cdcDdl);
+        tEnv.executeSql(sinkTableDdl);
+
+        util.verifyJsonPlan(
+                "insert into sink select user_id, ndFunc(user_name), email, 
balance from users");
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtilTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtilTest.java
new file mode 100644
index 00000000000..4ddfadfc1f9
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtilTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.utils;
+
+import org.apache.calcite.util.ImmutableBitSet;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link UpsertKeyUtil}. */
+public class UpsertKeyUtilTest {
+    private final int[] emptyKey = new int[0];
+
+    @Test
+    public void testSmallestKey() {
+        assertThat(UpsertKeyUtil.getSmallestKey(null)).isEqualTo(emptyKey);
+        assertThat(UpsertKeyUtil.getSmallestKey(new 
HashSet<>())).isEqualTo(emptyKey);
+
+        ImmutableBitSet smallestKey = ImmutableBitSet.of(0, 1);
+        ImmutableBitSet middleKey = ImmutableBitSet.of(0, 2);
+        ImmutableBitSet longKey = ImmutableBitSet.of(0, 1, 2);
+
+        Set<ImmutableBitSet> upsertKeys = new HashSet<>();
+        upsertKeys.add(smallestKey);
+        upsertKeys.add(middleKey);
+        
assertThat(UpsertKeyUtil.getSmallestKey(upsertKeys)).isEqualTo(smallestKey.toArray());
+
+        upsertKeys.clear();
+        upsertKeys.add(smallestKey);
+        upsertKeys.add(longKey);
+        
assertThat(UpsertKeyUtil.getSmallestKey(upsertKeys)).isEqualTo(smallestKey.toArray());
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
index 73c52d16e4d..5db0abf9cc8 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/JavaUserDefinedScalarFunctions.java
@@ -131,6 +131,10 @@ public class JavaUserDefinedScalarFunctions {
             return v + random.nextInt();
         }
 
+        public String eval(String v) {
+            return v + "-" + random.nextInt();
+        }
+
         @Override
         public boolean isDeterministic() {
             return false;
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out
index 1f4ca9d8dd7..46c9b032843 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testChangelogSource.out
@@ -133,6 +133,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>",
+    "inputUpsertKey" : [ 0, 1 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out
index 81bd0cc7972..e29a2507619 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ChangelogSourceJsonPlanTest_jsonplan/testUpsertSource.out
@@ -121,6 +121,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`a` BIGINT NOT NULL, `b` INT NOT NULL>",
+    "inputUpsertKey" : [ 0, 1 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, b])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/DeduplicationJsonPlanTest_jsonplan/testDeduplication.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/DeduplicationJsonPlanTest_jsonplan/testDeduplication.out
index f2c4a58820d..0c219aae929 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/DeduplicationJsonPlanTest_jsonplan/testDeduplication.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/DeduplicationJsonPlanTest_jsonplan/testDeduplication.out
@@ -272,6 +272,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`order_id` BIGINT, `user` VARCHAR(2147483647), 
`product` VARCHAR(2147483647), `order_time` TIMESTAMP(3)>",
+    "inputUpsertKey" : [ 2 ],
     "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[order_id, user, product, order_time])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out
index 3982472c711..8262813cced 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/ExpandJsonPlanTest_jsonplan/testExpand.out
@@ -365,6 +365,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`a` BIGINT, `$f1` BIGINT NOT NULL, `$f2` 
VARCHAR(2147483647)>",
+    "inputUpsertKey" : [ 0 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, $f1, $f2])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
index 52aad805789..1af9ee2c41c 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=false].out
@@ -297,6 +297,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT, `cnt_a2` BIGINT, `sum_a` 
BIGINT, `sum_b` INT, `avg_b` DOUBLE, `cnt_c` BIGINT>",
+    "inputUpsertKey" : [ 0 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[d, cnt_a1, cnt_a2, sum_a, sum_b, avg_b, cnt_c])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
index bfef96f23e1..12b2452fbef 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testDistinctAggCalls[isMiniBatchEnabled=true].out
@@ -545,6 +545,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`d` BIGINT, `cnt_a1` BIGINT, `cnt_a2` BIGINT, `sum_a` 
BIGINT, `sum_b` INT, `avg_b` DOUBLE, `cnt_c` BIGINT>",
+    "inputUpsertKey" : [ 0 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[d, cnt_a1, cnt_a2, sum_a, sum_b, avg_b, cnt_c])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out
index 4ca7313f0a2..f564a72dbad 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=false].out
@@ -240,6 +240,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`b` BIGINT, `cnt_a` BIGINT, `max_b` BIGINT, `min_c` 
VARCHAR(2147483647)>",
+    "inputUpsertKey" : [ 0 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, cnt_a, max_b, min_c])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
index b31d031ffd0..bfbee492d41 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testSimpleAggCallsWithGroupBy[isMiniBatchEnabled=true].out
@@ -305,6 +305,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`b` BIGINT, `cnt_a` BIGINT, `max_b` BIGINT, `min_c` 
VARCHAR(2147483647)>",
+    "inputUpsertKey" : [ 0 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, cnt_a, max_b, min_c])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out
index 1ce190c4f7c..41573678da6 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=false].out
@@ -233,6 +233,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`b` BIGINT, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, 
`c1` BIGINT>",
+    "inputUpsertKey" : [ 0 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, a1, a2, a3, c1])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
index 201e9547f11..36478bc1dd5 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupAggregateJsonPlanTest_jsonplan/testUserDefinedAggCalls[isMiniBatchEnabled=true].out
@@ -249,6 +249,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`b` BIGINT, `a1` BIGINT, `a2` BIGINT, `a3` BIGINT, 
`c1` BIGINT>",
+    "inputUpsertKey" : [ 0 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, a1, a2, a3, c1])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
index d30a69a2d5f..ab47bb954c1 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out
@@ -466,6 +466,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`b` BIGINT, `window_start` TIMESTAMP(3) NOT NULL, 
`window_end` TIMESTAMP(3) NOT NULL, `EXPR$3` BIGINT NOT NULL, `EXPR$4` INT, 
`EXPR$5` BIGINT NOT NULL, `EXPR$6` VARCHAR(2147483647)>",
+    "inputUpsertKey" : [ 0, 1 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, window_start, window_end, EXPR$3, EXPR$4, EXPR$5, EXPR$6])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
index bc5fd3fa7c0..52af443ff67 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/GroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out
@@ -451,6 +451,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`b` BIGINT, `window_end` TIMESTAMP(3) NOT NULL, 
`EXPR$2` BIGINT NOT NULL>",
+    "inputUpsertKey" : [ 0, 1 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, window_end, EXPR$2])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
index a2bcf705ab2..2f0585073c6 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregate.out
@@ -327,6 +327,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`a` BIGINT, `$f1` BIGINT NOT NULL>",
+    "inputUpsertKey" : [ 0 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a, $f1])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
index c4a0f219d8e..517dc8bc130 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IncrementalAggregateJsonPlanTest_jsonplan/testIncrementalAggregateWithSumCountDistinctAndRetraction.out
@@ -485,6 +485,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`b` BIGINT NOT NULL, `$f1` INT NOT NULL, `$f2` BIGINT 
NOT NULL, `$f3` BIGINT NOT NULL>",
+    "inputUpsertKey" : [ 0 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[b, $f1, $f2, $f3])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out
index 141b22ea160..a230c7d8695 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/JoinJsonPlanTest_jsonplan/testInnerJoinWithEqualPk.out
@@ -242,6 +242,7 @@
       "priority" : 0
     } ],
     "outputType" : "ROW<`a1` INT, `b1` INT>",
+    "inputUpsertKey" : [ 0 ],
     "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[a1, b1])"
   } ],
   "edges" : [ {
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testCdcWithNonDeterministicFuncSinkWithDifferentPk.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testCdcWithNonDeterministicFuncSinkWithDifferentPk.out
new file mode 100644
index 00000000000..40bedb2ea92
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TableSinkJsonPlanTest_jsonplan/testCdcWithNonDeterministicFuncSinkWithDifferentPk.out
@@ -0,0 +1,147 @@
+{
+  "flinkVersion" : "",
+  "nodes" : [ {
+    "id" : 1,
+    "type" : "stream-exec-table-source-scan_1",
+    "scanTableSource" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`users`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "user_id",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "user_name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "email",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "balance",
+              "dataType" : "DECIMAL(18, 2)"
+            } ],
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_-147132882",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "user_id" ]
+            }
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "connector" : "values",
+            "changelog-mode" : "I,UA,UB,D"
+          }
+        }
+      }
+    },
+    "outputType" : "ROW<`user_id` VARCHAR(2147483647) NOT NULL, `user_name` 
VARCHAR(2147483647), `email` VARCHAR(2147483647), `balance` DECIMAL(18, 2)>",
+    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, users]], fields=[user_id, user_name, email, balance])",
+    "inputProperties" : [ ]
+  }, {
+    "id" : 2,
+    "type" : "stream-exec-calc_1",
+    "projection" : [ {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 0,
+      "type" : "VARCHAR(2147483647) NOT NULL"
+    }, {
+      "kind" : "CALL",
+      "catalogName" : "`default_catalog`.`default_database`.`ndFunc`",
+      "operands" : [ {
+        "kind" : "INPUT_REF",
+        "inputIndex" : 1,
+        "type" : "VARCHAR(2147483647)"
+      } ],
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 2,
+      "type" : "VARCHAR(2147483647)"
+    }, {
+      "kind" : "INPUT_REF",
+      "inputIndex" : 3,
+      "type" : "DECIMAL(18, 2)"
+    } ],
+    "condition" : null,
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`user_id` VARCHAR(2147483647) NOT NULL, `EXPR$1` 
VARCHAR(2147483647), `email` VARCHAR(2147483647), `balance` DECIMAL(18, 2)>",
+    "description" : "Calc(select=[user_id, ndFunc(user_name) AS EXPR$1, email, 
balance])"
+  }, {
+    "id" : 3,
+    "type" : "stream-exec-sink_1",
+    "configuration" : {
+      "table.exec.sink.keyed-shuffle" : "AUTO",
+      "table.exec.sink.not-null-enforcer" : "ERROR",
+      "table.exec.sink.type-length-enforcer" : "IGNORE",
+      "table.exec.sink.upsert-materialize" : "AUTO"
+    },
+    "dynamicTableSink" : {
+      "table" : {
+        "identifier" : "`default_catalog`.`default_database`.`sink`",
+        "resolvedTable" : {
+          "schema" : {
+            "columns" : [ {
+              "name" : "user_id",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "user_name",
+              "dataType" : "VARCHAR(2147483647)"
+            }, {
+              "name" : "email",
+              "dataType" : "VARCHAR(2147483647) NOT NULL"
+            }, {
+              "name" : "balance",
+              "dataType" : "DECIMAL(18, 2)"
+            } ],
+            "watermarkSpecs" : [ ],
+            "primaryKey" : {
+              "name" : "PK_96619451",
+              "type" : "PRIMARY_KEY",
+              "columns" : [ "email" ]
+            }
+          },
+          "partitionKeys" : [ ],
+          "options" : {
+            "sink-insert-only" : "false",
+            "connector" : "values"
+          }
+        }
+      }
+    },
+    "inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", 
"DELETE" ],
+    "inputProperties" : [ {
+      "requiredDistribution" : {
+        "type" : "UNKNOWN"
+      },
+      "damBehavior" : "PIPELINED",
+      "priority" : 0
+    } ],
+    "outputType" : "ROW<`user_id` VARCHAR(2147483647) NOT NULL, `EXPR$1` 
VARCHAR(2147483647), `email` VARCHAR(2147483647), `balance` DECIMAL(18, 2)>",
+    "requireUpsertMaterialize" : true,
+    "inputUpsertKey" : [ 0 ],
+    "description" : "Sink(table=[default_catalog.default_database.sink], 
fields=[user_id, EXPR$1, email, balance], upsertMaterialize=[true])"
+  } ],
+  "edges" : [ {
+    "source" : 1,
+    "target" : 2,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  }, {
+    "source" : 2,
+    "target" : 3,
+    "shuffle" : {
+      "type" : "FORWARD"
+    },
+    "shuffleMode" : "PIPELINED"
+  } ]
+}
\ No newline at end of file
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
index 75b0d43706e..5dc7bc70e53 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TableSinkITCase.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.planner.runtime.stream.sql
 
+import org.apache.flink.table.planner.expressions.utils.TestNonDeterministicUdf
 import org.apache.flink.table.planner.factories.TestValuesTableFactory
 import org.apache.flink.table.planner.runtime.utils._
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
@@ -73,6 +74,21 @@ class TableSinkITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
          |  'data-id' = '$peopleDataId'
          |)
          |""".stripMargin)
+
+    val userDataId = 
TestValuesTableFactory.registerData(TestData.userChangelog)
+    tEnv.executeSql(s"""
+                       |CREATE TABLE users (
+                       |  user_id STRING,
+                       |  user_name STRING,
+                       |  email STRING,
+                       |  balance DECIMAL(18,2),
+                       |  primary key (user_id) not enforced
+                       |) WITH (
+                       | 'connector' = 'values',
+                       | 'data-id' = '$userDataId',
+                       | 'changelog-mode' = 'I,UA,UB,D'
+                       |)
+                       |""".stripMargin)
   }
 
   @Test
@@ -157,6 +173,52 @@ class TableSinkITCase(mode: StateBackendMode) extends 
StreamingWithStateTestBase
     assertEquals(expected.sorted, result.sorted)
   }
 
+  @Test
+  def testChangelogSourceWithNonDeterministicFuncSinkWithDifferentPk(): Unit = 
{
+    tEnv.createTemporaryFunction("ndFunc", new TestNonDeterministicUdf)
+    tEnv.executeSql("""
+                      |CREATE TABLE sink_with_pk (
+                      |  user_id STRING,
+                      |  user_name STRING,
+                      |  email STRING,
+                      |  balance DECIMAL(18,2),
+                      |  PRIMARY KEY(email) NOT ENFORCED
+                      |) WITH(
+                      |  'connector' = 'values',
+                      |  'sink-insert-only' = 'false'
+                      |)
+                      |""".stripMargin)
+
+    tEnv
+      .executeSql(s"""
+                     |insert into sink_with_pk
+                     |select user_id, SPLIT_INDEX(ndFunc(user_name), '-', 0), 
email, balance
+                     |from users
+                     |""".stripMargin)
+      .await()
+
+    val result = TestValuesTableFactory.getResults("sink_with_pk")
+    val expected = List(
+      "+I[user1, Tom, tom...@gmail.com, 8.10]",
+      "+I[user3, Bailey, bai...@qq.com, 9.99]",
+      "+I[user4, Tina, t...@gmail.com, 11.30]")
+    assertEquals(expected.sorted, result.sorted)
+
+    val rawResult = TestValuesTableFactory.getRawResults("sink_with_pk")
+    val expectedRaw = List(
+      "+I[user1, Tom, t...@gmail.com, 10.02]",
+      "+I[user2, Jack, j...@hotmail.com, 71.20]",
+      "-D[user1, Tom, t...@gmail.com, 10.02]",
+      "+I[user1, Tom, tom...@gmail.com, 8.10]",
+      "+I[user3, Bailey, bai...@gmail.com, 9.99]",
+      "-D[user2, Jack, j...@hotmail.com, 71.20]",
+      "+I[user4, Tina, t...@gmail.com, 11.30]",
+      "-D[user3, Bailey, bai...@gmail.com, 9.99]",
+      "+I[user3, Bailey, bai...@qq.com, 9.99]"
+    )
+    assertEquals(expectedRaw, rawResult.toList)
+  }
+
   @Test
   def testInsertPartColumn(): Unit = {
     tEnv.executeSql("""
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
index 0d18cb4c540..8ba4b792e2b 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializer.java
@@ -27,14 +27,19 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.generated.RecordEqualiser;
 import org.apache.flink.table.runtime.operators.TableStreamOperator;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -66,29 +71,59 @@ public class SinkUpsertMaterializer extends 
TableStreamOperator<RowData>
                     + "You can increase the state ttl to avoid this.";
 
     private final StateTtlConfig ttlConfig;
+    private final GeneratedRecordEqualiser generatedRecordEqualiser;
+    private final GeneratedRecordEqualiser generatedUpsertKeyEqualiser;
     private final TypeSerializer<RowData> serializer;
-    private final GeneratedRecordEqualiser generatedEqualiser;
+    private final int[] inputUpsertKey;
+    private final boolean hasUpsertKey;
 
+    // The equaliser here may behaviors differently due to hasUpsertKey:
+    // if true: the equaliser only compares the upsertKey (a projected row 
data)
+    // if false: the equaliser compares the complete row
     private transient RecordEqualiser equaliser;
+
     // Buffer of emitted insertions on which deletions will be applied first.
     // The row kind might be +I or +U and will be ignored when applying the 
deletion.
     private transient ValueState<List<RowData>> state;
     private transient TimestampedCollector<RowData> collector;
 
+    // Reused ProjectedRowData for comparing upsertKey if hasUpsertKey.
+    private transient ProjectedRowData upsertKeyProjectedRow1;
+    private transient ProjectedRowData upsertKeyProjectedRow2;
+
     public SinkUpsertMaterializer(
             StateTtlConfig ttlConfig,
             TypeSerializer<RowData> serializer,
-            GeneratedRecordEqualiser generatedEqualiser) {
+            GeneratedRecordEqualiser generatedRecordEqualiser,
+            @Nullable GeneratedRecordEqualiser generatedUpsertKeyEqualiser,
+            @Nullable int[] inputUpsertKey) {
         this.ttlConfig = ttlConfig;
         this.serializer = serializer;
-        this.generatedEqualiser = generatedEqualiser;
+        this.generatedRecordEqualiser = generatedRecordEqualiser;
+        this.generatedUpsertKeyEqualiser = generatedUpsertKeyEqualiser;
+        this.inputUpsertKey = inputUpsertKey;
+        this.hasUpsertKey = null != inputUpsertKey && inputUpsertKey.length > 
0;
+        if (hasUpsertKey) {
+            Preconditions.checkNotNull(
+                    generatedUpsertKeyEqualiser,
+                    "GeneratedUpsertKeyEqualiser cannot be null when 
inputUpsertKey is not empty!");
+        }
     }
 
     @Override
     public void open() throws Exception {
         super.open();
-        this.equaliser =
-                
generatedEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader());
+        if (hasUpsertKey) {
+            this.equaliser =
+                    generatedUpsertKeyEqualiser.newInstance(
+                            getRuntimeContext().getUserCodeClassLoader());
+            upsertKeyProjectedRow1 = ProjectedRowData.from(inputUpsertKey);
+            upsertKeyProjectedRow2 = ProjectedRowData.from(inputUpsertKey);
+        } else {
+            this.equaliser =
+                    generatedRecordEqualiser.newInstance(
+                            getRuntimeContext().getUserCodeClassLoader());
+        }
         ValueStateDescriptor<List<RowData>> descriptor =
                 new ValueStateDescriptor<>("values", new 
ListSerializer<>(serializer));
         if (ttlConfig.isEnabled()) {
@@ -109,31 +144,55 @@ public class SinkUpsertMaterializer extends 
TableStreamOperator<RowData>
         switch (row.getRowKind()) {
             case INSERT:
             case UPDATE_AFTER:
-                row.setRowKind(values.isEmpty() ? INSERT : UPDATE_AFTER);
-                values.add(row);
-                collector.collect(row);
+                addRow(values, row);
                 break;
 
             case UPDATE_BEFORE:
             case DELETE:
-                final int lastIndex = values.size() - 1;
-                final int index = removeFirst(values, row);
-                if (index == -1) {
-                    LOG.info(STATE_CLEARED_WARN_MSG);
-                    return;
-                }
-                if (values.isEmpty()) {
-                    // Delete this row
-                    row.setRowKind(DELETE);
-                    collector.collect(row);
-                } else if (index == lastIndex) {
-                    // Last row has been removed, update to the second last one
-                    final RowData latestRow = values.get(values.size() - 1);
-                    latestRow.setRowKind(UPDATE_AFTER);
-                    collector.collect(latestRow);
-                }
+                retractRow(values, row);
                 break;
         }
+    }
+
+    private void addRow(List<RowData> values, RowData add) throws IOException {
+        RowKind outRowKind = values.isEmpty() ? INSERT : UPDATE_AFTER;
+        if (hasUpsertKey) {
+            int index = findFirst(values, add);
+            if (index == -1) {
+                values.add(add);
+            } else {
+                values.set(index, add);
+            }
+        } else {
+            values.add(add);
+        }
+        add.setRowKind(outRowKind);
+        collector.collect(add);
+
+        // Always need to sync with state
+        state.update(values);
+    }
+
+    private void retractRow(List<RowData> values, RowData retract) throws 
IOException {
+        final int lastIndex = values.size() - 1;
+        final int index = findFirst(values, retract);
+        if (index == -1) {
+            LOG.info(STATE_CLEARED_WARN_MSG);
+            return;
+        } else {
+            // Remove first found row
+            values.remove(index);
+        }
+        if (values.isEmpty()) {
+            // Delete this row
+            retract.setRowKind(DELETE);
+            collector.collect(retract);
+        } else if (index == lastIndex) {
+            // Last row has been removed, update to the second last one
+            final RowData latestRow = values.get(values.size() - 1);
+            latestRow.setRowKind(UPDATE_AFTER);
+            collector.collect(latestRow);
+        }
 
         if (values.isEmpty()) {
             state.clear();
@@ -142,19 +201,25 @@ public class SinkUpsertMaterializer extends 
TableStreamOperator<RowData>
         }
     }
 
-    private int removeFirst(List<RowData> values, RowData remove) {
+    private int findFirst(List<RowData> values, RowData target) {
         final Iterator<RowData> iterator = values.iterator();
         int i = 0;
         while (iterator.hasNext()) {
-            final RowData row = iterator.next();
-            // Ignore kind during comparison
-            remove.setRowKind(row.getRowKind());
-            if (equaliser.equals(row, remove)) {
-                iterator.remove();
+            if (equalsIgnoreRowKind(target, iterator.next())) {
                 return i;
             }
             i++;
         }
         return -1;
     }
+
+    private boolean equalsIgnoreRowKind(RowData newRow, RowData oldRow) {
+        newRow.setRowKind(oldRow.getRowKind());
+        if (hasUpsertKey) {
+            return equaliser.equals(
+                    upsertKeyProjectedRow1.replaceRow(newRow),
+                    upsertKeyProjectedRow2.replaceRow(oldRow));
+        }
+        return equaliser.equals(newRow, oldRow);
+    }
 }
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
index cdc6299a839..671b2ec07d6 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.runtime.generated.RecordEqualiser;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.runtime.util.StateConfigUtil;
+import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.VarCharType;
@@ -43,16 +44,19 @@ import java.util.List;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.deleteRecord;
 import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.insertRecord;
 import static org.apache.flink.table.runtime.util.StreamRecordUtils.rowOfKind;
+import static 
org.apache.flink.table.runtime.util.StreamRecordUtils.updateAfterRecord;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link SinkUpsertMaterializer}. */
 public class SinkUpsertMaterializerTest {
 
     private final StateTtlConfig ttlConfig = 
StateConfigUtil.createTtlConfig(1000);
-    private final LogicalType[] types = new LogicalType[] {new IntType(), new 
VarCharType()};
+    private final LogicalType[] types =
+            new LogicalType[] {new BigIntType(), new IntType(), new 
VarCharType()};
     private final RowDataSerializer serializer = new RowDataSerializer(types);
     private final RowDataKeySelector keySelector =
-            HandwrittenSelectorUtil.getRowDataSelector(new int[0], types);
+            HandwrittenSelectorUtil.getRowDataSelector(new int[] {1}, types);
+
     private final GeneratedRecordEqualiser equaliser =
             new GeneratedRecordEqualiser("", "", new Object[0]) {
 
@@ -62,10 +66,20 @@ public class SinkUpsertMaterializerTest {
                 }
             };
 
+    private final GeneratedRecordEqualiser upsertKeyEqualiser =
+            new GeneratedRecordEqualiser("", "", new Object[0]) {
+
+                @Override
+                public RecordEqualiser newInstance(ClassLoader classLoader) {
+                    return new TestUpsertKeyEqualiser();
+                }
+            };
+
     @Test
     public void test() throws Exception {
         SinkUpsertMaterializer materializer =
-                new SinkUpsertMaterializer(ttlConfig, serializer, equaliser);
+                new SinkUpsertMaterializer(
+                        ttlConfig, serializer, equaliser, upsertKeyEqualiser, 
null);
         KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
                 new KeyedOneInputStreamOperatorTestHarness<>(
                         materializer, keySelector, 
keySelector.getProducedType());
@@ -74,30 +88,69 @@ public class SinkUpsertMaterializerTest {
 
         testHarness.setStateTtlProcessingTime(1);
 
-        testHarness.processElement(insertRecord(1, "a1"));
-        shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, "a1"));
+        testHarness.processElement(insertRecord(1L, 1, "a1"));
+        shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
 
-        testHarness.processElement(insertRecord(1, "a2"));
-        shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1, "a2"));
+        testHarness.processElement(insertRecord(2L, 1, "a2"));
+        shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 2L, 1, "a2"));
 
-        testHarness.processElement(insertRecord(1, "a3"));
-        shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1, "a3"));
+        testHarness.processElement(insertRecord(3L, 1, "a3"));
+        shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
 
-        testHarness.processElement(deleteRecord(1, "a2"));
+        testHarness.processElement(deleteRecord(2L, 1, "a2"));
         shouldEmitNothing(testHarness);
 
-        testHarness.processElement(deleteRecord(1, "a3"));
-        shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1, "a1"));
+        testHarness.processElement(deleteRecord(3L, 1, "a3"));
+        shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a1"));
 
-        testHarness.processElement(deleteRecord(1, "a1"));
-        shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1, "a1"));
+        testHarness.processElement(deleteRecord(1L, 1, "a1"));
+        shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 1L, 1, "a1"));
 
-        testHarness.processElement(insertRecord(1, "a4"));
-        shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1, "a4"));
+        testHarness.processElement(insertRecord(4L, 1, "a4"));
+        shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4"));
 
         testHarness.setStateTtlProcessingTime(1002);
 
-        testHarness.processElement(deleteRecord(1, "a4"));
+        testHarness.processElement(deleteRecord(4L, 1, "a4"));
+        shouldEmitNothing(testHarness);
+
+        testHarness.close();
+    }
+
+    @Test
+    public void testInputHasUpsertKeyWithNonDeterministicColumn() throws 
Exception {
+        SinkUpsertMaterializer materializer =
+                new SinkUpsertMaterializer(
+                        ttlConfig, serializer, equaliser, upsertKeyEqualiser, 
new int[] {0});
+        KeyedOneInputStreamOperatorTestHarness<RowData, RowData, RowData> 
testHarness =
+                new KeyedOneInputStreamOperatorTestHarness<>(
+                        materializer, keySelector, 
keySelector.getProducedType());
+
+        testHarness.open();
+
+        testHarness.setStateTtlProcessingTime(1);
+
+        testHarness.processElement(insertRecord(1L, 1, "a1"));
+        shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 1L, 1, "a1"));
+
+        testHarness.processElement(updateAfterRecord(1L, 1, "a11"));
+        shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 1L, 1, "a11"));
+
+        testHarness.processElement(insertRecord(3L, 1, "a3"));
+        shouldEmit(testHarness, rowOfKind(RowKind.UPDATE_AFTER, 3L, 1, "a3"));
+
+        testHarness.processElement(deleteRecord(1L, 1, "a111"));
+        shouldEmitNothing(testHarness);
+
+        testHarness.processElement(deleteRecord(3L, 1, "a33"));
+        shouldEmit(testHarness, rowOfKind(RowKind.DELETE, 3L, 1, "a33"));
+
+        testHarness.processElement(insertRecord(4L, 1, "a4"));
+        shouldEmit(testHarness, rowOfKind(RowKind.INSERT, 4L, 1, "a4"));
+
+        testHarness.setStateTtlProcessingTime(1002);
+
+        testHarness.processElement(deleteRecord(4L, 1, "a4"));
         shouldEmitNothing(testHarness);
 
         testHarness.close();
@@ -118,7 +171,8 @@ public class SinkUpsertMaterializerTest {
         Object o;
         while ((o = harness.getOutput().poll()) != null) {
             RowData value = (RowData) ((StreamRecord<?>) o).getValue();
-            GenericRowData newRow = GenericRowData.of(value.getInt(0), 
value.getString(1));
+            GenericRowData newRow =
+                    GenericRowData.of(value.getLong(0), value.getInt(1), 
value.getString(2));
             newRow.setRowKind(value.getRowKind());
             rows.add(newRow);
         }
@@ -129,8 +183,16 @@ public class SinkUpsertMaterializerTest {
         @Override
         public boolean equals(RowData row1, RowData row2) {
             return row1.getRowKind() == row2.getRowKind()
-                    && row1.getInt(0) == row2.getInt(0)
-                    && row1.getString(1).equals(row2.getString(1));
+                    && row1.getLong(0) == row2.getLong(0)
+                    && row1.getInt(1) == row2.getInt(1)
+                    && row1.getString(2).equals(row2.getString(2));
+        }
+    }
+
+    private static class TestUpsertKeyEqualiser implements RecordEqualiser {
+        @Override
+        public boolean equals(RowData row1, RowData row2) {
+            return row1.getRowKind() == row2.getRowKind() && row1.getLong(0) 
== row2.getLong(0);
         }
     }
 }

Reply via email to