This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7157ae4327 [core] Introduce a lazy reader for BTreeIndexReader to 
filter out files by predicate (#6927)
7157ae4327 is described below

commit 7157ae432774f5a4803f70bfebe76f8b78c54615
Author: Faiz <[email protected]>
AuthorDate: Tue Dec 30 12:57:33 2025 +0800

    [core] Introduce a lazy reader for BTreeIndexReader to filter out files by 
predicate (#6927)
---
 .../globalindex/btree/BTreeFileMetaSelector.java   |  36 ++-
 .../globalindex/btree/BTreeGlobalIndexer.java      |   8 +-
 .../paimon/globalindex/btree/BTreeIndexMeta.java   |  49 +++-
 .../paimon/globalindex/btree/BTreeIndexReader.java |  21 +-
 .../paimon/globalindex/btree/BTreeIndexWriter.java |   6 +-
 .../globalindex/btree/LazyFilteredBTreeReader.java | 294 +++++++++++++++++++++
 ...dexerTest.java => AbstractIndexReaderTest.java} | 175 ++----------
 .../globalindex/btree/BTreeIndexReaderTest.java    | 184 +++++++++++++
 .../btree/LazyFilteredBTreeIndexReaderTest.java    | 199 ++++++++++++++
 9 files changed, 788 insertions(+), 184 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
index 334771e125..ef59e03122 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
@@ -55,7 +55,7 @@ public class BTreeFileMetaSelector implements 
FunctionVisitor<Optional<List<Glob
 
     @Override
     public Optional<List<GlobalIndexIOMeta>> visitIsNotNull(FieldRef fieldRef) 
{
-        return Optional.of(filter(meta -> true));
+        return Optional.of(filter(meta -> !meta.onlyNulls()));
     }
 
     @Override
@@ -87,7 +87,12 @@ public class BTreeFileMetaSelector implements 
FunctionVisitor<Optional<List<Glob
     public Optional<List<GlobalIndexIOMeta>> visitLessThan(FieldRef fieldRef, 
Object literal) {
         // `<` means file.minKey < literal
         return Optional.of(
-                filter(meta -> 
comparator.compare(deserialize(meta.getFirstKey()), literal) < 0));
+                filter(
+                        meta ->
+                                !meta.onlyNulls()
+                                        && comparator.compare(
+                                                        
deserialize(meta.getFirstKey()), literal)
+                                                < 0));
     }
 
     @Override
@@ -95,7 +100,12 @@ public class BTreeFileMetaSelector implements 
FunctionVisitor<Optional<List<Glob
             FieldRef fieldRef, Object literal) {
         // `>=` means file.maxKey >= literal
         return Optional.of(
-                filter(meta -> 
comparator.compare(deserialize(meta.getLastKey()), literal) >= 0));
+                filter(
+                        meta ->
+                                !meta.onlyNulls()
+                                        && comparator.compare(
+                                                        
deserialize(meta.getLastKey()), literal)
+                                                >= 0));
     }
 
     @Override
@@ -107,7 +117,12 @@ public class BTreeFileMetaSelector implements 
FunctionVisitor<Optional<List<Glob
     public Optional<List<GlobalIndexIOMeta>> visitLessOrEqual(FieldRef 
fieldRef, Object literal) {
         // `<=` means file.minKey <= literal
         return Optional.of(
-                filter(meta -> 
comparator.compare(deserialize(meta.getFirstKey()), literal) <= 0));
+                filter(
+                        meta ->
+                                !meta.onlyNulls()
+                                        && comparator.compare(
+                                                        
deserialize(meta.getFirstKey()), literal)
+                                                <= 0));
     }
 
     @Override
@@ -115,6 +130,9 @@ public class BTreeFileMetaSelector implements 
FunctionVisitor<Optional<List<Glob
         return Optional.of(
                 filter(
                         meta -> {
+                            if (meta.onlyNulls()) {
+                                return false;
+                            }
                             Object minKey = deserialize(meta.getFirstKey());
                             Object maxKey = deserialize(meta.getLastKey());
                             return comparator.compare(literal, minKey) >= 0
@@ -126,7 +144,12 @@ public class BTreeFileMetaSelector implements 
FunctionVisitor<Optional<List<Glob
     public Optional<List<GlobalIndexIOMeta>> visitGreaterThan(FieldRef 
fieldRef, Object literal) {
         // `>` means file.maxKey > literal
         return Optional.of(
-                filter(meta -> 
comparator.compare(deserialize(meta.getLastKey()), literal) > 0));
+                filter(
+                        meta ->
+                                !meta.onlyNulls()
+                                        && comparator.compare(
+                                                        
deserialize(meta.getLastKey()), literal)
+                                                > 0));
     }
 
     @Override
@@ -134,6 +157,9 @@ public class BTreeFileMetaSelector implements 
FunctionVisitor<Optional<List<Glob
         return Optional.of(
                 filter(
                         meta -> {
+                            if (meta.onlyNulls()) {
+                                return false;
+                            }
                             Object minKey = deserialize(meta.getFirstKey());
                             Object maxKey = deserialize(meta.getLastKey());
                             for (Object literal : literals) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
index 2ffba2fc9d..34110bc8a5 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
@@ -23,7 +23,6 @@ import org.apache.paimon.compression.CompressOptions;
 import org.apache.paimon.globalindex.GlobalIndexIOMeta;
 import org.apache.paimon.globalindex.GlobalIndexReader;
 import org.apache.paimon.globalindex.GlobalIndexer;
-import org.apache.paimon.globalindex.UnionGlobalIndexReader;
 import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
 import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
 import org.apache.paimon.io.cache.CacheManager;
@@ -32,7 +31,6 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.utils.LazyField;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -95,10 +93,6 @@ public class BTreeGlobalIndexer implements GlobalIndexer {
     @Override
     public GlobalIndexReader createReader(
             GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files) 
throws IOException {
-        List<GlobalIndexReader> readers = new ArrayList<>();
-        for (GlobalIndexIOMeta meta : files) {
-            readers.add(new BTreeIndexReader(keySerializer, fileReader, meta, 
cacheManager.get()));
-        }
-        return new UnionGlobalIndexReader(readers);
+        return new LazyFilteredBTreeReader(files, keySerializer, fileReader, 
cacheManager.get());
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
index c134f1fa1e..83d12b217a 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
@@ -22,23 +22,30 @@ import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.memory.MemorySliceInput;
 import org.apache.paimon.memory.MemorySliceOutput;
 
-/** Index Meta of each BTree index file. */
+import javax.annotation.Nullable;
+
+/**
+ * Index Meta of each BTree index file. The first key and last key of this 
meta could be null if the
+ * entire btree index file only contains nulls.
+ */
 public class BTreeIndexMeta {
 
-    private final byte[] firstKey;
-    private final byte[] lastKey;
+    @Nullable private final byte[] firstKey;
+    @Nullable private final byte[] lastKey;
     private final boolean hasNulls;
 
-    public BTreeIndexMeta(byte[] firstKey, byte[] lastKey, boolean hasNulls) {
+    public BTreeIndexMeta(@Nullable byte[] firstKey, @Nullable byte[] lastKey, 
boolean hasNulls) {
         this.firstKey = firstKey;
         this.lastKey = lastKey;
         this.hasNulls = hasNulls;
     }
 
+    @Nullable
     public byte[] getFirstKey() {
         return firstKey;
     }
 
+    @Nullable
     public byte[] getLastKey() {
         return lastKey;
     }
@@ -47,12 +54,30 @@ public class BTreeIndexMeta {
         return hasNulls;
     }
 
+    public boolean onlyNulls() {
+        return firstKey == null && lastKey == null;
+    }
+
+    private int memorySize() {
+        return (firstKey == null ? 0 : firstKey.length)
+                + (lastKey == null ? 0 : lastKey.length)
+                + 9;
+    }
+
     public byte[] serialize() {
-        MemorySliceOutput sliceOutput = new MemorySliceOutput(firstKey.length 
+ lastKey.length + 8);
-        sliceOutput.writeInt(firstKey.length);
-        sliceOutput.writeBytes(firstKey);
-        sliceOutput.writeInt(lastKey.length);
-        sliceOutput.writeBytes(lastKey);
+        MemorySliceOutput sliceOutput = new MemorySliceOutput(memorySize());
+        if (firstKey != null) {
+            sliceOutput.writeInt(firstKey.length);
+            sliceOutput.writeBytes(firstKey);
+        } else {
+            sliceOutput.writeInt(0);
+        }
+        if (lastKey != null) {
+            sliceOutput.writeInt(lastKey.length);
+            sliceOutput.writeBytes(lastKey);
+        } else {
+            sliceOutput.writeInt(0);
+        }
         sliceOutput.writeByte(hasNulls ? 1 : 0);
         return sliceOutput.toSlice().getHeapMemory();
     }
@@ -60,9 +85,11 @@ public class BTreeIndexMeta {
     public static BTreeIndexMeta deserialize(byte[] data) {
         MemorySliceInput sliceInput = MemorySlice.wrap(data).toInput();
         int firstKeyLength = sliceInput.readInt();
-        byte[] firstKey = sliceInput.readSlice(firstKeyLength).copyBytes();
+        byte[] firstKey =
+                firstKeyLength == 0 ? null : 
sliceInput.readSlice(firstKeyLength).copyBytes();
         int lastKeyLength = sliceInput.readInt();
-        byte[] lastKey = sliceInput.readSlice(lastKeyLength).copyBytes();
+        byte[] lastKey =
+                lastKeyLength == 0 ? null : 
sliceInput.readSlice(lastKeyLength).copyBytes();
         boolean hasNulls = sliceInput.readByte() == 1;
         return new BTreeIndexMeta(firstKey, lastKey, hasNulls);
     }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
index 65b0d5744c..adee790a7d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
@@ -67,8 +67,14 @@ public class BTreeIndexReader implements GlobalIndexReader {
         this.keySerializer = keySerializer;
         this.comparator = keySerializer.createComparator();
         BTreeIndexMeta indexMeta = 
BTreeIndexMeta.deserialize(globalIndexIOMeta.metadata());
-        this.minKey = 
keySerializer.deserialize(MemorySlice.wrap(indexMeta.getFirstKey()));
-        this.maxKey = 
keySerializer.deserialize(MemorySlice.wrap(indexMeta.getLastKey()));
+        if (indexMeta.getFirstKey() != null) {
+            this.minKey = 
keySerializer.deserialize(MemorySlice.wrap(indexMeta.getFirstKey()));
+            this.maxKey = 
keySerializer.deserialize(MemorySlice.wrap(indexMeta.getLastKey()));
+        } else {
+            // this is possible if this btree index file only stores nulls.
+            this.minKey = null;
+            this.maxKey = null;
+        }
         this.input = fileReader.getInputStream(globalIndexIOMeta.fileName());
 
         // prepare file footer
@@ -340,6 +346,9 @@ public class BTreeIndexReader implements GlobalIndexReader {
         // Traverse all data to avoid returning null values, which is very 
advantageous in
         // situations where there are many null values
         // TODO do not traverse all data if less null values
+        if (minKey == null) {
+            return new RoaringNavigableMap64();
+        }
         return rangeQuery(minKey, maxKey, true, true);
     }
 
@@ -357,21 +366,19 @@ public class BTreeIndexReader implements 
GlobalIndexReader {
         SstFileReader.SstFileIterator fileIter = reader.createIterator();
         fileIter.seekTo(keySerializer.serialize(from));
 
-        boolean skipped = false;
         RoaringNavigableMap64 result = new RoaringNavigableMap64();
         BlockIterator dataIter;
         Map.Entry<MemorySlice, MemorySlice> entry;
         while ((dataIter = fileIter.readBatch()) != null) {
             while (dataIter.hasNext()) {
                 entry = dataIter.next();
+                Object key = keySerializer.deserialize(entry.getKey());
 
-                if (!fromInclusive && !skipped) {
-                    // this is correct only if the underlying file do not have 
duplicated keys.
-                    skipped = true;
+                if (!fromInclusive && comparator.compare(key, from) == 0) {
                     continue;
                 }
 
-                int difference = 
comparator.compare(keySerializer.deserialize(entry.getKey()), to);
+                int difference = comparator.compare(key, to);
                 if (difference > 0 || !toInclusive && difference == 0) {
                     return result;
                 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
index a08b378947..fe4c14a449 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
@@ -167,14 +167,14 @@ public class BTreeIndexWriter implements 
GlobalIndexParallelWriter {
             throw new RuntimeException("Error in closing BTree index writer", 
e);
         }
 
-        if (firstKey == null) {
+        if (firstKey == null && !nullBitmap.initialized()) {
             throw new RuntimeException("Should never write an empty btree 
index file.");
         }
 
         byte[] metaBytes =
                 new BTreeIndexMeta(
-                                keySerializer.serialize(firstKey),
-                                keySerializer.serialize(lastKey),
+                                firstKey == null ? null : 
keySerializer.serialize(firstKey),
+                                lastKey == null ? null : 
keySerializer.serialize(lastKey),
                                 nullBitmap.initialized())
                         .serialize();
         return Collections.singletonList(new ResultEntry(fileName, rowCount, 
metaBytes));
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeReader.java
new file mode 100644
index 0000000000..cbfb452b2d
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeReader.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.UnionGlobalIndexReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.predicate.FieldRef;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * An Index Reader for BTree which dynamically filters file list by input 
predicate, then merge the
+ * result by an {@link org.apache.paimon.globalindex.UnionGlobalIndexReader}. 
In the ideal situation
+ * such as visiting an Equal predicate, only a very few files would be 
actually read.
+ */
+public class LazyFilteredBTreeReader implements GlobalIndexReader {
+
+    private final BTreeFileMetaSelector fileSelector;
+    private final List<GlobalIndexIOMeta> files;
+    private final Map<String, GlobalIndexReader> readerCache;
+    private final KeySerializer keySerializer;
+    private final CacheManager cacheManager;
+    private final GlobalIndexFileReader fileReader;
+
+    public LazyFilteredBTreeReader(
+            List<GlobalIndexIOMeta> files,
+            KeySerializer keySerializer,
+            GlobalIndexFileReader fileReader,
+            CacheManager cacheManager) {
+        this.fileSelector = new BTreeFileMetaSelector(files, keySerializer);
+        this.readerCache = new HashMap<>();
+        this.cacheManager = cacheManager;
+        this.fileReader = fileReader;
+        this.keySerializer = keySerializer;
+        this.files = files;
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt = 
fileSelector.visitIsNotNull(fieldRef);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitIsNotNull(fieldRef);
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt = 
fileSelector.visitIsNull(fieldRef);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitIsNull(fieldRef);
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef, 
Object literal) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt =
+                fileSelector.visitStartsWith(fieldRef, literal);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitStartsWith(fieldRef, literal);
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object 
literal) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt =
+                fileSelector.visitEndsWith(fieldRef, literal);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitEndsWith(fieldRef, literal);
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object 
literal) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt =
+                fileSelector.visitContains(fieldRef, literal);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitContains(fieldRef, literal);
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitLike(FieldRef fieldRef, Object 
literal) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt = 
fileSelector.visitLike(fieldRef, literal);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitLike(fieldRef, literal);
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object 
literal) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt =
+                fileSelector.visitLessThan(fieldRef, literal);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitLessThan(fieldRef, literal);
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef, 
Object literal) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt =
+                fileSelector.visitGreaterOrEqual(fieldRef, literal);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitGreaterOrEqual(fieldRef, 
literal);
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object 
literal) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt =
+                fileSelector.visitNotEqual(fieldRef, literal);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitNotEqual(fieldRef, literal);
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef, 
Object literal) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt =
+                fileSelector.visitLessOrEqual(fieldRef, literal);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitLessOrEqual(fieldRef, literal);
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object 
literal) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt = 
fileSelector.visitEqual(fieldRef, literal);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitEqual(fieldRef, literal);
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef, 
Object literal) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt =
+                fileSelector.visitGreaterThan(fieldRef, literal);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitGreaterThan(fieldRef, literal);
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object> 
literals) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt = 
fileSelector.visitIn(fieldRef, literals);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitIn(fieldRef, literals);
+    }
+
+    @Override
+    public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef, 
List<Object> literals) {
+        Optional<List<GlobalIndexIOMeta>> selectedOpt = 
fileSelector.visitNotIn(fieldRef, literals);
+        if (!selectedOpt.isPresent()) {
+            return Optional.empty();
+        }
+        List<GlobalIndexIOMeta> selected = selectedOpt.get();
+        if (selected.isEmpty()) {
+            return Optional.of(GlobalIndexResult.createEmpty());
+        }
+        return createUnionReader(selected).visitNotIn(fieldRef, literals);
+    }
+
+    /**
+     * Create a Union Reader for given files. The union reader is composed by 
readers from reader
+     * cache, so please do not close it.
+     */
+    private UnionGlobalIndexReader createUnionReader(List<GlobalIndexIOMeta> 
files) {
+        List<GlobalIndexReader> readers = new ArrayList<>();
+        for (GlobalIndexIOMeta meta : files) {
+            readers.add(
+                    readerCache.computeIfAbsent(
+                            meta.fileName(),
+                            name -> {
+                                try {
+                                    return new BTreeIndexReader(
+                                            keySerializer, fileReader, meta, 
cacheManager);
+                                } catch (IOException e) {
+                                    throw new RuntimeException(
+                                            "Can't create BTree index reader 
for " + name, e);
+                                }
+                            }));
+        }
+        return new UnionGlobalIndexReader(readers);
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOException exception = null;
+        for (Map.Entry<String, GlobalIndexReader> entry : 
this.readerCache.entrySet()) {
+            try {
+                entry.getValue().close();
+            } catch (IOException ioe) {
+                if (exception == null) {
+                    exception = ioe;
+                } else {
+                    exception.addSuppressed(ioe);
+                }
+            }
+        }
+        if (exception != null) {
+            throw exception;
+        }
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/AbstractIndexReaderTest.java
similarity index 56%
rename from 
paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
rename to 
paimon-common/src/test/java/org/apache/paimon/globalindex/btree/AbstractIndexReaderTest.java
index 7acf024542..1160aa0c11 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/AbstractIndexReaderTest.java
@@ -28,16 +28,13 @@ import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.globalindex.GlobalIndexIOMeta;
 import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
-import org.apache.paimon.globalindex.GlobalIndexReader;
 import org.apache.paimon.globalindex.GlobalIndexResult;
 import org.apache.paimon.globalindex.ResultEntry;
 import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
 import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.FieldRef;
-import 
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
-import org.apache.paimon.testutils.junit.parameterized.Parameters;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.BooleanType;
 import org.apache.paimon.types.CharType;
@@ -59,73 +56,49 @@ import org.apache.paimon.utils.DecimalUtils;
 import org.apache.paimon.utils.Pair;
 
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Objects;
 import java.util.Random;
-import java.util.TreeSet;
+import java.util.UUID;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link BTreeGlobalIndexer}. */
-@ExtendWith(ParameterizedTestExtension.class)
-public class BTreeGlobalIndexerTest {
-    private DataType dataType;
-    private int dataNum;
-    private List<Pair<Object, Long>> data;
-    private KeySerializer keySerializer;
-    private Comparator<Object> comparator;
-    private FileIO fileIO;
-    private GlobalIndexFileReader fileReader;
-    private GlobalIndexFileWriter fileWriter;
-    private BTreeGlobalIndexer globalIndexer;
-    private Options options;
-
-    @TempDir java.nio.file.Path tempPath;
-
-    public BTreeGlobalIndexerTest(List<Object> args) {
+/** Common test class for BTreeIndexReader. */
+public class AbstractIndexReaderTest {
+    protected static final CacheManager CACHE_MANAGER = new 
CacheManager(MemorySize.VALUE_8_MB);
+
+    protected DataType dataType;
+    protected int dataNum;
+    protected List<Pair<Object, Long>> data;
+    protected KeySerializer keySerializer;
+    protected Comparator<Object> comparator;
+    protected FileIO fileIO;
+    protected GlobalIndexFileReader fileReader;
+    protected GlobalIndexFileWriter fileWriter;
+    protected BTreeGlobalIndexer globalIndexer;
+    protected Options options;
+
+    @TempDir protected java.nio.file.Path tempPath;
+
+    AbstractIndexReaderTest(List<Object> args) {
         this.dataType = (DataType) args.get(0);
         this.dataNum = (Integer) args.get(1);
     }
 
-    @SuppressWarnings("unused")
-    @Parameters(name = "dataType&recordNum-{0}")
-    public static List<List<Object>> getVarSeg() {
-        return Arrays.asList(
-                Arrays.asList(new IntType(), 10000),
-                Arrays.asList(new VarCharType(VarCharType.MAX_LENGTH), 10000),
-                Arrays.asList(new CharType(100), 10000),
-                Arrays.asList(new FloatType(), 10000),
-                Arrays.asList(new DecimalType(), 10000),
-                Arrays.asList(new DoubleType(), 10000),
-                Arrays.asList(new BooleanType(), 10000),
-                Arrays.asList(new TinyIntType(), 10000),
-                Arrays.asList(new SmallIntType(), 10000),
-                Arrays.asList(new BigIntType(), 10000),
-                Arrays.asList(new DateType(), 10000),
-                Arrays.asList(new TimestampType(), 10000));
-    }
-
-    @BeforeEach
     public void setUp() throws Exception {
         fileIO = LocalFileIO.create();
         fileWriter =
                 new GlobalIndexFileWriter() {
                     @Override
                     public String newFileName(String prefix) {
-                        return "test-btree-" + prefix;
+                        return "test-btree-" + UUID.randomUUID() + prefix;
                     }
 
                     @Override
@@ -163,107 +136,7 @@ public class BTreeGlobalIndexerTest {
         data.sort((p1, p2) -> comparator.compare(p1.getKey(), p2.getKey()));
     }
 
-    @TestTemplate
-    public void testRangePredicate() throws Exception {
-        GlobalIndexIOMeta written = writeData();
-        FieldRef ref = new FieldRef(1, "testField", dataType);
-
-        try (GlobalIndexReader reader =
-                globalIndexer.createReader(fileReader, 
Collections.singletonList(written))) {
-            GlobalIndexResult result;
-            Random random = new Random();
-
-            for (int i = 0; i < 5; i++) {
-                Object literal = data.get(random.nextInt(dataNum)).getKey();
-
-                // 1. test <= literal
-                result = reader.visitLessOrEqual(ref, literal).get();
-                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) <= 0));
-
-                // 2. test < literal
-                result = reader.visitLessThan(ref, literal).get();
-                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) < 0));
-
-                // 3. test >= literal
-                result = reader.visitGreaterOrEqual(ref, literal).get();
-                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) >= 0));
-
-                // 4. test > literal
-                result = reader.visitGreaterThan(ref, literal).get();
-                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) > 0));
-
-                // 5. test equal
-                result = reader.visitEqual(ref, literal).get();
-                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) == 0));
-
-                // 6. test not equal
-                result = reader.visitNotEqual(ref, literal).get();
-                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) != 0));
-            }
-
-            // 7. test < min
-            Object literal7 = data.get(0).getKey();
-            result = reader.visitLessThan(ref, literal7).get();
-            Assertions.assertTrue(result.results().isEmpty());
-
-            // 8. test > max
-            Object literal8 = data.get(dataNum - 1).getKey();
-            result = reader.visitGreaterThan(ref, literal8).get();
-            Assertions.assertTrue(result.results().isEmpty());
-        }
-    }
-
-    @TestTemplate
-    public void testIsNull() throws Exception {
-        // set nulls
-        for (int i = dataNum - 1; i >= dataNum * 0.9; i--) {
-            data.get(i).setLeft(null);
-        }
-        GlobalIndexIOMeta written = writeData();
-        FieldRef ref = new FieldRef(1, "testField", dataType);
-
-        try (GlobalIndexReader reader =
-                globalIndexer.createReader(fileReader, 
Collections.singletonList(written))) {
-            GlobalIndexResult result;
-
-            result = reader.visitIsNull(ref).get();
-            assertResult(result, filter(Objects::isNull));
-
-            result = reader.visitIsNotNull(ref).get();
-            assertResult(result, filter(Objects::nonNull));
-        }
-    }
-
-    @TestTemplate
-    public void testInPredicate() throws Exception {
-        GlobalIndexIOMeta written = writeData();
-        FieldRef ref = new FieldRef(1, "testField", dataType);
-
-        try (GlobalIndexReader reader =
-                globalIndexer.createReader(fileReader, 
Collections.singletonList(written))) {
-            GlobalIndexResult result;
-            for (int i = 0; i < 10; i++) {
-                Random random = new Random(System.currentTimeMillis());
-                List<Object> literals =
-                        
data.stream().map(Pair::getKey).collect(Collectors.toList());
-                Collections.shuffle(literals, random);
-                literals = literals.subList(0, (int) (dataNum * 0.1));
-
-                TreeSet<Object> set = new TreeSet<>(comparator);
-                set.addAll(literals);
-
-                // 1. test in
-                result = reader.visitIn(ref, literals).get();
-                assertResult(result, filter(set::contains));
-
-                // 2. test not in
-                result = reader.visitNotIn(ref, literals).get();
-                assertResult(result, filter(obj -> !set.contains(obj)));
-            }
-        }
-    }
-
-    private GlobalIndexIOMeta writeData() throws IOException {
+    protected GlobalIndexIOMeta writeData(List<Pair<Object, Long>> data) 
throws IOException {
         GlobalIndexParallelWriter indexWriter = 
globalIndexer.createWriter(fileWriter);
         for (Pair<Object, Long> pair : data) {
             indexWriter.write(pair.getKey(), pair.getValue());
@@ -279,14 +152,14 @@ public class BTreeGlobalIndexerTest {
                 resultEntry.meta());
     }
 
-    private List<Long> filter(Predicate<Object> filter) {
+    protected List<Long> filter(Predicate<Object> filter) {
         return data.stream()
                 .filter(pair -> filter.test(pair.getKey()))
                 .map(Pair::getValue)
                 .collect(Collectors.toList());
     }
 
-    private void assertResult(GlobalIndexResult indexResult, List<Long> 
expected) {
+    protected void assertResult(GlobalIndexResult indexResult, List<Long> 
expected) {
         Iterator<Long> iter = indexResult.results().iterator();
         List<Long> result = new ArrayList<>();
         while (iter.hasNext()) {
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeIndexReaderTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeIndexReaderTest.java
new file mode 100644
index 0000000000..74c5eb1658
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeIndexReaderTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.predicate.FieldRef;
+import 
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
+import org.apache.paimon.testutils.junit.parameterized.Parameters;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/** Test for {@link BTreeIndexReader} to read a single file. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class BTreeIndexReaderTest extends AbstractIndexReaderTest {
+
+    public BTreeIndexReaderTest(List<Object> args) {
+        super(args);
+    }
+
+    @SuppressWarnings("unused")
+    @Parameters(name = "dataType&recordNum-{0}")
+    public static List<List<Object>> getVarSeg() {
+        return Arrays.asList(
+                Arrays.asList(new IntType(), 10000),
+                Arrays.asList(new VarCharType(VarCharType.MAX_LENGTH), 10000),
+                Arrays.asList(new CharType(100), 10000),
+                Arrays.asList(new FloatType(), 10000),
+                Arrays.asList(new DecimalType(), 10000),
+                Arrays.asList(new DoubleType(), 10000),
+                Arrays.asList(new BooleanType(), 10000),
+                Arrays.asList(new TinyIntType(), 10000),
+                Arrays.asList(new SmallIntType(), 10000),
+                Arrays.asList(new BigIntType(), 10000),
+                Arrays.asList(new DateType(), 10000),
+                Arrays.asList(new TimestampType(), 10000));
+    }
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @TestTemplate
+    public void testRangePredicate() throws Exception {
+        GlobalIndexIOMeta written = writeData(data);
+        FieldRef ref = new FieldRef(1, "testField", dataType);
+
+        try (GlobalIndexReader reader =
+                new BTreeIndexReader(keySerializer, fileReader, written, 
CACHE_MANAGER)) {
+            GlobalIndexResult result;
+            Random random = new Random();
+
+            for (int i = 0; i < 5; i++) {
+                Object literal = data.get(random.nextInt(dataNum)).getKey();
+
+                // 1. test <= literal
+                result = reader.visitLessOrEqual(ref, literal).get();
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) <= 0));
+
+                // 2. test < literal
+                result = reader.visitLessThan(ref, literal).get();
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) < 0));
+
+                // 3. test >= literal
+                result = reader.visitGreaterOrEqual(ref, literal).get();
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) >= 0));
+
+                // 4. test > literal
+                result = reader.visitGreaterThan(ref, literal).get();
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) > 0));
+
+                // 5. test equal
+                result = reader.visitEqual(ref, literal).get();
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) == 0));
+
+                // 6. test not equal
+                result = reader.visitNotEqual(ref, literal).get();
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) != 0));
+            }
+
+            // 7. test < min
+            Object literal7 = data.get(0).getKey();
+            result = reader.visitLessThan(ref, literal7).get();
+            Assertions.assertTrue(result.results().isEmpty());
+
+            // 8. test > max
+            Object literal8 = data.get(dataNum - 1).getKey();
+            result = reader.visitGreaterThan(ref, literal8).get();
+            Assertions.assertTrue(result.results().isEmpty());
+        }
+    }
+
+    @TestTemplate
+    public void testIsNull() throws Exception {
+        // set nulls
+        for (int i = dataNum - 1; i >= dataNum * 0.9; i--) {
+            data.get(i).setLeft(null);
+        }
+        GlobalIndexIOMeta written = writeData(data);
+        FieldRef ref = new FieldRef(1, "testField", dataType);
+
+        try (GlobalIndexReader reader =
+                new BTreeIndexReader(keySerializer, fileReader, written, 
CACHE_MANAGER)) {
+            GlobalIndexResult result;
+
+            result = reader.visitIsNull(ref).get();
+            assertResult(result, filter(Objects::isNull));
+
+            result = reader.visitIsNotNull(ref).get();
+            assertResult(result, filter(Objects::nonNull));
+        }
+    }
+
+    @TestTemplate
+    public void testInPredicate() throws Exception {
+        GlobalIndexIOMeta written = writeData(data);
+        FieldRef ref = new FieldRef(1, "testField", dataType);
+
+        try (GlobalIndexReader reader =
+                new BTreeIndexReader(keySerializer, fileReader, written, 
CACHE_MANAGER)) {
+            GlobalIndexResult result;
+            for (int i = 0; i < 10; i++) {
+                Random random = new Random(System.currentTimeMillis());
+                List<Object> literals =
+                        
data.stream().map(Pair::getKey).collect(Collectors.toList());
+                Collections.shuffle(literals, random);
+                literals = literals.subList(0, (int) (dataNum * 0.1));
+
+                TreeSet<Object> set = new TreeSet<>(comparator);
+                set.addAll(literals);
+
+                // 1. test in
+                result = reader.visitIn(ref, literals).get();
+                assertResult(result, filter(set::contains));
+
+                // 2. test not in
+                result = reader.visitNotIn(ref, literals).get();
+                assertResult(result, filter(obj -> !set.contains(obj)));
+            }
+        }
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeIndexReaderTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeIndexReaderTest.java
new file mode 100644
index 0000000000..0dfcb712e6
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeIndexReaderTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.predicate.FieldRef;
+import 
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
+import org.apache.paimon.testutils.junit.parameterized.Parameters;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/** Test for {@link LazyFilteredBTreeReader} to read multiple files. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class LazyFilteredBTreeIndexReaderTest extends AbstractIndexReaderTest {
+
+    LazyFilteredBTreeIndexReaderTest(List<Object> args) {
+        super(args);
+    }
+
+    @SuppressWarnings("unused")
+    @Parameters(name = "dataType&recordNum-{0}")
+    public static List<List<Object>> getVarSeg() {
+        return Arrays.asList(
+                Arrays.asList(new IntType(), 10000),
+                Arrays.asList(new VarCharType(VarCharType.MAX_LENGTH), 10000),
+                Arrays.asList(new CharType(100), 10000),
+                Arrays.asList(new FloatType(), 10000),
+                Arrays.asList(new DecimalType(), 10000),
+                Arrays.asList(new DoubleType(), 10000),
+                Arrays.asList(new BooleanType(), 10000),
+                Arrays.asList(new TinyIntType(), 10000),
+                Arrays.asList(new SmallIntType(), 10000),
+                Arrays.asList(new BigIntType(), 10000),
+                Arrays.asList(new DateType(), 10000),
+                Arrays.asList(new TimestampType(), 10000));
+    }
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        super.setUp();
+    }
+
+    @TestTemplate
+    public void testRangePredicate() throws Exception {
+        List<GlobalIndexIOMeta> written = writeData();
+        FieldRef ref = new FieldRef(1, "testField", dataType);
+
+        try (GlobalIndexReader reader = globalIndexer.createReader(fileReader, 
written)) {
+            GlobalIndexResult result;
+            Random random = new Random();
+
+            for (int i = 0; i < 5; i++) {
+                org.apache.paimon.utils.Pair<Object, Long> pair = 
data.get(random.nextInt(dataNum));
+                Object literal = pair.getLeft();
+
+                // 1. test <= literal
+                result = reader.visitLessOrEqual(ref, literal).get();
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) <= 0));
+
+                // 2. test < literal
+                result = reader.visitLessThan(ref, literal).get();
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) < 0));
+
+                // 3. test >= literal
+                result = reader.visitGreaterOrEqual(ref, literal).get();
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) >= 0));
+
+                // 4. test > literal
+                result = reader.visitGreaterThan(ref, literal).get();
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) > 0));
+
+                // 5. test equal
+                result = reader.visitEqual(ref, literal).get();
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) == 0));
+
+                // 6. test not equal
+                result = reader.visitNotEqual(ref, literal).get();
+                assertResult(result, filter(obj -> comparator.compare(obj, 
literal) != 0));
+            }
+
+            // 7. test < min
+            Object literal7 = data.get(0).getKey();
+            result = reader.visitLessThan(ref, literal7).get();
+            Assertions.assertTrue(result.results().isEmpty());
+
+            // 8. test > max
+            Object literal8 = data.get(dataNum - 1).getKey();
+            result = reader.visitGreaterThan(ref, literal8).get();
+            Assertions.assertTrue(result.results().isEmpty());
+        }
+    }
+
+    @TestTemplate
+    public void testIsNull() throws Exception {
+        // set nulls
+        // make sure that there will be some btree file only containing nulls.
+        for (int i = dataNum - 1; i >= dataNum * 0.85; i--) {
+            data.get(i).setLeft(null);
+        }
+        List<GlobalIndexIOMeta> written = writeData();
+        FieldRef ref = new FieldRef(1, "testField", dataType);
+
+        try (GlobalIndexReader reader = globalIndexer.createReader(fileReader, 
written)) {
+            GlobalIndexResult result;
+
+            result = reader.visitIsNull(ref).get();
+            assertResult(result, filter(Objects::isNull));
+
+            result = reader.visitIsNotNull(ref).get();
+            assertResult(result, filter(Objects::nonNull));
+        }
+    }
+
+    @TestTemplate
+    public void testInPredicate() throws Exception {
+        List<GlobalIndexIOMeta> written = writeData();
+        FieldRef ref = new FieldRef(1, "testField", dataType);
+
+        try (GlobalIndexReader reader = globalIndexer.createReader(fileReader, 
written)) {
+            GlobalIndexResult result;
+            for (int i = 0; i < 10; i++) {
+                Random random = new Random(System.currentTimeMillis());
+                List<Object> literals =
+                        
data.stream().map(Pair::getKey).collect(Collectors.toList());
+                Collections.shuffle(literals, random);
+                literals = literals.subList(0, (int) (dataNum * 0.1));
+
+                TreeSet<Object> set = new TreeSet<>(comparator);
+                set.addAll(literals);
+
+                // 1. test in
+                result = reader.visitIn(ref, literals).get();
+                assertResult(result, filter(set::contains));
+
+                // 2. test not in
+                result = reader.visitNotIn(ref, literals).get();
+                assertResult(result, filter(obj -> !set.contains(obj)));
+            }
+        }
+    }
+
+    private List<GlobalIndexIOMeta> writeData() throws Exception {
+        int fileNum = 10;
+        List<GlobalIndexIOMeta> written = new ArrayList<>(fileNum);
+
+        int currentStart = 0;
+        int recordPerFile = dataNum / fileNum;
+        while (currentStart < dataNum) {
+            int nextStart = Math.min(currentStart + recordPerFile, dataNum);
+            written.add(writeData(data.subList(currentStart, nextStart)));
+            currentStart = nextStart;
+        }
+
+        return written;
+    }
+}

Reply via email to