This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dd7fa0632 [core] Support btree global index in paimon-common (#6869)
7dd7fa0632 is described below

commit 7dd7fa063254b3e16ab669dbf7242f8dc52c4497
Author: Faiz <[email protected]>
AuthorDate: Fri Dec 26 21:45:09 2025 +0800

    [core] Support btree global index in paimon-common (#6869)
---
 .../paimon/globalindex/GlobalIndexIOMeta.java      |  25 ++
 .../btree/BTreeFileFooter.java}                    |  46 ++-
 .../globalindex/btree/BTreeFileMetaSelector.java   | 199 +++++++++++
 .../globalindex/btree/BTreeGlobalIndexer.java      | 102 ++++++
 .../BTreeGlobalIndexerFactory.java}                |  22 +-
 .../paimon/globalindex/btree/BTreeIndexMeta.java   |  68 ++++
 .../globalindex/btree/BTreeIndexOptions.java       |  56 +++
 .../paimon/globalindex/btree/BTreeIndexReader.java | 379 +++++++++++++++++++++
 .../paimon/globalindex/btree/BTreeIndexWriter.java | 216 ++++++++++++
 .../paimon/globalindex/btree/KeySerializer.java    | 376 ++++++++++++++++++++
 .../globalindex/io/GlobalIndexFileReader.java      |   3 +
 .../globalindex/io/GlobalIndexFileWriter.java      |   5 +-
 .../paimon/lookup/sort/SortLookupStoreFooter.java  |   2 +-
 .../paimon/lookup/sort/SortLookupStoreWriter.java  |   1 +
 .../java/org/apache/paimon/memory/MemorySlice.java |   4 +
 .../apache/paimon/memory/MemorySliceOutput.java    |   6 +
 .../java/org/apache/paimon/sst/SstFileWriter.java  |   2 -
 .../apache/paimon/utils/RoaringNavigableMap64.java |   4 +
 .../bitmapindex/BitmapGlobalIndexTest.java         |  18 +-
 .../btree/BTreeFileMetaSelectorTest.java           | 160 +++++++++
 .../globalindex/btree/BTreeGlobalIndexerTest.java  | 378 ++++++++++++++++++++
 .../globalindex/GlobalIndexFileReadWrite.java      |   5 +-
 .../index/LuceneVectorGlobalIndexScanTest.java     |   5 +-
 .../lucene/index/LuceneVectorGlobalIndexTest.java  |  17 +-
 24 files changed, 2066 insertions(+), 33 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexIOMeta.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexIOMeta.java
index 8d6b5a0a1c..497a9448c5 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexIOMeta.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexIOMeta.java
@@ -18,6 +18,9 @@
 
 package org.apache.paimon.globalindex;
 
+import java.util.Arrays;
+import java.util.Objects;
+
 /** Index meta for global index. */
 public class GlobalIndexIOMeta {
 
@@ -48,4 +51,26 @@ public class GlobalIndexIOMeta {
     public byte[] metadata() {
         return metadata;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        GlobalIndexIOMeta that = (GlobalIndexIOMeta) o;
+        return Objects.equals(fileName, that.fileName)
+                && fileSize == that.fileSize
+                && rangeEnd == that.rangeEnd
+                && Arrays.equals(metadata, that.metadata);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(fileName, fileSize, rangeEnd);
+        result = 31 * result + Arrays.hashCode(metadata);
+        return result;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileFooter.java
similarity index 68%
copy from 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
copy to 
paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileFooter.java
index 9f4272096c..1484e12cea 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileFooter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.globalindex.btree;
 
 import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.memory.MemorySliceInput;
@@ -26,21 +26,24 @@ import org.apache.paimon.sst.BloomFilterHandle;
 
 import javax.annotation.Nullable;
 
-import static org.apache.paimon.sst.SstFileWriter.MAGIC_NUMBER;
+import static 
org.apache.paimon.globalindex.btree.BTreeIndexWriter.MAGIC_NUMBER;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
-/** Footer for a sorted file. */
-public class SortLookupStoreFooter {
-
-    public static final int ENCODED_LENGTH = 36;
+/** The Footer for BTree file. */
+public class BTreeFileFooter {
+    public static final int ENCODED_LENGTH = 48;
 
     @Nullable private final BloomFilterHandle bloomFilterHandle;
     private final BlockHandle indexBlockHandle;
+    @Nullable private final BlockHandle nullBitmapHandle;
 
-    public SortLookupStoreFooter(
-            @Nullable BloomFilterHandle bloomFilterHandle, BlockHandle 
indexBlockHandle) {
+    public BTreeFileFooter(
+            @Nullable BloomFilterHandle bloomFilterHandle,
+            BlockHandle indexBlockHandle,
+            BlockHandle nullBitmapHandle) {
         this.bloomFilterHandle = bloomFilterHandle;
         this.indexBlockHandle = indexBlockHandle;
+        this.nullBitmapHandle = nullBitmapHandle;
     }
 
     @Nullable
@@ -52,7 +55,12 @@ public class SortLookupStoreFooter {
         return indexBlockHandle;
     }
 
-    public static SortLookupStoreFooter readFooter(MemorySliceInput 
sliceInput) {
+    @Nullable
+    public BlockHandle getNullBitmapHandle() {
+        return nullBitmapHandle;
+    }
+
+    public static BTreeFileFooter readFooter(MemorySliceInput sliceInput) {
         // read bloom filter and index handles
         @Nullable
         BloomFilterHandle bloomFilterHandle =
@@ -65,6 +73,12 @@ public class SortLookupStoreFooter {
         }
         BlockHandle indexBlockHandle = new BlockHandle(sliceInput.readLong(), 
sliceInput.readInt());
 
+        @Nullable
+        BlockHandle nullBitmapHandle = new BlockHandle(sliceInput.readLong(), 
sliceInput.readInt());
+        if (nullBitmapHandle.offset() == 0 && nullBitmapHandle.size() == 0) {
+            nullBitmapHandle = null;
+        }
+
         // skip padding
         sliceInput.setPosition(ENCODED_LENGTH - 4);
 
@@ -72,16 +86,16 @@ public class SortLookupStoreFooter {
         int magicNumber = sliceInput.readInt();
         checkArgument(magicNumber == MAGIC_NUMBER, "File is not a table (bad 
magic number)");
 
-        return new SortLookupStoreFooter(bloomFilterHandle, indexBlockHandle);
+        return new BTreeFileFooter(bloomFilterHandle, indexBlockHandle, 
nullBitmapHandle);
     }
 
-    public static MemorySlice writeFooter(SortLookupStoreFooter footer) {
+    public static MemorySlice writeFooter(BTreeFileFooter footer) {
         MemorySliceOutput output = new MemorySliceOutput(ENCODED_LENGTH);
         writeFooter(footer, output);
         return output.toSlice();
     }
 
-    public static void writeFooter(SortLookupStoreFooter footer, 
MemorySliceOutput sliceOutput) {
+    public static void writeFooter(BTreeFileFooter footer, MemorySliceOutput 
sliceOutput) {
         // write bloom filter and index handles
         if (footer.bloomFilterHandle == null) {
             sliceOutput.writeLong(0);
@@ -96,6 +110,14 @@ public class SortLookupStoreFooter {
         sliceOutput.writeLong(footer.indexBlockHandle.offset());
         sliceOutput.writeInt(footer.indexBlockHandle.size());
 
+        if (footer.nullBitmapHandle == null) {
+            sliceOutput.writeLong(0);
+            sliceOutput.writeInt(0);
+        } else {
+            sliceOutput.writeLong(footer.nullBitmapHandle.offset());
+            sliceOutput.writeInt(footer.nullBitmapHandle.size());
+        }
+
         // write magic number
         sliceOutput.writeInt(MAGIC_NUMBER);
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
new file mode 100644
index 0000000000..06846eec3e
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.FunctionVisitor;
+import org.apache.paimon.predicate.TransformPredicate;
+import org.apache.paimon.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link FunctionVisitor} to select candidate btree index files. All files 
are expected to
+ * belong to the same field. The current {@code RowRangeGlobalIndexScanner} 
can guarantee that.
+ * Please do not break this premise if you want to implement your own index 
scanner.
+ */
+public class BTreeFileMetaSelector implements 
FunctionVisitor<Optional<List<GlobalIndexIOMeta>>> {
+    private final List<Pair<GlobalIndexIOMeta, BTreeIndexMeta>> files;
+    private final Comparator<Object> comparator;
+    private final KeySerializer keySerializer;
+
+    public BTreeFileMetaSelector(List<GlobalIndexIOMeta> files, KeySerializer 
keySerializer) {
+        this.files =
+                files.stream()
+                        .map(meta -> Pair.of(meta, 
BTreeIndexMeta.deserialize(meta.metadata())))
+                        .collect(Collectors.toList());
+        this.comparator = keySerializer.createComparator();
+        this.keySerializer = keySerializer;
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitIsNotNull(FieldRef fieldRef) 
{
+        return Optional.of(filter(meta -> true));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitIsNull(FieldRef fieldRef) {
+        return Optional.of(filter(BTreeIndexMeta::hasNulls));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitStartsWith(FieldRef 
fieldRef, Object literal) {
+        return Optional.of(filter(meta -> true));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitEndsWith(FieldRef fieldRef, 
Object literal) {
+        return Optional.of(filter(meta -> true));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitContains(FieldRef fieldRef, 
Object literal) {
+        return Optional.of(filter(meta -> true));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitLike(FieldRef fieldRef, 
Object literal) {
+        return Optional.of(filter(meta -> true));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitLessThan(FieldRef fieldRef, 
Object literal) {
+        // `<` means file.minKey < literal
+        return Optional.of(
+                filter(meta -> 
comparator.compare(deserialize(meta.getFirstKey()), literal) < 0));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitGreaterOrEqual(
+            FieldRef fieldRef, Object literal) {
+        // `>=` means file.maxKey >= literal
+        return Optional.of(
+                filter(meta -> 
comparator.compare(deserialize(meta.getLastKey()), literal) >= 0));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitNotEqual(FieldRef fieldRef, 
Object literal) {
+        return Optional.of(filter(meta -> true));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitLessOrEqual(FieldRef 
fieldRef, Object literal) {
+        // `<=` means file.minKey <= literal
+        return Optional.of(
+                filter(meta -> 
comparator.compare(deserialize(meta.getFirstKey()), literal) <= 0));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitEqual(FieldRef fieldRef, 
Object literal) {
+        return Optional.of(
+                filter(
+                        meta -> {
+                            Object minKey = deserialize(meta.getFirstKey());
+                            Object maxKey = deserialize(meta.getLastKey());
+                            return comparator.compare(literal, minKey) >= 0
+                                    && comparator.compare(literal, maxKey) <= 
0;
+                        }));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitGreaterThan(FieldRef 
fieldRef, Object literal) {
+        // `>` means file.maxKey > literal
+        return Optional.of(
+                filter(meta -> 
comparator.compare(deserialize(meta.getLastKey()), literal) > 0));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitIn(FieldRef fieldRef, 
List<Object> literals) {
+        return Optional.of(
+                filter(
+                        meta -> {
+                            Object minKey = deserialize(meta.getFirstKey());
+                            Object maxKey = deserialize(meta.getLastKey());
+                            for (Object literal : literals) {
+                                if (comparator.compare(literal, minKey) >= 0
+                                        && comparator.compare(literal, maxKey) 
<= 0) {
+                                    return true;
+                                }
+                            }
+                            return false;
+                        }));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitNotIn(FieldRef fieldRef, 
List<Object> literals) {
+        // we can't filter any file meta by NOT IN condition
+        return Optional.of(filter(meta -> true));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitAnd(
+            List<Optional<List<GlobalIndexIOMeta>>> children) {
+        HashSet<GlobalIndexIOMeta> result = null;
+        for (Optional<List<GlobalIndexIOMeta>> child : children) {
+            if (!child.isPresent()) {
+                return Optional.empty();
+            }
+            if (result == null) {
+                result = new HashSet<>(child.get());
+            } else {
+                result.retainAll(child.get());
+            }
+            if (result.isEmpty()) {
+                return Optional.empty();
+            }
+        }
+        return result == null ? Optional.empty() : Optional.of(new 
ArrayList<>(result));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visitOr(
+            List<Optional<List<GlobalIndexIOMeta>>> children) {
+        HashSet<GlobalIndexIOMeta> result = new HashSet<>();
+        for (Optional<List<GlobalIndexIOMeta>> child : children) {
+            child.ifPresent(result::addAll);
+        }
+        return result.isEmpty() ? Optional.empty() : Optional.of(new 
ArrayList<>(result));
+    }
+
+    @Override
+    public Optional<List<GlobalIndexIOMeta>> visit(TransformPredicate 
predicate) {
+        return Optional.empty();
+    }
+
+    private Object deserialize(byte[] valueBytes) {
+        return keySerializer.deserialize(MemorySlice.wrap(valueBytes));
+    }
+
+    private List<GlobalIndexIOMeta> filter(Predicate<BTreeIndexMeta> 
predicate) {
+        return files.stream()
+                .filter(pair -> predicate.test(pair.getRight()))
+                .map(Pair::getLeft)
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
new file mode 100644
index 0000000000..04e661b95d
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.LazyField;
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The {@link GlobalIndexer} for btree index. We do not build a B-tree 
directly in memory, instead,
+ * we form a logical B-tree via multi-level metadata over SST files that store 
the actual data, as
+ * below:
+ *
+ * <pre>
+ *                                             BTree-Index
+ *                                             /         \
+ *                                            /    ...    \
+ *                                           /             \
+ *     +--------------------------------------+           +------------+
+ *     |               SST File               |           |            |
+ *     +--------------------------------------+           |            |
+ *     |              Root Index              |           |            |
+ *     |             /   ...    \             |    ...    |  SST File  |
+ *     |     Leaf Index  ...  Leaf Index      |           |            |
+ *     |     /  ...   \       /  ...   \      |           |            |
+ *     | DataBlock       ...        DataBlock |           |            |
+ *     +--------------------------------------+           +------------+
+ * </pre>
+ *
+ * <p>This approach significantly reduces memory pressure during index reads.
+ */
+public class BTreeGlobalIndexer implements GlobalIndexer {
+
+    private final KeySerializer keySerializer;
+    private final Options options;
+    private final LazyField<CacheManager> cacheManager;
+
+    public BTreeGlobalIndexer(DataField dataField, Options options) {
+        this.keySerializer = KeySerializer.create(dataField.type());
+        this.options = options;
+        // todo: cacheManager can be null to disallow data cache.
+        this.cacheManager =
+                new LazyField<>(
+                        () ->
+                                new CacheManager(
+                                        
options.get(BTreeIndexOptions.BTREE_INDEX_CACHE_SIZE),
+                                        options.get(
+                                                BTreeIndexOptions
+                                                        
.BTREE_INDEX_HIGH_PRIORITY_POOL_RATIO)));
+    }
+
+    @Override
+    public GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter) 
throws IOException {
+        long blockSize = 
options.get(BTreeIndexOptions.BTREE_INDEX_BLOCK_SIZE).getBytes();
+        CompressOptions compressOptions =
+                new CompressOptions(
+                        options.get(BTreeIndexOptions.BTREE_INDEX_COMPRESSION),
+                        
options.get(BTreeIndexOptions.BTREE_INDEX_COMPRESSION_LEVEL));
+        return new BTreeIndexWriter(
+                fileWriter,
+                keySerializer,
+                (int) blockSize,
+                BlockCompressionFactory.create(compressOptions));
+    }
+
+    @Override
+    public GlobalIndexReader createReader(
+            GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) 
throws IOException {
+        // Single reader only supports read one index file
+        Preconditions.checkState(files.size() == 1);
+        return new BTreeIndexReader(keySerializer, fileReader, files.get(0), 
cacheManager.get());
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerFactory.java
similarity index 55%
copy from 
paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
copy to 
paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerFactory.java
index 7c88057fb4..914ed2a467 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerFactory.java
@@ -16,14 +16,24 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.globalindex.io;
+package org.apache.paimon.globalindex.btree;
 
-import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.GlobalIndexerFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
 
-import java.io.IOException;
+/** The {@link GlobalIndexerFactory} for btree index. */
+public class BTreeGlobalIndexerFactory implements GlobalIndexerFactory {
+    public static final String IDENTIFIER = "btree";
 
-/** File reader for global index. */
-public interface GlobalIndexFileReader {
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
 
-    SeekableInputStream getInputStream(String fileName) throws IOException;
+    @Override
+    public GlobalIndexer create(DataField dataField, Options options) {
+        return new BTreeGlobalIndexer(dataField, options);
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
new file mode 100644
index 0000000000..da4a1be622
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+import org.apache.paimon.memory.MemorySliceOutput;
+
+/** Index Meta of each BTree index file. */
+public class BTreeIndexMeta {
+    private final byte[] firstKey;
+    private final byte[] lastKey;
+    private final boolean hasNulls;
+
+    public BTreeIndexMeta(byte[] firstKey, byte[] lastKey, boolean hasNulls) {
+        this.firstKey = firstKey;
+        this.lastKey = lastKey;
+        this.hasNulls = hasNulls;
+    }
+
+    public byte[] getFirstKey() {
+        return firstKey;
+    }
+
+    public byte[] getLastKey() {
+        return lastKey;
+    }
+
+    public boolean hasNulls() {
+        return hasNulls;
+    }
+
+    public byte[] serialize() {
+        MemorySliceOutput sliceOutput = new MemorySliceOutput(firstKey.length 
+ lastKey.length + 8);
+        sliceOutput.writeInt(firstKey.length);
+        sliceOutput.writeBytes(firstKey);
+        sliceOutput.writeInt(lastKey.length);
+        sliceOutput.writeBytes(lastKey);
+        sliceOutput.writeByte(hasNulls ? 1 : 0);
+        return sliceOutput.toSlice().getHeapMemory();
+    }
+
+    public static BTreeIndexMeta deserialize(byte[] data) {
+        MemorySliceInput sliceInput = MemorySlice.wrap(data).toInput();
+        int firstKeyLength = sliceInput.readInt();
+        byte[] firstKey = sliceInput.readSlice(firstKeyLength).copyBytes();
+        int lastKeyLength = sliceInput.readInt();
+        byte[] lastKey = sliceInput.readSlice(lastKeyLength).copyBytes();
+        boolean hasNulls = sliceInput.readByte() == 1;
+        return new BTreeIndexMeta(firstKey, lastKey, hasNulls);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
new file mode 100644
index 0000000000..687a3f380b
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.MemorySize;
+
+/** Options for BTree index. */
+public class BTreeIndexOptions {
+    public static final ConfigOption<String> BTREE_INDEX_COMPRESSION =
+            ConfigOptions.key("btree-index.compression")
+                    .stringType()
+                    .defaultValue("none")
+                    .withDescription("The compression algorithm to use for 
BTreeIndex");
+
+    public static final ConfigOption<Integer> BTREE_INDEX_COMPRESSION_LEVEL =
+            ConfigOptions.key("btree-index.compression")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription("The compression level of the compression 
algorithm");
+
+    public static final ConfigOption<MemorySize> BTREE_INDEX_BLOCK_SIZE =
+            ConfigOptions.key("btree-index.block-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofKibiBytes(64))
+                    .withDescription("The block size to use for BTreeIndex");
+
+    public static final ConfigOption<MemorySize> BTREE_INDEX_CACHE_SIZE =
+            ConfigOptions.key("btree-index.cache-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(128))
+                    .withDescription("The cache size to use for BTreeIndex");
+
+    public static final ConfigOption<Double> 
BTREE_INDEX_HIGH_PRIORITY_POOL_RATIO =
+            ConfigOptions.key("btree-index.high-priority-pool-ratio")
+                    .doubleType()
+                    .defaultValue(0.1)
+                    .withDescription("The high priority pool ratio to use for 
BTreeIndex");
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
new file mode 100644
index 0000000000..59b6fe3c7b
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.sst.BlockCache;
+import org.apache.paimon.sst.BlockHandle;
+import org.apache.paimon.sst.BlockIterator;
+import org.apache.paimon.sst.SstFileReader;
+import org.apache.paimon.utils.FileBasedBloomFilter;
+import org.apache.paimon.utils.LazyField;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.CRC32;
+
+/** The {@link GlobalIndexReader} implementation for btree index. */
+public class BTreeIndexReader implements GlobalIndexReader {
+    private final SeekableInputStream input;
+    private final SstFileReader reader;
+    private final KeySerializer keySerializer;
+    private final Comparator<Object> comparator;
+    private final LazyField<RoaringNavigableMap64> nullBitmap;
+    private final Object minKey;
+    private final Object maxKey;
+
+    public BTreeIndexReader(
+            KeySerializer keySerializer,
+            GlobalIndexFileReader fileReader,
+            GlobalIndexIOMeta globalIndexIOMeta,
+            CacheManager cacheManager)
+            throws IOException {
+        this.keySerializer = keySerializer;
+        this.comparator = keySerializer.createComparator();
+        BTreeIndexMeta indexMeta = 
BTreeIndexMeta.deserialize(globalIndexIOMeta.metadata());
+        this.minKey = 
keySerializer.deserialize(MemorySlice.wrap(indexMeta.getFirstKey()));
+        this.maxKey = 
keySerializer.deserialize(MemorySlice.wrap(indexMeta.getLastKey()));
+        this.input = fileReader.getInputStream(globalIndexIOMeta.fileName());
+
+        // prepare file footer
+        long fileSize = globalIndexIOMeta.fileSize();
+        Path filePath = fileReader.filePath(globalIndexIOMeta.fileName());
+        BlockCache blockCache = new BlockCache(filePath, input, cacheManager);
+        BTreeFileFooter footer = readFooter(blockCache, fileSize);
+
+        // prepare nullBitmap and SstFileReader
+        this.nullBitmap =
+                new LazyField<>(() -> readNullBitmap(blockCache, 
footer.getNullBitmapHandle()));
+        FileBasedBloomFilter bloomFilter =
+                FileBasedBloomFilter.create(
+                        input, filePath, cacheManager, 
footer.getBloomFilterHandle());
+        this.reader =
+                new SstFileReader(
+                        createSliceComparator(keySerializer),
+                        blockCache,
+                        footer.getIndexBlockHandle(),
+                        bloomFilter);
+    }
+
+    private BTreeFileFooter readFooter(BlockCache blockCache, long fileSize) {
+        MemorySegment footerEncodings =
+                blockCache.getBlock(
+                        fileSize - BTreeFileFooter.ENCODED_LENGTH,
+                        BTreeFileFooter.ENCODED_LENGTH,
+                        b -> b,
+                        true);
+        return 
BTreeFileFooter.readFooter(MemorySlice.wrap(footerEncodings).toInput());
+    }
+
+    private RoaringNavigableMap64 readNullBitmap(
+            BlockCache cache, @Nullable BlockHandle blockHandle) {
+        RoaringNavigableMap64 nullBitmap = new RoaringNavigableMap64();
+        if (blockHandle == null) {
+            return nullBitmap;
+        }
+
+        CRC32 crc32c = new CRC32();
+        // read bytes and crc value
+        MemorySliceInput sliceInput =
+                MemorySlice.wrap(
+                                cache.getBlock(
+                                        blockHandle.offset(),
+                                        blockHandle.size() + 4,
+                                        b -> b,
+                                        false))
+                        .toInput();
+        byte[] nullBitmapEncodings = 
sliceInput.readSlice(blockHandle.size()).copyBytes();
+
+        // check crc value
+        crc32c.update(nullBitmapEncodings, 0, nullBitmapEncodings.length);
+        int expectedCrcValue = sliceInput.readInt();
+        Preconditions.checkState(
+                (int) crc32c.getValue() == expectedCrcValue,
+                "Crc check failure during decoding null bitmap.");
+
+        try {
+            nullBitmap.deserialize(nullBitmapEncodings);
+        } catch (IOException ioe) {
+            throw new RuntimeException(
+                    "Fail to deserialize null bitmap but crc check passed,"
+                            + " this means the ser/de algorithms not match.",
+                    ioe);
+        }
+
+        return nullBitmap;
+    }
+
+    private Comparator<MemorySlice> createSliceComparator(KeySerializer 
keySerializer) {
+        return (slice1, slice2) ->
+                comparator.compare(
+                        keySerializer.deserialize(slice1), 
keySerializer.deserialize(slice2));
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+        input.close();
+    }
+
+    @Override
+    public GlobalIndexResult visitIsNotNull(FieldRef fieldRef) {
+        // nulls are stored separately in null bitmap.
+        return GlobalIndexResult.create(
+                () -> {
+                    try {
+                        return fullData();
+                    } catch (IOException ioe) {
+                        throw new RuntimeException("fail to read btree index 
file.", ioe);
+                    }
+                });
+    }
+
+    @Override
+    public GlobalIndexResult visitIsNull(FieldRef fieldRef) {
+        // nulls are stored separately in null bitmap.
+        return GlobalIndexResult.create(nullBitmap::get);
+    }
+
+    @Override
+    public GlobalIndexResult visitStartsWith(FieldRef fieldRef, Object 
literal) {
+        // todo: `startsWith` can also be covered by btree index.
+        return GlobalIndexResult.create(
+                () -> {
+                    try {
+                        return fullData();
+                    } catch (IOException ioe) {
+                        throw new RuntimeException("fail to read btree index 
file.", ioe);
+                    }
+                });
+    }
+
+    @Override
+    public GlobalIndexResult visitEndsWith(FieldRef fieldRef, Object literal) {
+        return GlobalIndexResult.create(
+                () -> {
+                    try {
+                        return fullData();
+                    } catch (IOException ioe) {
+                        throw new RuntimeException("fail to read btree index 
file.", ioe);
+                    }
+                });
+    }
+
+    @Override
+    public GlobalIndexResult visitContains(FieldRef fieldRef, Object literal) {
+        return GlobalIndexResult.create(
+                () -> {
+                    try {
+                        return fullData();
+                    } catch (IOException ioe) {
+                        throw new RuntimeException("fail to read btree index 
file.", ioe);
+                    }
+                });
+    }
+
+    @Override
+    public GlobalIndexResult visitLike(FieldRef fieldRef, Object literal) {
+        return GlobalIndexResult.create(
+                () -> {
+                    try {
+                        return fullData();
+                    } catch (IOException ioe) {
+                        throw new RuntimeException("fail to read btree index 
file.", ioe);
+                    }
+                });
+    }
+
+    @Override
+    public GlobalIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
+        return GlobalIndexResult.create(
+                () -> {
+                    try {
+                        return rangeQuery(minKey, literal, true, false);
+                    } catch (IOException ioe) {
+                        throw new RuntimeException("fail to read btree index 
file.", ioe);
+                    }
+                });
+    }
+
+    @Override
+    public GlobalIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object 
literal) {
+        return GlobalIndexResult.create(
+                () -> {
+                    try {
+                        return rangeQuery(literal, maxKey, true, true);
+                    } catch (IOException ioe) {
+                        throw new RuntimeException("fail to read btree index 
file.", ioe);
+                    }
+                });
+    }
+
+    @Override
+    public GlobalIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
+        return GlobalIndexResult.create(
+                () -> {
+                    try {
+                        RoaringNavigableMap64 result = fullData();
+                        result.andNot(rangeQuery(literal, literal, true, 
true));
+                        return result;
+                    } catch (IOException ioe) {
+                        throw new RuntimeException("fail to read btree index 
file.", ioe);
+                    }
+                });
+    }
+
+    @Override
+    public GlobalIndexResult visitLessOrEqual(FieldRef fieldRef, Object 
literal) {
+        return GlobalIndexResult.create(
+                () -> {
+                    try {
+                        return rangeQuery(minKey, literal, true, true);
+                    } catch (IOException ioe) {
+                        throw new RuntimeException("fail to read btree index 
file.", ioe);
+                    }
+                });
+    }
+
+    @Override
+    public GlobalIndexResult visitEqual(FieldRef fieldRef, Object literal) {
+        return GlobalIndexResult.create(
+                () -> {
+                    try {
+                        return rangeQuery(literal, literal, true, true);
+                    } catch (IOException ioe) {
+                        throw new RuntimeException("fail to read btree index 
file.", ioe);
+                    }
+                });
+    }
+
+    @Override
+    public GlobalIndexResult visitGreaterThan(FieldRef fieldRef, Object 
literal) {
+        return GlobalIndexResult.create(
+                () -> {
+                    try {
+                        return rangeQuery(literal, maxKey, false, true);
+                    } catch (IOException ioe) {
+                        throw new RuntimeException("fail to read btree index 
file.", ioe);
+                    }
+                });
+    }
+
+    @Override
+    public GlobalIndexResult visitIn(FieldRef fieldRef, List<Object> literals) 
{
+        return GlobalIndexResult.create(
+                () -> {
+                    try {
+                        RoaringNavigableMap64 result = new 
RoaringNavigableMap64();
+                        for (Object literal : literals) {
+                            result.or(rangeQuery(literal, literal, true, 
true));
+                        }
+                        return result;
+                    } catch (IOException ioe) {
+                        throw new RuntimeException("fail to read btree index 
file.", ioe);
+                    }
+                });
+    }
+
+    @Override
+    public GlobalIndexResult visitNotIn(FieldRef fieldRef, List<Object> 
literals) {
+        return GlobalIndexResult.create(
+                () -> {
+                    try {
+                        RoaringNavigableMap64 result = fullData();
+                        result.andNot(this.visitIn(fieldRef, 
literals).results());
+                        return result;
+                    } catch (IOException ioe) {
+                        throw new RuntimeException("fail to read btree index 
file.", ioe);
+                    }
+                });
+    }
+
+    private RoaringNavigableMap64 fullData() throws IOException {
+        return rangeQuery(minKey, maxKey, true, true);
+    }
+
+    /**
+     * Range query on underlying SST File.
+     *
+     * @param from lower bound
+     * @param to upper bound
+     * @param fromInclusive whether include lower bound
+     * @param toInclusive whether include upper bound
+     * @return a bitmap containing all qualified row ids
+     */
+    private RoaringNavigableMap64 rangeQuery(
+            Object from, Object to, boolean fromInclusive, boolean 
toInclusive) throws IOException {
+        SstFileReader.SstFileIterator fileIter = reader.createIterator();
+        fileIter.seekTo(keySerializer.serialize(from));
+
+        boolean skipped = false;
+        RoaringNavigableMap64 result = new RoaringNavigableMap64();
+        BlockIterator dataIter;
+        Map.Entry<MemorySlice, MemorySlice> entry;
+        while ((dataIter = fileIter.readBatch()) != null) {
+            while (dataIter.hasNext()) {
+                entry = dataIter.next();
+
+                if (!fromInclusive && !skipped) {
+                    // this is correct only if the underlying file do not have 
duplicated keys.
+                    skipped = true;
+                    continue;
+                }
+
+                int difference = 
comparator.compare(keySerializer.deserialize(entry.getKey()), to);
+                if (difference > 0 || !toInclusive && difference == 0) {
+                    return result;
+                }
+
+                for (long rowId : deserializeRowIds(entry.getValue())) {
+                    result.add(rowId);
+                }
+            }
+        }
+        return result;
+    }
+
+    private long[] deserializeRowIds(MemorySlice slice) {
+        MemorySliceInput sliceInput = slice.toInput();
+        int length = sliceInput.readVarLenInt();
+        Preconditions.checkState(length > 0, "Invalid row id length: 0");
+        long[] ids = new long[length];
+        for (int i = 0; i < length; i++) {
+            ids[i] = sliceInput.readVarLenLong();
+        }
+        return ids;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
new file mode 100644
index 0000000000..69b62290dd
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceOutput;
+import org.apache.paimon.sst.BlockHandle;
+import org.apache.paimon.sst.BloomFilterHandle;
+import org.apache.paimon.sst.SstFileWriter;
+import org.apache.paimon.utils.LazyField;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.zip.CRC32;
+
+/**
+ * The {@link GlobalIndexWriter} implementation for BTree index. Note that 
users must keep written
+ * keys monotonically incremental. All null keys are stored in a separate 
bitmap, which will be
+ * serialized and appended to the file end on close. The layout is as below:
+ *
+ * <pre>
+ *    +-----------------------------------+------+
+ *    |             Footer                |      |
+ *    +-----------------------------------+      |
+ *    |           Index Block             |      +--> Loaded on open
+ *    +-----------------------------------+      |
+ *    |        Bloom Filter Block         |      |
+ *    +-----------------------------------+------+
+ *    |         Null Bitmap Block         |      |
+ *    +-----------------------------------+      |
+ *    |            Data Block             |      |
+ *    +-----------------------------------+      +--> Loaded on requested
+ *    |              ......               |      |
+ *    +-----------------------------------+      |
+ *    |            Data Block             |      |
+ *    +-----------------------------------+------+
+ * </pre>
+ *
+ * <p>For efficiency, we combine entries with the same keys and store a 
compact list of row ids for
+ * each key.
+ */
+public class BTreeIndexWriter implements GlobalIndexWriter {
+    public static final int MAGIC_NUMBER = 198732882;
+
+    private final String fileName;
+    private final PositionOutputStream out;
+
+    private long maxRowId = Long.MIN_VALUE;
+    private long minRowId = Long.MAX_VALUE;
+
+    private final SstFileWriter writer;
+    private final KeySerializer keySerializer;
+    private final Comparator<Object> comparator;
+
+    private Object firstKey = null;
+    private Object lastKey = null;
+    private final List<Long> currentRowIds = new ArrayList<>();
+
+    // for nulls
+    LazyField<RoaringNavigableMap64> nullBitmap = new 
LazyField<>(RoaringNavigableMap64::new);
+
+    public BTreeIndexWriter(
+            GlobalIndexFileWriter indexFileWriter,
+            KeySerializer keySerializer,
+            int blockSize,
+            BlockCompressionFactory compressionFactory)
+            throws IOException {
+        this.fileName = 
indexFileWriter.newFileName(BTreeGlobalIndexerFactory.IDENTIFIER);
+        this.out = indexFileWriter.newOutputStream(this.fileName);
+        this.keySerializer = keySerializer;
+        this.comparator = keySerializer.createComparator();
+        // todo: we may enable bf to accelerate equal and in predicate in the 
future
+        this.writer = new SstFileWriter(out, blockSize, null, 
compressionFactory);
+    }
+
+    @Override
+    public void write(@Nullable Object key) {
+        throw new UnsupportedOperationException(
+                "BTree index writer should explicitly specify row id for each 
key");
+    }
+
+    @Override
+    public void write(@Nullable Object key, long rowId) {
+        if (key == null) {
+            nullBitmap.get().add(rowId);
+            return;
+        }
+
+        if (lastKey != null && comparator.compare(key, lastKey) != 0) {
+            try {
+                flush();
+            } catch (IOException e) {
+                throw new RuntimeException("Error in writing btree index 
files.", e);
+            }
+        }
+        lastKey = key;
+        currentRowIds.add(rowId);
+
+        // update stats
+        if (firstKey == null) {
+            firstKey = key;
+        }
+        minRowId = Math.min(minRowId, rowId);
+        maxRowId = Math.max(maxRowId, rowId);
+    }
+
+    private void flush() throws IOException {
+        if (currentRowIds.isEmpty()) {
+            return;
+        }
+
+        // serialize row id list
+        MemorySliceOutput sliceOutput = new 
MemorySliceOutput(currentRowIds.size() * 9 + 5);
+        sliceOutput.writeVarLenInt(currentRowIds.size());
+        for (long currentRowId : currentRowIds) {
+            sliceOutput.writeVarLenLong(currentRowId);
+        }
+        currentRowIds.clear();
+
+        writer.put(keySerializer.serialize(lastKey), 
sliceOutput.toSlice().copyBytes());
+    }
+
+    @Override
+    public List<ResultEntry> finish() {
+        try {
+            // write remaining row ids
+            flush();
+
+            // flush writer remaining data blocks
+            writer.flush();
+
+            // write null bitmap
+            BlockHandle nullBitmapHandle = writeNullBitmap();
+
+            // write bloom filter (currently is always null, but we could add 
it for equal
+            // and in condition.)
+            BloomFilterHandle bloomFilterHandle = writer.writeBloomFilter();
+
+            // write index block
+            BlockHandle indexBlockHandle = writer.writeIndexBlock();
+
+            // write footer
+            BTreeFileFooter footer =
+                    new BTreeFileFooter(bloomFilterHandle, indexBlockHandle, 
nullBitmapHandle);
+            MemorySlice footerEncoding = BTreeFileFooter.writeFooter(footer);
+            writer.writeSlice(footerEncoding);
+
+            out.close();
+        } catch (IOException e) {
+            throw new RuntimeException("Error in closing BTree index writer", 
e);
+        }
+
+        if (firstKey == null) {
+            throw new RuntimeException("Should never write an empty btree 
index file.");
+        }
+
+        return Collections.singletonList(
+                ResultEntry.of(
+                        fileName,
+                        new BTreeIndexMeta(
+                                        keySerializer.serialize(firstKey),
+                                        keySerializer.serialize(lastKey),
+                                        nullBitmap.initialized())
+                                .serialize(),
+                        new Range(minRowId, maxRowId)));
+    }
+
+    @Nullable
+    private BlockHandle writeNullBitmap() throws IOException {
+        if (!nullBitmap.initialized()) {
+            return null;
+        }
+
+        CRC32 crc32 = new CRC32();
+        byte[] serializedBitmap = nullBitmap.get().serialize();
+        int length = serializedBitmap.length;
+        crc32.update(serializedBitmap, 0, length);
+
+        // serialized bitmap + crc value
+        MemorySliceOutput sliceOutput = new MemorySliceOutput(length + 4);
+        sliceOutput.writeBytes(serializedBitmap);
+        sliceOutput.writeInt((int) crc32.getValue());
+
+        BlockHandle nullBitmapHandle = new BlockHandle(out.getPos(), length);
+        writer.writeSlice(sliceOutput.toSlice());
+
+        return nullBitmapHandle;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/KeySerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/KeySerializer.java
new file mode 100644
index 0000000000..9971fd2751
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/KeySerializer.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+import org.apache.paimon.memory.MemorySliceOutput;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeDefaultVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+
+import java.util.Comparator;
+
+/** This interface provides core methods to ser/de and compare btree index 
keys. */
+public interface KeySerializer {
+
+    byte[] serialize(Object key);
+
+    Object deserialize(MemorySlice data);
+
+    Comparator<Object> createComparator();
+
+    static KeySerializer create(DataType type) {
+        return type.accept(
+                new DataTypeDefaultVisitor<KeySerializer>() {
+                    @Override
+                    public KeySerializer defaultMethod(DataType dataType) {
+                        throw new UnsupportedOperationException(
+                                "DataType: " + dataType + " is not supported 
by btree index now.");
+                    }
+
+                    @Override
+                    public KeySerializer visit(CharType charType) {
+                        return new StringSerializer();
+                    }
+
+                    @Override
+                    public KeySerializer visit(VarCharType varCharType) {
+                        return new StringSerializer();
+                    }
+
+                    @Override
+                    public KeySerializer visit(TinyIntType tinyIntType) {
+                        return new TinyIntSerializer();
+                    }
+
+                    @Override
+                    public KeySerializer visit(SmallIntType smallIntType) {
+                        return new SmallIntSerializer();
+                    }
+
+                    @Override
+                    public KeySerializer visit(IntType intType) {
+                        return new IntSerializer();
+                    }
+
+                    @Override
+                    public KeySerializer visit(BigIntType bigIntType) {
+                        return new BigIntSerializer();
+                    }
+
+                    @Override
+                    public KeySerializer visit(BooleanType booleanType) {
+                        return new BooleanSerializer();
+                    }
+
+                    @Override
+                    public KeySerializer visit(DecimalType decimalType) {
+                        return new DecimalSerializer(
+                                decimalType.getPrecision(), 
decimalType.getScale());
+                    }
+
+                    @Override
+                    public KeySerializer visit(FloatType floatType) {
+                        return new FloatSerializer();
+                    }
+
+                    @Override
+                    public KeySerializer visit(DoubleType doubleType) {
+                        return new DoubleSerializer();
+                    }
+
+                    @Override
+                    public KeySerializer visit(DateType dateType) {
+                        return new IntSerializer();
+                    }
+
+                    @Override
+                    public KeySerializer visit(TimeType timeType) {
+                        return new IntSerializer();
+                    }
+
+                    @Override
+                    public KeySerializer visit(TimestampType timestampType) {
+                        return new 
TimestampSerializer(timestampType.getPrecision());
+                    }
+
+                    @Override
+                    public KeySerializer visit(LocalZonedTimestampType 
localZonedTimestampType) {
+                        return new 
TimestampSerializer(localZonedTimestampType.getPrecision());
+                    }
+                });
+    }
+
+    /** Serializer for int type. */
+    class IntSerializer implements KeySerializer {
+        private final MemorySliceOutput keyOut = new MemorySliceOutput(4);
+
+        @Override
+        public byte[] serialize(Object key) {
+            keyOut.reset();
+            keyOut.writeInt((Integer) key);
+            return keyOut.toSlice().copyBytes();
+        }
+
+        @Override
+        public Object deserialize(MemorySlice data) {
+            return data.readInt(0);
+        }
+
+        @Override
+        public Comparator<Object> createComparator() {
+            return Comparator.comparing(o -> (Integer) o);
+        }
+    }
+
+    /** Serializer for long type. */
+    class BigIntSerializer implements KeySerializer {
+        private final MemorySliceOutput keyOut = new MemorySliceOutput(8);
+
+        @Override
+        public byte[] serialize(Object key) {
+            keyOut.reset();
+            keyOut.writeLong((Long) key);
+            return keyOut.toSlice().copyBytes();
+        }
+
+        @Override
+        public Object deserialize(MemorySlice data) {
+            return data.readLong(0);
+        }
+
+        @Override
+        public Comparator<Object> createComparator() {
+            return Comparator.comparing(o -> (Long) o);
+        }
+    }
+
+    /** Serializer for tiny int type. */
+    class TinyIntSerializer implements KeySerializer {
+
+        @Override
+        public byte[] serialize(Object key) {
+            return new byte[] {(byte) key};
+        }
+
+        @Override
+        public Object deserialize(MemorySlice data) {
+            return data.readByte(0);
+        }
+
+        @Override
+        public Comparator<Object> createComparator() {
+            return Comparator.comparing(o -> (Byte) o);
+        }
+    }
+
+    /** Serializer for small int type. */
+    class SmallIntSerializer implements KeySerializer {
+        private final MemorySliceOutput keyOut = new MemorySliceOutput(2);
+
+        @Override
+        public byte[] serialize(Object key) {
+            keyOut.reset();
+            keyOut.writeShort((Short) key);
+            return keyOut.toSlice().copyBytes();
+        }
+
+        @Override
+        public Object deserialize(MemorySlice data) {
+            return data.readShort(0);
+        }
+
+        @Override
+        public Comparator<Object> createComparator() {
+            return Comparator.comparing(o -> (Short) o);
+        }
+    }
+
+    /** Serializer for boolean type. */
+    class BooleanSerializer implements KeySerializer {
+
+        @Override
+        public byte[] serialize(Object key) {
+            return new byte[] {(Boolean) key ? (byte) 1 : (byte) 0};
+        }
+
+        @Override
+        public Object deserialize(MemorySlice data) {
+            return data.readByte(0) == (byte) 1 ? Boolean.TRUE : Boolean.FALSE;
+        }
+
+        @Override
+        public Comparator<Object> createComparator() {
+            return Comparator.comparing(o -> (Boolean) o);
+        }
+    }
+
+    /** Serializer for float type. */
+    class FloatSerializer implements KeySerializer {
+        private final MemorySliceOutput keyOut = new MemorySliceOutput(4);
+
+        @Override
+        public byte[] serialize(Object key) {
+            keyOut.reset();
+            keyOut.writeInt(Float.floatToIntBits((Float) key));
+            return keyOut.toSlice().copyBytes();
+        }
+
+        @Override
+        public Object deserialize(MemorySlice data) {
+            return Float.intBitsToFloat(data.readInt(0));
+        }
+
+        @Override
+        public Comparator<Object> createComparator() {
+            return Comparator.comparing(o -> (Float) o);
+        }
+    }
+
+    /** Serializer for double type. */
+    class DoubleSerializer implements KeySerializer {
+        private final MemorySliceOutput keyOut = new MemorySliceOutput(8);
+
+        @Override
+        public byte[] serialize(Object key) {
+            keyOut.reset();
+            keyOut.writeLong(Double.doubleToLongBits((Double) key));
+            return keyOut.toSlice().copyBytes();
+        }
+
+        @Override
+        public Object deserialize(MemorySlice data) {
+            return Double.longBitsToDouble(data.readLong(0));
+        }
+
+        @Override
+        public Comparator<Object> createComparator() {
+            return Comparator.comparing(o -> (Double) o);
+        }
+    }
+
+    /** Serializer for decimal type. */
+    class DecimalSerializer implements KeySerializer {
+        private final MemorySliceOutput keyOut = new MemorySliceOutput(8);
+        private final int precision;
+        private final int scale;
+
+        public DecimalSerializer(int precision, int scale) {
+            this.precision = precision;
+            this.scale = scale;
+        }
+
+        @Override
+        public byte[] serialize(Object key) {
+            if (Decimal.isCompact(precision)) {
+                keyOut.reset();
+                keyOut.writeLong(((Decimal) key).toUnscaledLong());
+                return keyOut.toSlice().copyBytes();
+            }
+            return ((Decimal) key).toUnscaledBytes();
+        }
+
+        @Override
+        public Object deserialize(MemorySlice data) {
+            if (Decimal.isCompact(precision)) {
+                return Decimal.fromUnscaledLong(data.readLong(0), precision, 
scale);
+            }
+            return Decimal.fromUnscaledBytes(data.copyBytes(), precision, 
scale);
+        }
+
+        @Override
+        public Comparator<Object> createComparator() {
+            return Comparator.comparing(o -> (Decimal) o);
+        }
+    }
+
+    /** Serializer for STRING type. */
+    class StringSerializer implements KeySerializer {
+
+        @Override
+        public byte[] serialize(Object key) {
+            return ((BinaryString) key).toBytes();
+        }
+
+        @Override
+        public Object deserialize(MemorySlice data) {
+            return BinaryString.fromBytes(data.copyBytes());
+        }
+
+        @Override
+        public Comparator<Object> createComparator() {
+            return Comparator.comparing(o -> (BinaryString) o);
+        }
+    }
+
+    /** Serializer for timestamp. */
+    class TimestampSerializer implements KeySerializer {
+        private final MemorySliceOutput keyOut = new MemorySliceOutput(8);
+        private final int precision;
+
+        public TimestampSerializer(int precision) {
+            this.precision = precision;
+        }
+
+        @Override
+        public byte[] serialize(Object key) {
+            keyOut.reset();
+            if (Timestamp.isCompact(precision)) {
+                keyOut.writeLong(((Timestamp) key).getMillisecond());
+            } else {
+                keyOut.writeLong(((Timestamp) key).getMillisecond());
+                keyOut.writeVarLenInt(((Timestamp) 
key).getNanoOfMillisecond());
+            }
+            return keyOut.toSlice().copyBytes();
+        }
+
+        @Override
+        public Object deserialize(MemorySlice data) {
+            if (Timestamp.isCompact(precision)) {
+                return Timestamp.fromEpochMillis(data.readLong(0));
+            }
+            MemorySliceInput input = data.toInput();
+            long millis = input.readLong();
+            int nanos = input.readVarLenInt();
+            return Timestamp.fromEpochMillis(millis, nanos);
+        }
+
+        @Override
+        public Comparator<Object> createComparator() {
+            return Comparator.comparing(o -> (Timestamp) o);
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
index 7c88057fb4..be0a2c18b9 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.globalindex.io;
 
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.SeekableInputStream;
 
 import java.io.IOException;
@@ -26,4 +27,6 @@ import java.io.IOException;
 public interface GlobalIndexFileReader {
 
     SeekableInputStream getInputStream(String fileName) throws IOException;
+
+    Path filePath(String fileName);
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
index 850cbb9451..cbc0dd4ea2 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
@@ -18,13 +18,14 @@
 
 package org.apache.paimon.globalindex.io;
 
+import org.apache.paimon.fs.PositionOutputStream;
+
 import java.io.IOException;
-import java.io.OutputStream;
 
 /** Global index writer io operators. */
 public interface GlobalIndexFileWriter {
 
     String newFileName(String prefix);
 
-    OutputStream newOutputStream(String fileName) throws IOException;
+    PositionOutputStream newOutputStream(String fileName) throws IOException;
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
index 9f4272096c..5239bb133e 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
@@ -26,7 +26,7 @@ import org.apache.paimon.sst.BloomFilterHandle;
 
 import javax.annotation.Nullable;
 
-import static org.apache.paimon.sst.SstFileWriter.MAGIC_NUMBER;
+import static org.apache.paimon.lookup.sort.SortLookupStoreWriter.MAGIC_NUMBER;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Footer for a sorted file. */
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
index e5fab5fc5c..c652c90e82 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
@@ -52,6 +52,7 @@ import java.io.IOException;
  * </pre>
  */
 public class SortLookupStoreWriter implements LookupStoreWriter {
+    public static final int MAGIC_NUMBER = 1481571681;
 
     private final SstFileWriter writer;
     private final PositionOutputStream out;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java
index 589e967d40..625ff565da 100644
--- a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java
+++ b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java
@@ -61,6 +61,10 @@ public final class MemorySlice implements 
Comparable<MemorySlice> {
         return segment.getInt(offset + position);
     }
 
+    public short readShort(int position) {
+        return segment.getShort(offset + position);
+    }
+
     public long readLong(int position) {
         return segment.getLong(offset + position);
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java
index 90e8e54900..7254f590b5 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java
@@ -51,6 +51,12 @@ public class MemorySliceOutput {
         size += 4;
     }
 
+    public void writeShort(int value) {
+        ensureSize(size + 2);
+        segment.putShort(size, (short) value);
+        size += 2;
+    }
+
     public void writeVarLenInt(int value) {
         if (value < 0) {
             throw new IllegalArgumentException("negative value: v=" + value);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
index 1f1405671b..613ffd80de 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
@@ -48,8 +48,6 @@ public class SstFileWriter {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SstFileWriter.class.getName());
 
-    public static final int MAGIC_NUMBER = 1481571681;
-
     private final PositionOutputStream out;
     private final int blockSize;
     private final BlockWriter dataBlockWriter;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
index e61486ecc4..f07b7c6afa 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
@@ -65,6 +65,10 @@ public class RoaringNavigableMap64 implements Iterable<Long> 
{
         roaring64NavigableMap.and(other.roaring64NavigableMap);
     }
 
+    public void andNot(RoaringNavigableMap64 other) {
+        roaring64NavigableMap.andNot(other.roaring64NavigableMap);
+    }
+
     public boolean isEmpty() {
         return roaring64NavigableMap.isEmpty();
     }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
index 43671537ec..7edd6173da 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
@@ -22,6 +22,8 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.fileindex.bitmap.BitmapFileIndex;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.globalindex.GlobalIndexIOMeta;
 import org.apache.paimon.globalindex.GlobalIndexReader;
@@ -44,7 +46,6 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.UUID;
@@ -207,7 +208,8 @@ public class BitmapGlobalIndexTest {
                     }
 
                     @Override
-                    public OutputStream newOutputStream(String fileName) 
throws IOException {
+                    public PositionOutputStream newOutputStream(String 
fileName)
+                            throws IOException {
                         return fileIO.newOutputStream(new 
Path(tempDir.toString(), fileName), true);
                     }
                 };
@@ -218,7 +220,17 @@ public class BitmapGlobalIndexTest {
         long fileSize = fileIO.getFileSize(path);
 
         GlobalIndexFileReader fileReader =
-                prefix -> fileIO.newInputStream(new Path(tempDir.toString(), 
prefix));
+                new GlobalIndexFileReader() {
+                    @Override
+                    public SeekableInputStream getInputStream(String fileName) 
throws IOException {
+                        return fileIO.newInputStream(new 
Path(tempDir.toString(), fileName));
+                    }
+
+                    @Override
+                    public Path filePath(String fileName) {
+                        return new Path(tempDir.toString(), fileName);
+                    }
+                };
 
         GlobalIndexIOMeta globalIndexMeta =
                 new GlobalIndexIOMeta(fileName, fileSize, Long.MAX_VALUE, 
null);
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelectorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelectorTest.java
new file mode 100644
index 0000000000..3015ffa715
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelectorTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.memory.MemorySliceOutput;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.IntType;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Test class for {@link BTreeFileMetaSelector}. */
+public class BTreeFileMetaSelectorTest {
+    private List<GlobalIndexIOMeta> files;
+
+    @BeforeEach
+    public void setUp() {
+        MemorySliceOutput sliceOutput = new MemorySliceOutput(4);
+
+        BTreeIndexMeta meta1 =
+                new BTreeIndexMeta(writeInt(1, sliceOutput), writeInt(10, 
sliceOutput), true);
+        BTreeIndexMeta meta2 =
+                new BTreeIndexMeta(writeInt(15, sliceOutput), writeInt(20, 
sliceOutput), false);
+        BTreeIndexMeta meta3 =
+                new BTreeIndexMeta(writeInt(21, sliceOutput), writeInt(30, 
sliceOutput), true);
+        BTreeIndexMeta meta4 =
+                new BTreeIndexMeta(writeInt(1, sliceOutput), writeInt(5, 
sliceOutput), false);
+        BTreeIndexMeta meta5 =
+                new BTreeIndexMeta(writeInt(19, sliceOutput), writeInt(25, 
sliceOutput), true);
+
+        files =
+                Arrays.asList(
+                        new GlobalIndexIOMeta("file1", 1, 1L, 
meta1.serialize()),
+                        new GlobalIndexIOMeta("file2", 1, 1L, 
meta2.serialize()),
+                        new GlobalIndexIOMeta("file3", 1, 1L, 
meta3.serialize()),
+                        new GlobalIndexIOMeta("file4", 1, 1L, 
meta4.serialize()),
+                        new GlobalIndexIOMeta("file5", 1, 1L, 
meta5.serialize()));
+    }
+
+    @Test
+    public void testMetaSelector() {
+        DataType dataType = new IntType();
+        FieldRef ref = new FieldRef(1, "testField", dataType);
+        KeySerializer serializer = KeySerializer.create(dataType);
+        BTreeFileMetaSelector selector = new BTreeFileMetaSelector(files, 
serializer);
+
+        Optional<List<GlobalIndexIOMeta>> result;
+
+        // 1. test range queries
+        result = selector.visitLessThan(ref, 8);
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file1", "file4"));
+
+        result = selector.visitLessOrEqual(ref, 20);
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file1", "file2", "file4", 
"file5"));
+
+        result = selector.visitGreaterThan(ref, 20);
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file3", "file5"));
+
+        result = selector.visitGreaterOrEqual(ref, 5);
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file1", "file2", "file3", 
"file4", "file5"));
+
+        result = selector.visitEqual(ref, 22);
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file3", "file5"));
+
+        result = selector.visitNotEqual(ref, 22);
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file1", "file2", "file3", 
"file4", "file5"));
+
+        // 1.1 test range boundaries
+        result = selector.visitLessThan(ref, 15);
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file1", "file4"));
+
+        result = selector.visitLessOrEqual(ref, 15);
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file1", "file2", "file4"));
+
+        result = selector.visitGreaterThan(ref, 20);
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file3", "file5"));
+
+        result = selector.visitGreaterOrEqual(ref, 20);
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file2", "file3", "file5"));
+
+        result = selector.visitEqual(ref, 30);
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file3"));
+
+        // 1.2 test out of boundaries
+        result = selector.visitLessThan(ref, 1);
+        Assertions.assertThat(result).isNotEmpty();
+        Assertions.assertThat(result.get()).isEmpty();
+
+        result = selector.visitGreaterThan(ref, 30);
+        Assertions.assertThat(result).isNotEmpty();
+        Assertions.assertThat(result.get()).isEmpty();
+
+        // 2. test isNull & isNotNull
+        result = selector.visitIsNull(ref);
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file1", "file3", "file5"));
+
+        result = selector.visitIsNotNull(ref);
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file1", "file2", "file3", 
"file4", "file5"));
+
+        // 3. test in
+        result = selector.visitIn(ref, Arrays.asList(1, 2, 3, 26, 27, 28));
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file1", "file4", "file3"));
+
+        result = selector.visitNotIn(ref, Arrays.asList(1, 7, 19, 30, 31));
+        Assertions.assertThat(result).isNotEmpty();
+        assertFiles(result.get(), Arrays.asList("file1", "file2", "file3", 
"file4", "file5"));
+    }
+
+    private void assertFiles(List<GlobalIndexIOMeta> files, List<String> 
expected) {
+        Assertions.assertThat(
+                        files.stream()
+                                .map(GlobalIndexIOMeta::fileName)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    private byte[] writeInt(int value, MemorySliceOutput sliceOutput) {
+        sliceOutput.reset();
+        sliceOutput.writeInt(value);
+        return sliceOutput.toSlice().copyBytes();
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
new file mode 100644
index 0000000000..8cf5a9d840
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.LocalZoneTimestamp;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import 
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
+import org.apache.paimon.testutils.junit.parameterized.Parameters;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeDefaultVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.DecimalUtils;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/** Test for {@link BTreeGlobalIndexer}. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class BTreeGlobalIndexerTest {
+    private DataType dataType;
+    private int dataNum;
+    private List<Pair<Object, Long>> data;
+    private KeySerializer keySerializer;
+    private Comparator<Object> comparator;
+    private FileIO fileIO;
+    private GlobalIndexFileReader fileReader;
+    private GlobalIndexFileWriter fileWriter;
+    private BTreeGlobalIndexer globalIndexer;
+    private Options options;
+
+    @TempDir java.nio.file.Path tempPath;
+
+    public BTreeGlobalIndexerTest(List<Object> args) {
+        this.dataType = (DataType) args.get(0);
+        this.dataNum = (Integer) args.get(1);
+    }
+
+    @SuppressWarnings("unused")
+    @Parameters(name = "dataType&recordNum-{0}")
+    public static List<List<Object>> getVarSeg() {
+        return Arrays.asList(
+                Arrays.asList(new IntType(), 10000),
+                Arrays.asList(new VarCharType(VarCharType.MAX_LENGTH), 10000),
+                Arrays.asList(new CharType(100), 10000),
+                Arrays.asList(new FloatType(), 10000),
+                Arrays.asList(new DecimalType(), 10000),
+                Arrays.asList(new DoubleType(), 10000),
+                Arrays.asList(new BooleanType(), 10000),
+                Arrays.asList(new TinyIntType(), 10000),
+                Arrays.asList(new SmallIntType(), 10000),
+                Arrays.asList(new BigIntType(), 10000),
+                Arrays.asList(new DateType(), 10000),
+                Arrays.asList(new TimestampType(), 10000));
+    }
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        fileIO = LocalFileIO.create();
+        fileWriter =
+                new GlobalIndexFileWriter() {
+                    @Override
+                    public String newFileName(String prefix) {
+                        return "test-btree-" + prefix;
+                    }
+
+                    @Override
+                    public PositionOutputStream newOutputStream(String 
fileName)
+                            throws IOException {
+                        return fileIO.newOutputStream(
+                                new Path(new Path(tempPath.toUri()), 
fileName), true);
+                    }
+                };
+        fileReader =
+                new GlobalIndexFileReader() {
+                    @Override
+                    public SeekableInputStream getInputStream(String fileName) 
throws IOException {
+                        return fileIO.newInputStream(
+                                new Path(new Path(tempPath.toUri()), 
fileName));
+                    }
+
+                    @Override
+                    public Path filePath(String fileName) {
+                        return new Path(new Path(tempPath.toUri()), fileName);
+                    }
+                };
+        options = new Options();
+        options.set(BTreeIndexOptions.BTREE_INDEX_CACHE_SIZE, 
MemorySize.ofMebiBytes(8));
+        globalIndexer = new BTreeGlobalIndexer(new DataField(1, "testField", 
dataType), options);
+        keySerializer = KeySerializer.create(dataType);
+        comparator = keySerializer.createComparator();
+
+        // generate data and sort by key
+        data = new ArrayList<>(dataNum);
+        DataGenerator dataGenerator = new DataGenerator();
+        for (int i = 0; i < dataNum; i++) {
+            data.add(Pair.of(dataType.accept(dataGenerator), (long) i));
+        }
+        data.sort((p1, p2) -> comparator.compare(p1.getKey(), p2.getKey()));
+    }
+
+    @TestTemplate
+    public void testRangePredicate() throws Exception {
+        GlobalIndexIOMeta written = writeData();
+        FieldRef ref = new FieldRef(1, "testField", dataType);
+
+        try (GlobalIndexReader reader =
+                globalIndexer.createReader(fileReader, 
Collections.singletonList(written))) {
+            GlobalIndexResult result;
+            Random random = new Random();
+
+            for (int i = 0; i < 5; i++) {
+                Object literal = data.get(random.nextInt(dataNum)).getKey();
+
+                // 1. test <= literal
+                result = reader.visitLessOrEqual(ref, literal);
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) <= 0));
+
+                // 2. test < literal
+                result = reader.visitLessThan(ref, literal);
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) < 0));
+
+                // 3. test >= literal
+                result = reader.visitGreaterOrEqual(ref, literal);
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) >= 0));
+
+                // 4. test > literal
+                result = reader.visitGreaterThan(ref, literal);
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) > 0));
+
+                // 5. test equal
+                result = reader.visitEqual(ref, literal);
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) == 0));
+
+                // 6. test not equal
+                result = reader.visitNotEqual(ref, literal);
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) != 0));
+            }
+
+            // 7. test < min
+            Object literal7 = data.get(0).getKey();
+            result = reader.visitLessThan(ref, literal7);
+            Assertions.assertTrue(result.results().isEmpty());
+
+            // 8. test > max
+            Object literal8 = data.get(dataNum - 1).getKey();
+            result = reader.visitGreaterThan(ref, literal8);
+            Assertions.assertTrue(result.results().isEmpty());
+        }
+    }
+
+    @TestTemplate
+    public void testIsNull() throws Exception {
+        // set nulls
+        for (int i = dataNum - 1; i >= dataNum * 0.9; i--) {
+            data.get(i).setLeft(null);
+        }
+        GlobalIndexIOMeta written = writeData();
+        FieldRef ref = new FieldRef(1, "testField", dataType);
+
+        try (GlobalIndexReader reader =
+                globalIndexer.createReader(fileReader, 
Collections.singletonList(written))) {
+            GlobalIndexResult result;
+
+            result = reader.visitIsNull(ref);
+            assertResult(result, filter(Objects::isNull));
+
+            result = reader.visitIsNotNull(ref);
+            assertResult(result, filter(Objects::nonNull));
+        }
+    }
+
+    @TestTemplate
+    public void testInPredicate() throws Exception {
+        GlobalIndexIOMeta written = writeData();
+        FieldRef ref = new FieldRef(1, "testField", dataType);
+
+        try (GlobalIndexReader reader =
+                globalIndexer.createReader(fileReader, 
Collections.singletonList(written))) {
+            GlobalIndexResult result;
+            for (int i = 0; i < 10; i++) {
+                Random random = new Random(System.currentTimeMillis());
+                List<Object> literals =
+                        
data.stream().map(Pair::getKey).collect(Collectors.toList());
+                Collections.shuffle(literals, random);
+                literals = literals.subList(0, (int) (dataNum * 0.1));
+
+                TreeSet<Object> set = new TreeSet<>(comparator);
+                set.addAll(literals);
+
+                // 1. test in
+                result = reader.visitIn(ref, literals);
+                assertResult(result, filter(set::contains));
+
+                // 2. test not in
+                result = reader.visitNotIn(ref, literals);
+                assertResult(result, filter(obj -> !set.contains(obj)));
+            }
+        }
+    }
+
+    private GlobalIndexIOMeta writeData() throws IOException {
+        GlobalIndexWriter indexWriter = globalIndexer.createWriter(fileWriter);
+        for (Pair<Object, Long> pair : data) {
+            indexWriter.write(pair.getKey(), pair.getValue());
+        }
+        List<GlobalIndexWriter.ResultEntry> results = indexWriter.finish();
+        Assertions.assertEquals(1, results.size());
+
+        GlobalIndexWriter.ResultEntry resultEntry = results.get(0);
+        String fileName = resultEntry.fileName();
+        return new GlobalIndexIOMeta(
+                fileName,
+                fileIO.getFileSize(new Path(new Path(tempPath.toUri()), 
fileName)),
+                resultEntry.rowRange().to - resultEntry.rowRange().from,
+                resultEntry.meta());
+    }
+
+    private List<Long> filter(Predicate<Object> filter) {
+        return data.stream()
+                .filter(pair -> filter.test(pair.getKey()))
+                .map(Pair::getValue)
+                .collect(Collectors.toList());
+    }
+
+    private void assertResult(GlobalIndexResult indexResult, List<Long> 
expected) {
+        Iterator<Long> iter = indexResult.results().iterator();
+        List<Long> result = new ArrayList<>();
+        while (iter.hasNext()) {
+            result.add(iter.next());
+        }
+        org.assertj.core.api.Assertions.assertThat(result)
+                .containsExactlyInAnyOrderElementsOf(expected);
+    }
+
+    /** The Generator to generate test data. */
+    private static class DataGenerator extends DataTypeDefaultVisitor<Object> {
+        Random random = new Random();
+
+        @Override
+        public Object visit(CharType charType) {
+            return BinaryString.fromString("random char " + random.nextInt());
+        }
+
+        @Override
+        public Object visit(VarCharType varCharType) {
+            return BinaryString.fromString("random varchar " + 
random.nextInt());
+        }
+
+        @Override
+        public Object visit(BooleanType booleanType) {
+            return random.nextBoolean();
+        }
+
+        @Override
+        public Object visit(DecimalType decimalType) {
+            return DecimalUtils.castFrom(
+                    random.nextDouble(), decimalType.getPrecision(), 
decimalType.getScale());
+        }
+
+        @Override
+        public Object visit(TinyIntType tinyIntType) {
+            return (byte) random.nextInt();
+        }
+
+        @Override
+        public Object visit(SmallIntType smallIntType) {
+            return (short) random.nextInt();
+        }
+
+        @Override
+        public Object visit(IntType intType) {
+            return random.nextInt();
+        }
+
+        @Override
+        public Object visit(BigIntType bigIntType) {
+            return random.nextLong();
+        }
+
+        @Override
+        public Object visit(FloatType floatType) {
+            return random.nextFloat();
+        }
+
+        @Override
+        public Object visit(DoubleType doubleType) {
+            return random.nextDouble();
+        }
+
+        @Override
+        public Object visit(DateType dateType) {
+            return random.nextInt(Integer.MAX_VALUE);
+        }
+
+        @Override
+        public Object visit(TimeType timeType) {
+            return random.nextInt(Integer.MAX_VALUE);
+        }
+
+        @Override
+        public Object visit(TimestampType timestampType) {
+            return Timestamp.fromMicros(random.nextLong());
+        }
+
+        @Override
+        public Object visit(LocalZonedTimestampType localZonedTimestampType) {
+            return LocalZoneTimestamp.fromMicros(random.nextLong());
+        }
+
+        @Override
+        protected Object defaultMethod(DataType dataType) {
+            throw new UnsupportedOperationException(
+                    "Btree index do not support " + dataType + " type.");
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
index c48a729950..80c6a41ffc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
@@ -20,13 +20,13 @@ package org.apache.paimon.globalindex;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
 import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
 import org.apache.paimon.index.IndexPathFactory;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.util.UUID;
 
 /** Helper class for managing global index files. */
@@ -44,6 +44,7 @@ public class GlobalIndexFileReadWrite implements 
GlobalIndexFileReader, GlobalIn
         return prefix + "-" + "global-index-" + UUID.randomUUID() + ".index";
     }
 
+    @Override
     public Path filePath(String fileName) {
         return indexPathFactory.toPath(fileName);
     }
@@ -52,7 +53,7 @@ public class GlobalIndexFileReadWrite implements 
GlobalIndexFileReader, GlobalIn
         return fileIO.getFileSize(filePath(fileName));
     }
 
-    public OutputStream newOutputStream(String fileName) throws IOException {
+    public PositionOutputStream newOutputStream(String fileName) throws 
IOException {
         return fileIO.newOutputStream(indexPathFactory.toPath(fileName), true);
     }
 
diff --git 
a/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexScanTest.java
 
b/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexScanTest.java
index 93d9f4ffd8..a608df5baa 100644
--- 
a/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexScanTest.java
+++ 
b/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexScanTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.globalindex.GlobalIndexWriter;
 import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
@@ -53,7 +54,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -157,7 +157,8 @@ public class LuceneVectorGlobalIndexScanTest {
                     }
 
                     @Override
-                    public OutputStream newOutputStream(String fileName) 
throws IOException {
+                    public PositionOutputStream newOutputStream(String 
fileName)
+                            throws IOException {
                         return fileIO.newOutputStream(new Path(indexDir, 
fileName), false);
                     }
                 };
diff --git 
a/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexTest.java
 
b/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexTest.java
index 20241facdd..d4ccc0eb2e 100644
--- 
a/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexTest.java
+++ 
b/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexTest.java
@@ -20,6 +20,8 @@ package org.apache.paimon.lucene.index;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.globalindex.GlobalIndexIOMeta;
 import org.apache.paimon.globalindex.GlobalIndexResult;
@@ -40,7 +42,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -82,14 +83,24 @@ public class LuceneVectorGlobalIndexTest {
             }
 
             @Override
-            public OutputStream newOutputStream(String fileName) throws 
IOException {
+            public PositionOutputStream newOutputStream(String fileName) 
throws IOException {
                 return fileIO.newOutputStream(new Path(path, fileName), false);
             }
         };
     }
 
     private GlobalIndexFileReader createFileReader(Path path) {
-        return fileName -> fileIO.newInputStream(new Path(path, fileName));
+        return new GlobalIndexFileReader() {
+            @Override
+            public SeekableInputStream getInputStream(String fileName) throws 
IOException {
+                return fileIO.newInputStream(new Path(path, fileName));
+            }
+
+            @Override
+            public Path filePath(String fileName) {
+                return new Path(path, fileName);
+            }
+        };
     }
 
     @Test

Reply via email to