This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7157ae4327 [core] Introduce a lazy reader for BTreeIndexReader to
filter out files by predicate (#6927)
7157ae4327 is described below
commit 7157ae432774f5a4803f70bfebe76f8b78c54615
Author: Faiz <[email protected]>
AuthorDate: Tue Dec 30 12:57:33 2025 +0800
[core] Introduce a lazy reader for BTreeIndexReader to filter out files by
predicate (#6927)
---
.../globalindex/btree/BTreeFileMetaSelector.java | 36 ++-
.../globalindex/btree/BTreeGlobalIndexer.java | 8 +-
.../paimon/globalindex/btree/BTreeIndexMeta.java | 49 +++-
.../paimon/globalindex/btree/BTreeIndexReader.java | 21 +-
.../paimon/globalindex/btree/BTreeIndexWriter.java | 6 +-
.../globalindex/btree/LazyFilteredBTreeReader.java | 294 +++++++++++++++++++++
...dexerTest.java => AbstractIndexReaderTest.java} | 175 ++----------
.../globalindex/btree/BTreeIndexReaderTest.java | 184 +++++++++++++
.../btree/LazyFilteredBTreeIndexReaderTest.java | 199 ++++++++++++++
9 files changed, 788 insertions(+), 184 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
index 334771e125..ef59e03122 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
@@ -55,7 +55,7 @@ public class BTreeFileMetaSelector implements
FunctionVisitor<Optional<List<Glob
@Override
public Optional<List<GlobalIndexIOMeta>> visitIsNotNull(FieldRef fieldRef)
{
- return Optional.of(filter(meta -> true));
+ return Optional.of(filter(meta -> !meta.onlyNulls()));
}
@Override
@@ -87,7 +87,12 @@ public class BTreeFileMetaSelector implements
FunctionVisitor<Optional<List<Glob
public Optional<List<GlobalIndexIOMeta>> visitLessThan(FieldRef fieldRef,
Object literal) {
// `<` means file.minKey < literal
return Optional.of(
- filter(meta ->
comparator.compare(deserialize(meta.getFirstKey()), literal) < 0));
+ filter(
+ meta ->
+ !meta.onlyNulls()
+ && comparator.compare(
+
deserialize(meta.getFirstKey()), literal)
+ < 0));
}
@Override
@@ -95,7 +100,12 @@ public class BTreeFileMetaSelector implements
FunctionVisitor<Optional<List<Glob
FieldRef fieldRef, Object literal) {
// `>=` means file.maxKey >= literal
return Optional.of(
- filter(meta ->
comparator.compare(deserialize(meta.getLastKey()), literal) >= 0));
+ filter(
+ meta ->
+ !meta.onlyNulls()
+ && comparator.compare(
+
deserialize(meta.getLastKey()), literal)
+ >= 0));
}
@Override
@@ -107,7 +117,12 @@ public class BTreeFileMetaSelector implements
FunctionVisitor<Optional<List<Glob
public Optional<List<GlobalIndexIOMeta>> visitLessOrEqual(FieldRef
fieldRef, Object literal) {
// `<=` means file.minKey <= literal
return Optional.of(
- filter(meta ->
comparator.compare(deserialize(meta.getFirstKey()), literal) <= 0));
+ filter(
+ meta ->
+ !meta.onlyNulls()
+ && comparator.compare(
+
deserialize(meta.getFirstKey()), literal)
+ <= 0));
}
@Override
@@ -115,6 +130,9 @@ public class BTreeFileMetaSelector implements
FunctionVisitor<Optional<List<Glob
return Optional.of(
filter(
meta -> {
+ if (meta.onlyNulls()) {
+ return false;
+ }
Object minKey = deserialize(meta.getFirstKey());
Object maxKey = deserialize(meta.getLastKey());
return comparator.compare(literal, minKey) >= 0
@@ -126,7 +144,12 @@ public class BTreeFileMetaSelector implements
FunctionVisitor<Optional<List<Glob
public Optional<List<GlobalIndexIOMeta>> visitGreaterThan(FieldRef
fieldRef, Object literal) {
// `>` means file.maxKey > literal
return Optional.of(
- filter(meta ->
comparator.compare(deserialize(meta.getLastKey()), literal) > 0));
+ filter(
+ meta ->
+ !meta.onlyNulls()
+ && comparator.compare(
+
deserialize(meta.getLastKey()), literal)
+ > 0));
}
@Override
@@ -134,6 +157,9 @@ public class BTreeFileMetaSelector implements
FunctionVisitor<Optional<List<Glob
return Optional.of(
filter(
meta -> {
+ if (meta.onlyNulls()) {
+ return false;
+ }
Object minKey = deserialize(meta.getFirstKey());
Object maxKey = deserialize(meta.getLastKey());
for (Object literal : literals) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
index 2ffba2fc9d..34110bc8a5 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
@@ -23,7 +23,6 @@ import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexReader;
import org.apache.paimon.globalindex.GlobalIndexer;
-import org.apache.paimon.globalindex.UnionGlobalIndexReader;
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
import org.apache.paimon.io.cache.CacheManager;
@@ -32,7 +31,6 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.LazyField;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
/**
@@ -95,10 +93,6 @@ public class BTreeGlobalIndexer implements GlobalIndexer {
@Override
public GlobalIndexReader createReader(
GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files)
throws IOException {
- List<GlobalIndexReader> readers = new ArrayList<>();
- for (GlobalIndexIOMeta meta : files) {
- readers.add(new BTreeIndexReader(keySerializer, fileReader, meta,
cacheManager.get()));
- }
- return new UnionGlobalIndexReader(readers);
+ return new LazyFilteredBTreeReader(files, keySerializer, fileReader,
cacheManager.get());
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
index c134f1fa1e..83d12b217a 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
@@ -22,23 +22,30 @@ import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceInput;
import org.apache.paimon.memory.MemorySliceOutput;
-/** Index Meta of each BTree index file. */
+import javax.annotation.Nullable;
+
+/**
+ * Index Meta of each BTree index file. The first key and last key of this
meta could be null if the
+ * entire btree index file only contains nulls.
+ */
public class BTreeIndexMeta {
- private final byte[] firstKey;
- private final byte[] lastKey;
+ @Nullable private final byte[] firstKey;
+ @Nullable private final byte[] lastKey;
private final boolean hasNulls;
- public BTreeIndexMeta(byte[] firstKey, byte[] lastKey, boolean hasNulls) {
+ public BTreeIndexMeta(@Nullable byte[] firstKey, @Nullable byte[] lastKey,
boolean hasNulls) {
this.firstKey = firstKey;
this.lastKey = lastKey;
this.hasNulls = hasNulls;
}
+ @Nullable
public byte[] getFirstKey() {
return firstKey;
}
+ @Nullable
public byte[] getLastKey() {
return lastKey;
}
@@ -47,12 +54,30 @@ public class BTreeIndexMeta {
return hasNulls;
}
+ public boolean onlyNulls() {
+ return firstKey == null && lastKey == null;
+ }
+
+ private int memorySize() {
+ return (firstKey == null ? 0 : firstKey.length)
+ + (lastKey == null ? 0 : lastKey.length)
+ + 9;
+ }
+
public byte[] serialize() {
- MemorySliceOutput sliceOutput = new MemorySliceOutput(firstKey.length
+ lastKey.length + 8);
- sliceOutput.writeInt(firstKey.length);
- sliceOutput.writeBytes(firstKey);
- sliceOutput.writeInt(lastKey.length);
- sliceOutput.writeBytes(lastKey);
+ MemorySliceOutput sliceOutput = new MemorySliceOutput(memorySize());
+ if (firstKey != null) {
+ sliceOutput.writeInt(firstKey.length);
+ sliceOutput.writeBytes(firstKey);
+ } else {
+ sliceOutput.writeInt(0);
+ }
+ if (lastKey != null) {
+ sliceOutput.writeInt(lastKey.length);
+ sliceOutput.writeBytes(lastKey);
+ } else {
+ sliceOutput.writeInt(0);
+ }
sliceOutput.writeByte(hasNulls ? 1 : 0);
return sliceOutput.toSlice().getHeapMemory();
}
@@ -60,9 +85,11 @@ public class BTreeIndexMeta {
public static BTreeIndexMeta deserialize(byte[] data) {
MemorySliceInput sliceInput = MemorySlice.wrap(data).toInput();
int firstKeyLength = sliceInput.readInt();
- byte[] firstKey = sliceInput.readSlice(firstKeyLength).copyBytes();
+ byte[] firstKey =
+ firstKeyLength == 0 ? null :
sliceInput.readSlice(firstKeyLength).copyBytes();
int lastKeyLength = sliceInput.readInt();
- byte[] lastKey = sliceInput.readSlice(lastKeyLength).copyBytes();
+ byte[] lastKey =
+ lastKeyLength == 0 ? null :
sliceInput.readSlice(lastKeyLength).copyBytes();
boolean hasNulls = sliceInput.readByte() == 1;
return new BTreeIndexMeta(firstKey, lastKey, hasNulls);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
index 65b0d5744c..adee790a7d 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
@@ -67,8 +67,14 @@ public class BTreeIndexReader implements GlobalIndexReader {
this.keySerializer = keySerializer;
this.comparator = keySerializer.createComparator();
BTreeIndexMeta indexMeta =
BTreeIndexMeta.deserialize(globalIndexIOMeta.metadata());
- this.minKey =
keySerializer.deserialize(MemorySlice.wrap(indexMeta.getFirstKey()));
- this.maxKey =
keySerializer.deserialize(MemorySlice.wrap(indexMeta.getLastKey()));
+ if (indexMeta.getFirstKey() != null) {
+ this.minKey =
keySerializer.deserialize(MemorySlice.wrap(indexMeta.getFirstKey()));
+ this.maxKey =
keySerializer.deserialize(MemorySlice.wrap(indexMeta.getLastKey()));
+ } else {
+ // this is possible if this btree index file only stores nulls.
+ this.minKey = null;
+ this.maxKey = null;
+ }
this.input = fileReader.getInputStream(globalIndexIOMeta.fileName());
// prepare file footer
@@ -340,6 +346,9 @@ public class BTreeIndexReader implements GlobalIndexReader {
// Traverse all data to avoid returning null values, which is very
advantageous in
// situations where there are many null values
// TODO do not traverse all data if less null values
+ if (minKey == null) {
+ return new RoaringNavigableMap64();
+ }
return rangeQuery(minKey, maxKey, true, true);
}
@@ -357,21 +366,19 @@ public class BTreeIndexReader implements
GlobalIndexReader {
SstFileReader.SstFileIterator fileIter = reader.createIterator();
fileIter.seekTo(keySerializer.serialize(from));
- boolean skipped = false;
RoaringNavigableMap64 result = new RoaringNavigableMap64();
BlockIterator dataIter;
Map.Entry<MemorySlice, MemorySlice> entry;
while ((dataIter = fileIter.readBatch()) != null) {
while (dataIter.hasNext()) {
entry = dataIter.next();
+ Object key = keySerializer.deserialize(entry.getKey());
- if (!fromInclusive && !skipped) {
- // this is correct only if the underlying file do not have
duplicated keys.
- skipped = true;
+ if (!fromInclusive && comparator.compare(key, from) == 0) {
continue;
}
- int difference =
comparator.compare(keySerializer.deserialize(entry.getKey()), to);
+ int difference = comparator.compare(key, to);
if (difference > 0 || !toInclusive && difference == 0) {
return result;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
index a08b378947..fe4c14a449 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
@@ -167,14 +167,14 @@ public class BTreeIndexWriter implements
GlobalIndexParallelWriter {
throw new RuntimeException("Error in closing BTree index writer",
e);
}
- if (firstKey == null) {
+ if (firstKey == null && !nullBitmap.initialized()) {
throw new RuntimeException("Should never write an empty btree
index file.");
}
byte[] metaBytes =
new BTreeIndexMeta(
- keySerializer.serialize(firstKey),
- keySerializer.serialize(lastKey),
+ firstKey == null ? null :
keySerializer.serialize(firstKey),
+ lastKey == null ? null :
keySerializer.serialize(lastKey),
nullBitmap.initialized())
.serialize();
return Collections.singletonList(new ResultEntry(fileName, rowCount,
metaBytes));
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeReader.java
new file mode 100644
index 0000000000..cbfb452b2d
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeReader.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.UnionGlobalIndexReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.predicate.FieldRef;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * An Index Reader for BTree which dynamically filters file list by input
predicate, then merge the
+ * result by an {@link org.apache.paimon.globalindex.UnionGlobalIndexReader}.
In the ideal situation
+ * such as visiting an Equal predicate, only a very few files would be
actually read.
+ */
+public class LazyFilteredBTreeReader implements GlobalIndexReader {
+
+ private final BTreeFileMetaSelector fileSelector;
+ private final List<GlobalIndexIOMeta> files;
+ private final Map<String, GlobalIndexReader> readerCache;
+ private final KeySerializer keySerializer;
+ private final CacheManager cacheManager;
+ private final GlobalIndexFileReader fileReader;
+
+ public LazyFilteredBTreeReader(
+ List<GlobalIndexIOMeta> files,
+ KeySerializer keySerializer,
+ GlobalIndexFileReader fileReader,
+ CacheManager cacheManager) {
+ this.fileSelector = new BTreeFileMetaSelector(files, keySerializer);
+ this.readerCache = new HashMap<>();
+ this.cacheManager = cacheManager;
+ this.fileReader = fileReader;
+ this.keySerializer = keySerializer;
+ this.files = files;
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
fileSelector.visitIsNotNull(fieldRef);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitIsNotNull(fieldRef);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
fileSelector.visitIsNull(fieldRef);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitIsNull(fieldRef);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef,
Object literal) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
+ fileSelector.visitStartsWith(fieldRef, literal);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitStartsWith(fieldRef, literal);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object
literal) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
+ fileSelector.visitEndsWith(fieldRef, literal);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitEndsWith(fieldRef, literal);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object
literal) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
+ fileSelector.visitContains(fieldRef, literal);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitContains(fieldRef, literal);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitLike(FieldRef fieldRef, Object
literal) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
fileSelector.visitLike(fieldRef, literal);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitLike(fieldRef, literal);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object
literal) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
+ fileSelector.visitLessThan(fieldRef, literal);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitLessThan(fieldRef, literal);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef,
Object literal) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
+ fileSelector.visitGreaterOrEqual(fieldRef, literal);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitGreaterOrEqual(fieldRef,
literal);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object
literal) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
+ fileSelector.visitNotEqual(fieldRef, literal);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitNotEqual(fieldRef, literal);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef,
Object literal) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
+ fileSelector.visitLessOrEqual(fieldRef, literal);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitLessOrEqual(fieldRef, literal);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object
literal) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
fileSelector.visitEqual(fieldRef, literal);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitEqual(fieldRef, literal);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef,
Object literal) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
+ fileSelector.visitGreaterThan(fieldRef, literal);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitGreaterThan(fieldRef, literal);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object>
literals) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
fileSelector.visitIn(fieldRef, literals);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitIn(fieldRef, literals);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef,
List<Object> literals) {
+ Optional<List<GlobalIndexIOMeta>> selectedOpt =
fileSelector.visitNotIn(fieldRef, literals);
+ if (!selectedOpt.isPresent()) {
+ return Optional.empty();
+ }
+ List<GlobalIndexIOMeta> selected = selectedOpt.get();
+ if (selected.isEmpty()) {
+ return Optional.of(GlobalIndexResult.createEmpty());
+ }
+ return createUnionReader(selected).visitNotIn(fieldRef, literals);
+ }
+
+ /**
+ * Create a Union Reader for given files. The union reader is composed by
readers from reader
+ * cache, so please do not close it.
+ */
+ private UnionGlobalIndexReader createUnionReader(List<GlobalIndexIOMeta>
files) {
+ List<GlobalIndexReader> readers = new ArrayList<>();
+ for (GlobalIndexIOMeta meta : files) {
+ readers.add(
+ readerCache.computeIfAbsent(
+ meta.fileName(),
+ name -> {
+ try {
+ return new BTreeIndexReader(
+ keySerializer, fileReader, meta,
cacheManager);
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Can't create BTree index reader
for " + name, e);
+ }
+ }));
+ }
+ return new UnionGlobalIndexReader(readers);
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOException exception = null;
+ for (Map.Entry<String, GlobalIndexReader> entry :
this.readerCache.entrySet()) {
+ try {
+ entry.getValue().close();
+ } catch (IOException ioe) {
+ if (exception == null) {
+ exception = ioe;
+ } else {
+ exception.addSuppressed(ioe);
+ }
+ }
+ }
+ if (exception != null) {
+ throw exception;
+ }
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/AbstractIndexReaderTest.java
similarity index 56%
rename from
paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
rename to
paimon-common/src/test/java/org/apache/paimon/globalindex/btree/AbstractIndexReaderTest.java
index 7acf024542..1160aa0c11 100644
---
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/AbstractIndexReaderTest.java
@@ -28,16 +28,13 @@ import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
-import org.apache.paimon.globalindex.GlobalIndexReader;
import org.apache.paimon.globalindex.GlobalIndexResult;
import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.FieldRef;
-import
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
-import org.apache.paimon.testutils.junit.parameterized.Parameters;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.CharType;
@@ -59,73 +56,49 @@ import org.apache.paimon.utils.DecimalUtils;
import org.apache.paimon.utils.Pair;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
-import java.util.Objects;
import java.util.Random;
-import java.util.TreeSet;
+import java.util.UUID;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link BTreeGlobalIndexer}. */
-@ExtendWith(ParameterizedTestExtension.class)
-public class BTreeGlobalIndexerTest {
- private DataType dataType;
- private int dataNum;
- private List<Pair<Object, Long>> data;
- private KeySerializer keySerializer;
- private Comparator<Object> comparator;
- private FileIO fileIO;
- private GlobalIndexFileReader fileReader;
- private GlobalIndexFileWriter fileWriter;
- private BTreeGlobalIndexer globalIndexer;
- private Options options;
-
- @TempDir java.nio.file.Path tempPath;
-
- public BTreeGlobalIndexerTest(List<Object> args) {
+/** Common test class for BTreeIndexReader. */
+public class AbstractIndexReaderTest {
+ protected static final CacheManager CACHE_MANAGER = new
CacheManager(MemorySize.VALUE_8_MB);
+
+ protected DataType dataType;
+ protected int dataNum;
+ protected List<Pair<Object, Long>> data;
+ protected KeySerializer keySerializer;
+ protected Comparator<Object> comparator;
+ protected FileIO fileIO;
+ protected GlobalIndexFileReader fileReader;
+ protected GlobalIndexFileWriter fileWriter;
+ protected BTreeGlobalIndexer globalIndexer;
+ protected Options options;
+
+ @TempDir protected java.nio.file.Path tempPath;
+
+ AbstractIndexReaderTest(List<Object> args) {
this.dataType = (DataType) args.get(0);
this.dataNum = (Integer) args.get(1);
}
- @SuppressWarnings("unused")
- @Parameters(name = "dataType&recordNum-{0}")
- public static List<List<Object>> getVarSeg() {
- return Arrays.asList(
- Arrays.asList(new IntType(), 10000),
- Arrays.asList(new VarCharType(VarCharType.MAX_LENGTH), 10000),
- Arrays.asList(new CharType(100), 10000),
- Arrays.asList(new FloatType(), 10000),
- Arrays.asList(new DecimalType(), 10000),
- Arrays.asList(new DoubleType(), 10000),
- Arrays.asList(new BooleanType(), 10000),
- Arrays.asList(new TinyIntType(), 10000),
- Arrays.asList(new SmallIntType(), 10000),
- Arrays.asList(new BigIntType(), 10000),
- Arrays.asList(new DateType(), 10000),
- Arrays.asList(new TimestampType(), 10000));
- }
-
- @BeforeEach
public void setUp() throws Exception {
fileIO = LocalFileIO.create();
fileWriter =
new GlobalIndexFileWriter() {
@Override
public String newFileName(String prefix) {
- return "test-btree-" + prefix;
+ return "test-btree-" + UUID.randomUUID() + prefix;
}
@Override
@@ -163,107 +136,7 @@ public class BTreeGlobalIndexerTest {
data.sort((p1, p2) -> comparator.compare(p1.getKey(), p2.getKey()));
}
- @TestTemplate
- public void testRangePredicate() throws Exception {
- GlobalIndexIOMeta written = writeData();
- FieldRef ref = new FieldRef(1, "testField", dataType);
-
- try (GlobalIndexReader reader =
- globalIndexer.createReader(fileReader,
Collections.singletonList(written))) {
- GlobalIndexResult result;
- Random random = new Random();
-
- for (int i = 0; i < 5; i++) {
- Object literal = data.get(random.nextInt(dataNum)).getKey();
-
- // 1. test <= literal
- result = reader.visitLessOrEqual(ref, literal).get();
- assertResult(result, filter(obj -> comparator.compare(obj,
literal) <= 0));
-
- // 2. test < literal
- result = reader.visitLessThan(ref, literal).get();
- assertResult(result, filter(obj -> comparator.compare(obj,
literal) < 0));
-
- // 3. test >= literal
- result = reader.visitGreaterOrEqual(ref, literal).get();
- assertResult(result, filter(obj -> comparator.compare(obj,
literal) >= 0));
-
- // 4. test > literal
- result = reader.visitGreaterThan(ref, literal).get();
- assertResult(result, filter(obj -> comparator.compare(obj,
literal) > 0));
-
- // 5. test equal
- result = reader.visitEqual(ref, literal).get();
- assertResult(result, filter(obj -> comparator.compare(obj,
literal) == 0));
-
- // 6. test not equal
- result = reader.visitNotEqual(ref, literal).get();
- assertResult(result, filter(obj -> comparator.compare(obj,
literal) != 0));
- }
-
- // 7. test < min
- Object literal7 = data.get(0).getKey();
- result = reader.visitLessThan(ref, literal7).get();
- Assertions.assertTrue(result.results().isEmpty());
-
- // 8. test > max
- Object literal8 = data.get(dataNum - 1).getKey();
- result = reader.visitGreaterThan(ref, literal8).get();
- Assertions.assertTrue(result.results().isEmpty());
- }
- }
-
- @TestTemplate
- public void testIsNull() throws Exception {
- // set nulls
- for (int i = dataNum - 1; i >= dataNum * 0.9; i--) {
- data.get(i).setLeft(null);
- }
- GlobalIndexIOMeta written = writeData();
- FieldRef ref = new FieldRef(1, "testField", dataType);
-
- try (GlobalIndexReader reader =
- globalIndexer.createReader(fileReader,
Collections.singletonList(written))) {
- GlobalIndexResult result;
-
- result = reader.visitIsNull(ref).get();
- assertResult(result, filter(Objects::isNull));
-
- result = reader.visitIsNotNull(ref).get();
- assertResult(result, filter(Objects::nonNull));
- }
- }
-
- @TestTemplate
- public void testInPredicate() throws Exception {
- GlobalIndexIOMeta written = writeData();
- FieldRef ref = new FieldRef(1, "testField", dataType);
-
- try (GlobalIndexReader reader =
- globalIndexer.createReader(fileReader,
Collections.singletonList(written))) {
- GlobalIndexResult result;
- for (int i = 0; i < 10; i++) {
- Random random = new Random(System.currentTimeMillis());
- List<Object> literals =
-
data.stream().map(Pair::getKey).collect(Collectors.toList());
- Collections.shuffle(literals, random);
- literals = literals.subList(0, (int) (dataNum * 0.1));
-
- TreeSet<Object> set = new TreeSet<>(comparator);
- set.addAll(literals);
-
- // 1. test in
- result = reader.visitIn(ref, literals).get();
- assertResult(result, filter(set::contains));
-
- // 2. test not in
- result = reader.visitNotIn(ref, literals).get();
- assertResult(result, filter(obj -> !set.contains(obj)));
- }
- }
- }
-
- private GlobalIndexIOMeta writeData() throws IOException {
+ protected GlobalIndexIOMeta writeData(List<Pair<Object, Long>> data)
throws IOException {
GlobalIndexParallelWriter indexWriter =
globalIndexer.createWriter(fileWriter);
for (Pair<Object, Long> pair : data) {
indexWriter.write(pair.getKey(), pair.getValue());
@@ -279,14 +152,14 @@ public class BTreeGlobalIndexerTest {
resultEntry.meta());
}
- private List<Long> filter(Predicate<Object> filter) {
+ protected List<Long> filter(Predicate<Object> filter) {
return data.stream()
.filter(pair -> filter.test(pair.getKey()))
.map(Pair::getValue)
.collect(Collectors.toList());
}
- private void assertResult(GlobalIndexResult indexResult, List<Long>
expected) {
+ protected void assertResult(GlobalIndexResult indexResult, List<Long>
expected) {
Iterator<Long> iter = indexResult.results().iterator();
List<Long> result = new ArrayList<>();
while (iter.hasNext()) {
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeIndexReaderTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeIndexReaderTest.java
new file mode 100644
index 0000000000..74c5eb1658
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeIndexReaderTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.predicate.FieldRef;
+import
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
+import org.apache.paimon.testutils.junit.parameterized.Parameters;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/** Test for {@link BTreeIndexReader} to read a single file. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class BTreeIndexReaderTest extends AbstractIndexReaderTest {
+
+ public BTreeIndexReaderTest(List<Object> args) {
+ super(args);
+ }
+
+ @SuppressWarnings("unused")
+ @Parameters(name = "dataType&recordNum-{0}")
+ public static List<List<Object>> getVarSeg() {
+ return Arrays.asList(
+ Arrays.asList(new IntType(), 10000),
+ Arrays.asList(new VarCharType(VarCharType.MAX_LENGTH), 10000),
+ Arrays.asList(new CharType(100), 10000),
+ Arrays.asList(new FloatType(), 10000),
+ Arrays.asList(new DecimalType(), 10000),
+ Arrays.asList(new DoubleType(), 10000),
+ Arrays.asList(new BooleanType(), 10000),
+ Arrays.asList(new TinyIntType(), 10000),
+ Arrays.asList(new SmallIntType(), 10000),
+ Arrays.asList(new BigIntType(), 10000),
+ Arrays.asList(new DateType(), 10000),
+ Arrays.asList(new TimestampType(), 10000));
+ }
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @TestTemplate
+ public void testRangePredicate() throws Exception {
+ GlobalIndexIOMeta written = writeData(data);
+ FieldRef ref = new FieldRef(1, "testField", dataType);
+
+ try (GlobalIndexReader reader =
+ new BTreeIndexReader(keySerializer, fileReader, written,
CACHE_MANAGER)) {
+ GlobalIndexResult result;
+ Random random = new Random();
+
+ for (int i = 0; i < 5; i++) {
+ Object literal = data.get(random.nextInt(dataNum)).getKey();
+
+ // 1. test <= literal
+ result = reader.visitLessOrEqual(ref, literal).get();
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) <= 0));
+
+ // 2. test < literal
+ result = reader.visitLessThan(ref, literal).get();
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) < 0));
+
+ // 3. test >= literal
+ result = reader.visitGreaterOrEqual(ref, literal).get();
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) >= 0));
+
+ // 4. test > literal
+ result = reader.visitGreaterThan(ref, literal).get();
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) > 0));
+
+ // 5. test equal
+ result = reader.visitEqual(ref, literal).get();
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) == 0));
+
+ // 6. test not equal
+ result = reader.visitNotEqual(ref, literal).get();
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) != 0));
+ }
+
+ // 7. test < min
+ Object literal7 = data.get(0).getKey();
+ result = reader.visitLessThan(ref, literal7).get();
+ Assertions.assertTrue(result.results().isEmpty());
+
+ // 8. test > max
+ Object literal8 = data.get(dataNum - 1).getKey();
+ result = reader.visitGreaterThan(ref, literal8).get();
+ Assertions.assertTrue(result.results().isEmpty());
+ }
+ }
+
+ @TestTemplate
+ public void testIsNull() throws Exception {
+ // set nulls
+ for (int i = dataNum - 1; i >= dataNum * 0.9; i--) {
+ data.get(i).setLeft(null);
+ }
+ GlobalIndexIOMeta written = writeData(data);
+ FieldRef ref = new FieldRef(1, "testField", dataType);
+
+ try (GlobalIndexReader reader =
+ new BTreeIndexReader(keySerializer, fileReader, written,
CACHE_MANAGER)) {
+ GlobalIndexResult result;
+
+ result = reader.visitIsNull(ref).get();
+ assertResult(result, filter(Objects::isNull));
+
+ result = reader.visitIsNotNull(ref).get();
+ assertResult(result, filter(Objects::nonNull));
+ }
+ }
+
+ @TestTemplate
+ public void testInPredicate() throws Exception {
+ GlobalIndexIOMeta written = writeData(data);
+ FieldRef ref = new FieldRef(1, "testField", dataType);
+
+ try (GlobalIndexReader reader =
+ new BTreeIndexReader(keySerializer, fileReader, written,
CACHE_MANAGER)) {
+ GlobalIndexResult result;
+ for (int i = 0; i < 10; i++) {
+ Random random = new Random(System.currentTimeMillis());
+ List<Object> literals =
+
data.stream().map(Pair::getKey).collect(Collectors.toList());
+ Collections.shuffle(literals, random);
+ literals = literals.subList(0, (int) (dataNum * 0.1));
+
+ TreeSet<Object> set = new TreeSet<>(comparator);
+ set.addAll(literals);
+
+ // 1. test in
+ result = reader.visitIn(ref, literals).get();
+ assertResult(result, filter(set::contains));
+
+ // 2. test not in
+ result = reader.visitNotIn(ref, literals).get();
+ assertResult(result, filter(obj -> !set.contains(obj)));
+ }
+ }
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeIndexReaderTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeIndexReaderTest.java
new file mode 100644
index 0000000000..0dfcb712e6
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/LazyFilteredBTreeIndexReaderTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.predicate.FieldRef;
+import
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
+import org.apache.paimon.testutils.junit.parameterized.Parameters;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+/** Test for {@link LazyFilteredBTreeReader} to read multiple files. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class LazyFilteredBTreeIndexReaderTest extends AbstractIndexReaderTest {
+
+ LazyFilteredBTreeIndexReaderTest(List<Object> args) {
+ super(args);
+ }
+
+ @SuppressWarnings("unused")
+ @Parameters(name = "dataType&recordNum-{0}")
+ public static List<List<Object>> getVarSeg() {
+ return Arrays.asList(
+ Arrays.asList(new IntType(), 10000),
+ Arrays.asList(new VarCharType(VarCharType.MAX_LENGTH), 10000),
+ Arrays.asList(new CharType(100), 10000),
+ Arrays.asList(new FloatType(), 10000),
+ Arrays.asList(new DecimalType(), 10000),
+ Arrays.asList(new DoubleType(), 10000),
+ Arrays.asList(new BooleanType(), 10000),
+ Arrays.asList(new TinyIntType(), 10000),
+ Arrays.asList(new SmallIntType(), 10000),
+ Arrays.asList(new BigIntType(), 10000),
+ Arrays.asList(new DateType(), 10000),
+ Arrays.asList(new TimestampType(), 10000));
+ }
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @TestTemplate
+ public void testRangePredicate() throws Exception {
+ List<GlobalIndexIOMeta> written = writeData();
+ FieldRef ref = new FieldRef(1, "testField", dataType);
+
+ try (GlobalIndexReader reader = globalIndexer.createReader(fileReader,
written)) {
+ GlobalIndexResult result;
+ Random random = new Random();
+
+ for (int i = 0; i < 5; i++) {
+ org.apache.paimon.utils.Pair<Object, Long> pair =
data.get(random.nextInt(dataNum));
+ Object literal = pair.getLeft();
+
+ // 1. test <= literal
+ result = reader.visitLessOrEqual(ref, literal).get();
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) <= 0));
+
+ // 2. test < literal
+ result = reader.visitLessThan(ref, literal).get();
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) < 0));
+
+ // 3. test >= literal
+ result = reader.visitGreaterOrEqual(ref, literal).get();
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) >= 0));
+
+ // 4. test > literal
+ result = reader.visitGreaterThan(ref, literal).get();
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) > 0));
+
+ // 5. test equal
+ result = reader.visitEqual(ref, literal).get();
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) == 0));
+
+ // 6. test not equal
+ result = reader.visitNotEqual(ref, literal).get();
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) != 0));
+ }
+
+ // 7. test < min
+ Object literal7 = data.get(0).getKey();
+ result = reader.visitLessThan(ref, literal7).get();
+ Assertions.assertTrue(result.results().isEmpty());
+
+ // 8. test > max
+ Object literal8 = data.get(dataNum - 1).getKey();
+ result = reader.visitGreaterThan(ref, literal8).get();
+ Assertions.assertTrue(result.results().isEmpty());
+ }
+ }
+
+ @TestTemplate
+ public void testIsNull() throws Exception {
+ // set nulls
+ // make sure that there will be some btree file only containing nulls.
+ for (int i = dataNum - 1; i >= dataNum * 0.85; i--) {
+ data.get(i).setLeft(null);
+ }
+ List<GlobalIndexIOMeta> written = writeData();
+ FieldRef ref = new FieldRef(1, "testField", dataType);
+
+ try (GlobalIndexReader reader = globalIndexer.createReader(fileReader,
written)) {
+ GlobalIndexResult result;
+
+ result = reader.visitIsNull(ref).get();
+ assertResult(result, filter(Objects::isNull));
+
+ result = reader.visitIsNotNull(ref).get();
+ assertResult(result, filter(Objects::nonNull));
+ }
+ }
+
+ @TestTemplate
+ public void testInPredicate() throws Exception {
+ List<GlobalIndexIOMeta> written = writeData();
+ FieldRef ref = new FieldRef(1, "testField", dataType);
+
+ try (GlobalIndexReader reader = globalIndexer.createReader(fileReader,
written)) {
+ GlobalIndexResult result;
+ for (int i = 0; i < 10; i++) {
+ Random random = new Random(System.currentTimeMillis());
+ List<Object> literals =
+
data.stream().map(Pair::getKey).collect(Collectors.toList());
+ Collections.shuffle(literals, random);
+ literals = literals.subList(0, (int) (dataNum * 0.1));
+
+ TreeSet<Object> set = new TreeSet<>(comparator);
+ set.addAll(literals);
+
+ // 1. test in
+ result = reader.visitIn(ref, literals).get();
+ assertResult(result, filter(set::contains));
+
+ // 2. test not in
+ result = reader.visitNotIn(ref, literals).get();
+ assertResult(result, filter(obj -> !set.contains(obj)));
+ }
+ }
+ }
+
+ private List<GlobalIndexIOMeta> writeData() throws Exception {
+ int fileNum = 10;
+ List<GlobalIndexIOMeta> written = new ArrayList<>(fileNum);
+
+ int currentStart = 0;
+ int recordPerFile = dataNum / fileNum;
+ while (currentStart < dataNum) {
+ int nextStart = Math.min(currentStart + recordPerFile, dataNum);
+ written.add(writeData(data.subList(currentStart, nextStart)));
+ currentStart = nextStart;
+ }
+
+ return written;
+ }
+}