This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new baa53d93d [lake] LakeSourceSplit should be backcompatible (#1703)
baa53d93d is described below

commit baa53d93df418f538afe52ad7968eb4cea298bb5
Author: CaoZhen <[email protected]>
AuthorDate: Wed Sep 17 12:05:39 2025 +0800

    [lake] LakeSourceSplit should be backcompatible (#1703)
---
 .../fluss/flink/lake/LakeSplitSerializer.java      | 10 ++--
 .../FlussSourceEnumeratorStateSerializer.java      |  7 +--
 .../fluss/flink/lake/LakeSplitSerializerTest.java  | 67 ++++++++++++++++++++--
 3 files changed, 70 insertions(+), 14 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
index 4e1cec920..f297f7c32 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
@@ -47,6 +47,7 @@ public class LakeSplitSerializer {
     }
 
     public void serialize(DataOutputSerializer out, SourceSplitBase split) 
throws IOException {
+        out.writeInt(sourceSplitSerializer.getVersion());
         if (split instanceof LakeSnapshotSplit) {
             LakeSnapshotSplit lakeSplit = (LakeSnapshotSplit) split;
             out.writeInt(lakeSplit.getSplitIndex());
@@ -91,13 +92,12 @@ public class LakeSplitSerializer {
             @Nullable String partition,
             DataInputDeserializer input)
             throws IOException {
+        int version = input.readInt();
         if (splitKind == LAKE_SNAPSHOT_SPLIT_KIND) {
             int splitIndex = input.readInt();
             byte[] serializeBytes = new byte[input.readInt()];
             input.read(serializeBytes);
-            LakeSplit lakeSplit =
-                    sourceSplitSerializer.deserialize(
-                            sourceSplitSerializer.getVersion(), 
serializeBytes);
+            LakeSplit lakeSplit = sourceSplitSerializer.deserialize(version, 
serializeBytes);
             return new LakeSnapshotSplit(tableBucket, partition, lakeSplit, 
splitIndex);
         } else if (splitKind == LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
             List<LakeSplit> lakeSplits = null;
@@ -107,9 +107,7 @@ public class LakeSplitSerializer {
                 for (int i = 0; i < lakeSplitSize; i++) {
                     byte[] serializeBytes = new byte[input.readInt()];
                     input.read(serializeBytes);
-                    lakeSplits.add(
-                            sourceSplitSerializer.deserialize(
-                                    sourceSplitSerializer.getVersion(), 
serializeBytes));
+                    lakeSplits.add(sourceSplitSerializer.deserialize(version, 
serializeBytes));
                 }
             }
             long startingOffset = input.readLong();
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
index 512441c9e..c355109a2 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
@@ -142,12 +142,12 @@ public class FlussSourceEnumeratorStateSerializer
             out.writeBoolean(true);
             out.writeInt(remainingHybridLakeFlussSplits.size());
             SourceSplitSerializer sourceSplitSerializer = new 
SourceSplitSerializer(lakeSource);
+            out.writeInt(sourceSplitSerializer.getVersion());
             for (SourceSplitBase split : remainingHybridLakeFlussSplits) {
                 byte[] serializeBytes = sourceSplitSerializer.serialize(split);
                 out.writeInt(serializeBytes.length);
                 out.write(serializeBytes);
             }
-
         } else {
             // write that hybrid lake fluss splits is null
             out.writeBoolean(false);
@@ -161,13 +161,12 @@ public class FlussSourceEnumeratorStateSerializer
             int numSplits = in.readInt();
             List<SourceSplitBase> splits = new ArrayList<>(numSplits);
             SourceSplitSerializer sourceSplitSerializer = new 
SourceSplitSerializer(lakeSource);
+            int version = in.readInt();
             for (int i = 0; i < numSplits; i++) {
                 int splitSizeInBytes = in.readInt();
                 byte[] splitBytes = new byte[splitSizeInBytes];
                 in.readFully(splitBytes);
-                splits.add(
-                        sourceSplitSerializer.deserialize(
-                                sourceSplitSerializer.getVersion(), 
splitBytes));
+                splits.add(sourceSplitSerializer.deserialize(version, 
splitBytes));
             }
             return splits;
         } else {
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
index 45cfb2e3f..cbad27ef3 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -42,8 +42,6 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 class LakeSplitSerializerTest {
     private static final byte LAKE_SNAPSHOT_SPLIT_KIND = -1;
 
-    private static final int SERIALIZER_VERSION = 3;
-
     private static final byte[] TEST_DATA = "test-lake-split".getBytes();
 
     private static final int STOPPING_OFFSET = 1024;
@@ -61,8 +59,9 @@ class LakeSplitSerializerTest {
     @Test
     void testSerializeAndDeserializeLakeSnapshotSplit() throws IOException {
         // Prepare test data
+        int splitIndex = 1;
         LakeSnapshotSplit originalSplit =
-                new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT, 
1);
+                new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT, 
splitIndex);
 
         DataOutputSerializer output = new 
DataOutputSerializer(STOPPING_OFFSET);
         serializer.serialize(output, originalSplit);
@@ -80,6 +79,40 @@ class LakeSplitSerializerTest {
         assertThat(tableBucket).isEqualTo(result.getTableBucket());
         assertThat("2025-08-18").isEqualTo(result.getPartitionName());
         assertThat(LAKE_SPLIT).isEqualTo(result.getLakeSplit());
+        assertThat(splitIndex).isEqualTo(result.getSplitIndex());
+    }
+
+    @Test
+    void testSerializeAndDeserializeLakeSnapshotSplitBackwardCompatibility() 
throws IOException {
+        SimpleVersionedSerializer<LakeSplit> sourceSplitSerializerV1 =
+                new TestSimpleVersionedSerializer();
+        SimpleVersionedSerializer<LakeSplit> sourceSplitSerializerV2 =
+                new TestSimpleVersionedSerializerV2();
+        LakeSplitSerializer serializerV1 = new 
LakeSplitSerializer(sourceSplitSerializerV1);
+        LakeSplitSerializer serializerV2 = new 
LakeSplitSerializer(sourceSplitSerializerV2);
+
+        // Prepare test data
+        int splitIndex = 1;
+        LakeSnapshotSplit originalSplit =
+                new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT, 
splitIndex);
+
+        DataOutputSerializer output = new 
DataOutputSerializer(STOPPING_OFFSET);
+        serializerV1.serialize(output, originalSplit);
+
+        SourceSplitBase deserializedSplit =
+                serializerV2.deserialize(
+                        LAKE_SNAPSHOT_SPLIT_KIND,
+                        tableBucket,
+                        "2025-08-18",
+                        new DataInputDeserializer(output.getCopyOfBuffer()));
+
+        assertThat(deserializedSplit instanceof LakeSnapshotSplit).isTrue();
+        LakeSnapshotSplit result = (LakeSnapshotSplit) deserializedSplit;
+
+        assertThat(tableBucket).isEqualTo(result.getTableBucket());
+        assertThat("2025-08-18").isEqualTo(result.getPartitionName());
+        assertThat(LAKE_SPLIT).isEqualTo(result.getLakeSplit());
+        assertThat(splitIndex).isEqualTo(result.getSplitIndex());
     }
 
     @Test
@@ -137,6 +170,8 @@ class LakeSplitSerializerTest {
     private static class TestSimpleVersionedSerializer
             implements SimpleVersionedSerializer<LakeSplit> {
 
+        private static final int V1 = 1;
+
         @Override
         public byte[] serialize(LakeSplit split) throws IOException {
             return TEST_DATA;
@@ -149,7 +184,31 @@ class LakeSplitSerializerTest {
 
         @Override
         public int getVersion() {
-            return SERIALIZER_VERSION;
+            return V1;
+        }
+    }
+
+    private static class TestSimpleVersionedSerializerV2
+            implements SimpleVersionedSerializer<LakeSplit> {
+
+        private static final int V2 = 2;
+
+        @Override
+        public byte[] serialize(LakeSplit split) throws IOException {
+            return TEST_DATA;
+        }
+
+        @Override
+        public LakeSplit deserialize(int version, byte[] serialized) throws 
IOException {
+            if (version < V2) {
+                return LAKE_SPLIT;
+            }
+            return new TestLakeSplit(0, 
Collections.singletonList("2025-08-19"));
+        }
+
+        @Override
+        public int getVersion() {
+            return V2;
         }
     }
 

Reply via email to