This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new baa53d93d [lake] LakeSourceSplit should be backcompatible (#1703)
baa53d93d is described below
commit baa53d93df418f538afe52ad7968eb4cea298bb5
Author: CaoZhen <[email protected]>
AuthorDate: Wed Sep 17 12:05:39 2025 +0800
[lake] LakeSourceSplit should be backcompatible (#1703)
---
.../fluss/flink/lake/LakeSplitSerializer.java | 10 ++--
.../FlussSourceEnumeratorStateSerializer.java | 7 +--
.../fluss/flink/lake/LakeSplitSerializerTest.java | 67 ++++++++++++++++++++--
3 files changed, 70 insertions(+), 14 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
index 4e1cec920..f297f7c32 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
@@ -47,6 +47,7 @@ public class LakeSplitSerializer {
}
public void serialize(DataOutputSerializer out, SourceSplitBase split)
throws IOException {
+ out.writeInt(sourceSplitSerializer.getVersion());
if (split instanceof LakeSnapshotSplit) {
LakeSnapshotSplit lakeSplit = (LakeSnapshotSplit) split;
out.writeInt(lakeSplit.getSplitIndex());
@@ -91,13 +92,12 @@ public class LakeSplitSerializer {
@Nullable String partition,
DataInputDeserializer input)
throws IOException {
+ int version = input.readInt();
if (splitKind == LAKE_SNAPSHOT_SPLIT_KIND) {
int splitIndex = input.readInt();
byte[] serializeBytes = new byte[input.readInt()];
input.read(serializeBytes);
- LakeSplit lakeSplit =
- sourceSplitSerializer.deserialize(
- sourceSplitSerializer.getVersion(),
serializeBytes);
+ LakeSplit lakeSplit = sourceSplitSerializer.deserialize(version,
serializeBytes);
return new LakeSnapshotSplit(tableBucket, partition, lakeSplit,
splitIndex);
} else if (splitKind == LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
List<LakeSplit> lakeSplits = null;
@@ -107,9 +107,7 @@ public class LakeSplitSerializer {
for (int i = 0; i < lakeSplitSize; i++) {
byte[] serializeBytes = new byte[input.readInt()];
input.read(serializeBytes);
- lakeSplits.add(
- sourceSplitSerializer.deserialize(
- sourceSplitSerializer.getVersion(),
serializeBytes));
+ lakeSplits.add(sourceSplitSerializer.deserialize(version,
serializeBytes));
}
}
long startingOffset = input.readLong();
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
index 512441c9e..c355109a2 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
@@ -142,12 +142,12 @@ public class FlussSourceEnumeratorStateSerializer
out.writeBoolean(true);
out.writeInt(remainingHybridLakeFlussSplits.size());
SourceSplitSerializer sourceSplitSerializer = new
SourceSplitSerializer(lakeSource);
+ out.writeInt(sourceSplitSerializer.getVersion());
for (SourceSplitBase split : remainingHybridLakeFlussSplits) {
byte[] serializeBytes = sourceSplitSerializer.serialize(split);
out.writeInt(serializeBytes.length);
out.write(serializeBytes);
}
-
} else {
// write that hybrid lake fluss splits is null
out.writeBoolean(false);
@@ -161,13 +161,12 @@ public class FlussSourceEnumeratorStateSerializer
int numSplits = in.readInt();
List<SourceSplitBase> splits = new ArrayList<>(numSplits);
SourceSplitSerializer sourceSplitSerializer = new
SourceSplitSerializer(lakeSource);
+ int version = in.readInt();
for (int i = 0; i < numSplits; i++) {
int splitSizeInBytes = in.readInt();
byte[] splitBytes = new byte[splitSizeInBytes];
in.readFully(splitBytes);
- splits.add(
- sourceSplitSerializer.deserialize(
- sourceSplitSerializer.getVersion(),
splitBytes));
+ splits.add(sourceSplitSerializer.deserialize(version,
splitBytes));
}
return splits;
} else {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
index 45cfb2e3f..cbad27ef3 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -42,8 +42,6 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
class LakeSplitSerializerTest {
private static final byte LAKE_SNAPSHOT_SPLIT_KIND = -1;
- private static final int SERIALIZER_VERSION = 3;
-
private static final byte[] TEST_DATA = "test-lake-split".getBytes();
private static final int STOPPING_OFFSET = 1024;
@@ -61,8 +59,9 @@ class LakeSplitSerializerTest {
@Test
void testSerializeAndDeserializeLakeSnapshotSplit() throws IOException {
// Prepare test data
+ int splitIndex = 1;
LakeSnapshotSplit originalSplit =
- new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT,
1);
+ new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT,
splitIndex);
DataOutputSerializer output = new
DataOutputSerializer(STOPPING_OFFSET);
serializer.serialize(output, originalSplit);
@@ -80,6 +79,40 @@ class LakeSplitSerializerTest {
assertThat(tableBucket).isEqualTo(result.getTableBucket());
assertThat("2025-08-18").isEqualTo(result.getPartitionName());
assertThat(LAKE_SPLIT).isEqualTo(result.getLakeSplit());
+ assertThat(splitIndex).isEqualTo(result.getSplitIndex());
+ }
+
+ @Test
+ void testSerializeAndDeserializeLakeSnapshotSplitBackwardCompatibility()
throws IOException {
+ SimpleVersionedSerializer<LakeSplit> sourceSplitSerializerV1 =
+ new TestSimpleVersionedSerializer();
+ SimpleVersionedSerializer<LakeSplit> sourceSplitSerializerV2 =
+ new TestSimpleVersionedSerializerV2();
+ LakeSplitSerializer serializerV1 = new
LakeSplitSerializer(sourceSplitSerializerV1);
+ LakeSplitSerializer serializerV2 = new
LakeSplitSerializer(sourceSplitSerializerV2);
+
+ // Prepare test data
+ int splitIndex = 1;
+ LakeSnapshotSplit originalSplit =
+ new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT,
splitIndex);
+
+ DataOutputSerializer output = new
DataOutputSerializer(STOPPING_OFFSET);
+ serializerV1.serialize(output, originalSplit);
+
+ SourceSplitBase deserializedSplit =
+ serializerV2.deserialize(
+ LAKE_SNAPSHOT_SPLIT_KIND,
+ tableBucket,
+ "2025-08-18",
+ new DataInputDeserializer(output.getCopyOfBuffer()));
+
+ assertThat(deserializedSplit instanceof LakeSnapshotSplit).isTrue();
+ LakeSnapshotSplit result = (LakeSnapshotSplit) deserializedSplit;
+
+ assertThat(tableBucket).isEqualTo(result.getTableBucket());
+ assertThat("2025-08-18").isEqualTo(result.getPartitionName());
+ assertThat(LAKE_SPLIT).isEqualTo(result.getLakeSplit());
+ assertThat(splitIndex).isEqualTo(result.getSplitIndex());
}
@Test
@@ -137,6 +170,8 @@ class LakeSplitSerializerTest {
private static class TestSimpleVersionedSerializer
implements SimpleVersionedSerializer<LakeSplit> {
+ private static final int V1 = 1;
+
@Override
public byte[] serialize(LakeSplit split) throws IOException {
return TEST_DATA;
@@ -149,7 +184,31 @@ class LakeSplitSerializerTest {
@Override
public int getVersion() {
- return SERIALIZER_VERSION;
+ return V1;
+ }
+ }
+
+ private static class TestSimpleVersionedSerializerV2
+ implements SimpleVersionedSerializer<LakeSplit> {
+
+ private static final int V2 = 2;
+
+ @Override
+ public byte[] serialize(LakeSplit split) throws IOException {
+ return TEST_DATA;
+ }
+
+ @Override
+ public LakeSplit deserialize(int version, byte[] serialized) throws
IOException {
+ if (version < V2) {
+ return LAKE_SPLIT;
+ }
+ return new TestLakeSplit(0,
Collections.singletonList("2025-08-19"));
+ }
+
+ @Override
+ public int getVersion() {
+ return V2;
}
}