This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c9e0c4a9b [core] Add IndexedSplit for global index (#6789)
3c9e0c4a9b is described below

commit 3c9e0c4a9b363042106e440becc87b47004ddbf3
Author: YeJunHao <[email protected]>
AuthorDate: Wed Dec 10 17:11:56 2025 +0800

    [core] Add IndexedSplit for global index (#6789)
---
 .../apache/paimon/globalindex/IndexedSplit.java    | 171 +++++++++++
 .../paimon/globalindex/IndexedSplitTest.java       | 333 +++++++++++++++++++++
 .../org/apache/paimon/io/DataFileTestUtils.java    |   4 +-
 3 files changed, 506 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java
new file mode 100644
index 0000000000..e14a18fcb9
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/IndexedSplit.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.Range;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+/** Indexed split for global index. */
+public class IndexedSplit implements Split {
+
+    private static final long serialVersionUID = 1L;
+    private static final long MAGIC = -938472394838495695L;
+    private static final int VERSION = 1;
+
+    private DataSplit split;
+    private List<Range> rowRanges;
+    @Nullable private Float[] scores;
+
+    public IndexedSplit(DataSplit split, List<Range> rowRanges, @Nullable 
Float[] scores) {
+        this.split = split;
+        this.rowRanges = rowRanges;
+        this.scores = scores;
+    }
+
+    public DataSplit dataSplit() {
+        return split;
+    }
+
+    public List<Range> rowRanges() {
+        return rowRanges;
+    }
+
+    @Nullable
+    public Float[] scores() {
+        return scores;
+    }
+
+    @Override
+    public long rowCount() {
+        return rowRanges.stream().mapToLong(r -> r.to - r.from + 1).sum();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        IndexedSplit that = (IndexedSplit) o;
+
+        return split.equals(that.split)
+                && rowRanges.equals(that.rowRanges)
+                && Arrays.equals(scores, that.scores);
+    }
+
+    @Override
+    public String toString() {
+        return "IndexedSplit{"
+                + "split="
+                + split
+                + ", rowRanges="
+                + rowRanges
+                + ", scores="
+                + Arrays.toString(scores)
+                + '}';
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(split, rowRanges, Arrays.hashCode(scores));
+    }
+
+    public void serialize(DataOutputView out) throws IOException {
+        out.writeLong(MAGIC);
+        out.writeInt(VERSION);
+        split.serialize(out);
+        out.writeInt(rowRanges.size());
+        for (Range range : rowRanges) {
+            out.writeLong(range.from);
+            out.writeLong(range.to);
+        }
+        if (scores != null) {
+            out.writeBoolean(true);
+            out.writeInt(scores.length);
+            for (Float score : scores) {
+                out.writeByte(score == null ? 0 : 1);
+                if (score != null) {
+                    out.writeFloat(score);
+                }
+            }
+        } else {
+            out.writeBoolean(false);
+        }
+    }
+
+    public static IndexedSplit deserialize(DataInputView in) throws 
IOException {
+        long magic = in.readLong();
+        if (magic != MAGIC) {
+            throw new IOException("Corrupted IndexedSplit: wrong magic number 
" + magic);
+        }
+        int version = in.readInt();
+        if (version != VERSION) {
+            throw new IOException("Unsupported IndexedSplit version: " + 
version);
+        }
+        DataSplit split = DataSplit.deserialize(in);
+        int rangeSize = in.readInt();
+        List<Range> rowRanges = new java.util.ArrayList<>(rangeSize);
+        for (int i = 0; i < rangeSize; i++) {
+            long from = in.readLong();
+            long to = in.readLong();
+            rowRanges.add(new Range(from, to));
+        }
+        Float[] scores = null;
+        boolean hasScores = in.readBoolean();
+        if (hasScores) {
+            int scoresLength = in.readInt();
+            scores = new Float[scoresLength];
+            for (int i = 0; i < scoresLength; i++) {
+                if (in.readByte() == 1) {
+                    scores[i] = in.readFloat();
+                }
+            }
+        }
+        return new IndexedSplit(split, rowRanges, scores);
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        serialize(new DataOutputViewStreamWrapper(out));
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        IndexedSplit other = deserialize(new DataInputViewStreamWrapper(in));
+
+        this.split = other.split;
+        this.rowRanges = other.rowRanges;
+        this.scores = other.scores;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/globalindex/IndexedSplitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/IndexedSplitTest.java
new file mode 100644
index 0000000000..f24f7cd12a
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/globalindex/IndexedSplitTest.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileTestUtils;
+import org.apache.paimon.io.DataInputViewStreamWrapper;
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.InstantiationUtil;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link IndexedSplit}. */
+public class IndexedSplitTest {
+
+    @Test
+    public void testSerializeAndDeserializeWithoutScores() throws IOException {
+        // Create test DataSplit
+        DataFileMeta file1 = DataFileTestUtils.newFile("file1", 0, 1, 100, 
1000L);
+        DataFileMeta file2 = DataFileTestUtils.newFile("file2", 0, 101, 200, 
2000L);
+
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withSnapshot(1L)
+                        .withPartition(BinaryRow.EMPTY_ROW)
+                        .withBucket(0)
+                        .withBucketPath("bucket-0")
+                        .withDataFiles(Arrays.asList(file1, file2))
+                        .build();
+
+        // Create test ranges
+        List<Range> rowRanges = Arrays.asList(new Range(0, 100), new 
Range(200, 300));
+
+        // Create IndexedSplit without scores
+        IndexedSplit split = new IndexedSplit(dataSplit, rowRanges, null);
+
+        // Test custom serialization
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(baos);
+        split.serialize(outputView);
+
+        ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+        DataInputViewStreamWrapper inputView = new 
DataInputViewStreamWrapper(bais);
+        IndexedSplit deserialized = IndexedSplit.deserialize(inputView);
+
+        assertThat(deserialized).isEqualTo(split);
+    }
+
+    @Test
+    public void testSerializeAndDeserializeWithScores() throws IOException {
+        // Create test DataSplit
+        DataFileMeta file = DataFileTestUtils.newFile("file1", 0, 1, 100, 
1000L);
+
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withSnapshot(2L)
+                        .withPartition(BinaryRow.EMPTY_ROW)
+                        .withBucket(1)
+                        .withBucketPath("bucket-1")
+                        .withDataFiles(Collections.singletonList(file))
+                        .build();
+
+        // Create test ranges and scores
+        List<Range> rowRanges = Arrays.asList(new Range(0, 50), new Range(100, 
150));
+        Float[] scores = new Float[] {0.8f, 0.9f};
+
+        // Create IndexedSplit with scores
+        IndexedSplit split = new IndexedSplit(dataSplit, rowRanges, scores);
+
+        // Test custom serialization
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(baos);
+        split.serialize(outputView);
+
+        ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+        DataInputViewStreamWrapper inputView = new 
DataInputViewStreamWrapper(bais);
+        IndexedSplit deserialized = IndexedSplit.deserialize(inputView);
+
+        assertThat(deserialized).isEqualTo(split);
+    }
+
+    @Test
+    public void testJavaSerializationWithoutScores() throws IOException, 
ClassNotFoundException {
+        // Create test DataSplit
+        DataFileMeta file = DataFileTestUtils.newFile("file1", 0, 1, 100, 
1000L);
+
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withSnapshot(3L)
+                        .withPartition(BinaryRow.EMPTY_ROW)
+                        .withBucket(2)
+                        .withBucketPath("bucket-2")
+                        .withDataFiles(Collections.singletonList(file))
+                        .build();
+
+        List<Range> rowRanges = Collections.singletonList(new Range(10, 20));
+
+        IndexedSplit split = new IndexedSplit(dataSplit, rowRanges, null);
+
+        // Test Java serialization
+        byte[] serialized = InstantiationUtil.serializeObject(split);
+        IndexedSplit deserialized =
+                InstantiationUtil.deserializeObject(serialized, 
getClass().getClassLoader());
+
+        // Verify
+        assertThat(deserialized).isEqualTo(split);
+    }
+
+    @Test
+    public void testJavaSerializationWithScores() throws IOException, 
ClassNotFoundException {
+        // Create test DataSplit
+        DataFileMeta file1 = DataFileTestUtils.newFile("file1", 0, 1, 100, 
1000L);
+        DataFileMeta file2 = DataFileTestUtils.newFile("file2", 0, 101, 200, 
2000L);
+
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withSnapshot(4L)
+                        .withPartition(BinaryRow.EMPTY_ROW)
+                        .withBucket(3)
+                        .withBucketPath("bucket-3")
+                        .withDataFiles(Arrays.asList(file1, file2))
+                        .build();
+
+        List<Range> rowRanges =
+                Arrays.asList(new Range(5, 10), new Range(20, 30), new 
Range(100, 200));
+        Float[] scores = new Float[] {0.5f, 0.7f, 0.9f};
+
+        IndexedSplit split = new IndexedSplit(dataSplit, rowRanges, scores);
+
+        // Test Java serialization
+        byte[] serialized = InstantiationUtil.serializeObject(split);
+        IndexedSplit deserialized =
+                InstantiationUtil.deserializeObject(serialized, 
getClass().getClassLoader());
+
+        // Verify
+        assertThat(deserialized).isEqualTo(split);
+    }
+
+    @Test
+    public void testJavaSerializationWithScoresPartialNull()
+            throws IOException, ClassNotFoundException {
+        // Create test DataSplit
+        DataFileMeta file1 = DataFileTestUtils.newFile("file1", 0, 1, 100, 
1000L);
+        DataFileMeta file2 = DataFileTestUtils.newFile("file2", 0, 101, 200, 
2000L);
+
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withSnapshot(4L)
+                        .withPartition(BinaryRow.EMPTY_ROW)
+                        .withBucket(3)
+                        .withBucketPath("bucket-3")
+                        .withDataFiles(Arrays.asList(file1, file2))
+                        .build();
+
+        List<Range> rowRanges =
+                Arrays.asList(new Range(5, 10), new Range(20, 30), new 
Range(100, 200));
+        Float[] scores = new Float[] {0.5f, null, 0.9f};
+
+        IndexedSplit split = new IndexedSplit(dataSplit, rowRanges, scores);
+
+        // Test Java serialization
+        byte[] serialized = InstantiationUtil.serializeObject(split);
+        IndexedSplit deserialized =
+                InstantiationUtil.deserializeObject(serialized, 
getClass().getClassLoader());
+
+        // Verify
+        assertThat(deserialized).isEqualTo(split);
+    }
+
+    @Test
+    public void testRowCount() {
+        DataFileMeta file = DataFileTestUtils.newFile("file1", 0, 1, 100, 
1000L);
+
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withSnapshot(1L)
+                        .withPartition(BinaryRow.EMPTY_ROW)
+                        .withBucket(0)
+                        .withBucketPath("bucket-0")
+                        .withDataFiles(Collections.singletonList(file))
+                        .build();
+
+        // Test with single range
+        IndexedSplit split1 =
+                new IndexedSplit(dataSplit, Collections.singletonList(new 
Range(0, 99)), null);
+        assertThat(split1.rowCount()).isEqualTo(100);
+
+        // Test with multiple ranges
+        List<Range> ranges = Arrays.asList(new Range(0, 9), new Range(20, 29), 
new Range(50, 59));
+        IndexedSplit split2 = new IndexedSplit(dataSplit, ranges, null);
+        assertThat(split2.rowCount()).isEqualTo(30); // 10 + 10 + 10
+
+        // Test with single-element range
+        IndexedSplit split3 =
+                new IndexedSplit(dataSplit, Collections.singletonList(new 
Range(5, 5)), null);
+        assertThat(split3.rowCount()).isEqualTo(1);
+    }
+
+    @Test
+    public void testEmptyRanges() throws IOException {
+        DataFileMeta file = DataFileTestUtils.newFile("file1", 0, 1, 100, 
1000L);
+
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withSnapshot(1L)
+                        .withPartition(BinaryRow.EMPTY_ROW)
+                        .withBucket(0)
+                        .withBucketPath("bucket-0")
+                        .withDataFiles(Collections.singletonList(file))
+                        .build();
+
+        // Create IndexedSplit with empty ranges
+        IndexedSplit split = new IndexedSplit(dataSplit, 
Collections.emptyList(), null);
+
+        // Test serialization
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(baos);
+        split.serialize(outputView);
+
+        ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+        DataInputViewStreamWrapper inputView = new 
DataInputViewStreamWrapper(bais);
+        IndexedSplit deserialized = IndexedSplit.deserialize(inputView);
+
+        // Verify
+        assertThat(deserialized.rowRanges()).isEmpty();
+        assertThat(deserialized.rowCount()).isEqualTo(0);
+    }
+
+    @Test
+    public void testCorruptedMagicNumber() throws IOException {
+        DataFileMeta file = DataFileTestUtils.newFile("file1", 0, 1, 100, 
1000L);
+
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withSnapshot(1L)
+                        .withPartition(BinaryRow.EMPTY_ROW)
+                        .withBucket(0)
+                        .withBucketPath("bucket-0")
+                        .withDataFiles(Collections.singletonList(file))
+                        .build();
+
+        IndexedSplit split =
+                new IndexedSplit(dataSplit, Collections.singletonList(new 
Range(0, 10)), null);
+
+        // Serialize and corrupt the magic number
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(baos);
+        split.serialize(outputView);
+
+        byte[] data = baos.toByteArray();
+        // Corrupt the magic number (first 8 bytes)
+        data[0] = (byte) 0xFF;
+
+        ByteArrayInputStream bais = new ByteArrayInputStream(data);
+        DataInputViewStreamWrapper inputView = new 
DataInputViewStreamWrapper(bais);
+
+        // Verify that deserialization throws exception
+        assertThatThrownBy(() -> IndexedSplit.deserialize(inputView))
+                .isInstanceOf(IOException.class)
+                .hasMessageContaining("Corrupted IndexedSplit: wrong magic 
number");
+    }
+
+    @Test
+    public void testLargeRanges() throws IOException {
+        DataFileMeta file = DataFileTestUtils.newFile("file1", 0, 1, 1000000, 
1000L);
+
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withSnapshot(1L)
+                        .withPartition(BinaryRow.EMPTY_ROW)
+                        .withBucket(0)
+                        .withBucketPath("bucket-0")
+                        .withDataFiles(Collections.singletonList(file))
+                        .build();
+
+        // Create large ranges
+        List<Range> rowRanges =
+                Arrays.asList(
+                        new Range(0, 1000000),
+                        new Range(2000000, 3000000),
+                        new Range(5000000, 10000000));
+        Float[] scores = new Float[] {0.1f, 0.5f, 0.9f};
+
+        IndexedSplit split = new IndexedSplit(dataSplit, rowRanges, scores);
+
+        // Test serialization
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper outputView = new 
DataOutputViewStreamWrapper(baos);
+        split.serialize(outputView);
+
+        ByteArrayInputStream bais = new 
ByteArrayInputStream(baos.toByteArray());
+        DataInputViewStreamWrapper inputView = new 
DataInputViewStreamWrapper(bais);
+        IndexedSplit deserialized = IndexedSplit.deserialize(inputView);
+
+        // Verify
+        assertThat(deserialized.rowRanges()).hasSize(3);
+        assertThat(deserialized.rowRanges().get(0)).isEqualTo(new Range(0, 
1000000));
+        assertThat(deserialized.rowRanges().get(1)).isEqualTo(new 
Range(2000000, 3000000));
+        assertThat(deserialized.rowRanges().get(2)).isEqualTo(new 
Range(5000000, 10000000));
+        assertThat(deserialized.scores()).hasSize(3);
+        assertThat(deserialized.rowCount())
+                .isEqualTo(7000003); // (1000000-0+1) + (3000000-2000000+1) + 
(10000000-5000000+1)
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
index bdf0c7c78e..0961928517 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java
@@ -97,8 +97,8 @@ public class DataFileTestUtils {
                 maxKey - minKey + 1,
                 row(minKey),
                 row(maxKey),
-                null,
-                null,
+                EMPTY_STATS,
+                EMPTY_STATS,
                 0,
                 maxSequence,
                 0,

Reply via email to