This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3c9e0c4a9b [core] Add IndexedSplit for global index (#6789)
3c9e0c4a9b is described below
commit 3c9e0c4a9b363042106e440becc87b47004ddbf3
Author: YeJunHao <[email protected]>
AuthorDate: Wed Dec 10 17:11:56 2025 +0800
[core] Add IndexedSplit for global index (#6789)
---
.../apache/paimon/globalindex/IndexedSplit.java | 171 +++++++++++
.../paimon/globalindex/IndexedSplitTest.java | 333 +++++++++++++++++++++
.../org/apache/paimon/io/DataFileTestUtils.java | 4 +-
3 files changed, 506 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java
new file mode 100644
index 0000000000..e14a18fcb9
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.Range;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/** Indexed split for global index. */
+public class IndexedSplit implements Split {
+
+ private static final long serialVersionUID = 1L;
+ private static final long MAGIC = -938472394838495695L;
+ private static final int VERSION = 1;
+
+ private DataSplit split;
+ private List<Range> rowRanges;
+ @Nullable private Float[] scores;
+
+ public IndexedSplit(DataSplit split, List<Range> rowRanges, @Nullable
Float[] scores) {
+ this.split = split;
+ this.rowRanges = rowRanges;
+ this.scores = scores;
+ }
+
+ public DataSplit dataSplit() {
+ return split;
+ }
+
+ public List<Range> rowRanges() {
+ return rowRanges;
+ }
+
+ @Nullable
+ public Float[] scores() {
+ return scores;
+ }
+
+ @Override
+ public long rowCount() {
+ return rowRanges.stream().mapToLong(r -> r.to - r.from + 1).sum();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ IndexedSplit that = (IndexedSplit) o;
+
+ return split.equals(that.split)
+ && rowRanges.equals(that.rowRanges)
+ && Arrays.equals(scores, that.scores);
+ }
+
+ @Override
+ public String toString() {
+ return "IndexedSplit{"
+ + "split="
+ + split
+ + ", rowRanges="
+ + rowRanges
+ + ", scores="
+ + Arrays.toString(scores)
+ + '}';
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(split, rowRanges, Arrays.hashCode(scores));
+ }
+
+ public void serialize(DataOutputView out) throws IOException {
+ out.writeLong(MAGIC);
+ out.writeInt(VERSION);
+ split.serialize(out);
+ out.writeInt(rowRanges.size());
+ for (Range range : rowRanges) {
+ out.writeLong(range.from);
+ out.writeLong(range.to);
+ }
+ if (scores != null) {
+ out.writeBoolean(true);
+ out.writeInt(scores.length);
+ for (Float score : scores) {
+ out.writeByte(score == null ? 0 : 1);
+ if (score != null) {
+ out.writeFloat(score);
+ }
+ }
+ } else {
+ out.writeBoolean(false);
+ }
+ }
+
+ public static IndexedSplit deserialize(DataInputView in) throws
IOException {
+ long magic = in.readLong();
+ if (magic != MAGIC) {
+ throw new IOException("Corrupted IndexedSplit: wrong magic number
" + magic);
+ }
+ int version = in.readInt();
+ if (version != VERSION) {
+ throw new IOException("Unsupported IndexedSplit version: " +
version);
+ }
+ DataSplit split = DataSplit.deserialize(in);
+ int rangeSize = in.readInt();
+ List<Range> rowRanges = new java.util.ArrayList<>(rangeSize);
+ for (int i = 0; i < rangeSize; i++) {
+ long from = in.readLong();
+ long to = in.readLong();
+ rowRanges.add(new Range(from, to));
+ }
+ Float[] scores = null;
+ boolean hasScores = in.readBoolean();
+ if (hasScores) {
+ int scoresLength = in.readInt();
+ scores = new Float[scoresLength];
+ for (int i = 0; i < scoresLength; i++) {
+ if (in.readByte() == 1) {
+ scores[i] = in.readFloat();
+ }
+ }
+ }
+ return new IndexedSplit(split, rowRanges, scores);
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ serialize(new DataOutputViewStreamWrapper(out));
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ IndexedSplit other = deserialize(new DataInputViewStreamWrapper(in));
+
+ this.split = other.split;
+ this.rowRanges = other.rowRanges;
+ this.scores = other.scores;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/globalindex/IndexedSplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/globalindex/IndexedSplitTest.java
new file mode 100644
index 0000000000..f24f7cd12a
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/globalindex/IndexedSplitTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileTestUtils;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.InstantiationUtil;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link IndexedSplit}. */
+public class IndexedSplitTest {
+
+ @Test
+ public void testSerializeAndDeserializeWithoutScores() throws IOException {
+ // Create test DataSplit
+ DataFileMeta file1 = DataFileTestUtils.newFile("file1", 0, 1, 100,
1000L);
+ DataFileMeta file2 = DataFileTestUtils.newFile("file2", 0, 101, 200,
2000L);
+
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withSnapshot(1L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(0)
+ .withBucketPath("bucket-0")
+ .withDataFiles(Arrays.asList(file1, file2))
+ .build();
+
+ // Create test ranges
+ List<Range> rowRanges = Arrays.asList(new Range(0, 100), new
Range(200, 300));
+
+ // Create IndexedSplit without scores
+ IndexedSplit split = new IndexedSplit(dataSplit, rowRanges, null);
+
+ // Test custom serialization
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper outputView = new
DataOutputViewStreamWrapper(baos);
+ split.serialize(outputView);
+
+ ByteArrayInputStream bais = new
ByteArrayInputStream(baos.toByteArray());
+ DataInputViewStreamWrapper inputView = new
DataInputViewStreamWrapper(bais);
+ IndexedSplit deserialized = IndexedSplit.deserialize(inputView);
+
+ assertThat(deserialized).isEqualTo(split);
+ }
+
+ @Test
+ public void testSerializeAndDeserializeWithScores() throws IOException {
+ // Create test DataSplit
+ DataFileMeta file = DataFileTestUtils.newFile("file1", 0, 1, 100,
1000L);
+
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withSnapshot(2L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(1)
+ .withBucketPath("bucket-1")
+ .withDataFiles(Collections.singletonList(file))
+ .build();
+
+ // Create test ranges and scores
+ List<Range> rowRanges = Arrays.asList(new Range(0, 50), new Range(100,
150));
+ Float[] scores = new Float[] {0.8f, 0.9f};
+
+ // Create IndexedSplit with scores
+ IndexedSplit split = new IndexedSplit(dataSplit, rowRanges, scores);
+
+ // Test custom serialization
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper outputView = new
DataOutputViewStreamWrapper(baos);
+ split.serialize(outputView);
+
+ ByteArrayInputStream bais = new
ByteArrayInputStream(baos.toByteArray());
+ DataInputViewStreamWrapper inputView = new
DataInputViewStreamWrapper(bais);
+ IndexedSplit deserialized = IndexedSplit.deserialize(inputView);
+
+ assertThat(deserialized).isEqualTo(split);
+ }
+
+ @Test
+ public void testJavaSerializationWithoutScores() throws IOException,
ClassNotFoundException {
+ // Create test DataSplit
+ DataFileMeta file = DataFileTestUtils.newFile("file1", 0, 1, 100,
1000L);
+
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withSnapshot(3L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(2)
+ .withBucketPath("bucket-2")
+ .withDataFiles(Collections.singletonList(file))
+ .build();
+
+ List<Range> rowRanges = Collections.singletonList(new Range(10, 20));
+
+ IndexedSplit split = new IndexedSplit(dataSplit, rowRanges, null);
+
+ // Test Java serialization
+ byte[] serialized = InstantiationUtil.serializeObject(split);
+ IndexedSplit deserialized =
+ InstantiationUtil.deserializeObject(serialized,
getClass().getClassLoader());
+
+ // Verify
+ assertThat(deserialized).isEqualTo(split);
+ }
+
+ @Test
+ public void testJavaSerializationWithScores() throws IOException,
ClassNotFoundException {
+ // Create test DataSplit
+ DataFileMeta file1 = DataFileTestUtils.newFile("file1", 0, 1, 100,
1000L);
+ DataFileMeta file2 = DataFileTestUtils.newFile("file2", 0, 101, 200,
2000L);
+
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withSnapshot(4L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(3)
+ .withBucketPath("bucket-3")
+ .withDataFiles(Arrays.asList(file1, file2))
+ .build();
+
+ List<Range> rowRanges =
+ Arrays.asList(new Range(5, 10), new Range(20, 30), new
Range(100, 200));
+ Float[] scores = new Float[] {0.5f, 0.7f, 0.9f};
+
+ IndexedSplit split = new IndexedSplit(dataSplit, rowRanges, scores);
+
+ // Test Java serialization
+ byte[] serialized = InstantiationUtil.serializeObject(split);
+ IndexedSplit deserialized =
+ InstantiationUtil.deserializeObject(serialized,
getClass().getClassLoader());
+
+ // Verify
+ assertThat(deserialized).isEqualTo(split);
+ }
+
+ @Test
+ public void testJavaSerializationWithScoresPartialNull()
+ throws IOException, ClassNotFoundException {
+ // Create test DataSplit
+ DataFileMeta file1 = DataFileTestUtils.newFile("file1", 0, 1, 100,
1000L);
+ DataFileMeta file2 = DataFileTestUtils.newFile("file2", 0, 101, 200,
2000L);
+
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withSnapshot(4L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(3)
+ .withBucketPath("bucket-3")
+ .withDataFiles(Arrays.asList(file1, file2))
+ .build();
+
+ List<Range> rowRanges =
+ Arrays.asList(new Range(5, 10), new Range(20, 30), new
Range(100, 200));
+ Float[] scores = new Float[] {0.5f, null, 0.9f};
+
+ IndexedSplit split = new IndexedSplit(dataSplit, rowRanges, scores);
+
+ // Test Java serialization
+ byte[] serialized = InstantiationUtil.serializeObject(split);
+ IndexedSplit deserialized =
+ InstantiationUtil.deserializeObject(serialized,
getClass().getClassLoader());
+
+ // Verify
+ assertThat(deserialized).isEqualTo(split);
+ }
+
+ @Test
+ public void testRowCount() {
+ DataFileMeta file = DataFileTestUtils.newFile("file1", 0, 1, 100,
1000L);
+
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withSnapshot(1L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(0)
+ .withBucketPath("bucket-0")
+ .withDataFiles(Collections.singletonList(file))
+ .build();
+
+ // Test with single range
+ IndexedSplit split1 =
+ new IndexedSplit(dataSplit, Collections.singletonList(new
Range(0, 99)), null);
+ assertThat(split1.rowCount()).isEqualTo(100);
+
+ // Test with multiple ranges
+ List<Range> ranges = Arrays.asList(new Range(0, 9), new Range(20, 29),
new Range(50, 59));
+ IndexedSplit split2 = new IndexedSplit(dataSplit, ranges, null);
+ assertThat(split2.rowCount()).isEqualTo(30); // 10 + 10 + 10
+
+ // Test with single-element range
+ IndexedSplit split3 =
+ new IndexedSplit(dataSplit, Collections.singletonList(new
Range(5, 5)), null);
+ assertThat(split3.rowCount()).isEqualTo(1);
+ }
+
+ @Test
+ public void testEmptyRanges() throws IOException {
+ DataFileMeta file = DataFileTestUtils.newFile("file1", 0, 1, 100,
1000L);
+
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withSnapshot(1L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(0)
+ .withBucketPath("bucket-0")
+ .withDataFiles(Collections.singletonList(file))
+ .build();
+
+ // Create IndexedSplit with empty ranges
+ IndexedSplit split = new IndexedSplit(dataSplit,
Collections.emptyList(), null);
+
+ // Test serialization
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper outputView = new
DataOutputViewStreamWrapper(baos);
+ split.serialize(outputView);
+
+ ByteArrayInputStream bais = new
ByteArrayInputStream(baos.toByteArray());
+ DataInputViewStreamWrapper inputView = new
DataInputViewStreamWrapper(bais);
+ IndexedSplit deserialized = IndexedSplit.deserialize(inputView);
+
+ // Verify
+ assertThat(deserialized.rowRanges()).isEmpty();
+ assertThat(deserialized.rowCount()).isEqualTo(0);
+ }
+
+ @Test
+ public void testCorruptedMagicNumber() throws IOException {
+ DataFileMeta file = DataFileTestUtils.newFile("file1", 0, 1, 100,
1000L);
+
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withSnapshot(1L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(0)
+ .withBucketPath("bucket-0")
+ .withDataFiles(Collections.singletonList(file))
+ .build();
+
+ IndexedSplit split =
+ new IndexedSplit(dataSplit, Collections.singletonList(new
Range(0, 10)), null);
+
+ // Serialize and corrupt the magic number
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper outputView = new
DataOutputViewStreamWrapper(baos);
+ split.serialize(outputView);
+
+ byte[] data = baos.toByteArray();
+ // Corrupt the magic number (first 8 bytes)
+ data[0] = (byte) 0xFF;
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(data);
+ DataInputViewStreamWrapper inputView = new
DataInputViewStreamWrapper(bais);
+
+ // Verify that deserialization throws exception
+ assertThatThrownBy(() -> IndexedSplit.deserialize(inputView))
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("Corrupted IndexedSplit: wrong magic
number");
+ }
+
+ @Test
+ public void testLargeRanges() throws IOException {
+ DataFileMeta file = DataFileTestUtils.newFile("file1", 0, 1, 1000000,
1000L);
+
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withSnapshot(1L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(0)
+ .withBucketPath("bucket-0")
+ .withDataFiles(Collections.singletonList(file))
+ .build();
+
+ // Create large ranges
+ List<Range> rowRanges =
+ Arrays.asList(
+ new Range(0, 1000000),
+ new Range(2000000, 3000000),
+ new Range(5000000, 10000000));
+ Float[] scores = new Float[] {0.1f, 0.5f, 0.9f};
+
+ IndexedSplit split = new IndexedSplit(dataSplit, rowRanges, scores);
+
+ // Test serialization
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputViewStreamWrapper outputView = new
DataOutputViewStreamWrapper(baos);
+ split.serialize(outputView);
+
+ ByteArrayInputStream bais = new
ByteArrayInputStream(baos.toByteArray());
+ DataInputViewStreamWrapper inputView = new
DataInputViewStreamWrapper(bais);
+ IndexedSplit deserialized = IndexedSplit.deserialize(inputView);
+
+ // Verify
+ assertThat(deserialized.rowRanges()).hasSize(3);
+ assertThat(deserialized.rowRanges().get(0)).isEqualTo(new Range(0,
1000000));
+ assertThat(deserialized.rowRanges().get(1)).isEqualTo(new
Range(2000000, 3000000));
+ assertThat(deserialized.rowRanges().get(2)).isEqualTo(new
Range(5000000, 10000000));
+ assertThat(deserialized.scores()).hasSize(3);
+ assertThat(deserialized.rowCount())
+ .isEqualTo(7000003); // (1000000-0+1) + (3000000-2000000+1) +
(10000000-5000000+1)
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index bdf0c7c78e..0961928517 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -97,8 +97,8 @@ public class DataFileTestUtils {
maxKey - minKey + 1,
row(minKey),
row(maxKey),
- null,
- null,
+ EMPTY_STATS,
+ EMPTY_STATS,
0,
maxSequence,
0,