This is an automated email from the ASF dual-hosted git repository.

rkhachatryan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f422510a8f3150ac67d40d30cca67065a8c54e5
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Mar 13 13:10:28 2026 +0100

    [hotfix][table/runtime] Fix 
RowDataKeySerializerSnapshot.resolveSchemaCompatibility()
---
 .../linked/RowDataKeySerializerSnapshot.java       |  6 ++-
 .../linked/RowDataKeySerializerTest.java           | 57 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 2 deletions(-)

diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
index f4a690dbf5b..99bb6a03f2f 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerSnapshot.java
@@ -115,8 +115,10 @@ public class RowDataKeySerializerSnapshot implements 
TypeSerializerSnapshot<RowD
         RowDataKeySerializerSnapshot old = (RowDataKeySerializerSnapshot) 
oldSerializerSnapshot;
 
         TypeSerializerSchemaCompatibility<RowData> compatibility =
-                
old.restoredRowDataSerializerSnapshot.resolveSchemaCompatibility(
-                        old.serializer.serializer.snapshotConfiguration());
+                serializer
+                        .serializer
+                        .snapshotConfiguration()
+                        
.resolveSchemaCompatibility(old.restoredRowDataSerializerSnapshot);
 
         return mapToOuterCompatibility(
                 compatibility,
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java
index f78faf9f343..b59e13a0ded 100644
--- 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/sequencedmultisetstate/linked/RowDataKeySerializerTest.java
@@ -20,7 +20,12 @@ package 
org.apache.flink.table.runtime.sequencedmultisetstate.linked;
 
 import org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
+import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
+import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
@@ -29,9 +34,16 @@ import 
org.apache.flink.table.runtime.generated.RecordEqualiser;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.runtime.util.StreamRecordUtils;
 import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.VarCharType;
 
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.util.Objects;
 
+import static org.assertj.core.api.Assertions.assertThat;
+
 /** Test for {@link RowDataKeySerializer}. */
 public class RowDataKeySerializerTest extends SerializerTestBase<RowDataKey> {
 
@@ -62,6 +74,51 @@ public class RowDataKeySerializerTest extends 
SerializerTestBase<RowDataKey> {
         return new RowDataKey[] {new RowDataKey(StreamRecordUtils.row(123), 
equaliser, equaliser)};
     }
 
+    @Test
+    void testResolveSchemaCompatibilityWithDifferentSchema() throws Exception {
+        // Create a snapshot from the "old" serializer (IntType only) and 
serialize it
+        RowDataKeySerializer oldSerializer =
+                new RowDataKeySerializer(
+                        new RowDataSerializer(new IntType()),
+                        equaliser,
+                        equaliser,
+                        EQUALISER,
+                        HASH_FUNCTION);
+
+        byte[] serializedSnapshot;
+        try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+            TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(
+                    new DataOutputViewStreamWrapper(out), 
oldSerializer.snapshotConfiguration());
+            serializedSnapshot = out.toByteArray();
+        }
+
+        // Restore the "old" snapshot
+        TypeSerializerSnapshot<RowDataKey> restoredOldSnapshot;
+        try (ByteArrayInputStream in = new 
ByteArrayInputStream(serializedSnapshot)) {
+            restoredOldSnapshot =
+                    
TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
+                            new DataInputViewStreamWrapper(in),
+                            Thread.currentThread().getContextClassLoader());
+        }
+
+        // Create a "new" serializer with a different schema (IntType + 
VarCharType)
+        RowDataKeySerializer newSerializer =
+                new RowDataKeySerializer(
+                        new RowDataSerializer(new IntType(), new 
VarCharType()),
+                        equaliser,
+                        equaliser,
+                        EQUALISER,
+                        HASH_FUNCTION);
+
+        // The new snapshot should detect incompatibility with the old snapshot
+        TypeSerializerSchemaCompatibility<RowDataKey> compatibility =
+                newSerializer
+                        .snapshotConfiguration()
+                        .resolveSchemaCompatibility(restoredOldSnapshot);
+
+        assertThat(compatibility.isIncompatible()).isTrue();
+    }
+
     static final GeneratedRecordEqualiser EQUALISER =
             new GeneratedRecordEqualiser("", "", new Object[0]) {
 

Reply via email to