This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac6cc0b8bf12d35dc505e16b766f95d17de0c3f9
Author: Gustavo de Morais <[email protected]>
AuthorDate: Tue Mar 24 11:43:07 2026 +0100

    [FLINK-38209][table-planner] Use UniqueKeys instead of Upsertkeys for 
inputSpec and state management for MultiJoin
---
 .../nodes/exec/stream/StreamExecMultiJoin.java     |  31 +++++-----
 .../physical/stream/StreamPhysicalMultiJoin.java   |  19 +++---
 .../exec/serde/ExecNodeMultiJoinJsonSerdeTest.java |   6 +-
 .../nodes/exec/stream/MultiJoinSemanticTests.java  |   3 +-
 .../nodes/exec/stream/MultiJoinTestPrograms.java   |  66 ++++++++++++++++++++-
 ...our-way-complex-updating-join-with-restore.json |   2 +-
 .../savepoint/_metadata                            | Bin 35162 -> 35375 bytes
 ...r-way-join-no-common-join-key-with-restore.json |   4 +-
 .../savepoint/_metadata                            | Bin 41248 -> 38180 bytes
 .../plan/three-way-inner-join-with-restore.json    |   2 +-
 .../savepoint/_metadata                            | Bin 21326 -> 21331 bytes
 ...way-join-with-time-attributes-with-restore.json |   2 +-
 .../savepoint/_metadata                            | Bin 24946 -> 23917 bytes
 .../three-way-left-outer-join-with-restore.json    |   2 +-
 .../savepoint/_metadata                            | Bin 21759 -> 21588 bytes
 15 files changed, 100 insertions(+), 37 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMultiJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMultiJoin.java
index b6ca3571e1a..31e04a693c7 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMultiJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMultiJoin.java
@@ -82,7 +82,7 @@ public class StreamExecMultiJoin extends ExecNodeBase<RowData>
     private static final String FIELD_NAME_JOIN_TYPES = "joinTypes";
     private static final String FIELD_NAME_JOIN_CONDITIONS = "joinConditions";
     private static final String FIELD_NAME_JOIN_ATTRIBUTE_MAP = 
"joinAttributeMap";
-    private static final String FIELD_NAME_INPUT_UPSERT_KEYS = 
"inputUpsertKeys";
+    private static final String FIELD_NAME_INPUT_UNIQUE_KEYS = 
"inputUniqueKeys";
     private static final String FIELD_NAME_MULTI_JOIN_CONDITION = 
"multiJoinCondition";
 
     @JsonProperty(FIELD_NAME_JOIN_TYPES)
@@ -100,12 +100,13 @@ public class StreamExecMultiJoin extends 
ExecNodeBase<RowData>
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     private final Map<Integer, List<ConditionAttributeRef>> joinAttributeMap;
 
-    @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS)
+    // Why List<List<int[]>> as a type
+    // Each unique key can be also a composite key with multiple fields, thus 
-> int[].
+    // Theoretically, each input can have multiple unique keys, thus -> 
List<int[]>
+    // Since we have multiple inputs -> List<List<int[]>>
+    @JsonProperty(FIELD_NAME_INPUT_UNIQUE_KEYS)
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
-    // List of upsert keys for each input, where each inner list corresponds 
to an input
-    // The reason it's a List<List<int[]>> is that SQL allows only one primary 
key but
-    // multiple upsert (unique) keys per input
-    private final List<List<int[]>> inputUpsertKeys;
+    private final List<List<int[]>> inputUniqueKeys;
 
     @JsonProperty(FIELD_NAME_STATE)
     @JsonInclude(JsonInclude.Include.NON_NULL)
@@ -117,7 +118,7 @@ public class StreamExecMultiJoin extends 
ExecNodeBase<RowData>
             final List<? extends @Nullable RexNode> joinConditions,
             @Nullable final RexNode multiJoinCondition,
             final Map<Integer, List<ConditionAttributeRef>> joinAttributeMap,
-            final List<List<int[]>> inputUpsertKeys,
+            final List<List<int[]>> inputUniqueKeys,
             final Map<Integer, Long> stateTtlFromHint,
             final List<InputProperty> inputProperties,
             final RowType outputType,
@@ -130,7 +131,7 @@ public class StreamExecMultiJoin extends 
ExecNodeBase<RowData>
                 joinConditions,
                 multiJoinCondition,
                 joinAttributeMap,
-                inputUpsertKeys,
+                inputUniqueKeys,
                 StateMetadata.getMultiInputOperatorDefaultMeta(
                         stateTtlFromHint, tableConfig, 
generateStateNames(inputProperties.size())),
                 inputProperties,
@@ -150,18 +151,18 @@ public class StreamExecMultiJoin extends 
ExecNodeBase<RowData>
                     final RexNode multiJoinCondition,
             @JsonProperty(FIELD_NAME_JOIN_ATTRIBUTE_MAP)
                     final Map<Integer, List<ConditionAttributeRef>> 
joinAttributeMap,
-            @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS) final 
List<List<int[]>> inputUpsertKeys,
+            @JsonProperty(FIELD_NAME_INPUT_UNIQUE_KEYS) final 
List<List<int[]>> inputUniqueKeys,
             @Nullable @JsonProperty(FIELD_NAME_STATE) final 
List<StateMetadata> stateMetadataList,
             @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) final 
List<InputProperty> inputProperties,
             @JsonProperty(FIELD_NAME_OUTPUT_TYPE) final RowType outputType,
             @JsonProperty(FIELD_NAME_DESCRIPTION) final String description) {
         super(id, context, persistedConfig, inputProperties, outputType, 
description);
-        validateInputs(inputProperties, joinTypes, joinConditions, 
inputUpsertKeys);
+        validateInputs(inputProperties, joinTypes, joinConditions, 
inputUniqueKeys);
         this.joinTypes = checkNotNull(joinTypes);
         this.joinConditions = checkNotNull(joinConditions);
         this.multiJoinCondition = multiJoinCondition;
-        this.inputUpsertKeys = checkNotNull(inputUpsertKeys);
         this.joinAttributeMap = 
Objects.requireNonNullElseGet(joinAttributeMap, Map::of);
+        this.inputUniqueKeys = checkNotNull(inputUniqueKeys);
         this.stateMetadataList = stateMetadataList;
     }
 
@@ -169,7 +170,7 @@ public class StreamExecMultiJoin extends 
ExecNodeBase<RowData>
             final List<InputProperty> inputProperties,
             final List<FlinkJoinType> joinTypes,
             final List<? extends @Nullable RexNode> joinConditions,
-            final List<List<int[]>> inputUpsertKeys) {
+            final List<List<int[]>> inputUniqueKeys) {
         checkArgument(
                 inputProperties.size() >= 2, "Multi-input join operator needs 
at least 2 inputs.");
         checkArgument(
@@ -179,8 +180,8 @@ public class StreamExecMultiJoin extends 
ExecNodeBase<RowData>
                 joinConditions.size() == inputProperties.size(),
                 "Size of joinConditions must match the number of inputs.");
         checkArgument(
-                inputUpsertKeys.size() == inputProperties.size(),
-                "Size of inputUpsertKeys must match the number of inputs.");
+                inputUniqueKeys.size() == inputProperties.size(),
+                "Size of inputUniqueKeys must match the number of inputs.");
     }
 
     private static String[] generateStateNames(int numInputs) {
@@ -220,7 +221,7 @@ public class StreamExecMultiJoin extends 
ExecNodeBase<RowData>
                             planner.getFlinkContext().getClassLoader(),
                             inputTypeInfos.get(i),
                             keyExtractor.getJoinKeyIndices(i),
-                            inputUpsertKeys.get(i)));
+                            inputUniqueKeys.get(i)));
         }
 
         final GeneratedJoinCondition[] generatedJoinConditions =
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java
index 0d5ee9c8b76..fb25cab268f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java
@@ -167,7 +167,7 @@ public class StreamPhysicalMultiJoin extends 
AbstractRelNode implements StreamPh
     @Override
     public ExecNode<?> translateToExecNode() {
         final RexNode multiJoinCondition = createMultiJoinCondition();
-        final List<List<int[]>> inputUpsertKeys = getUpsertKeysForInputs();
+        final List<List<int[]>> inputUniqueKeys = getUniqueKeysForInputs();
         final List<FlinkJoinType> execJoinTypes = getExecJoinTypes();
         final List<InputProperty> inputProperties = createInputProperties();
 
@@ -177,7 +177,7 @@ public class StreamPhysicalMultiJoin extends 
AbstractRelNode implements StreamPh
                 joinConditions,
                 multiJoinCondition,
                 joinAttributeMap,
-                inputUpsertKeys,
+                inputUniqueKeys,
                 Collections.emptyMap(), // TODO Enable hint-based state ttl. 
See ticket
                 // TODO https://issues.apache.org/jira/browse/FLINK-37936
                 inputProperties,
@@ -194,26 +194,27 @@ public class StreamPhysicalMultiJoin extends 
AbstractRelNode implements StreamPh
         return RexUtil.composeConjunction(getCluster().getRexBuilder(), 
conjunctions, true);
     }
 
-    private List<List<int[]>> getUpsertKeysForInputs() {
+    private List<List<int[]>> getUniqueKeysForInputs() {
         return inputs.stream()
                 .map(
                         input -> {
-                            final Set<ImmutableBitSet> upsertKeys = 
getUpsertKeys(input);
+                            final Set<ImmutableBitSet> uniqueKeys = 
getUniqueKeys(input);
 
-                            if (upsertKeys == null) {
+                            if (uniqueKeys == null) {
                                 return Collections.<int[]>emptyList();
                             }
-                            return upsertKeys.stream()
+
+                            return uniqueKeys.stream()
                                     .map(ImmutableBitSet::toArray)
                                     .collect(Collectors.toList());
                         })
                 .collect(Collectors.toList());
     }
 
-    private @Nullable Set<ImmutableBitSet> getUpsertKeys(RelNode input) {
+    private @Nullable Set<ImmutableBitSet> getUniqueKeys(RelNode input) {
         final FlinkRelMetadataQuery fmq =
                 
FlinkRelMetadataQuery.reuseOrCreate(input.getCluster().getMetadataQuery());
-        return fmq.getUpsertKeys(input);
+        return fmq.getUniqueKeys(input);
     }
 
     private List<FlinkJoinType> getExecJoinTypes() {
@@ -255,7 +256,7 @@ public class StreamPhysicalMultiJoin extends 
AbstractRelNode implements StreamPh
      */
     public boolean inputUniqueKeyContainsCommonJoinKey(int inputId) {
         final RelNode input = getInputs().get(inputId);
-        final Set<ImmutableBitSet> inputUniqueKeys = getUpsertKeys(input);
+        final Set<ImmutableBitSet> inputUniqueKeys = getUniqueKeys(input);
         if (inputUniqueKeys == null || inputUniqueKeys.isEmpty()) {
             return false;
         }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java
index 38454ac9fed..14e2489a06d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java
@@ -101,7 +101,7 @@ class ExecNodeMultiJoinJsonSerdeTest {
         // MultiJoin specific fields
         JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", 
"joinTypes");
         JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", 
"joinAttributeMap");
-        JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", 
"inputUpsertKeys");
+        JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", 
"inputUniqueKeys");
         JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", 
"joinConditions");
 
         // Verify specific field values
@@ -112,8 +112,8 @@ class ExecNodeMultiJoinJsonSerdeTest {
         assertThat(node.get("joinTypes"))
                 .containsExactly(new TextNode("INNER"), new TextNode("INNER"));
         assertThat(node.get("joinAttributeMap").isObject()).isTrue();
-        assertThat(node.get("inputUpsertKeys").isArray()).isTrue();
-        assertThat(node.get("inputUpsertKeys")).hasSize(2);
+        assertThat(node.get("inputUniqueKeys").isArray()).isTrue();
+        assertThat(node.get("inputUniqueKeys")).hasSize(2);
         assertThat(node.get("joinConditions").isArray()).isTrue();
         assertThat(node.get("joinConditions")).hasSize(2);
         assertThat(node.get("inputProperties").isArray()).isTrue();
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
index ef069feba05..42db5806102 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java
@@ -39,6 +39,7 @@ public class MultiJoinSemanticTests extends SemanticTestBase {
                 MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_NO_COMMON_JOIN_KEY,
                 
MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_CTE,
                 MultiJoinTestPrograms.MULTI_JOIN_LEFT_OUTER_WITH_NULL_KEYS,
-                
MultiJoinTestPrograms.MULTI_JOIN_NULL_SAFE_JOIN_WITH_NULL_KEYS);
+                MultiJoinTestPrograms.MULTI_JOIN_NULL_SAFE_JOIN_WITH_NULL_KEYS,
+                MultiJoinTestPrograms.MULTI_JOIN_MIXED_CHANGELOG_MODES);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
index 38db1af79e1..ffc47c62a20 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java
@@ -545,7 +545,7 @@ public class MultiJoinTestPrograms {
                                             "order_id STRING",
                                             "payment_id STRING",
                                             "location STRING")
-                                    .addOption("changelog-mode", "I,UA,UB,D")
+                                    .addOption("sink-changelog-mode-enforced", 
"I,UA,UB,D")
                                     .consumedBeforeRestore(
                                             "+I[1, Gus, order0, 1, London]",
                                             "+I[1, Gus, order1, 1, London]",
@@ -650,7 +650,7 @@ public class MultiJoinTestPrograms {
                                             "order_id STRING",
                                             "payment_id STRING",
                                             "location STRING")
-                                    .addOption("changelog-mode", "I,UA,UB,D")
+                                    .addOption("sink-changelog-mode-enforced", 
"I,UA,UB,D")
                                     .consumedValues(
                                             "+I[1, Gus, order0, 1, London]",
                                             "+I[1, Gus, order1, 1, London]",
@@ -758,7 +758,7 @@ public class MultiJoinTestPrograms {
                                             "order_id STRING",
                                             "payment_id STRING",
                                             "location STRING")
-                                    .addOption("changelog-mode", "I,UA,UB,D")
+                                    .addOption("sink-changelog-mode-enforced", 
"I,UA,UB,D")
                                     .consumedBeforeRestore(
                                             "+I[1, Gus, order0, payment1, 
London]",
                                             "+I[1, Gus, order1, payment1, 
London]",
@@ -1042,4 +1042,64 @@ public class MultiJoinTestPrograms {
                                     + "FROM UsersNullSafe u "
                                     + "INNER JOIN OrdersNullSafe o ON 
u.user_id IS NOT DISTINCT FROM o.user_id")
                     .build();
+
+    public static final TableTestProgram MULTI_JOIN_MIXED_CHANGELOG_MODES =
+            TableTestProgram.of(
+                            "three-way-mixed-changelog-modes",
+                            "three way join with mixed changelog modes and 
primary key configurations")
+                    
.setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("AppendTable")
+                                    .addSchema("id STRING PRIMARY KEY NOT 
ENFORCED, val STRING")
+                                    .addOption("changelog-mode", "I")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "1", 
"append1"),
+                                            Row.ofKind(RowKind.INSERT, "2", 
"append2"),
+                                            Row.ofKind(RowKind.INSERT, "3", 
"append3"))
+                                    .build())
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("RetractTable")
+                                    .addSchema("ref_id STRING, data STRING")
+                                    .addOption("changelog-mode", "I,UA,UB,D")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "1", 
"retract1"),
+                                            Row.ofKind(RowKind.INSERT, "2", 
"retract2"),
+                                            Row.ofKind(RowKind.INSERT, "3", 
"retract3"),
+                                            Row.ofKind(RowKind.DELETE, "3", 
"retract3"),
+                                            Row.ofKind(RowKind.INSERT, "1", 
"retract1_new"))
+                                    .build())
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("UpsertTable")
+                                    .addSchema(
+                                            "key_id STRING PRIMARY KEY NOT 
ENFORCED, status STRING")
+                                    .addOption("changelog-mode", "I,UA,D")
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, "1", 
"active"),
+                                            Row.ofKind(RowKind.INSERT, "2", 
"pending"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"2", "active"),
+                                            Row.ofKind(RowKind.INSERT, "3", 
"inactive"),
+                                            Row.ofKind(RowKind.DELETE, "3", 
"inactive"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(
+                                            "id STRING",
+                                            "val STRING",
+                                            "data STRING",
+                                            "status STRING")
+                                    .addOption("sink-changelog-mode-enforced", 
"I,UA,UB,D")
+                                    .consumedValues(
+                                            "+I[1, append1, retract1, active]",
+                                            "+I[2, append2, retract2, active]",
+                                            "+I[1, append1, retract1_new, 
active]",
+                                            "+I[3, append3, null, null]")
+                                    .testMaterializedData()
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink "
+                                    + "SELECT a.id, a.val, r.data, u.status "
+                                    + "FROM AppendTable a "
+                                    + "LEFT JOIN RetractTable r ON a.id = 
r.ref_id "
+                                    + "LEFT JOIN UpsertTable u ON a.id = 
u.key_id")
+                    .build();
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json
index 0a9fd86b289..8a415d1da8c 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json
@@ -362,7 +362,7 @@
         "rightFieldIndex" : 1
       } ]
     },
-    "inputUpsertKeys" : [ [ [ 1 ] ], [ ], [ ], [ ] ],
+    "inputUniqueKeys" : [ [ [ 1 ] ], [ [ 0 ] ], [ [ 1 ] ], [ ] ],
     "state" : [ {
       "index" : 0,
       "ttl" : "0 ms",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/savepoint/_metadata
index b176b457713..061d0b08163 100644
Binary files 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/savepoint/_metadata
 and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json
index ecfa2f8e9f9..4b020388a61 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json
@@ -228,7 +228,7 @@
         "rightFieldIndex" : 0
       } ]
     },
-    "inputUpsertKeys" : [ [ [ 1 ] ], [ ], [ ] ],
+    "inputUniqueKeys" : [ [ [ 1 ] ], [ [ 0 ] ], [ [ 1 ] ] ],
     "state" : [ {
       "index" : 0,
       "ttl" : "0 ms",
@@ -344,7 +344,7 @@
         "rightFieldIndex" : 1
       } ]
     },
-    "inputUpsertKeys" : [ [ ], [ ] ],
+    "inputUniqueKeys" : [ [ ], [ ] ],
     "state" : [ {
       "index" : 0,
       "ttl" : "0 ms",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/savepoint/_metadata
index 5282c8c245b..f53211043d6 100644
Binary files 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/savepoint/_metadata
 and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json
index 1a5a9ee06c8..3b650218933 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json
@@ -188,7 +188,7 @@
         "rightFieldIndex" : 0
       } ]
     },
-    "inputUpsertKeys" : [ [ ], [ ], [ ] ],
+    "inputUniqueKeys" : [ [ ], [ ], [ ] ],
     "state" : [ {
       "index" : 0,
       "ttl" : "0 ms",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/savepoint/_metadata
index fd072e08e9c..377ec1862c7 100644
Binary files 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/savepoint/_metadata
 and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json
index 03324337ecc..3fabcaceda1 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json
@@ -348,7 +348,7 @@
         "rightFieldIndex" : 2
       } ]
     },
-    "inputUpsertKeys" : [ [ [ 0 ] ], [ ], [ ] ],
+    "inputUniqueKeys" : [ [ [ 0 ] ], [ [ 0 ] ], [ ] ],
     "state" : [ {
       "index" : 0,
       "ttl" : "0 ms",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/savepoint/_metadata
index c21172f3c7d..f11caa837fe 100644
Binary files 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/savepoint/_metadata
 and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/savepoint/_metadata
 differ
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json
index 72c40250564..5105a4410d9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json
+++ 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json
@@ -153,7 +153,7 @@
         "rightFieldIndex" : 0
       } ]
     },
-    "inputUpsertKeys" : [ [ ], [ ], [ ] ],
+    "inputUniqueKeys" : [ [ ], [ ], [ ] ],
     "state" : [ {
       "index" : 0,
       "ttl" : "0 ms",
diff --git 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/savepoint/_metadata
 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/savepoint/_metadata
index e3228781a06..f0288325eee 100644
Binary files 
a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/savepoint/_metadata
 and 
b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/savepoint/_metadata
 differ

Reply via email to