This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 293f5fe4bb [core] Upgrade data split serializer and manifest
serializer version (#6165)
293f5fe4bb is described below
commit 293f5fe4bb8d290113afe1a41c471212de403c94
Author: YeJunHao <[email protected]>
AuthorDate: Thu Aug 28 11:26:45 2025 +0800
[core] Upgrade data split serializer and manifest serializer version (#6165)
---
.../io/DataFileMetaFirstRowIdLegacySerializer.java | 91 +++++++++++++++++++++
.../paimon/table/sink/CommitMessageSerializer.java | 13 ++-
.../org/apache/paimon/table/source/DataSplit.java | 9 +-
...festCommittableSerializerCompatibilityTest.java | 80 +++++++++++++++++-
...SplitTest.java => DataSplitCompatibleTest.java} | 88 ++++++++++++++++++--
.../src/test/resources/compatibility/datasplit-v7 | Bin 1032 -> 992 bytes
.../compatibility/{datasplit-v7 => datasplit-v8} | Bin 1032 -> 1032 bytes
.../compatibility/manifest-committable-v8 | Bin 3786 -> 3578 bytes
...fest-committable-v8 => manifest-committable-v9} | Bin 3786 -> 3786 bytes
9 files changed, 267 insertions(+), 14 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaFirstRowIdLegacySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaFirstRowIdLegacySerializer.java
new file mode 100644
index 0000000000..59abcc730d
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaFirstRowIdLegacySerializer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.utils.ObjectSerializer;
+
+import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData;
+import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData;
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
+/** Serializer for {@link DataFileMeta}. */
+public class DataFileMetaFirstRowIdLegacySerializer extends
ObjectSerializer<DataFileMeta> {
+
+ private static final long serialVersionUID = 1L;
+
+ public DataFileMetaFirstRowIdLegacySerializer() {
+ super(DataFileMeta.SCHEMA);
+ }
+
+ @Override
+ public InternalRow toRow(DataFileMeta meta) {
+ return GenericRow.of(
+ BinaryString.fromString(meta.fileName()),
+ meta.fileSize(),
+ meta.rowCount(),
+ serializeBinaryRow(meta.minKey()),
+ serializeBinaryRow(meta.maxKey()),
+ meta.keyStats().toRow(),
+ meta.valueStats().toRow(),
+ meta.minSequenceNumber(),
+ meta.maxSequenceNumber(),
+ meta.schemaId(),
+ meta.level(),
+ toStringArrayData(meta.extraFiles()),
+ meta.creationTime(),
+ meta.deleteRowCount().orElse(null),
+ meta.embeddedIndex(),
+ meta.fileSource().map(FileSource::toByteValue).orElse(null),
+ toStringArrayData(meta.valueStatsCols()),
+ meta.externalPath().map(BinaryString::fromString).orElse(null),
+ meta.firstRowId(),
+ null);
+ }
+
+ @Override
+ public DataFileMeta fromRow(InternalRow row) {
+ return DataFileMeta.create(
+ row.getString(0).toString(),
+ row.getLong(1),
+ row.getLong(2),
+ deserializeBinaryRow(row.getBinary(3)),
+ deserializeBinaryRow(row.getBinary(4)),
+ SimpleStats.fromRow(row.getRow(5, 3)),
+ SimpleStats.fromRow(row.getRow(6, 3)),
+ row.getLong(7),
+ row.getLong(8),
+ row.getLong(9),
+ row.getInt(10),
+ fromStringArrayData(row.getArray(11)),
+ row.getTimestamp(12, 3),
+ row.isNullAt(13) ? null : row.getLong(13),
+ row.isNullAt(14) ? null : row.getBinary(14),
+ row.isNullAt(15) ? null :
FileSource.fromByteValue(row.getByte(15)),
+ row.isNullAt(16) ? null :
fromStringArrayData(row.getArray(16)),
+ row.isNullAt(17) ? null : row.getString(17).toString(),
+ row.isNullAt(18) ? null : row.getLong(18),
+ null);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 5c5fc7c0c0..1b652b9644 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -29,6 +29,7 @@ import org.apache.paimon.io.DataFileMeta08Serializer;
import org.apache.paimon.io.DataFileMeta09Serializer;
import org.apache.paimon.io.DataFileMeta10LegacySerializer;
import org.apache.paimon.io.DataFileMeta12LegacySerializer;
+import org.apache.paimon.io.DataFileMetaFirstRowIdLegacySerializer;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.io.DataInputDeserializer;
@@ -50,11 +51,12 @@ import static
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
/** {@link VersionedSerializer} for {@link CommitMessage}. */
public class CommitMessageSerializer implements
VersionedSerializer<CommitMessage> {
- private static final int CURRENT_VERSION = 8;
+ private static final int CURRENT_VERSION = 9;
private final DataFileMetaSerializer dataFileSerializer;
private final IndexFileMetaSerializer indexEntrySerializer;
+ private DataFileMetaFirstRowIdLegacySerializer
dataFileMetaFirstRowIdLegacySerializer;
private DataFileMeta12LegacySerializer dataFileMeta12LegacySerializer;
private DataFileMeta10LegacySerializer dataFileMeta10LegacySerializer;
private DataFileMeta09Serializer dataFile09Serializer;
@@ -146,8 +148,15 @@ public class CommitMessageSerializer implements
VersionedSerializer<CommitMessag
private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer(
int version, DataInputView view) {
- if (version >= 8) {
+
+ if (version == 9) {
return () -> dataFileSerializer.deserializeList(view);
+ } else if (version == 8) {
+ if (dataFileMetaFirstRowIdLegacySerializer == null) {
+ dataFileMetaFirstRowIdLegacySerializer =
+ new DataFileMetaFirstRowIdLegacySerializer();
+ }
+ return () ->
dataFileMetaFirstRowIdLegacySerializer.deserializeList(view);
} else if (version == 6 || version == 7) {
if (dataFileMeta12LegacySerializer == null) {
dataFileMeta12LegacySerializer = new
DataFileMeta12LegacySerializer();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 8bdace010d..079477b001 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -26,6 +26,7 @@ import org.apache.paimon.io.DataFileMeta08Serializer;
import org.apache.paimon.io.DataFileMeta09Serializer;
import org.apache.paimon.io.DataFileMeta10LegacySerializer;
import org.apache.paimon.io.DataFileMeta12LegacySerializer;
+import org.apache.paimon.io.DataFileMetaFirstRowIdLegacySerializer;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataInputViewStreamWrapper;
@@ -61,7 +62,7 @@ public class DataSplit implements Split {
private static final long serialVersionUID = 7L;
private static final long MAGIC = -2394839472490812314L;
- private static final int VERSION = 7;
+ private static final int VERSION = 8;
private long snapshotId = 0;
private BinaryRow partition;
@@ -452,7 +453,11 @@ public class DataSplit implements Split {
} else if (version == 5 || version == 6) {
DataFileMeta12LegacySerializer serializer = new
DataFileMeta12LegacySerializer();
return serializer::deserialize;
- } else if (version >= 7) {
+ } else if (version == 7) {
+ DataFileMetaFirstRowIdLegacySerializer serializer =
+ new DataFileMetaFirstRowIdLegacySerializer();
+ return serializer::deserialize;
+ } else if (version == 8) {
DataFileMetaSerializer serializer = new DataFileMetaSerializer();
return serializer::deserialize;
} else {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index b458c29fe8..927da8e97d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -46,7 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class ManifestCommittableSerializerCompatibilityTest {
@Test
- public void testCompatibilityToV4CommitV8() throws IOException {
+ public void testCompatibilityToV4CommitV9() throws IOException {
SimpleStats keyStats =
new SimpleStats(
singleColumn("min_key"),
@@ -118,6 +118,84 @@ public class
ManifestCommittableSerializerCompatibilityTest {
ManifestCommittable deserialized =
serializer.deserialize(serializer.getVersion(), bytes);
assertThat(deserialized).isEqualTo(manifestCommittable);
+ byte[] oldBytes =
+ IOUtils.readFully(
+ ManifestCommittableSerializerCompatibilityTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/manifest-committable-v9"),
+ true);
+ deserialized = serializer.deserialize(4, oldBytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+ }
+
+ @Test
+ public void testCompatibilityToV4CommitV8() throws IOException {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+ DataFileMeta dataFile =
+ DataFileMeta.create(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ FileSource.COMPACT,
+ Arrays.asList("field1", "field2", "field3"),
+ "hdfs://localhost:9000/path/to/file",
+ 1L,
+ null);
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas = new
LinkedHashMap<>();
+ dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L));
+ dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L));
+ IndexFileMeta indexFile =
+ new IndexFileMeta(
+ "my_index_type", "my_index_file", 1024 * 100, 1002,
dvMetas, null);
+ List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+ CommitMessageImpl commitMessage =
+ new CommitMessageImpl(
+ singleColumn("my_partition"),
+ 11,
+ 16,
+ new DataIncrement(dataFiles, dataFiles, dataFiles),
+ new CompactIncrement(dataFiles, dataFiles, dataFiles),
+ new IndexIncrement(indexFiles));
+
+ ManifestCommittable manifestCommittable =
+ new ManifestCommittable(
+ 5,
+ 202020L,
+ Collections.singletonMap(5, 555L),
+ Collections.singletonList(commitMessage));
+ manifestCommittable.addProperty("k1", "v1");
+ manifestCommittable.addProperty("k2", "v2");
+
+ ManifestCommittableSerializer serializer = new
ManifestCommittableSerializer();
+ byte[] bytes = serializer.serialize(manifestCommittable);
+
+ ManifestCommittable deserialized =
serializer.deserialize(serializer.getVersion(), bytes);
+ assertThat(deserialized).isEqualTo(manifestCommittable);
+
byte[] oldBytes =
IOUtils.readFully(
ManifestCommittableSerializerCompatibilityTest.class
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
similarity index 90%
rename from
paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
index 4d6d842f75..cfe9086fa0 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/DataSplitCompatibleTest.java
@@ -59,7 +59,7 @@ import static org.apache.paimon.data.BinaryRow.singleColumn;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link DataSplit}. */
-public class SplitTest {
+public class DataSplitCompatibleTest {
@Test
public void testSplitMergedRowCount() {
@@ -273,7 +273,7 @@ public class SplitTest {
byte[] v2Bytes =
IOUtils.readFully(
- SplitTest.class
+ DataSplitCompatibleTest.class
.getClassLoader()
.getResourceAsStream("compatibility/datasplit-v1"),
true);
@@ -338,7 +338,7 @@ public class SplitTest {
byte[] v2Bytes =
IOUtils.readFully(
- SplitTest.class
+ DataSplitCompatibleTest.class
.getClassLoader()
.getResourceAsStream("compatibility/datasplit-v2"),
true);
@@ -407,7 +407,7 @@ public class SplitTest {
byte[] v2Bytes =
IOUtils.readFully(
- SplitTest.class
+ DataSplitCompatibleTest.class
.getClassLoader()
.getResourceAsStream("compatibility/datasplit-v3"),
true);
@@ -476,7 +476,7 @@ public class SplitTest {
byte[] v4Bytes =
IOUtils.readFully(
- SplitTest.class
+ DataSplitCompatibleTest.class
.getClassLoader()
.getResourceAsStream("compatibility/datasplit-v4"),
true);
@@ -545,7 +545,7 @@ public class SplitTest {
byte[] v5Bytes =
IOUtils.readFully(
- SplitTest.class
+ DataSplitCompatibleTest.class
.getClassLoader()
.getResourceAsStream("compatibility/datasplit-v5"),
true);
@@ -615,7 +615,7 @@ public class SplitTest {
byte[] v6Bytes =
IOUtils.readFully(
- SplitTest.class
+ DataSplitCompatibleTest.class
.getClassLoader()
.getResourceAsStream("compatibility/datasplit-v6"),
true);
@@ -659,7 +659,7 @@ public class SplitTest {
Arrays.asList("field1", "field2", "field3"),
"hdfs:///path/to/warehouse",
12L,
- Arrays.asList("a", "b", "c", "f"));
+ null);
List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22,
33L);
@@ -685,7 +685,7 @@ public class SplitTest {
byte[] v6Bytes =
IOUtils.readFully(
- SplitTest.class
+ DataSplitCompatibleTest.class
.getClassLoader()
.getResourceAsStream("compatibility/datasplit-v7"),
true);
@@ -695,6 +695,76 @@ public class SplitTest {
assertThat(actual).isEqualTo(split);
}
+ @Test
+ public void testSerializerCompatibleV8() throws Exception {
+ SimpleStats keyStats =
+ new SimpleStats(
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ fromLongArray(new Long[] {0L}));
+ SimpleStats valueStats =
+ new SimpleStats(
+ singleColumn("min_value"),
+ singleColumn("max_value"),
+ fromLongArray(new Long[] {0L}));
+
+ DataFileMeta dataFile =
+ DataFileMeta.create(
+ "my_file",
+ 1024 * 1024,
+ 1024,
+ singleColumn("min_key"),
+ singleColumn("max_key"),
+ keyStats,
+ valueStats,
+ 15,
+ 200,
+ 5,
+ 3,
+ Arrays.asList("extra1", "extra2"),
+
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+ 11L,
+ new byte[] {1, 2, 4},
+ FileSource.COMPACT,
+ Arrays.asList("field1", "field2", "field3"),
+ "hdfs:///path/to/warehouse",
+ 12L,
+ Arrays.asList("a", "b", "c", "f"));
+ List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+ DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22,
33L);
+ List<DeletionFile> deletionFiles =
Collections.singletonList(deletionFile);
+
+ BinaryRow partition = new BinaryRow(1);
+ BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
+ binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
+ binaryRowWriter.complete();
+
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(18)
+ .withPartition(partition)
+ .withBucket(20)
+ .withTotalBuckets(32)
+ .withDataFiles(dataFiles)
+ .withDataDeletionFiles(deletionFiles)
+ .withBucketPath("my path")
+ .build();
+
+
assertThat(InstantiationUtil.clone(InstantiationUtil.clone(split))).isEqualTo(split);
+
+ byte[] v6Bytes =
+ IOUtils.readFully(
+ DataSplitCompatibleTest.class
+ .getClassLoader()
+
.getResourceAsStream("compatibility/datasplit-v8"),
+ true);
+
+ DataSplit actual =
+ InstantiationUtil.deserializeObject(v6Bytes,
DataSplit.class.getClassLoader());
+ assertThat(actual).isEqualTo(split);
+ }
+
private DataFileMeta newDataFile(long rowCount) {
return newDataFile(rowCount, null, null);
}
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v7
b/paimon-core/src/test/resources/compatibility/datasplit-v7
index 3fc65743a6..16b16ca73b 100644
Binary files a/paimon-core/src/test/resources/compatibility/datasplit-v7 and
b/paimon-core/src/test/resources/compatibility/datasplit-v7 differ
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v7
b/paimon-core/src/test/resources/compatibility/datasplit-v8
similarity index 91%
copy from paimon-core/src/test/resources/compatibility/datasplit-v7
copy to paimon-core/src/test/resources/compatibility/datasplit-v8
index 3fc65743a6..5fc9725731 100644
Binary files a/paimon-core/src/test/resources/compatibility/datasplit-v7 and
b/paimon-core/src/test/resources/compatibility/datasplit-v8 differ
diff --git
a/paimon-core/src/test/resources/compatibility/manifest-committable-v8
b/paimon-core/src/test/resources/compatibility/manifest-committable-v8
index 922e55c5d7..0f2ca46b92 100644
Binary files
a/paimon-core/src/test/resources/compatibility/manifest-committable-v8 and
b/paimon-core/src/test/resources/compatibility/manifest-committable-v8 differ
diff --git
a/paimon-core/src/test/resources/compatibility/manifest-committable-v8
b/paimon-core/src/test/resources/compatibility/manifest-committable-v9
similarity index 98%
copy from paimon-core/src/test/resources/compatibility/manifest-committable-v8
copy to paimon-core/src/test/resources/compatibility/manifest-committable-v9
index 922e55c5d7..336a2aa3f5 100644
Binary files
a/paimon-core/src/test/resources/compatibility/manifest-committable-v8 and
b/paimon-core/src/test/resources/compatibility/manifest-committable-v9 differ