This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch release-2.1 in repository https://gitbox.apache.org/repos/asf/flink.git
commit ac6cc0b8bf12d35dc505e16b766f95d17de0c3f9 Author: Gustavo de Morais <[email protected]> AuthorDate: Tue Mar 24 11:43:07 2026 +0100 [FLINK-38209][table-planner] Use UniqueKeys instead of Upsertkeys for inputSpec and state management for MultiJoin --- .../nodes/exec/stream/StreamExecMultiJoin.java | 31 +++++----- .../physical/stream/StreamPhysicalMultiJoin.java | 19 +++--- .../exec/serde/ExecNodeMultiJoinJsonSerdeTest.java | 6 +- .../nodes/exec/stream/MultiJoinSemanticTests.java | 3 +- .../nodes/exec/stream/MultiJoinTestPrograms.java | 66 ++++++++++++++++++++- ...our-way-complex-updating-join-with-restore.json | 2 +- .../savepoint/_metadata | Bin 35162 -> 35375 bytes ...r-way-join-no-common-join-key-with-restore.json | 4 +- .../savepoint/_metadata | Bin 41248 -> 38180 bytes .../plan/three-way-inner-join-with-restore.json | 2 +- .../savepoint/_metadata | Bin 21326 -> 21331 bytes ...way-join-with-time-attributes-with-restore.json | 2 +- .../savepoint/_metadata | Bin 24946 -> 23917 bytes .../three-way-left-outer-join-with-restore.json | 2 +- .../savepoint/_metadata | Bin 21759 -> 21588 bytes 15 files changed, 100 insertions(+), 37 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMultiJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMultiJoin.java index b6ca3571e1a..31e04a693c7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMultiJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMultiJoin.java @@ -82,7 +82,7 @@ public class StreamExecMultiJoin extends ExecNodeBase<RowData> private static final String FIELD_NAME_JOIN_TYPES = "joinTypes"; private static final String FIELD_NAME_JOIN_CONDITIONS = "joinConditions"; private static final String FIELD_NAME_JOIN_ATTRIBUTE_MAP = "joinAttributeMap"; - private static final String FIELD_NAME_INPUT_UPSERT_KEYS = "inputUpsertKeys"; + private static final String FIELD_NAME_INPUT_UNIQUE_KEYS = "inputUniqueKeys"; private static final String FIELD_NAME_MULTI_JOIN_CONDITION = "multiJoinCondition"; @JsonProperty(FIELD_NAME_JOIN_TYPES) @@ -100,12 +100,13 @@ public class StreamExecMultiJoin extends ExecNodeBase<RowData> @JsonInclude(JsonInclude.Include.NON_EMPTY) private final Map<Integer, List<ConditionAttributeRef>> joinAttributeMap; - @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS) + // Why List<List<int[]>> as a type + // Each unique key can be also a composite key with multiple fields, thus -> int[]. + // Theoretically, each input can have multiple unique keys, thus -> List<int[]> + // Since we have multiple inputs -> List<List<int[]>> + @JsonProperty(FIELD_NAME_INPUT_UNIQUE_KEYS) @JsonInclude(JsonInclude.Include.NON_EMPTY) - // List of upsert keys for each input, where each inner list corresponds to an input - // The reason it's a List<List<int[]>> is that SQL allows only one primary key but - // multiple upsert (unique) keys per input - private final List<List<int[]>> inputUpsertKeys; + private final List<List<int[]>> inputUniqueKeys; @JsonProperty(FIELD_NAME_STATE) @JsonInclude(JsonInclude.Include.NON_NULL) @@ -117,7 +118,7 @@ public class StreamExecMultiJoin extends ExecNodeBase<RowData> final List<? extends @Nullable RexNode> joinConditions, @Nullable final RexNode multiJoinCondition, final Map<Integer, List<ConditionAttributeRef>> joinAttributeMap, - final List<List<int[]>> inputUpsertKeys, + final List<List<int[]>> inputUniqueKeys, final Map<Integer, Long> stateTtlFromHint, final List<InputProperty> inputProperties, final RowType outputType, @@ -130,7 +131,7 @@ public class StreamExecMultiJoin extends ExecNodeBase<RowData> joinConditions, multiJoinCondition, joinAttributeMap, - inputUpsertKeys, + inputUniqueKeys, StateMetadata.getMultiInputOperatorDefaultMeta( stateTtlFromHint, tableConfig, generateStateNames(inputProperties.size())), inputProperties, @@ -150,18 +151,18 @@ public class StreamExecMultiJoin extends ExecNodeBase<RowData> final RexNode multiJoinCondition, @JsonProperty(FIELD_NAME_JOIN_ATTRIBUTE_MAP) final Map<Integer, List<ConditionAttributeRef>> joinAttributeMap, - @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS) final List<List<int[]>> inputUpsertKeys, + @JsonProperty(FIELD_NAME_INPUT_UNIQUE_KEYS) final List<List<int[]>> inputUniqueKeys, @Nullable @JsonProperty(FIELD_NAME_STATE) final List<StateMetadata> stateMetadataList, @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) final List<InputProperty> inputProperties, @JsonProperty(FIELD_NAME_OUTPUT_TYPE) final RowType outputType, @JsonProperty(FIELD_NAME_DESCRIPTION) final String description) { super(id, context, persistedConfig, inputProperties, outputType, description); - validateInputs(inputProperties, joinTypes, joinConditions, inputUpsertKeys); + validateInputs(inputProperties, joinTypes, joinConditions, inputUniqueKeys); this.joinTypes = checkNotNull(joinTypes); this.joinConditions = checkNotNull(joinConditions); this.multiJoinCondition = multiJoinCondition; - this.inputUpsertKeys = checkNotNull(inputUpsertKeys); this.joinAttributeMap = Objects.requireNonNullElseGet(joinAttributeMap, Map::of); + this.inputUniqueKeys = checkNotNull(inputUniqueKeys); this.stateMetadataList = stateMetadataList; } @@ -169,7 +170,7 @@ public class StreamExecMultiJoin extends ExecNodeBase<RowData> final List<InputProperty> inputProperties, final List<FlinkJoinType> joinTypes, final List<? extends @Nullable RexNode> joinConditions, - final List<List<int[]>> inputUpsertKeys) { + final List<List<int[]>> inputUniqueKeys) { checkArgument( inputProperties.size() >= 2, "Multi-input join operator needs at least 2 inputs."); checkArgument( @@ -179,8 +180,8 @@ public class StreamExecMultiJoin extends ExecNodeBase<RowData> joinConditions.size() == inputProperties.size(), "Size of joinConditions must match the number of inputs."); checkArgument( - inputUpsertKeys.size() == inputProperties.size(), - "Size of inputUpsertKeys must match the number of inputs."); + inputUniqueKeys.size() == inputProperties.size(), + "Size of inputUniqueKeys must match the number of inputs."); } private static String[] generateStateNames(int numInputs) { @@ -220,7 +221,7 @@ public class StreamExecMultiJoin extends ExecNodeBase<RowData> planner.getFlinkContext().getClassLoader(), inputTypeInfos.get(i), keyExtractor.getJoinKeyIndices(i), - inputUpsertKeys.get(i))); + inputUniqueKeys.get(i))); } final GeneratedJoinCondition[] generatedJoinConditions = diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java index 0d5ee9c8b76..fb25cab268f 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMultiJoin.java @@ -167,7 +167,7 @@ public class StreamPhysicalMultiJoin extends AbstractRelNode implements StreamPh @Override public ExecNode<?> translateToExecNode() { final RexNode multiJoinCondition = createMultiJoinCondition(); - final List<List<int[]>> inputUpsertKeys = getUpsertKeysForInputs(); + final List<List<int[]>> inputUniqueKeys = getUniqueKeysForInputs(); final List<FlinkJoinType> execJoinTypes = getExecJoinTypes(); final List<InputProperty> inputProperties = createInputProperties(); @@ -177,7 +177,7 @@ public class StreamPhysicalMultiJoin extends AbstractRelNode implements StreamPh joinConditions, multiJoinCondition, joinAttributeMap, - inputUpsertKeys, + inputUniqueKeys, Collections.emptyMap(), // TODO Enable hint-based state ttl. See ticket // TODO https://issues.apache.org/jira/browse/FLINK-37936 inputProperties, @@ -194,26 +194,27 @@ public class StreamPhysicalMultiJoin extends AbstractRelNode implements StreamPh return RexUtil.composeConjunction(getCluster().getRexBuilder(), conjunctions, true); } - private List<List<int[]>> getUpsertKeysForInputs() { + private List<List<int[]>> getUniqueKeysForInputs() { return inputs.stream() .map( input -> { - final Set<ImmutableBitSet> upsertKeys = getUpsertKeys(input); + final Set<ImmutableBitSet> uniqueKeys = getUniqueKeys(input); - if (upsertKeys == null) { + if (uniqueKeys == null) { return Collections.<int[]>emptyList(); } - return upsertKeys.stream() + + return uniqueKeys.stream() .map(ImmutableBitSet::toArray) .collect(Collectors.toList()); }) .collect(Collectors.toList()); } - private @Nullable Set<ImmutableBitSet> getUpsertKeys(RelNode input) { + private @Nullable Set<ImmutableBitSet> getUniqueKeys(RelNode input) { final FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(input.getCluster().getMetadataQuery()); - return fmq.getUpsertKeys(input); + return fmq.getUniqueKeys(input); } private List<FlinkJoinType> getExecJoinTypes() { @@ -255,7 +256,7 @@ public class StreamPhysicalMultiJoin extends AbstractRelNode implements StreamPh */ public boolean inputUniqueKeyContainsCommonJoinKey(int inputId) { final RelNode input = getInputs().get(inputId); - final Set<ImmutableBitSet> inputUniqueKeys = getUpsertKeys(input); + final Set<ImmutableBitSet> inputUniqueKeys = getUniqueKeys(input); if (inputUniqueKeys == null || inputUniqueKeys.isEmpty()) { return false; } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java index 38454ac9fed..14e2489a06d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeMultiJoinJsonSerdeTest.java @@ -101,7 +101,7 @@ class ExecNodeMultiJoinJsonSerdeTest { // MultiJoin specific fields JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "joinTypes"); JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "joinAttributeMap"); - JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "inputUpsertKeys"); + JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "inputUniqueKeys"); JsonSerdeTestUtil.assertThatJsonContains(jsonNode, "nodes", "0", "joinConditions"); // Verify specific field values @@ -112,8 +112,8 @@ class ExecNodeMultiJoinJsonSerdeTest { assertThat(node.get("joinTypes")) .containsExactly(new TextNode("INNER"), new TextNode("INNER")); assertThat(node.get("joinAttributeMap").isObject()).isTrue(); - assertThat(node.get("inputUpsertKeys").isArray()).isTrue(); - assertThat(node.get("inputUpsertKeys")).hasSize(2); + assertThat(node.get("inputUniqueKeys").isArray()).isTrue(); + assertThat(node.get("inputUniqueKeys")).hasSize(2); assertThat(node.get("joinConditions").isArray()).isTrue(); assertThat(node.get("joinConditions")).hasSize(2); assertThat(node.get("inputProperties").isArray()).isTrue(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java index ef069feba05..42db5806102 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinSemanticTests.java @@ -39,6 +39,7 @@ public class MultiJoinSemanticTests extends SemanticTestBase { MultiJoinTestPrograms.MULTI_JOIN_FOUR_WAY_NO_COMMON_JOIN_KEY, MultiJoinTestPrograms.MULTI_JOIN_THREE_WAY_LEFT_OUTER_JOIN_WITH_CTE, MultiJoinTestPrograms.MULTI_JOIN_LEFT_OUTER_WITH_NULL_KEYS, - MultiJoinTestPrograms.MULTI_JOIN_NULL_SAFE_JOIN_WITH_NULL_KEYS); + MultiJoinTestPrograms.MULTI_JOIN_NULL_SAFE_JOIN_WITH_NULL_KEYS, + MultiJoinTestPrograms.MULTI_JOIN_MIXED_CHANGELOG_MODES); } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java index 38db1af79e1..ffc47c62a20 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/MultiJoinTestPrograms.java @@ -545,7 +545,7 @@ public class MultiJoinTestPrograms { "order_id STRING", "payment_id STRING", "location STRING") - .addOption("changelog-mode", "I,UA,UB,D") + .addOption("sink-changelog-mode-enforced", "I,UA,UB,D") .consumedBeforeRestore( "+I[1, Gus, order0, 1, London]", "+I[1, Gus, order1, 1, London]", @@ -650,7 +650,7 @@ public class MultiJoinTestPrograms { "order_id STRING", "payment_id STRING", "location STRING") - .addOption("changelog-mode", "I,UA,UB,D") + .addOption("sink-changelog-mode-enforced", "I,UA,UB,D") .consumedValues( "+I[1, Gus, order0, 1, London]", "+I[1, Gus, order1, 1, London]", @@ -758,7 +758,7 @@ public class MultiJoinTestPrograms { "order_id STRING", "payment_id STRING", "location STRING") - .addOption("changelog-mode", "I,UA,UB,D") + .addOption("sink-changelog-mode-enforced", "I,UA,UB,D") .consumedBeforeRestore( "+I[1, Gus, order0, payment1, London]", "+I[1, Gus, order1, payment1, London]", @@ -1042,4 +1042,64 @@ public class MultiJoinTestPrograms { + "FROM UsersNullSafe u " + "INNER JOIN OrdersNullSafe o ON u.user_id IS NOT DISTINCT FROM o.user_id") .build(); + + public static final TableTestProgram MULTI_JOIN_MIXED_CHANGELOG_MODES = + TableTestProgram.of( + "three-way-mixed-changelog-modes", + "three way join with mixed changelog modes and primary key configurations") + .setupConfig(OptimizerConfigOptions.TABLE_OPTIMIZER_MULTI_JOIN_ENABLED, true) + .setupTableSource( + SourceTestStep.newBuilder("AppendTable") + .addSchema("id STRING PRIMARY KEY NOT ENFORCED, val STRING") + .addOption("changelog-mode", "I") + .producedValues( + Row.ofKind(RowKind.INSERT, "1", "append1"), + Row.ofKind(RowKind.INSERT, "2", "append2"), + Row.ofKind(RowKind.INSERT, "3", "append3")) + .build()) + .setupTableSource( + SourceTestStep.newBuilder("RetractTable") + .addSchema("ref_id STRING, data STRING") + .addOption("changelog-mode", "I,UA,UB,D") + .producedValues( + Row.ofKind(RowKind.INSERT, "1", "retract1"), + Row.ofKind(RowKind.INSERT, "2", "retract2"), + Row.ofKind(RowKind.INSERT, "3", "retract3"), + Row.ofKind(RowKind.DELETE, "3", "retract3"), + Row.ofKind(RowKind.INSERT, "1", "retract1_new")) + .build()) + .setupTableSource( + SourceTestStep.newBuilder("UpsertTable") + .addSchema( + "key_id STRING PRIMARY KEY NOT ENFORCED, status STRING") + .addOption("changelog-mode", "I,UA,D") + .producedValues( + Row.ofKind(RowKind.INSERT, "1", "active"), + Row.ofKind(RowKind.INSERT, "2", "pending"), + Row.ofKind(RowKind.UPDATE_AFTER, "2", "active"), + Row.ofKind(RowKind.INSERT, "3", "inactive"), + Row.ofKind(RowKind.DELETE, "3", "inactive")) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink") + .addSchema( + "id STRING", + "val STRING", + "data STRING", + "status STRING") + .addOption("sink-changelog-mode-enforced", "I,UA,UB,D") + .consumedValues( + "+I[1, append1, retract1, active]", + "+I[2, append2, retract2, active]", + "+I[1, append1, retract1_new, active]", + "+I[3, append3, null, null]") + .testMaterializedData() + .build()) + .runSql( + "INSERT INTO sink " + + "SELECT a.id, a.val, r.data, u.status " + + "FROM AppendTable a " + + "LEFT JOIN RetractTable r ON a.id = r.ref_id " + + "LEFT JOIN UpsertTable u ON a.id = u.key_id") + .build(); } diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json index 0a9fd86b289..8a415d1da8c 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/plan/four-way-complex-updating-join-with-restore.json @@ -362,7 +362,7 @@ "rightFieldIndex" : 1 } ] }, - "inputUpsertKeys" : [ [ [ 1 ] ], [ ], [ ], [ ] ], + "inputUniqueKeys" : [ [ [ 1 ] ], [ [ 0 ] ], [ [ 1 ] ], [ ] ], "state" : [ { "index" : 0, "ttl" : "0 ms", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/savepoint/_metadata index b176b457713..061d0b08163 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-complex-updating-join-with-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json index ecfa2f8e9f9..4b020388a61 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/plan/four-way-join-no-common-join-key-with-restore.json @@ -228,7 +228,7 @@ "rightFieldIndex" : 0 } ] }, - "inputUpsertKeys" : [ [ [ 1 ] ], [ ], [ ] ], + "inputUniqueKeys" : [ [ [ 1 ] ], [ [ 0 ] ], [ [ 1 ] ] ], "state" : [ { "index" : 0, "ttl" : "0 ms", @@ -344,7 +344,7 @@ "rightFieldIndex" : 1 } ] }, - "inputUpsertKeys" : [ [ ], [ ] ], + "inputUniqueKeys" : [ [ ], [ ] ], "state" : [ { "index" : 0, "ttl" : "0 ms", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/savepoint/_metadata index 5282c8c245b..f53211043d6 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/four-way-join-no-common-join-key-with-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json index 1a5a9ee06c8..3b650218933 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/plan/three-way-inner-join-with-restore.json @@ -188,7 +188,7 @@ "rightFieldIndex" : 0 } ] }, - "inputUpsertKeys" : [ [ ], [ ], [ ] ], + "inputUniqueKeys" : [ [ ], [ ], [ ] ], "state" : [ { "index" : 0, "ttl" : "0 ms", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/savepoint/_metadata index fd072e08e9c..377ec1862c7 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-inner-join-with-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json index 03324337ecc..3fabcaceda1 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/plan/three-way-join-with-time-attributes-with-restore.json @@ -348,7 +348,7 @@ "rightFieldIndex" : 2 } ] }, - "inputUpsertKeys" : [ [ [ 0 ] ], [ ], [ ] ], + "inputUniqueKeys" : [ [ [ 0 ] ], [ [ 0 ] ], [ ] ], "state" : [ { "index" : 0, "ttl" : "0 ms", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/savepoint/_metadata index c21172f3c7d..f11caa837fe 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-join-with-time-attributes-with-restore/savepoint/_metadata differ diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json index 72c40250564..5105a4410d9 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/plan/three-way-left-outer-join-with-restore.json @@ -153,7 +153,7 @@ "rightFieldIndex" : 0 } ] }, - "inputUpsertKeys" : [ [ ], [ ], [ ] ], + "inputUniqueKeys" : [ [ ], [ ], [ ] ], "state" : [ { "index" : 0, "ttl" : "0 ms", diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/savepoint/_metadata b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/savepoint/_metadata index e3228781a06..f0288325eee 100644 Binary files a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/savepoint/_metadata and b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-multi-join_1/three-way-left-outer-join-with-restore/savepoint/_metadata differ
