This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7dd7fa0632 [core] Support btree global index in paimon-common (#6869)
7dd7fa0632 is described below
commit 7dd7fa063254b3e16ab669dbf7242f8dc52c4497
Author: Faiz <[email protected]>
AuthorDate: Fri Dec 26 21:45:09 2025 +0800
[core] Support btree global index in paimon-common (#6869)
---
.../paimon/globalindex/GlobalIndexIOMeta.java | 25 ++
.../btree/BTreeFileFooter.java} | 46 ++-
.../globalindex/btree/BTreeFileMetaSelector.java | 199 +++++++++++
.../globalindex/btree/BTreeGlobalIndexer.java | 102 ++++++
.../BTreeGlobalIndexerFactory.java} | 22 +-
.../paimon/globalindex/btree/BTreeIndexMeta.java | 68 ++++
.../globalindex/btree/BTreeIndexOptions.java | 56 +++
.../paimon/globalindex/btree/BTreeIndexReader.java | 379 +++++++++++++++++++++
.../paimon/globalindex/btree/BTreeIndexWriter.java | 216 ++++++++++++
.../paimon/globalindex/btree/KeySerializer.java | 376 ++++++++++++++++++++
.../globalindex/io/GlobalIndexFileReader.java | 3 +
.../globalindex/io/GlobalIndexFileWriter.java | 5 +-
.../paimon/lookup/sort/SortLookupStoreFooter.java | 2 +-
.../paimon/lookup/sort/SortLookupStoreWriter.java | 1 +
.../java/org/apache/paimon/memory/MemorySlice.java | 4 +
.../apache/paimon/memory/MemorySliceOutput.java | 6 +
.../java/org/apache/paimon/sst/SstFileWriter.java | 2 -
.../apache/paimon/utils/RoaringNavigableMap64.java | 4 +
.../bitmapindex/BitmapGlobalIndexTest.java | 18 +-
.../btree/BTreeFileMetaSelectorTest.java | 160 +++++++++
.../globalindex/btree/BTreeGlobalIndexerTest.java | 378 ++++++++++++++++++++
.../globalindex/GlobalIndexFileReadWrite.java | 5 +-
.../index/LuceneVectorGlobalIndexScanTest.java | 5 +-
.../lucene/index/LuceneVectorGlobalIndexTest.java | 17 +-
24 files changed, 2066 insertions(+), 33 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexIOMeta.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexIOMeta.java
index 8d6b5a0a1c..497a9448c5 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexIOMeta.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexIOMeta.java
@@ -18,6 +18,9 @@
package org.apache.paimon.globalindex;
+import java.util.Arrays;
+import java.util.Objects;
+
/** Index meta for global index. */
public class GlobalIndexIOMeta {
@@ -48,4 +51,26 @@ public class GlobalIndexIOMeta {
public byte[] metadata() {
return metadata;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GlobalIndexIOMeta that = (GlobalIndexIOMeta) o;
+ return Objects.equals(fileName, that.fileName)
+ && fileSize == that.fileSize
+ && rangeEnd == that.rangeEnd
+ && Arrays.equals(metadata, that.metadata);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(fileName, fileSize, rangeEnd);
+ result = 31 * result + Arrays.hashCode(metadata);
+ return result;
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileFooter.java
similarity index 68%
copy from
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
copy to
paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileFooter.java
index 9f4272096c..1484e12cea 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileFooter.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup.sort;
+package org.apache.paimon.globalindex.btree;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceInput;
@@ -26,21 +26,24 @@ import org.apache.paimon.sst.BloomFilterHandle;
import javax.annotation.Nullable;
-import static org.apache.paimon.sst.SstFileWriter.MAGIC_NUMBER;
+import static
org.apache.paimon.globalindex.btree.BTreeIndexWriter.MAGIC_NUMBER;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-/** Footer for a sorted file. */
-public class SortLookupStoreFooter {
-
- public static final int ENCODED_LENGTH = 36;
+/** The Footer for BTree file. */
+public class BTreeFileFooter {
+ public static final int ENCODED_LENGTH = 48;
@Nullable private final BloomFilterHandle bloomFilterHandle;
private final BlockHandle indexBlockHandle;
+ @Nullable private final BlockHandle nullBitmapHandle;
- public SortLookupStoreFooter(
- @Nullable BloomFilterHandle bloomFilterHandle, BlockHandle
indexBlockHandle) {
+ public BTreeFileFooter(
+ @Nullable BloomFilterHandle bloomFilterHandle,
+ BlockHandle indexBlockHandle,
+ BlockHandle nullBitmapHandle) {
this.bloomFilterHandle = bloomFilterHandle;
this.indexBlockHandle = indexBlockHandle;
+ this.nullBitmapHandle = nullBitmapHandle;
}
@Nullable
@@ -52,7 +55,12 @@ public class SortLookupStoreFooter {
return indexBlockHandle;
}
- public static SortLookupStoreFooter readFooter(MemorySliceInput
sliceInput) {
+ @Nullable
+ public BlockHandle getNullBitmapHandle() {
+ return nullBitmapHandle;
+ }
+
+ public static BTreeFileFooter readFooter(MemorySliceInput sliceInput) {
// read bloom filter and index handles
@Nullable
BloomFilterHandle bloomFilterHandle =
@@ -65,6 +73,12 @@ public class SortLookupStoreFooter {
}
BlockHandle indexBlockHandle = new BlockHandle(sliceInput.readLong(),
sliceInput.readInt());
+ @Nullable
+ BlockHandle nullBitmapHandle = new BlockHandle(sliceInput.readLong(),
sliceInput.readInt());
+ if (nullBitmapHandle.offset() == 0 && nullBitmapHandle.size() == 0) {
+ nullBitmapHandle = null;
+ }
+
// skip padding
sliceInput.setPosition(ENCODED_LENGTH - 4);
@@ -72,16 +86,16 @@ public class SortLookupStoreFooter {
int magicNumber = sliceInput.readInt();
checkArgument(magicNumber == MAGIC_NUMBER, "File is not a table (bad
magic number)");
- return new SortLookupStoreFooter(bloomFilterHandle, indexBlockHandle);
+ return new BTreeFileFooter(bloomFilterHandle, indexBlockHandle,
nullBitmapHandle);
}
- public static MemorySlice writeFooter(SortLookupStoreFooter footer) {
+ public static MemorySlice writeFooter(BTreeFileFooter footer) {
MemorySliceOutput output = new MemorySliceOutput(ENCODED_LENGTH);
writeFooter(footer, output);
return output.toSlice();
}
- public static void writeFooter(SortLookupStoreFooter footer,
MemorySliceOutput sliceOutput) {
+ public static void writeFooter(BTreeFileFooter footer, MemorySliceOutput
sliceOutput) {
// write bloom filter and index handles
if (footer.bloomFilterHandle == null) {
sliceOutput.writeLong(0);
@@ -96,6 +110,14 @@ public class SortLookupStoreFooter {
sliceOutput.writeLong(footer.indexBlockHandle.offset());
sliceOutput.writeInt(footer.indexBlockHandle.size());
+ if (footer.nullBitmapHandle == null) {
+ sliceOutput.writeLong(0);
+ sliceOutput.writeInt(0);
+ } else {
+ sliceOutput.writeLong(footer.nullBitmapHandle.offset());
+ sliceOutput.writeInt(footer.nullBitmapHandle.size());
+ }
+
// write magic number
sliceOutput.writeInt(MAGIC_NUMBER);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
new file mode 100644
index 0000000000..06846eec3e
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelector.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.FunctionVisitor;
+import org.apache.paimon.predicate.TransformPredicate;
+import org.apache.paimon.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * An {@link FunctionVisitor} to select candidate btree index files. All files
are expected to
+ * belong to the same field. The current {@code RowRangeGlobalIndexScanner}
can guarantee that.
+ * Please do not break this premise if you want to implement your own index
scanner.
+ */
+public class BTreeFileMetaSelector implements
FunctionVisitor<Optional<List<GlobalIndexIOMeta>>> {
+ private final List<Pair<GlobalIndexIOMeta, BTreeIndexMeta>> files;
+ private final Comparator<Object> comparator;
+ private final KeySerializer keySerializer;
+
+ public BTreeFileMetaSelector(List<GlobalIndexIOMeta> files, KeySerializer
keySerializer) {
+ this.files =
+ files.stream()
+ .map(meta -> Pair.of(meta,
BTreeIndexMeta.deserialize(meta.metadata())))
+ .collect(Collectors.toList());
+ this.comparator = keySerializer.createComparator();
+ this.keySerializer = keySerializer;
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitIsNotNull(FieldRef fieldRef)
{
+ return Optional.of(filter(meta -> true));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitIsNull(FieldRef fieldRef) {
+ return Optional.of(filter(BTreeIndexMeta::hasNulls));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitStartsWith(FieldRef
fieldRef, Object literal) {
+ return Optional.of(filter(meta -> true));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitEndsWith(FieldRef fieldRef,
Object literal) {
+ return Optional.of(filter(meta -> true));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitContains(FieldRef fieldRef,
Object literal) {
+ return Optional.of(filter(meta -> true));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitLike(FieldRef fieldRef,
Object literal) {
+ return Optional.of(filter(meta -> true));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitLessThan(FieldRef fieldRef,
Object literal) {
+ // `<` means file.minKey < literal
+ return Optional.of(
+ filter(meta ->
comparator.compare(deserialize(meta.getFirstKey()), literal) < 0));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitGreaterOrEqual(
+ FieldRef fieldRef, Object literal) {
+ // `>=` means file.maxKey >= literal
+ return Optional.of(
+ filter(meta ->
comparator.compare(deserialize(meta.getLastKey()), literal) >= 0));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitNotEqual(FieldRef fieldRef,
Object literal) {
+ return Optional.of(filter(meta -> true));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitLessOrEqual(FieldRef
fieldRef, Object literal) {
+ // `<=` means file.minKey <= literal
+ return Optional.of(
+ filter(meta ->
comparator.compare(deserialize(meta.getFirstKey()), literal) <= 0));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitEqual(FieldRef fieldRef,
Object literal) {
+ return Optional.of(
+ filter(
+ meta -> {
+ Object minKey = deserialize(meta.getFirstKey());
+ Object maxKey = deserialize(meta.getLastKey());
+ return comparator.compare(literal, minKey) >= 0
+ && comparator.compare(literal, maxKey) <=
0;
+ }));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitGreaterThan(FieldRef
fieldRef, Object literal) {
+ // `>` means file.maxKey > literal
+ return Optional.of(
+ filter(meta ->
comparator.compare(deserialize(meta.getLastKey()), literal) > 0));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitIn(FieldRef fieldRef,
List<Object> literals) {
+ return Optional.of(
+ filter(
+ meta -> {
+ Object minKey = deserialize(meta.getFirstKey());
+ Object maxKey = deserialize(meta.getLastKey());
+ for (Object literal : literals) {
+ if (comparator.compare(literal, minKey) >= 0
+ && comparator.compare(literal, maxKey)
<= 0) {
+ return true;
+ }
+ }
+ return false;
+ }));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitNotIn(FieldRef fieldRef,
List<Object> literals) {
+ // we can't filter any file meta by NOT IN condition
+ return Optional.of(filter(meta -> true));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitAnd(
+ List<Optional<List<GlobalIndexIOMeta>>> children) {
+ HashSet<GlobalIndexIOMeta> result = null;
+ for (Optional<List<GlobalIndexIOMeta>> child : children) {
+ if (!child.isPresent()) {
+ return Optional.empty();
+ }
+ if (result == null) {
+ result = new HashSet<>(child.get());
+ } else {
+ result.retainAll(child.get());
+ }
+ if (result.isEmpty()) {
+ return Optional.empty();
+ }
+ }
+ return result == null ? Optional.empty() : Optional.of(new
ArrayList<>(result));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visitOr(
+ List<Optional<List<GlobalIndexIOMeta>>> children) {
+ HashSet<GlobalIndexIOMeta> result = new HashSet<>();
+ for (Optional<List<GlobalIndexIOMeta>> child : children) {
+ child.ifPresent(result::addAll);
+ }
+ return result.isEmpty() ? Optional.empty() : Optional.of(new
ArrayList<>(result));
+ }
+
+ @Override
+ public Optional<List<GlobalIndexIOMeta>> visit(TransformPredicate
predicate) {
+ return Optional.empty();
+ }
+
+ private Object deserialize(byte[] valueBytes) {
+ return keySerializer.deserialize(MemorySlice.wrap(valueBytes));
+ }
+
+ private List<GlobalIndexIOMeta> filter(Predicate<BTreeIndexMeta>
predicate) {
+ return files.stream()
+ .filter(pair -> predicate.test(pair.getRight()))
+ .map(Pair::getLeft)
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
new file mode 100644
index 0000000000..04e661b95d
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.LazyField;
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The {@link GlobalIndexer} for btree index. We do not build a B-tree
directly in memory, instead,
+ * we form a logical B-tree via multi-level metadata over SST files that store
the actual data, as
+ * below:
+ *
+ * <pre>
+ * BTree-Index
+ * / \
+ * / ... \
+ * / \
+ * +--------------------------------------+ +------------+
+ * | SST File | | |
+ * +--------------------------------------+ | |
+ * | Root Index | | |
+ * | / ... \ | ... | SST File |
+ * | Leaf Index ... Leaf Index | | |
+ * | / ... \ / ... \ | | |
+ * | DataBlock ... DataBlock | | |
+ * +--------------------------------------+ +------------+
+ * </pre>
+ *
+ * <p>This approach significantly reduces memory pressure during index reads.
+ */
+public class BTreeGlobalIndexer implements GlobalIndexer {
+
+ private final KeySerializer keySerializer;
+ private final Options options;
+ private final LazyField<CacheManager> cacheManager;
+
+ public BTreeGlobalIndexer(DataField dataField, Options options) {
+ this.keySerializer = KeySerializer.create(dataField.type());
+ this.options = options;
+ // todo: cacheManager can be null to disallow data cache.
+ this.cacheManager =
+ new LazyField<>(
+ () ->
+ new CacheManager(
+
options.get(BTreeIndexOptions.BTREE_INDEX_CACHE_SIZE),
+ options.get(
+ BTreeIndexOptions
+
.BTREE_INDEX_HIGH_PRIORITY_POOL_RATIO)));
+ }
+
+ @Override
+ public GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter)
throws IOException {
+ long blockSize =
options.get(BTreeIndexOptions.BTREE_INDEX_BLOCK_SIZE).getBytes();
+ CompressOptions compressOptions =
+ new CompressOptions(
+ options.get(BTreeIndexOptions.BTREE_INDEX_COMPRESSION),
+
options.get(BTreeIndexOptions.BTREE_INDEX_COMPRESSION_LEVEL));
+ return new BTreeIndexWriter(
+ fileWriter,
+ keySerializer,
+ (int) blockSize,
+ BlockCompressionFactory.create(compressOptions));
+ }
+
+ @Override
+ public GlobalIndexReader createReader(
+ GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files)
throws IOException {
+ // Single reader only supports read one index file
+ Preconditions.checkState(files.size() == 1);
+ return new BTreeIndexReader(keySerializer, fileReader, files.get(0),
cacheManager.get());
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerFactory.java
similarity index 55%
copy from
paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
copy to
paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerFactory.java
index 7c88057fb4..914ed2a467 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerFactory.java
@@ -16,14 +16,24 @@
* limitations under the License.
*/
-package org.apache.paimon.globalindex.io;
+package org.apache.paimon.globalindex.btree;
-import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.GlobalIndexerFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataField;
-import java.io.IOException;
+/** The {@link GlobalIndexerFactory} for btree index. */
+public class BTreeGlobalIndexerFactory implements GlobalIndexerFactory {
+ public static final String IDENTIFIER = "btree";
-/** File reader for global index. */
-public interface GlobalIndexFileReader {
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
- SeekableInputStream getInputStream(String fileName) throws IOException;
+ @Override
+ public GlobalIndexer create(DataField dataField, Options options) {
+ return new BTreeGlobalIndexer(dataField, options);
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
new file mode 100644
index 0000000000..da4a1be622
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexMeta.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+import org.apache.paimon.memory.MemorySliceOutput;
+
+/** Index Meta of each BTree index file. */
+public class BTreeIndexMeta {
+ private final byte[] firstKey;
+ private final byte[] lastKey;
+ private final boolean hasNulls;
+
+ public BTreeIndexMeta(byte[] firstKey, byte[] lastKey, boolean hasNulls) {
+ this.firstKey = firstKey;
+ this.lastKey = lastKey;
+ this.hasNulls = hasNulls;
+ }
+
+ public byte[] getFirstKey() {
+ return firstKey;
+ }
+
+ public byte[] getLastKey() {
+ return lastKey;
+ }
+
+ public boolean hasNulls() {
+ return hasNulls;
+ }
+
+ public byte[] serialize() {
+ MemorySliceOutput sliceOutput = new MemorySliceOutput(firstKey.length
+ lastKey.length + 8);
+ sliceOutput.writeInt(firstKey.length);
+ sliceOutput.writeBytes(firstKey);
+ sliceOutput.writeInt(lastKey.length);
+ sliceOutput.writeBytes(lastKey);
+ sliceOutput.writeByte(hasNulls ? 1 : 0);
+ return sliceOutput.toSlice().getHeapMemory();
+ }
+
+ public static BTreeIndexMeta deserialize(byte[] data) {
+ MemorySliceInput sliceInput = MemorySlice.wrap(data).toInput();
+ int firstKeyLength = sliceInput.readInt();
+ byte[] firstKey = sliceInput.readSlice(firstKeyLength).copyBytes();
+ int lastKeyLength = sliceInput.readInt();
+ byte[] lastKey = sliceInput.readSlice(lastKeyLength).copyBytes();
+ boolean hasNulls = sliceInput.readByte() == 1;
+ return new BTreeIndexMeta(firstKey, lastKey, hasNulls);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
new file mode 100644
index 0000000000..687a3f380b
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.ConfigOptions;
+import org.apache.paimon.options.MemorySize;
+
+/** Options for BTree index. */
+public class BTreeIndexOptions {
+ public static final ConfigOption<String> BTREE_INDEX_COMPRESSION =
+ ConfigOptions.key("btree-index.compression")
+ .stringType()
+ .defaultValue("none")
+ .withDescription("The compression algorithm to use for
BTreeIndex");
+
+ public static final ConfigOption<Integer> BTREE_INDEX_COMPRESSION_LEVEL =
+ ConfigOptions.key("btree-index.compression")
+ .intType()
+ .defaultValue(1)
+ .withDescription("The compression level of the compression
algorithm");
+
+ public static final ConfigOption<MemorySize> BTREE_INDEX_BLOCK_SIZE =
+ ConfigOptions.key("btree-index.block-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofKibiBytes(64))
+ .withDescription("The block size to use for BTreeIndex");
+
+ public static final ConfigOption<MemorySize> BTREE_INDEX_CACHE_SIZE =
+ ConfigOptions.key("btree-index.cache-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(128))
+ .withDescription("The cache size to use for BTreeIndex");
+
+ public static final ConfigOption<Double>
BTREE_INDEX_HIGH_PRIORITY_POOL_RATIO =
+ ConfigOptions.key("btree-index.high-priority-pool-ratio")
+ .doubleType()
+ .defaultValue(0.1)
+ .withDescription("The high priority pool ratio to use for
BTreeIndex");
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
new file mode 100644
index 0000000000..59b6fe3c7b
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.sst.BlockCache;
+import org.apache.paimon.sst.BlockHandle;
+import org.apache.paimon.sst.BlockIterator;
+import org.apache.paimon.sst.SstFileReader;
+import org.apache.paimon.utils.FileBasedBloomFilter;
+import org.apache.paimon.utils.LazyField;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.CRC32;
+
+/** The {@link GlobalIndexReader} implementation for btree index. */
+public class BTreeIndexReader implements GlobalIndexReader {
+ private final SeekableInputStream input;
+ private final SstFileReader reader;
+ private final KeySerializer keySerializer;
+ private final Comparator<Object> comparator;
+ private final LazyField<RoaringNavigableMap64> nullBitmap;
+ private final Object minKey;
+ private final Object maxKey;
+
+ public BTreeIndexReader(
+ KeySerializer keySerializer,
+ GlobalIndexFileReader fileReader,
+ GlobalIndexIOMeta globalIndexIOMeta,
+ CacheManager cacheManager)
+ throws IOException {
+ this.keySerializer = keySerializer;
+ this.comparator = keySerializer.createComparator();
+ BTreeIndexMeta indexMeta =
BTreeIndexMeta.deserialize(globalIndexIOMeta.metadata());
+ this.minKey =
keySerializer.deserialize(MemorySlice.wrap(indexMeta.getFirstKey()));
+ this.maxKey =
keySerializer.deserialize(MemorySlice.wrap(indexMeta.getLastKey()));
+ this.input = fileReader.getInputStream(globalIndexIOMeta.fileName());
+
+ // prepare file footer
+ long fileSize = globalIndexIOMeta.fileSize();
+ Path filePath = fileReader.filePath(globalIndexIOMeta.fileName());
+ BlockCache blockCache = new BlockCache(filePath, input, cacheManager);
+ BTreeFileFooter footer = readFooter(blockCache, fileSize);
+
+ // prepare nullBitmap and SstFileReader
+ this.nullBitmap =
+ new LazyField<>(() -> readNullBitmap(blockCache,
footer.getNullBitmapHandle()));
+ FileBasedBloomFilter bloomFilter =
+ FileBasedBloomFilter.create(
+ input, filePath, cacheManager,
footer.getBloomFilterHandle());
+ this.reader =
+ new SstFileReader(
+ createSliceComparator(keySerializer),
+ blockCache,
+ footer.getIndexBlockHandle(),
+ bloomFilter);
+ }
+
+ private BTreeFileFooter readFooter(BlockCache blockCache, long fileSize) {
+ MemorySegment footerEncodings =
+ blockCache.getBlock(
+ fileSize - BTreeFileFooter.ENCODED_LENGTH,
+ BTreeFileFooter.ENCODED_LENGTH,
+ b -> b,
+ true);
+ return
BTreeFileFooter.readFooter(MemorySlice.wrap(footerEncodings).toInput());
+ }
+
+ private RoaringNavigableMap64 readNullBitmap(
+ BlockCache cache, @Nullable BlockHandle blockHandle) {
+ RoaringNavigableMap64 nullBitmap = new RoaringNavigableMap64();
+ if (blockHandle == null) {
+ return nullBitmap;
+ }
+
+ CRC32 crc32c = new CRC32();
+ // read bytes and crc value
+ MemorySliceInput sliceInput =
+ MemorySlice.wrap(
+ cache.getBlock(
+ blockHandle.offset(),
+ blockHandle.size() + 4,
+ b -> b,
+ false))
+ .toInput();
+ byte[] nullBitmapEncodings =
sliceInput.readSlice(blockHandle.size()).copyBytes();
+
+ // check crc value
+ crc32c.update(nullBitmapEncodings, 0, nullBitmapEncodings.length);
+ int expectedCrcValue = sliceInput.readInt();
+ Preconditions.checkState(
+ (int) crc32c.getValue() == expectedCrcValue,
+ "Crc check failure during decoding null bitmap.");
+
+ try {
+ nullBitmap.deserialize(nullBitmapEncodings);
+ } catch (IOException ioe) {
+ throw new RuntimeException(
+ "Fail to deserialize null bitmap but crc check passed,"
+ + " this means the ser/de algorithms not match.",
+ ioe);
+ }
+
+ return nullBitmap;
+ }
+
+ private Comparator<MemorySlice> createSliceComparator(KeySerializer
keySerializer) {
+ return (slice1, slice2) ->
+ comparator.compare(
+ keySerializer.deserialize(slice1),
keySerializer.deserialize(slice2));
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ input.close();
+ }
+
+ @Override
+ public GlobalIndexResult visitIsNotNull(FieldRef fieldRef) {
+ // nulls are stored separately in null bitmap.
+ return GlobalIndexResult.create(
+ () -> {
+ try {
+ return fullData();
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree index
file.", ioe);
+ }
+ });
+ }
+
+ @Override
+ public GlobalIndexResult visitIsNull(FieldRef fieldRef) {
+ // nulls are stored separately in null bitmap.
+ return GlobalIndexResult.create(nullBitmap::get);
+ }
+
+ @Override
+ public GlobalIndexResult visitStartsWith(FieldRef fieldRef, Object
literal) {
+ // todo: `startsWith` can also be covered by btree index.
+ return GlobalIndexResult.create(
+ () -> {
+ try {
+ return fullData();
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree index
file.", ioe);
+ }
+ });
+ }
+
+ @Override
+ public GlobalIndexResult visitEndsWith(FieldRef fieldRef, Object literal) {
+ return GlobalIndexResult.create(
+ () -> {
+ try {
+ return fullData();
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree index
file.", ioe);
+ }
+ });
+ }
+
+ @Override
+ public GlobalIndexResult visitContains(FieldRef fieldRef, Object literal) {
+ return GlobalIndexResult.create(
+ () -> {
+ try {
+ return fullData();
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree index
file.", ioe);
+ }
+ });
+ }
+
+ @Override
+ public GlobalIndexResult visitLike(FieldRef fieldRef, Object literal) {
+ return GlobalIndexResult.create(
+ () -> {
+ try {
+ return fullData();
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree index
file.", ioe);
+ }
+ });
+ }
+
+ @Override
+ public GlobalIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
+ return GlobalIndexResult.create(
+ () -> {
+ try {
+ return rangeQuery(minKey, literal, true, false);
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree index
file.", ioe);
+ }
+ });
+ }
+
+ @Override
+ public GlobalIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object
literal) {
+ return GlobalIndexResult.create(
+ () -> {
+ try {
+ return rangeQuery(literal, maxKey, true, true);
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree index
file.", ioe);
+ }
+ });
+ }
+
+ @Override
+ public GlobalIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
+ return GlobalIndexResult.create(
+ () -> {
+ try {
+ RoaringNavigableMap64 result = fullData();
+ result.andNot(rangeQuery(literal, literal, true,
true));
+ return result;
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree index
file.", ioe);
+ }
+ });
+ }
+
+ @Override
+ public GlobalIndexResult visitLessOrEqual(FieldRef fieldRef, Object
literal) {
+ return GlobalIndexResult.create(
+ () -> {
+ try {
+ return rangeQuery(minKey, literal, true, true);
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree index
file.", ioe);
+ }
+ });
+ }
+
+ @Override
+ public GlobalIndexResult visitEqual(FieldRef fieldRef, Object literal) {
+ return GlobalIndexResult.create(
+ () -> {
+ try {
+ return rangeQuery(literal, literal, true, true);
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree index
file.", ioe);
+ }
+ });
+ }
+
+ @Override
+ public GlobalIndexResult visitGreaterThan(FieldRef fieldRef, Object
literal) {
+ return GlobalIndexResult.create(
+ () -> {
+ try {
+ return rangeQuery(literal, maxKey, false, true);
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree index
file.", ioe);
+ }
+ });
+ }
+
+ @Override
+ public GlobalIndexResult visitIn(FieldRef fieldRef, List<Object> literals)
{
+ return GlobalIndexResult.create(
+ () -> {
+ try {
+ RoaringNavigableMap64 result = new
RoaringNavigableMap64();
+ for (Object literal : literals) {
+ result.or(rangeQuery(literal, literal, true,
true));
+ }
+ return result;
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree index
file.", ioe);
+ }
+ });
+ }
+
+ @Override
+ public GlobalIndexResult visitNotIn(FieldRef fieldRef, List<Object>
literals) {
+ return GlobalIndexResult.create(
+ () -> {
+ try {
+ RoaringNavigableMap64 result = fullData();
+ result.andNot(this.visitIn(fieldRef,
literals).results());
+ return result;
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree index
file.", ioe);
+ }
+ });
+ }
+
+ private RoaringNavigableMap64 fullData() throws IOException {
+ return rangeQuery(minKey, maxKey, true, true);
+ }
+
+ /**
+ * Range query on underlying SST File.
+ *
+ * @param from lower bound
+ * @param to upper bound
+ * @param fromInclusive whether include lower bound
+ * @param toInclusive whether include upper bound
+ * @return a bitmap containing all qualified row ids
+ */
+ private RoaringNavigableMap64 rangeQuery(
+ Object from, Object to, boolean fromInclusive, boolean
toInclusive) throws IOException {
+ SstFileReader.SstFileIterator fileIter = reader.createIterator();
+ fileIter.seekTo(keySerializer.serialize(from));
+
+ boolean skipped = false;
+ RoaringNavigableMap64 result = new RoaringNavigableMap64();
+ BlockIterator dataIter;
+ Map.Entry<MemorySlice, MemorySlice> entry;
+ while ((dataIter = fileIter.readBatch()) != null) {
+ while (dataIter.hasNext()) {
+ entry = dataIter.next();
+
+ if (!fromInclusive && !skipped) {
+ // this is correct only if the underlying file do not have
duplicated keys.
+ skipped = true;
+ continue;
+ }
+
+ int difference =
comparator.compare(keySerializer.deserialize(entry.getKey()), to);
+ if (difference > 0 || !toInclusive && difference == 0) {
+ return result;
+ }
+
+ for (long rowId : deserializeRowIds(entry.getValue())) {
+ result.add(rowId);
+ }
+ }
+ }
+ return result;
+ }
+
+ private long[] deserializeRowIds(MemorySlice slice) {
+ MemorySliceInput sliceInput = slice.toInput();
+ int length = sliceInput.readVarLenInt();
+ Preconditions.checkState(length > 0, "Invalid row id length: 0");
+ long[] ids = new long[length];
+ for (int i = 0; i < length; i++) {
+ ids[i] = sliceInput.readVarLenLong();
+ }
+ return ids;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
new file mode 100644
index 0000000000..69b62290dd
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceOutput;
+import org.apache.paimon.sst.BlockHandle;
+import org.apache.paimon.sst.BloomFilterHandle;
+import org.apache.paimon.sst.SstFileWriter;
+import org.apache.paimon.utils.LazyField;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.zip.CRC32;
+
+/**
+ * The {@link GlobalIndexWriter} implementation for BTree index. Note that
users must keep written
+ * keys monotonically incremental. All null keys are stored in a separate
bitmap, which will be
+ * serialized and appended to the file end on close. The layout is as below:
+ *
+ * <pre>
+ * +-----------------------------------+------+
+ * | Footer | |
+ * +-----------------------------------+ |
+ * | Index Block | +--> Loaded on open
+ * +-----------------------------------+ |
+ * | Bloom Filter Block | |
+ * +-----------------------------------+------+
+ * | Null Bitmap Block | |
+ * +-----------------------------------+ |
+ * | Data Block | |
+ * +-----------------------------------+ +--> Loaded on requested
+ * | ...... | |
+ * +-----------------------------------+ |
+ * | Data Block | |
+ * +-----------------------------------+------+
+ * </pre>
+ *
+ * <p>For efficiency, we combine entries with the same keys and store a
compact list of row ids for
+ * each key.
+ */
+public class BTreeIndexWriter implements GlobalIndexWriter {
+ public static final int MAGIC_NUMBER = 198732882;
+
+ private final String fileName;
+ private final PositionOutputStream out;
+
+ private long maxRowId = Long.MIN_VALUE;
+ private long minRowId = Long.MAX_VALUE;
+
+ private final SstFileWriter writer;
+ private final KeySerializer keySerializer;
+ private final Comparator<Object> comparator;
+
+ private Object firstKey = null;
+ private Object lastKey = null;
+ private final List<Long> currentRowIds = new ArrayList<>();
+
+ // for nulls
+ LazyField<RoaringNavigableMap64> nullBitmap = new
LazyField<>(RoaringNavigableMap64::new);
+
+ public BTreeIndexWriter(
+ GlobalIndexFileWriter indexFileWriter,
+ KeySerializer keySerializer,
+ int blockSize,
+ BlockCompressionFactory compressionFactory)
+ throws IOException {
+ this.fileName =
indexFileWriter.newFileName(BTreeGlobalIndexerFactory.IDENTIFIER);
+ this.out = indexFileWriter.newOutputStream(this.fileName);
+ this.keySerializer = keySerializer;
+ this.comparator = keySerializer.createComparator();
+ // todo: we may enable bf to accelerate equal and in predicate in the
future
+ this.writer = new SstFileWriter(out, blockSize, null,
compressionFactory);
+ }
+
+ @Override
+ public void write(@Nullable Object key) {
+ throw new UnsupportedOperationException(
+ "BTree index writer should explicitly specify row id for each
key");
+ }
+
+ @Override
+ public void write(@Nullable Object key, long rowId) {
+ if (key == null) {
+ nullBitmap.get().add(rowId);
+ return;
+ }
+
+ if (lastKey != null && comparator.compare(key, lastKey) != 0) {
+ try {
+ flush();
+ } catch (IOException e) {
+ throw new RuntimeException("Error in writing btree index
files.", e);
+ }
+ }
+ lastKey = key;
+ currentRowIds.add(rowId);
+
+ // update stats
+ if (firstKey == null) {
+ firstKey = key;
+ }
+ minRowId = Math.min(minRowId, rowId);
+ maxRowId = Math.max(maxRowId, rowId);
+ }
+
+ private void flush() throws IOException {
+ if (currentRowIds.isEmpty()) {
+ return;
+ }
+
+ // serialize row id list
+ MemorySliceOutput sliceOutput = new
MemorySliceOutput(currentRowIds.size() * 9 + 5);
+ sliceOutput.writeVarLenInt(currentRowIds.size());
+ for (long currentRowId : currentRowIds) {
+ sliceOutput.writeVarLenLong(currentRowId);
+ }
+ currentRowIds.clear();
+
+ writer.put(keySerializer.serialize(lastKey),
sliceOutput.toSlice().copyBytes());
+ }
+
+ @Override
+ public List<ResultEntry> finish() {
+ try {
+ // write remaining row ids
+ flush();
+
+ // flush writer remaining data blocks
+ writer.flush();
+
+ // write null bitmap
+ BlockHandle nullBitmapHandle = writeNullBitmap();
+
+ // write bloom filter (currently is always null, but we could add
it for equal
+ // and in condition.)
+ BloomFilterHandle bloomFilterHandle = writer.writeBloomFilter();
+
+ // write index block
+ BlockHandle indexBlockHandle = writer.writeIndexBlock();
+
+ // write footer
+ BTreeFileFooter footer =
+ new BTreeFileFooter(bloomFilterHandle, indexBlockHandle,
nullBitmapHandle);
+ MemorySlice footerEncoding = BTreeFileFooter.writeFooter(footer);
+ writer.writeSlice(footerEncoding);
+
+ out.close();
+ } catch (IOException e) {
+ throw new RuntimeException("Error in closing BTree index writer",
e);
+ }
+
+ if (firstKey == null) {
+ throw new RuntimeException("Should never write an empty btree
index file.");
+ }
+
+ return Collections.singletonList(
+ ResultEntry.of(
+ fileName,
+ new BTreeIndexMeta(
+ keySerializer.serialize(firstKey),
+ keySerializer.serialize(lastKey),
+ nullBitmap.initialized())
+ .serialize(),
+ new Range(minRowId, maxRowId)));
+ }
+
+ @Nullable
+ private BlockHandle writeNullBitmap() throws IOException {
+ if (!nullBitmap.initialized()) {
+ return null;
+ }
+
+ CRC32 crc32 = new CRC32();
+ byte[] serializedBitmap = nullBitmap.get().serialize();
+ int length = serializedBitmap.length;
+ crc32.update(serializedBitmap, 0, length);
+
+ // serialized bitmap + crc value
+ MemorySliceOutput sliceOutput = new MemorySliceOutput(length + 4);
+ sliceOutput.writeBytes(serializedBitmap);
+ sliceOutput.writeInt((int) crc32.getValue());
+
+ BlockHandle nullBitmapHandle = new BlockHandle(out.getPos(), length);
+ writer.writeSlice(sliceOutput.toSlice());
+
+ return nullBitmapHandle;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/KeySerializer.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/KeySerializer.java
new file mode 100644
index 0000000000..9971fd2751
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/KeySerializer.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+import org.apache.paimon.memory.MemorySliceOutput;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeDefaultVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+
+import java.util.Comparator;
+
+/** This interface provides core methods to ser/de and compare btree index
keys. */
+public interface KeySerializer {
+
+ byte[] serialize(Object key);
+
+ Object deserialize(MemorySlice data);
+
+ Comparator<Object> createComparator();
+
+ static KeySerializer create(DataType type) {
+ return type.accept(
+ new DataTypeDefaultVisitor<KeySerializer>() {
+ @Override
+ public KeySerializer defaultMethod(DataType dataType) {
+ throw new UnsupportedOperationException(
+ "DataType: " + dataType + " is not supported
by btree index now.");
+ }
+
+ @Override
+ public KeySerializer visit(CharType charType) {
+ return new StringSerializer();
+ }
+
+ @Override
+ public KeySerializer visit(VarCharType varCharType) {
+ return new StringSerializer();
+ }
+
+ @Override
+ public KeySerializer visit(TinyIntType tinyIntType) {
+ return new TinyIntSerializer();
+ }
+
+ @Override
+ public KeySerializer visit(SmallIntType smallIntType) {
+ return new SmallIntSerializer();
+ }
+
+ @Override
+ public KeySerializer visit(IntType intType) {
+ return new IntSerializer();
+ }
+
+ @Override
+ public KeySerializer visit(BigIntType bigIntType) {
+ return new BigIntSerializer();
+ }
+
+ @Override
+ public KeySerializer visit(BooleanType booleanType) {
+ return new BooleanSerializer();
+ }
+
+ @Override
+ public KeySerializer visit(DecimalType decimalType) {
+ return new DecimalSerializer(
+ decimalType.getPrecision(),
decimalType.getScale());
+ }
+
+ @Override
+ public KeySerializer visit(FloatType floatType) {
+ return new FloatSerializer();
+ }
+
+ @Override
+ public KeySerializer visit(DoubleType doubleType) {
+ return new DoubleSerializer();
+ }
+
+ @Override
+ public KeySerializer visit(DateType dateType) {
+ return new IntSerializer();
+ }
+
+ @Override
+ public KeySerializer visit(TimeType timeType) {
+ return new IntSerializer();
+ }
+
+ @Override
+ public KeySerializer visit(TimestampType timestampType) {
+ return new
TimestampSerializer(timestampType.getPrecision());
+ }
+
+ @Override
+ public KeySerializer visit(LocalZonedTimestampType
localZonedTimestampType) {
+ return new
TimestampSerializer(localZonedTimestampType.getPrecision());
+ }
+ });
+ }
+
+ /** Serializer for int type. */
+ class IntSerializer implements KeySerializer {
+ private final MemorySliceOutput keyOut = new MemorySliceOutput(4);
+
+ @Override
+ public byte[] serialize(Object key) {
+ keyOut.reset();
+ keyOut.writeInt((Integer) key);
+ return keyOut.toSlice().copyBytes();
+ }
+
+ @Override
+ public Object deserialize(MemorySlice data) {
+ return data.readInt(0);
+ }
+
+ @Override
+ public Comparator<Object> createComparator() {
+ return Comparator.comparing(o -> (Integer) o);
+ }
+ }
+
+ /** Serializer for long type. */
+ class BigIntSerializer implements KeySerializer {
+ private final MemorySliceOutput keyOut = new MemorySliceOutput(8);
+
+ @Override
+ public byte[] serialize(Object key) {
+ keyOut.reset();
+ keyOut.writeLong((Long) key);
+ return keyOut.toSlice().copyBytes();
+ }
+
+ @Override
+ public Object deserialize(MemorySlice data) {
+ return data.readLong(0);
+ }
+
+ @Override
+ public Comparator<Object> createComparator() {
+ return Comparator.comparing(o -> (Long) o);
+ }
+ }
+
+ /** Serializer for tiny int type. */
+ class TinyIntSerializer implements KeySerializer {
+
+ @Override
+ public byte[] serialize(Object key) {
+ return new byte[] {(byte) key};
+ }
+
+ @Override
+ public Object deserialize(MemorySlice data) {
+ return data.readByte(0);
+ }
+
+ @Override
+ public Comparator<Object> createComparator() {
+ return Comparator.comparing(o -> (Byte) o);
+ }
+ }
+
+ /** Serializer for small int type. */
+ class SmallIntSerializer implements KeySerializer {
+ private final MemorySliceOutput keyOut = new MemorySliceOutput(2);
+
+ @Override
+ public byte[] serialize(Object key) {
+ keyOut.reset();
+ keyOut.writeShort((Short) key);
+ return keyOut.toSlice().copyBytes();
+ }
+
+ @Override
+ public Object deserialize(MemorySlice data) {
+ return data.readShort(0);
+ }
+
+ @Override
+ public Comparator<Object> createComparator() {
+ return Comparator.comparing(o -> (Short) o);
+ }
+ }
+
+ /** Serializer for boolean type. */
+ class BooleanSerializer implements KeySerializer {
+
+ @Override
+ public byte[] serialize(Object key) {
+ return new byte[] {(Boolean) key ? (byte) 1 : (byte) 0};
+ }
+
+ @Override
+ public Object deserialize(MemorySlice data) {
+ return data.readByte(0) == (byte) 1 ? Boolean.TRUE : Boolean.FALSE;
+ }
+
+ @Override
+ public Comparator<Object> createComparator() {
+ return Comparator.comparing(o -> (Boolean) o);
+ }
+ }
+
+ /** Serializer for float type. */
+ class FloatSerializer implements KeySerializer {
+ private final MemorySliceOutput keyOut = new MemorySliceOutput(4);
+
+ @Override
+ public byte[] serialize(Object key) {
+ keyOut.reset();
+ keyOut.writeInt(Float.floatToIntBits((Float) key));
+ return keyOut.toSlice().copyBytes();
+ }
+
+ @Override
+ public Object deserialize(MemorySlice data) {
+ return Float.intBitsToFloat(data.readInt(0));
+ }
+
+ @Override
+ public Comparator<Object> createComparator() {
+ return Comparator.comparing(o -> (Float) o);
+ }
+ }
+
+ /** Serializer for double type. */
+ class DoubleSerializer implements KeySerializer {
+ private final MemorySliceOutput keyOut = new MemorySliceOutput(8);
+
+ @Override
+ public byte[] serialize(Object key) {
+ keyOut.reset();
+ keyOut.writeLong(Double.doubleToLongBits((Double) key));
+ return keyOut.toSlice().copyBytes();
+ }
+
+ @Override
+ public Object deserialize(MemorySlice data) {
+ return Double.longBitsToDouble(data.readLong(0));
+ }
+
+ @Override
+ public Comparator<Object> createComparator() {
+ return Comparator.comparing(o -> (Double) o);
+ }
+ }
+
+ /** Serializer for decimal type. */
+ class DecimalSerializer implements KeySerializer {
+ private final MemorySliceOutput keyOut = new MemorySliceOutput(8);
+ private final int precision;
+ private final int scale;
+
+ public DecimalSerializer(int precision, int scale) {
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ @Override
+ public byte[] serialize(Object key) {
+ if (Decimal.isCompact(precision)) {
+ keyOut.reset();
+ keyOut.writeLong(((Decimal) key).toUnscaledLong());
+ return keyOut.toSlice().copyBytes();
+ }
+ return ((Decimal) key).toUnscaledBytes();
+ }
+
+ @Override
+ public Object deserialize(MemorySlice data) {
+ if (Decimal.isCompact(precision)) {
+ return Decimal.fromUnscaledLong(data.readLong(0), precision,
scale);
+ }
+ return Decimal.fromUnscaledBytes(data.copyBytes(), precision,
scale);
+ }
+
+ @Override
+ public Comparator<Object> createComparator() {
+ return Comparator.comparing(o -> (Decimal) o);
+ }
+ }
+
+ /** Serializer for STRING type. */
+ class StringSerializer implements KeySerializer {
+
+ @Override
+ public byte[] serialize(Object key) {
+ return ((BinaryString) key).toBytes();
+ }
+
+ @Override
+ public Object deserialize(MemorySlice data) {
+ return BinaryString.fromBytes(data.copyBytes());
+ }
+
+ @Override
+ public Comparator<Object> createComparator() {
+ return Comparator.comparing(o -> (BinaryString) o);
+ }
+ }
+
+ /** Serializer for timestamp. */
+ class TimestampSerializer implements KeySerializer {
+ private final MemorySliceOutput keyOut = new MemorySliceOutput(8);
+ private final int precision;
+
+ public TimestampSerializer(int precision) {
+ this.precision = precision;
+ }
+
+ @Override
+ public byte[] serialize(Object key) {
+ keyOut.reset();
+ if (Timestamp.isCompact(precision)) {
+ keyOut.writeLong(((Timestamp) key).getMillisecond());
+ } else {
+ keyOut.writeLong(((Timestamp) key).getMillisecond());
+ keyOut.writeVarLenInt(((Timestamp)
key).getNanoOfMillisecond());
+ }
+ return keyOut.toSlice().copyBytes();
+ }
+
+ @Override
+ public Object deserialize(MemorySlice data) {
+ if (Timestamp.isCompact(precision)) {
+ return Timestamp.fromEpochMillis(data.readLong(0));
+ }
+ MemorySliceInput input = data.toInput();
+ long millis = input.readLong();
+ int nanos = input.readVarLenInt();
+ return Timestamp.fromEpochMillis(millis, nanos);
+ }
+
+ @Override
+ public Comparator<Object> createComparator() {
+ return Comparator.comparing(o -> (Timestamp) o);
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
index 7c88057fb4..be0a2c18b9 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileReader.java
@@ -18,6 +18,7 @@
package org.apache.paimon.globalindex.io;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.SeekableInputStream;
import java.io.IOException;
@@ -26,4 +27,6 @@ import java.io.IOException;
public interface GlobalIndexFileReader {
SeekableInputStream getInputStream(String fileName) throws IOException;
+
+ Path filePath(String fileName);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
index 850cbb9451..cbc0dd4ea2 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/io/GlobalIndexFileWriter.java
@@ -18,13 +18,14 @@
package org.apache.paimon.globalindex.io;
+import org.apache.paimon.fs.PositionOutputStream;
+
import java.io.IOException;
-import java.io.OutputStream;
/** Global index writer io operators. */
public interface GlobalIndexFileWriter {
String newFileName(String prefix);
- OutputStream newOutputStream(String fileName) throws IOException;
+ PositionOutputStream newOutputStream(String fileName) throws IOException;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
index 9f4272096c..5239bb133e 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFooter.java
@@ -26,7 +26,7 @@ import org.apache.paimon.sst.BloomFilterHandle;
import javax.annotation.Nullable;
-import static org.apache.paimon.sst.SstFileWriter.MAGIC_NUMBER;
+import static org.apache.paimon.lookup.sort.SortLookupStoreWriter.MAGIC_NUMBER;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Footer for a sorted file. */
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
index e5fab5fc5c..c652c90e82 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
@@ -52,6 +52,7 @@ import java.io.IOException;
* </pre>
*/
public class SortLookupStoreWriter implements LookupStoreWriter {
+ public static final int MAGIC_NUMBER = 1481571681;
private final SstFileWriter writer;
private final PositionOutputStream out;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java
index 589e967d40..625ff565da 100644
--- a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java
+++ b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java
@@ -61,6 +61,10 @@ public final class MemorySlice implements
Comparable<MemorySlice> {
return segment.getInt(offset + position);
}
+ public short readShort(int position) {
+ return segment.getShort(offset + position);
+ }
+
public long readLong(int position) {
return segment.getLong(offset + position);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java
index 90e8e54900..7254f590b5 100644
---
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java
+++
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java
@@ -51,6 +51,12 @@ public class MemorySliceOutput {
size += 4;
}
+ public void writeShort(int value) {
+ ensureSize(size + 2);
+ segment.putShort(size, (short) value);
+ size += 2;
+ }
+
public void writeVarLenInt(int value) {
if (value < 0) {
throw new IllegalArgumentException("negative value: v=" + value);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
index 1f1405671b..613ffd80de 100644
--- a/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/sst/SstFileWriter.java
@@ -48,8 +48,6 @@ public class SstFileWriter {
private static final Logger LOG =
LoggerFactory.getLogger(SstFileWriter.class.getName());
- public static final int MAGIC_NUMBER = 1481571681;
-
private final PositionOutputStream out;
private final int blockSize;
private final BlockWriter dataBlockWriter;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
index e61486ecc4..f07b7c6afa 100644
---
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringNavigableMap64.java
@@ -65,6 +65,10 @@ public class RoaringNavigableMap64 implements Iterable<Long>
{
roaring64NavigableMap.and(other.roaring64NavigableMap);
}
+ public void andNot(RoaringNavigableMap64 other) {
+ roaring64NavigableMap.andNot(other.roaring64NavigableMap);
+ }
+
public boolean isEmpty() {
return roaring64NavigableMap.isEmpty();
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
index 43671537ec..7edd6173da 100644
---
a/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
@@ -22,6 +22,8 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.fileindex.bitmap.BitmapFileIndex;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexReader;
@@ -44,7 +46,6 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
-import java.io.OutputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
@@ -207,7 +208,8 @@ public class BitmapGlobalIndexTest {
}
@Override
- public OutputStream newOutputStream(String fileName)
throws IOException {
+ public PositionOutputStream newOutputStream(String
fileName)
+ throws IOException {
return fileIO.newOutputStream(new
Path(tempDir.toString(), fileName), true);
}
};
@@ -218,7 +220,17 @@ public class BitmapGlobalIndexTest {
long fileSize = fileIO.getFileSize(path);
GlobalIndexFileReader fileReader =
- prefix -> fileIO.newInputStream(new Path(tempDir.toString(),
prefix));
+ new GlobalIndexFileReader() {
+ @Override
+ public SeekableInputStream getInputStream(String fileName)
throws IOException {
+ return fileIO.newInputStream(new
Path(tempDir.toString(), fileName));
+ }
+
+ @Override
+ public Path filePath(String fileName) {
+ return new Path(tempDir.toString(), fileName);
+ }
+ };
GlobalIndexIOMeta globalIndexMeta =
new GlobalIndexIOMeta(fileName, fileSize, Long.MAX_VALUE,
null);
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelectorTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelectorTest.java
new file mode 100644
index 0000000000..3015ffa715
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelectorTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.memory.MemorySliceOutput;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.IntType;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Test class for {@link BTreeFileMetaSelector}. */
+public class BTreeFileMetaSelectorTest {
+ private List<GlobalIndexIOMeta> files;
+
+ @BeforeEach
+ public void setUp() {
+ MemorySliceOutput sliceOutput = new MemorySliceOutput(4);
+
+ BTreeIndexMeta meta1 =
+ new BTreeIndexMeta(writeInt(1, sliceOutput), writeInt(10,
sliceOutput), true);
+ BTreeIndexMeta meta2 =
+ new BTreeIndexMeta(writeInt(15, sliceOutput), writeInt(20,
sliceOutput), false);
+ BTreeIndexMeta meta3 =
+ new BTreeIndexMeta(writeInt(21, sliceOutput), writeInt(30,
sliceOutput), true);
+ BTreeIndexMeta meta4 =
+ new BTreeIndexMeta(writeInt(1, sliceOutput), writeInt(5,
sliceOutput), false);
+ BTreeIndexMeta meta5 =
+ new BTreeIndexMeta(writeInt(19, sliceOutput), writeInt(25,
sliceOutput), true);
+
+ files =
+ Arrays.asList(
+ new GlobalIndexIOMeta("file1", 1, 1L,
meta1.serialize()),
+ new GlobalIndexIOMeta("file2", 1, 1L,
meta2.serialize()),
+ new GlobalIndexIOMeta("file3", 1, 1L,
meta3.serialize()),
+ new GlobalIndexIOMeta("file4", 1, 1L,
meta4.serialize()),
+ new GlobalIndexIOMeta("file5", 1, 1L,
meta5.serialize()));
+ }
+
+ @Test
+ public void testMetaSelector() {
+ DataType dataType = new IntType();
+ FieldRef ref = new FieldRef(1, "testField", dataType);
+ KeySerializer serializer = KeySerializer.create(dataType);
+ BTreeFileMetaSelector selector = new BTreeFileMetaSelector(files,
serializer);
+
+ Optional<List<GlobalIndexIOMeta>> result;
+
+ // 1. test range queries
+ result = selector.visitLessThan(ref, 8);
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file1", "file4"));
+
+ result = selector.visitLessOrEqual(ref, 20);
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file1", "file2", "file4",
"file5"));
+
+ result = selector.visitGreaterThan(ref, 20);
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file3", "file5"));
+
+ result = selector.visitGreaterOrEqual(ref, 5);
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file1", "file2", "file3",
"file4", "file5"));
+
+ result = selector.visitEqual(ref, 22);
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file3", "file5"));
+
+ result = selector.visitNotEqual(ref, 22);
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file1", "file2", "file3",
"file4", "file5"));
+
+ // 1.1 test range boundaries
+ result = selector.visitLessThan(ref, 15);
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file1", "file4"));
+
+ result = selector.visitLessOrEqual(ref, 15);
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file1", "file2", "file4"));
+
+ result = selector.visitGreaterThan(ref, 20);
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file3", "file5"));
+
+ result = selector.visitGreaterOrEqual(ref, 20);
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file2", "file3", "file5"));
+
+ result = selector.visitEqual(ref, 30);
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file3"));
+
+ // 1.2 test out of boundaries
+ result = selector.visitLessThan(ref, 1);
+ Assertions.assertThat(result).isNotEmpty();
+ Assertions.assertThat(result.get()).isEmpty();
+
+ result = selector.visitGreaterThan(ref, 30);
+ Assertions.assertThat(result).isNotEmpty();
+ Assertions.assertThat(result.get()).isEmpty();
+
+ // 2. test isNull & isNotNull
+ result = selector.visitIsNull(ref);
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file1", "file3", "file5"));
+
+ result = selector.visitIsNotNull(ref);
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file1", "file2", "file3",
"file4", "file5"));
+
+ // 3. test in
+ result = selector.visitIn(ref, Arrays.asList(1, 2, 3, 26, 27, 28));
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file1", "file4", "file3"));
+
+ result = selector.visitNotIn(ref, Arrays.asList(1, 7, 19, 30, 31));
+ Assertions.assertThat(result).isNotEmpty();
+ assertFiles(result.get(), Arrays.asList("file1", "file2", "file3",
"file4", "file5"));
+ }
+
+ private void assertFiles(List<GlobalIndexIOMeta> files, List<String>
expected) {
+ Assertions.assertThat(
+ files.stream()
+ .map(GlobalIndexIOMeta::fileName)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ private byte[] writeInt(int value, MemorySliceOutput sliceOutput) {
+ sliceOutput.reset();
+ sliceOutput.writeInt(value);
+ return sliceOutput.toSlice().copyBytes();
+ }
+}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
new file mode 100644
index 0000000000..8cf5a9d840
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.LocalZoneTimestamp;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import
org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension;
+import org.apache.paimon.testutils.junit.parameterized.Parameters;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeDefaultVisitor;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.DecimalUtils;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/** Test for {@link BTreeGlobalIndexer}. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class BTreeGlobalIndexerTest {
+ private DataType dataType;
+ private int dataNum;
+ private List<Pair<Object, Long>> data;
+ private KeySerializer keySerializer;
+ private Comparator<Object> comparator;
+ private FileIO fileIO;
+ private GlobalIndexFileReader fileReader;
+ private GlobalIndexFileWriter fileWriter;
+ private BTreeGlobalIndexer globalIndexer;
+ private Options options;
+
+ @TempDir java.nio.file.Path tempPath;
+
+ public BTreeGlobalIndexerTest(List<Object> args) {
+ this.dataType = (DataType) args.get(0);
+ this.dataNum = (Integer) args.get(1);
+ }
+
+ @SuppressWarnings("unused")
+ @Parameters(name = "dataType&recordNum-{0}")
+ public static List<List<Object>> getVarSeg() {
+ return Arrays.asList(
+ Arrays.asList(new IntType(), 10000),
+ Arrays.asList(new VarCharType(VarCharType.MAX_LENGTH), 10000),
+ Arrays.asList(new CharType(100), 10000),
+ Arrays.asList(new FloatType(), 10000),
+ Arrays.asList(new DecimalType(), 10000),
+ Arrays.asList(new DoubleType(), 10000),
+ Arrays.asList(new BooleanType(), 10000),
+ Arrays.asList(new TinyIntType(), 10000),
+ Arrays.asList(new SmallIntType(), 10000),
+ Arrays.asList(new BigIntType(), 10000),
+ Arrays.asList(new DateType(), 10000),
+ Arrays.asList(new TimestampType(), 10000));
+ }
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ fileIO = LocalFileIO.create();
+ fileWriter =
+ new GlobalIndexFileWriter() {
+ @Override
+ public String newFileName(String prefix) {
+ return "test-btree-" + prefix;
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(String
fileName)
+ throws IOException {
+ return fileIO.newOutputStream(
+ new Path(new Path(tempPath.toUri()),
fileName), true);
+ }
+ };
+ fileReader =
+ new GlobalIndexFileReader() {
+ @Override
+ public SeekableInputStream getInputStream(String fileName)
throws IOException {
+ return fileIO.newInputStream(
+ new Path(new Path(tempPath.toUri()),
fileName));
+ }
+
+ @Override
+ public Path filePath(String fileName) {
+ return new Path(new Path(tempPath.toUri()), fileName);
+ }
+ };
+ options = new Options();
+ options.set(BTreeIndexOptions.BTREE_INDEX_CACHE_SIZE,
MemorySize.ofMebiBytes(8));
+ globalIndexer = new BTreeGlobalIndexer(new DataField(1, "testField",
dataType), options);
+ keySerializer = KeySerializer.create(dataType);
+ comparator = keySerializer.createComparator();
+
+ // generate data and sort by key
+ data = new ArrayList<>(dataNum);
+ DataGenerator dataGenerator = new DataGenerator();
+ for (int i = 0; i < dataNum; i++) {
+ data.add(Pair.of(dataType.accept(dataGenerator), (long) i));
+ }
+ data.sort((p1, p2) -> comparator.compare(p1.getKey(), p2.getKey()));
+ }
+
+ @TestTemplate
+ public void testRangePredicate() throws Exception {
+ GlobalIndexIOMeta written = writeData();
+ FieldRef ref = new FieldRef(1, "testField", dataType);
+
+ try (GlobalIndexReader reader =
+ globalIndexer.createReader(fileReader,
Collections.singletonList(written))) {
+ GlobalIndexResult result;
+ Random random = new Random();
+
+ for (int i = 0; i < 5; i++) {
+ Object literal = data.get(random.nextInt(dataNum)).getKey();
+
+ // 1. test <= literal
+ result = reader.visitLessOrEqual(ref, literal);
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) <= 0));
+
+ // 2. test < literal
+ result = reader.visitLessThan(ref, literal);
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) < 0));
+
+ // 3. test >= literal
+ result = reader.visitGreaterOrEqual(ref, literal);
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) >= 0));
+
+ // 4. test > literal
+ result = reader.visitGreaterThan(ref, literal);
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) > 0));
+
+ // 5. test equal
+ result = reader.visitEqual(ref, literal);
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) == 0));
+
+ // 6. test not equal
+ result = reader.visitNotEqual(ref, literal);
+ assertResult(result, filter(obj -> comparator.compare(obj,
literal) != 0));
+ }
+
+ // 7. test < min
+ Object literal7 = data.get(0).getKey();
+ result = reader.visitLessThan(ref, literal7);
+ Assertions.assertTrue(result.results().isEmpty());
+
+ // 8. test > max
+ Object literal8 = data.get(dataNum - 1).getKey();
+ result = reader.visitGreaterThan(ref, literal8);
+ Assertions.assertTrue(result.results().isEmpty());
+ }
+ }
+
+ @TestTemplate
+ public void testIsNull() throws Exception {
+ // set nulls
+ for (int i = dataNum - 1; i >= dataNum * 0.9; i--) {
+ data.get(i).setLeft(null);
+ }
+ GlobalIndexIOMeta written = writeData();
+ FieldRef ref = new FieldRef(1, "testField", dataType);
+
+ try (GlobalIndexReader reader =
+ globalIndexer.createReader(fileReader,
Collections.singletonList(written))) {
+ GlobalIndexResult result;
+
+ result = reader.visitIsNull(ref);
+ assertResult(result, filter(Objects::isNull));
+
+ result = reader.visitIsNotNull(ref);
+ assertResult(result, filter(Objects::nonNull));
+ }
+ }
+
+ @TestTemplate
+ public void testInPredicate() throws Exception {
+ GlobalIndexIOMeta written = writeData();
+ FieldRef ref = new FieldRef(1, "testField", dataType);
+
+ try (GlobalIndexReader reader =
+ globalIndexer.createReader(fileReader,
Collections.singletonList(written))) {
+ GlobalIndexResult result;
+ for (int i = 0; i < 10; i++) {
+ Random random = new Random(System.currentTimeMillis());
+ List<Object> literals =
+
data.stream().map(Pair::getKey).collect(Collectors.toList());
+ Collections.shuffle(literals, random);
+ literals = literals.subList(0, (int) (dataNum * 0.1));
+
+ TreeSet<Object> set = new TreeSet<>(comparator);
+ set.addAll(literals);
+
+ // 1. test in
+ result = reader.visitIn(ref, literals);
+ assertResult(result, filter(set::contains));
+
+ // 2. test not in
+ result = reader.visitNotIn(ref, literals);
+ assertResult(result, filter(obj -> !set.contains(obj)));
+ }
+ }
+ }
+
+ private GlobalIndexIOMeta writeData() throws IOException {
+ GlobalIndexWriter indexWriter = globalIndexer.createWriter(fileWriter);
+ for (Pair<Object, Long> pair : data) {
+ indexWriter.write(pair.getKey(), pair.getValue());
+ }
+ List<GlobalIndexWriter.ResultEntry> results = indexWriter.finish();
+ Assertions.assertEquals(1, results.size());
+
+ GlobalIndexWriter.ResultEntry resultEntry = results.get(0);
+ String fileName = resultEntry.fileName();
+ return new GlobalIndexIOMeta(
+ fileName,
+ fileIO.getFileSize(new Path(new Path(tempPath.toUri()),
fileName)),
+ resultEntry.rowRange().to - resultEntry.rowRange().from,
+ resultEntry.meta());
+ }
+
+ private List<Long> filter(Predicate<Object> filter) {
+ return data.stream()
+ .filter(pair -> filter.test(pair.getKey()))
+ .map(Pair::getValue)
+ .collect(Collectors.toList());
+ }
+
+ private void assertResult(GlobalIndexResult indexResult, List<Long>
expected) {
+ Iterator<Long> iter = indexResult.results().iterator();
+ List<Long> result = new ArrayList<>();
+ while (iter.hasNext()) {
+ result.add(iter.next());
+ }
+ org.assertj.core.api.Assertions.assertThat(result)
+ .containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ /** The Generator to generate test data. */
+ private static class DataGenerator extends DataTypeDefaultVisitor<Object> {
+ Random random = new Random();
+
+ @Override
+ public Object visit(CharType charType) {
+ return BinaryString.fromString("random char " + random.nextInt());
+ }
+
+ @Override
+ public Object visit(VarCharType varCharType) {
+ return BinaryString.fromString("random varchar " +
random.nextInt());
+ }
+
+ @Override
+ public Object visit(BooleanType booleanType) {
+ return random.nextBoolean();
+ }
+
+ @Override
+ public Object visit(DecimalType decimalType) {
+ return DecimalUtils.castFrom(
+ random.nextDouble(), decimalType.getPrecision(),
decimalType.getScale());
+ }
+
+ @Override
+ public Object visit(TinyIntType tinyIntType) {
+ return (byte) random.nextInt();
+ }
+
+ @Override
+ public Object visit(SmallIntType smallIntType) {
+ return (short) random.nextInt();
+ }
+
+ @Override
+ public Object visit(IntType intType) {
+ return random.nextInt();
+ }
+
+ @Override
+ public Object visit(BigIntType bigIntType) {
+ return random.nextLong();
+ }
+
+ @Override
+ public Object visit(FloatType floatType) {
+ return random.nextFloat();
+ }
+
+ @Override
+ public Object visit(DoubleType doubleType) {
+ return random.nextDouble();
+ }
+
+ @Override
+ public Object visit(DateType dateType) {
+ return random.nextInt(Integer.MAX_VALUE);
+ }
+
+ @Override
+ public Object visit(TimeType timeType) {
+ return random.nextInt(Integer.MAX_VALUE);
+ }
+
+ @Override
+ public Object visit(TimestampType timestampType) {
+ return Timestamp.fromMicros(random.nextLong());
+ }
+
+ @Override
+ public Object visit(LocalZonedTimestampType localZonedTimestampType) {
+ return LocalZoneTimestamp.fromMicros(random.nextLong());
+ }
+
+ @Override
+ protected Object defaultMethod(DataType dataType) {
+ throw new UnsupportedOperationException(
+ "Btree index do not support " + dataType + " type.");
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
index c48a729950..80c6a41ffc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
@@ -20,13 +20,13 @@ package org.apache.paimon.globalindex;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
import org.apache.paimon.index.IndexPathFactory;
import java.io.IOException;
-import java.io.OutputStream;
import java.util.UUID;
/** Helper class for managing global index files. */
@@ -44,6 +44,7 @@ public class GlobalIndexFileReadWrite implements
GlobalIndexFileReader, GlobalIn
return prefix + "-" + "global-index-" + UUID.randomUUID() + ".index";
}
+ @Override
public Path filePath(String fileName) {
return indexPathFactory.toPath(fileName);
}
@@ -52,7 +53,7 @@ public class GlobalIndexFileReadWrite implements
GlobalIndexFileReader, GlobalIn
return fileIO.getFileSize(filePath(fileName));
}
- public OutputStream newOutputStream(String fileName) throws IOException {
+ public PositionOutputStream newOutputStream(String fileName) throws
IOException {
return fileIO.newOutputStream(indexPathFactory.toPath(fileName), true);
}
diff --git
a/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexScanTest.java
b/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexScanTest.java
index 93d9f4ffd8..a608df5baa 100644
---
a/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexScanTest.java
+++
b/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexScanTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.globalindex.GlobalIndexWriter;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
@@ -53,7 +54,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
-import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -157,7 +157,8 @@ public class LuceneVectorGlobalIndexScanTest {
}
@Override
- public OutputStream newOutputStream(String fileName)
throws IOException {
+ public PositionOutputStream newOutputStream(String
fileName)
+ throws IOException {
return fileIO.newOutputStream(new Path(indexDir,
fileName), false);
}
};
diff --git
a/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexTest.java
b/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexTest.java
index 20241facdd..d4ccc0eb2e 100644
---
a/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexTest.java
+++
b/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexTest.java
@@ -20,6 +20,8 @@ package org.apache.paimon.lucene.index;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexResult;
@@ -40,7 +42,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
-import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -82,14 +83,24 @@ public class LuceneVectorGlobalIndexTest {
}
@Override
- public OutputStream newOutputStream(String fileName) throws
IOException {
+ public PositionOutputStream newOutputStream(String fileName)
throws IOException {
return fileIO.newOutputStream(new Path(path, fileName), false);
}
};
}
private GlobalIndexFileReader createFileReader(Path path) {
- return fileName -> fileIO.newInputStream(new Path(path, fileName));
+ return new GlobalIndexFileReader() {
+ @Override
+ public SeekableInputStream getInputStream(String fileName) throws
IOException {
+ return fileIO.newInputStream(new Path(path, fileName));
+ }
+
+ @Override
+ public Path filePath(String fileName) {
+ return new Path(path, fileName);
+ }
+ };
}
@Test