This is an automated email from the ASF dual-hosted git repository. rkhachatryan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f08eb4345004b1422d100fccdee9667804122ad2 Author: Roman Khachatryan <[email protected]> AuthorDate: Fri Mar 13 13:08:50 2026 +0100 [FLINK-39740][table/runtime] Fix highSqn update in LinkedMultiSetState.add --- .../linked/LinkedMultiSetState.java | 2 +- .../SequencedMultiSetStateTest.java | 28 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java index 840fe007ebf..801bab4a973 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/LinkedMultiSetState.java @@ -185,8 +185,8 @@ public class LinkedMultiSetState implements SequencedMultiSetState<RowData> { isNewRowKey ? new Node(row, newSqn, highSqn, null, null, timestamp) : sqnToNodeState.get(oldSqn).withRow(row, timestamp)); - highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize)); if (isNewRowKey) { + highestSqnAndSizeState.update(MetaSqnInfo.of(newSqn, newSize)); rowToSqnState.put(key, RowSqnInfo.ofSingle(newSqn)); if (!isNewContextKey) { sqnToNodeState.put(highSqn, sqnToNodeState.get(highSqn).withNext(newSqn)); diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java index b68638115cf..38f3c826d21 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/SequencedMultiSetStateTest.java @@ -229,6 +229,34 @@ public class SequencedMultiSetStateTest { }); } + /** Test that replacing a non-tail row preserves the ability to add new rows afterwards. */ + @TestTemplate + public void testAddAfterReplacingNonTail() throws Exception { + runTest( + state -> { + state.add(row("k1", "v1"), 1L); + state.add(row("k2", "v2"), 2L); + state.add(row("k3", "v3"), 3L); + + // replace k1 (not the tail) - should not corrupt highSqn + state.add(row("k1", "v1-updated"), 4L); + assertStateContents( + state, + Tuple2.of(row("k1", "v1-updated"), 4L), + Tuple2.of(row("k2", "v2"), 2L), + Tuple2.of(row("k3", "v3"), 3L)); + + // adding a new key after the replace should work correctly + state.add(row("k4", "v4"), 5L); + assertStateContents( + state, + Tuple2.of(row("k1", "v1-updated"), 4L), + Tuple2.of(row("k2", "v2"), 2L), + Tuple2.of(row("k3", "v3"), 3L), + Tuple2.of(row("k4", "v4"), 5L)); + }); + } + @TestTemplate public void testAddAfterRemovingTail() throws Exception { runTest(
