This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 732f4ee35e [core] Refactor Global index writer and reader for Btree
732f4ee35e is described below
commit 732f4ee35e12dc30f2706e70fbd48ddac006d1db
Author: JingsongLi <[email protected]>
AuthorDate: Fri Dec 26 23:38:00 2025 +0800
[core] Refactor Global index writer and reader for Btree
---
.../paimon/globalindex/GlobalIndexEvaluator.java | 21 +-
.../paimon/globalindex/GlobalIndexIOMeta.java | 11 +-
.../globalindex/GlobalIndexParallelWriter.java | 26 ++
.../paimon/globalindex/GlobalIndexReader.java | 19 +-
.../globalindex/GlobalIndexSingletonWriter.java | 26 ++
.../paimon/globalindex/GlobalIndexWriter.java | 41 +---
.../globalindex/OffsetGlobalIndexReader.java | 38 ++-
.../org/apache/paimon/globalindex/ResultEntry.java | 47 ++++
.../paimon/globalindex/UnionGlobalIndexReader.java | 47 ++--
.../globalindex/bitmap/BitmapGlobalIndex.java | 18 +-
.../globalindex/btree/BTreeGlobalIndexer.java | 14 +-
.../paimon/globalindex/btree/BTreeIndexReader.java | 273 +++++++++++----------
.../paimon/globalindex/btree/BTreeIndexWriter.java | 49 ++--
.../globalindex/wrap/FileIndexReaderWrapper.java | 33 +--
.../globalindex/wrap/FileIndexWriterWrapper.java | 9 +-
.../org/apache/paimon/predicate/VectorSearch.java | 3 +-
.../bitmapindex/BitmapGlobalIndexTest.java | 69 ++++--
.../btree/BTreeFileMetaSelectorTest.java | 10 +-
.../globalindex/btree/BTreeGlobalIndexerTest.java | 39 +--
.../globalindex/RowRangeGlobalIndexScanner.java | 11 +-
.../paimon/table/DataEvolutionTableTest.java | 16 +-
.../index/LuceneVectorGlobalIndexReader.java | 66 +++--
.../index/LuceneVectorGlobalIndexWriter.java | 10 +-
.../index/LuceneVectorGlobalIndexScanTest.java | 15 +-
.../lucene/index/LuceneVectorGlobalIndexTest.java | 44 ++--
.../spark/globalindex/GlobalIndexBuilder.java | 38 ++-
26 files changed, 549 insertions(+), 444 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
index 694480882d..8f17c85503 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
@@ -67,13 +67,17 @@ public class GlobalIndexEvaluator
vectorSearch =
vectorSearch.withIncludeRowIds(compoundResult.get().results());
}
for (GlobalIndexReader fileIndexReader : readers) {
- GlobalIndexResult childResult =
vectorSearch.visit(fileIndexReader);
+ Optional<GlobalIndexResult> childResult =
vectorSearch.visit(fileIndexReader);
+ if (!childResult.isPresent()) {
+ continue;
+ }
+ GlobalIndexResult result = childResult.get();
// AND Operation
if (compoundResult.isPresent()) {
GlobalIndexResult r1 = compoundResult.get();
- compoundResult = Optional.of(r1.and(childResult));
+ compoundResult = Optional.of(r1.and(result));
} else {
- compoundResult = Optional.of(childResult);
+ compoundResult = Optional.of(result);
}
if (compoundResult.get().results().isEmpty()) {
@@ -93,15 +97,20 @@ public class GlobalIndexEvaluator
Collection<GlobalIndexReader> readers =
indexReadersCache.computeIfAbsent(fieldId,
readersFunction::apply);
for (GlobalIndexReader fileIndexReader : readers) {
- GlobalIndexResult childResult =
+ Optional<GlobalIndexResult> childResult =
predicate.function().visit(fileIndexReader, fieldRef,
predicate.literals());
+ if (!childResult.isPresent()) {
+ continue;
+ }
+
+ GlobalIndexResult result = childResult.get();
// AND Operation
if (compoundResult.isPresent()) {
GlobalIndexResult r1 = compoundResult.get();
- compoundResult = Optional.of(r1.and(childResult));
+ compoundResult = Optional.of(r1.and(result));
} else {
- compoundResult = Optional.of(childResult);
+ compoundResult = Optional.of(result);
}
if (compoundResult.get().results().isEmpty()) {
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexIOMeta.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexIOMeta.java
index 497a9448c5..a09d2437c1 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexIOMeta.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexIOMeta.java
@@ -26,13 +26,11 @@ public class GlobalIndexIOMeta {
private final String fileName;
private final long fileSize;
- private final long rangeEnd;
private final byte[] metadata;
- public GlobalIndexIOMeta(String fileName, long fileSize, long rangeEnd,
byte[] metadata) {
+ public GlobalIndexIOMeta(String fileName, long fileSize, byte[] metadata) {
this.fileName = fileName;
this.fileSize = fileSize;
- this.rangeEnd = rangeEnd;
this.metadata = metadata;
}
@@ -44,10 +42,6 @@ public class GlobalIndexIOMeta {
return fileSize;
}
- public long rangeEnd() {
- return rangeEnd;
- }
-
public byte[] metadata() {
return metadata;
}
@@ -63,13 +57,12 @@ public class GlobalIndexIOMeta {
GlobalIndexIOMeta that = (GlobalIndexIOMeta) o;
return Objects.equals(fileName, that.fileName)
&& fileSize == that.fileSize
- && rangeEnd == that.rangeEnd
&& Arrays.equals(metadata, that.metadata);
}
@Override
public int hashCode() {
- int result = Objects.hash(fileName, fileSize, rangeEnd);
+ int result = Objects.hash(fileName, fileSize);
result = 31 * result + Arrays.hashCode(metadata);
return result;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexParallelWriter.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexParallelWriter.java
new file mode 100644
index 0000000000..13351b10b8
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexParallelWriter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import javax.annotation.Nullable;
+
+/** Parallel Index writer for global index with relative row id (from 0 to
rowCnt - 1). */
+public interface GlobalIndexParallelWriter extends GlobalIndexWriter {
+ void write(@Nullable Object key, long relativeRowId);
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
index 475fced409..d62c2a9f74 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
@@ -24,26 +24,27 @@ import org.apache.paimon.predicate.VectorSearch;
import java.io.Closeable;
import java.util.List;
+import java.util.Optional;
/** Index reader for global index, return {@link GlobalIndexResult}. */
-public interface GlobalIndexReader extends FunctionVisitor<GlobalIndexResult>,
Closeable {
+public interface GlobalIndexReader extends
FunctionVisitor<Optional<GlobalIndexResult>>, Closeable {
@Override
- default GlobalIndexResult visitAnd(List<GlobalIndexResult> children) {
- throw new UnsupportedOperationException("Should not invoke this");
+ default Optional<GlobalIndexResult>
visitAnd(List<Optional<GlobalIndexResult>> children) {
+ throw new UnsupportedOperationException();
}
@Override
- default GlobalIndexResult visitOr(List<GlobalIndexResult> children) {
- throw new UnsupportedOperationException("Should not invoke this");
+ default Optional<GlobalIndexResult>
visitOr(List<Optional<GlobalIndexResult>> children) {
+ throw new UnsupportedOperationException();
}
@Override
- default GlobalIndexResult visit(TransformPredicate predicate) {
- throw new UnsupportedOperationException("Should not invoke this");
+ default Optional<GlobalIndexResult> visit(TransformPredicate predicate) {
+ throw new UnsupportedOperationException();
}
- default GlobalIndexResult visitVectorSearch(VectorSearch vectorSearch) {
- throw new UnsupportedOperationException("Should not invoke this");
+ default Optional<GlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
+ throw new UnsupportedOperationException();
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexSingletonWriter.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexSingletonWriter.java
new file mode 100644
index 0000000000..d8a06d874b
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexSingletonWriter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import javax.annotation.Nullable;
+
+/** Index writer for global index. */
+public interface GlobalIndexSingletonWriter extends GlobalIndexWriter {
+ void write(@Nullable Object key);
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
index 5e241665df..7e97f89319 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
@@ -18,49 +18,10 @@
package org.apache.paimon.globalindex;
-import org.apache.paimon.utils.Range;
-
-import javax.annotation.Nullable;
-
import java.util.List;
-/** Index writer for global index. */
+/** Parallel Index writer for global index with relative row id (from 0 to
rowCnt - 1). */
public interface GlobalIndexWriter {
- void write(@Nullable Object key);
-
- default void write(@Nullable Object key, long rowId) {
- throw new UnsupportedOperationException("Not supported for default.");
- }
-
List<ResultEntry> finish();
-
- /** Write result meta. */
- class ResultEntry {
- private final String fileName;
- private final Range rowRange;
- @Nullable private final byte[] meta;
-
- public ResultEntry(String fileName, @Nullable byte[] meta, Range
rowRange) {
- this.fileName = fileName;
- this.meta = meta;
- this.rowRange = rowRange;
- }
-
- public String fileName() {
- return fileName;
- }
-
- public Range rowRange() {
- return rowRange;
- }
-
- public byte[] meta() {
- return meta;
- }
-
- public static ResultEntry of(String fileName, byte[] meta, Range
rowRange) {
- return new ResultEntry(fileName, meta, rowRange);
- }
- }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java
index 3974d532ef..ecd5b3fd66 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/OffsetGlobalIndexReader.java
@@ -23,6 +23,7 @@ import org.apache.paimon.predicate.VectorSearch;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
/**
* A {@link GlobalIndexReader} that wraps another reader and applies an offset
to all row IDs in the
@@ -39,85 +40,82 @@ public class OffsetGlobalIndexReader implements
GlobalIndexReader {
}
@Override
- public GlobalIndexResult visitIsNotNull(FieldRef fieldRef) {
+ public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
return applyOffset(wrapped.visitIsNotNull(fieldRef));
}
@Override
- public GlobalIndexResult visitIsNull(FieldRef fieldRef) {
+ public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
return applyOffset(wrapped.visitIsNull(fieldRef));
}
@Override
- public GlobalIndexResult visitStartsWith(FieldRef fieldRef, Object
literal) {
+ public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef,
Object literal) {
return applyOffset(wrapped.visitStartsWith(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitEndsWith(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object
literal) {
return applyOffset(wrapped.visitEndsWith(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitContains(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object
literal) {
return applyOffset(wrapped.visitContains(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitLike(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitLike(FieldRef fieldRef, Object
literal) {
return applyOffset(wrapped.visitLike(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object
literal) {
return applyOffset(wrapped.visitLessThan(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object
literal) {
+ public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef,
Object literal) {
return applyOffset(wrapped.visitGreaterOrEqual(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object
literal) {
return applyOffset(wrapped.visitNotEqual(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitLessOrEqual(FieldRef fieldRef, Object
literal) {
+ public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef,
Object literal) {
return applyOffset(wrapped.visitLessOrEqual(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitEqual(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object
literal) {
return applyOffset(wrapped.visitEqual(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitGreaterThan(FieldRef fieldRef, Object
literal) {
+ public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef,
Object literal) {
return applyOffset(wrapped.visitGreaterThan(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitIn(FieldRef fieldRef, List<Object> literals)
{
+ public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object>
literals) {
return applyOffset(wrapped.visitIn(fieldRef, literals));
}
@Override
- public GlobalIndexResult visitNotIn(FieldRef fieldRef, List<Object>
literals) {
+ public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef,
List<Object> literals) {
return applyOffset(wrapped.visitNotIn(fieldRef, literals));
}
@Override
- public GlobalIndexResult visitVectorSearch(VectorSearch vectorSearch) {
+ public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
return applyOffset(wrapped.visitVectorSearch(vectorSearch));
}
- private GlobalIndexResult applyOffset(GlobalIndexResult result) {
- if (result == null) {
- throw new IllegalStateException("Wrapped reader should not return
null");
- }
- return result.offset(offset);
+ private Optional<GlobalIndexResult>
applyOffset(Optional<GlobalIndexResult> result) {
+ return result.map(r -> r.offset(offset));
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/ResultEntry.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/ResultEntry.java
new file mode 100644
index 0000000000..69938fdbdf
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/ResultEntry.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import javax.annotation.Nullable;
+
+/** Write result meta. */
+public class ResultEntry {
+
+ private final String fileName;
+ private final long rowCount;
+ @Nullable private final byte[] meta;
+
+ public ResultEntry(String fileName, long rowCount, @Nullable byte[] meta) {
+ this.fileName = fileName;
+ this.rowCount = rowCount;
+ this.meta = meta;
+ }
+
+ public String fileName() {
+ return fileName;
+ }
+
+ public long rowCount() {
+ return rowCount;
+ }
+
+ public byte[] meta() {
+ return meta;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
index 1bf04148ca..6e10e38f0a 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
@@ -23,6 +23,7 @@ import org.apache.paimon.predicate.VectorSearch;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
import java.util.function.Function;
/**
@@ -38,88 +39,92 @@ public class UnionGlobalIndexReader implements
GlobalIndexReader {
}
@Override
- public GlobalIndexResult visitIsNotNull(FieldRef fieldRef) {
+ public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
return union(reader -> reader.visitIsNotNull(fieldRef));
}
@Override
- public GlobalIndexResult visitIsNull(FieldRef fieldRef) {
+ public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
return union(reader -> reader.visitIsNull(fieldRef));
}
@Override
- public GlobalIndexResult visitStartsWith(FieldRef fieldRef, Object
literal) {
+ public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef,
Object literal) {
return union(reader -> reader.visitStartsWith(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitEndsWith(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object
literal) {
return union(reader -> reader.visitEndsWith(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitContains(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object
literal) {
return union(reader -> reader.visitContains(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitLike(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitLike(FieldRef fieldRef, Object
literal) {
return union(reader -> reader.visitLike(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object
literal) {
return union(reader -> reader.visitLessThan(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object
literal) {
+ public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef,
Object literal) {
return union(reader -> reader.visitGreaterOrEqual(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object
literal) {
return union(reader -> reader.visitNotEqual(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitLessOrEqual(FieldRef fieldRef, Object
literal) {
+ public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef,
Object literal) {
return union(reader -> reader.visitLessOrEqual(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitEqual(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object
literal) {
return union(reader -> reader.visitEqual(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitGreaterThan(FieldRef fieldRef, Object
literal) {
+ public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef,
Object literal) {
return union(reader -> reader.visitGreaterThan(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitIn(FieldRef fieldRef, List<Object> literals)
{
+ public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object>
literals) {
return union(reader -> reader.visitIn(fieldRef, literals));
}
@Override
- public GlobalIndexResult visitNotIn(FieldRef fieldRef, List<Object>
literals) {
+ public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef,
List<Object> literals) {
return union(reader -> reader.visitNotIn(fieldRef, literals));
}
@Override
- public GlobalIndexResult visitVectorSearch(VectorSearch vectorSearch) {
+ public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
return union(reader -> reader.visitVectorSearch(vectorSearch));
}
- private GlobalIndexResult union(Function<GlobalIndexReader,
GlobalIndexResult> visitor) {
- GlobalIndexResult result = null;
+ private Optional<GlobalIndexResult> union(
+ Function<GlobalIndexReader, Optional<GlobalIndexResult>> visitor) {
+ Optional<GlobalIndexResult> result = Optional.empty();
for (GlobalIndexReader reader : readers) {
- GlobalIndexResult current = visitor.apply(reader);
- if (current == null) {
- throw new IllegalStateException("Reader should not return
null");
+ Optional<GlobalIndexResult> current = visitor.apply(reader);
+ if (!current.isPresent()) {
+ continue;
}
- result = result == null ? current : result.or(current);
+ if (!result.isPresent()) {
+ result = current;
+ }
+ result = Optional.of(result.get().or(current.get()));
}
return result;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndex.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndex.java
index 8d498219b9..b8a64602c2 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndex.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/bitmap/BitmapGlobalIndex.java
@@ -27,16 +27,16 @@ import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexReader;
import org.apache.paimon.globalindex.GlobalIndexResult;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
import org.apache.paimon.globalindex.GlobalIndexer;
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
import org.apache.paimon.globalindex.wrap.FileIndexReaderWrapper;
import org.apache.paimon.globalindex.wrap.FileIndexWriterWrapper;
-import org.apache.paimon.utils.Range;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -50,7 +50,8 @@ public class BitmapGlobalIndex implements GlobalIndexer {
}
@Override
- public GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter)
throws IOException {
+ public GlobalIndexSingletonWriter createWriter(GlobalIndexFileWriter
fileWriter)
+ throws IOException {
FileIndexWriter writer = index.createWriter();
return new FileIndexWriterWrapper(
fileWriter, writer, BitmapGlobalIndexerFactory.IDENTIFIER);
@@ -62,17 +63,16 @@ public class BitmapGlobalIndex implements GlobalIndexer {
GlobalIndexIOMeta indexMeta = files.get(0);
SeekableInputStream input =
fileReader.getInputStream(indexMeta.fileName());
FileIndexReader reader = index.createReader(input, 0, (int)
indexMeta.fileSize());
- return new FileIndexReaderWrapper(
- reader, r -> toGlobalResult(indexMeta.rangeEnd(), r), input);
+ return new FileIndexReaderWrapper(reader, this::toGlobalResult, input);
}
- private GlobalIndexResult toGlobalResult(long rangeEnd, FileIndexResult
result) {
+ private Optional<GlobalIndexResult> toGlobalResult(FileIndexResult result)
{
if (FileIndexResult.REMAIN == result) {
- return GlobalIndexResult.fromRange(new Range(0L, rangeEnd));
+ return Optional.empty();
} else if (FileIndexResult.SKIP == result) {
- return GlobalIndexResult.createEmpty();
+ return Optional.of(GlobalIndexResult.createEmpty());
}
BitmapIndexResult bitmapResult = (BitmapIndexResult) result;
- return GlobalIndexResult.create(() ->
bitmapResult.get().toNavigable64());
+ return Optional.of(GlobalIndexResult.create(() ->
bitmapResult.get().toNavigable64()));
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
index 04e661b95d..2ffba2fc9d 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexer.java
@@ -22,17 +22,17 @@ import
org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexReader;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.UnionGlobalIndexReader;
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.LazyField;
-import org.apache.paimon.utils.Preconditions;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -79,7 +79,7 @@ public class BTreeGlobalIndexer implements GlobalIndexer {
}
@Override
- public GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter)
throws IOException {
+ public BTreeIndexWriter createWriter(GlobalIndexFileWriter fileWriter)
throws IOException {
long blockSize =
options.get(BTreeIndexOptions.BTREE_INDEX_BLOCK_SIZE).getBytes();
CompressOptions compressOptions =
new CompressOptions(
@@ -95,8 +95,10 @@ public class BTreeGlobalIndexer implements GlobalIndexer {
@Override
public GlobalIndexReader createReader(
GlobalIndexFileReader fileReader, List<GlobalIndexIOMeta> files)
throws IOException {
- // Single reader only supports read one index file
- Preconditions.checkState(files.size() == 1);
- return new BTreeIndexReader(keySerializer, fileReader, files.get(0),
cacheManager.get());
+ List<GlobalIndexReader> readers = new ArrayList<>();
+ for (GlobalIndexIOMeta meta : files) {
+ readers.add(new BTreeIndexReader(keySerializer, fileReader, meta,
cacheManager.get()));
+ }
+ return new UnionGlobalIndexReader(readers);
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
index dc0fba2175..65b0d5744c 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexReader.java
@@ -44,6 +44,7 @@ import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.zip.CRC32;
/** The {@link GlobalIndexReader} implementation for btree index. */
@@ -151,178 +152,194 @@ public class BTreeIndexReader implements
GlobalIndexReader {
}
@Override
- public GlobalIndexResult visitIsNotNull(FieldRef fieldRef) {
+ public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
// nulls are stored separately in null bitmap.
- return GlobalIndexResult.create(
- () -> {
- try {
- return fullData();
- } catch (IOException ioe) {
- throw new RuntimeException("fail to read btree index
file.", ioe);
- }
- });
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ try {
+ return allNonNullRows();
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree
index file.", ioe);
+ }
+ }));
}
@Override
- public GlobalIndexResult visitIsNull(FieldRef fieldRef) {
+ public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
// nulls are stored separately in null bitmap.
- return GlobalIndexResult.create(nullBitmap::get);
+ return Optional.of(GlobalIndexResult.create(nullBitmap::get));
}
@Override
- public GlobalIndexResult visitStartsWith(FieldRef fieldRef, Object
literal) {
+ public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef,
Object literal) {
// todo: `startsWith` can also be covered by btree index.
- return GlobalIndexResult.create(
- () -> {
- try {
- return fullData();
- } catch (IOException ioe) {
- throw new RuntimeException("fail to read btree index
file.", ioe);
- }
- });
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ try {
+ return allNonNullRows();
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree
index file.", ioe);
+ }
+ }));
}
@Override
- public GlobalIndexResult visitEndsWith(FieldRef fieldRef, Object literal) {
- return GlobalIndexResult.create(
- () -> {
- try {
- return fullData();
- } catch (IOException ioe) {
- throw new RuntimeException("fail to read btree index
file.", ioe);
- }
- });
+ public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object
literal) {
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ try {
+ return allNonNullRows();
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree
index file.", ioe);
+ }
+ }));
}
@Override
- public GlobalIndexResult visitContains(FieldRef fieldRef, Object literal) {
- return GlobalIndexResult.create(
- () -> {
- try {
- return fullData();
- } catch (IOException ioe) {
- throw new RuntimeException("fail to read btree index
file.", ioe);
- }
- });
+ public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object
literal) {
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ try {
+ return allNonNullRows();
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree
index file.", ioe);
+ }
+ }));
}
@Override
- public GlobalIndexResult visitLike(FieldRef fieldRef, Object literal) {
- return GlobalIndexResult.create(
- () -> {
- try {
- return fullData();
- } catch (IOException ioe) {
- throw new RuntimeException("fail to read btree index
file.", ioe);
- }
- });
+ public Optional<GlobalIndexResult> visitLike(FieldRef fieldRef, Object
literal) {
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ try {
+ return allNonNullRows();
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree
index file.", ioe);
+ }
+ }));
}
@Override
- public GlobalIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
- return GlobalIndexResult.create(
- () -> {
- try {
- return rangeQuery(minKey, literal, true, false);
- } catch (IOException ioe) {
- throw new RuntimeException("fail to read btree index
file.", ioe);
- }
- });
+ public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object
literal) {
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ try {
+ return rangeQuery(minKey, literal, true,
false);
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree
index file.", ioe);
+ }
+ }));
}
@Override
- public GlobalIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object
literal) {
- return GlobalIndexResult.create(
- () -> {
- try {
- return rangeQuery(literal, maxKey, true, true);
- } catch (IOException ioe) {
- throw new RuntimeException("fail to read btree index
file.", ioe);
- }
- });
+ public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef,
Object literal) {
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ try {
+ return rangeQuery(literal, maxKey, true, true);
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree
index file.", ioe);
+ }
+ }));
}
@Override
- public GlobalIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
- return GlobalIndexResult.create(
- () -> {
- try {
- RoaringNavigableMap64 result = fullData();
- result.andNot(rangeQuery(literal, literal, true,
true));
- return result;
- } catch (IOException ioe) {
- throw new RuntimeException("fail to read btree index
file.", ioe);
- }
- });
+ public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object
literal) {
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ try {
+ RoaringNavigableMap64 result =
allNonNullRows();
+ result.andNot(rangeQuery(literal, literal,
true, true));
+ return result;
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree
index file.", ioe);
+ }
+ }));
}
@Override
- public GlobalIndexResult visitLessOrEqual(FieldRef fieldRef, Object
literal) {
- return GlobalIndexResult.create(
- () -> {
- try {
- return rangeQuery(minKey, literal, true, true);
- } catch (IOException ioe) {
- throw new RuntimeException("fail to read btree index
file.", ioe);
- }
- });
+ public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef,
Object literal) {
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ try {
+ return rangeQuery(minKey, literal, true, true);
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree
index file.", ioe);
+ }
+ }));
}
@Override
- public GlobalIndexResult visitEqual(FieldRef fieldRef, Object literal) {
- return GlobalIndexResult.create(
- () -> {
- try {
- return rangeQuery(literal, literal, true, true);
- } catch (IOException ioe) {
- throw new RuntimeException("fail to read btree index
file.", ioe);
- }
- });
+ public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object
literal) {
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ try {
+ return rangeQuery(literal, literal, true,
true);
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree
index file.", ioe);
+ }
+ }));
}
@Override
- public GlobalIndexResult visitGreaterThan(FieldRef fieldRef, Object
literal) {
- return GlobalIndexResult.create(
- () -> {
- try {
- return rangeQuery(literal, maxKey, false, true);
- } catch (IOException ioe) {
- throw new RuntimeException("fail to read btree index
file.", ioe);
- }
- });
+ public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef,
Object literal) {
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ try {
+ return rangeQuery(literal, maxKey, false,
true);
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree
index file.", ioe);
+ }
+ }));
}
@Override
- public GlobalIndexResult visitIn(FieldRef fieldRef, List<Object> literals)
{
- return GlobalIndexResult.create(
- () -> {
- try {
- RoaringNavigableMap64 result = new
RoaringNavigableMap64();
- for (Object literal : literals) {
- result.or(rangeQuery(literal, literal, true,
true));
- }
- return result;
- } catch (IOException ioe) {
- throw new RuntimeException("fail to read btree index
file.", ioe);
- }
- });
+ public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object>
literals) {
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ try {
+ RoaringNavigableMap64 result = new
RoaringNavigableMap64();
+ for (Object literal : literals) {
+ result.or(rangeQuery(literal, literal,
true, true));
+ }
+ return result;
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree
index file.", ioe);
+ }
+ }));
}
@Override
- public GlobalIndexResult visitNotIn(FieldRef fieldRef, List<Object>
literals) {
- return GlobalIndexResult.create(
- () -> {
- try {
- RoaringNavigableMap64 result = fullData();
- result.andNot(this.visitIn(fieldRef,
literals).results());
- return result;
- } catch (IOException ioe) {
- throw new RuntimeException("fail to read btree index
file.", ioe);
- }
- });
+ public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef,
List<Object> literals) {
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ try {
+ RoaringNavigableMap64 result =
allNonNullRows();
+ result.andNot(this.visitIn(fieldRef,
literals).get().results());
+ return result;
+ } catch (IOException ioe) {
+ throw new RuntimeException("fail to read btree
index file.", ioe);
+ }
+ }));
}
- private RoaringNavigableMap64 fullData() throws IOException {
+ private RoaringNavigableMap64 allNonNullRows() throws IOException {
+ // Traverse all data to avoid returning null values, which is very
advantageous in
+ // situations where there are many null values
+ // TODO do not traverse all data if less null values
return rangeQuery(minKey, maxKey, true, true);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
index 97c5f6430a..a08b378947 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexWriter.java
@@ -20,7 +20,9 @@ package org.apache.paimon.globalindex.btree;
import org.apache.paimon.compression.BlockCompressionFactory;
import org.apache.paimon.fs.PositionOutputStream;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.memory.MemorySliceOutput;
@@ -28,7 +30,6 @@ import org.apache.paimon.sst.BlockHandle;
import org.apache.paimon.sst.BloomFilterHandle;
import org.apache.paimon.sst.SstFileWriter;
import org.apache.paimon.utils.LazyField;
-import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RoaringNavigableMap64;
import javax.annotation.Nullable;
@@ -41,9 +42,9 @@ import java.util.List;
import java.util.zip.CRC32;
/**
- * The {@link GlobalIndexWriter} implementation for BTree index. Note that
users must keep written
- * keys monotonically incremental. All null keys are stored in a separate
bitmap, which will be
- * serialized and appended to the file end on close. The layout is as below:
+ * The {@link GlobalIndexSingletonWriter} implementation for BTree index. Note
that users must keep
+ * written keys monotonically incremental. All null keys are stored in a
separate bitmap, which will
+ * be serialized and appended to the file end on close. The layout is as below:
*
* <pre>
* +-----------------------------------+------+
@@ -66,24 +67,21 @@ import java.util.zip.CRC32;
* <p>For efficiency, we combine entries with the same keys and store a
compact list of row ids for
* each key.
*/
-public class BTreeIndexWriter implements GlobalIndexWriter {
+public class BTreeIndexWriter implements GlobalIndexParallelWriter {
private final String fileName;
private final PositionOutputStream out;
- private long maxRowId = Long.MIN_VALUE;
- private long minRowId = Long.MAX_VALUE;
-
private final SstFileWriter writer;
private final KeySerializer keySerializer;
private final Comparator<Object> comparator;
+ private final List<Long> currentRowIds = new ArrayList<>();
+ private final LazyField<RoaringNavigableMap64> nullBitmap =
+ new LazyField<>(RoaringNavigableMap64::new);
private Object firstKey = null;
private Object lastKey = null;
- private final List<Long> currentRowIds = new ArrayList<>();
-
- // for nulls
- LazyField<RoaringNavigableMap64> nullBitmap = new
LazyField<>(RoaringNavigableMap64::new);
+ private long rowCount = 0;
public BTreeIndexWriter(
GlobalIndexFileWriter indexFileWriter,
@@ -99,14 +97,9 @@ public class BTreeIndexWriter implements GlobalIndexWriter {
this.writer = new SstFileWriter(out, blockSize, null,
compressionFactory);
}
- @Override
- public void write(@Nullable Object key) {
- throw new UnsupportedOperationException(
- "BTree index writer should explicitly specify row id for each
key");
- }
-
@Override
public void write(@Nullable Object key, long rowId) {
+ rowCount++;
if (key == null) {
nullBitmap.get().add(rowId);
return;
@@ -126,8 +119,6 @@ public class BTreeIndexWriter implements GlobalIndexWriter {
if (firstKey == null) {
firstKey = key;
}
- minRowId = Math.min(minRowId, rowId);
- maxRowId = Math.max(maxRowId, rowId);
}
private void flush() throws IOException {
@@ -180,15 +171,13 @@ public class BTreeIndexWriter implements
GlobalIndexWriter {
throw new RuntimeException("Should never write an empty btree
index file.");
}
- return Collections.singletonList(
- ResultEntry.of(
- fileName,
- new BTreeIndexMeta(
- keySerializer.serialize(firstKey),
- keySerializer.serialize(lastKey),
- nullBitmap.initialized())
- .serialize(),
- new Range(minRowId, maxRowId)));
+ byte[] metaBytes =
+ new BTreeIndexMeta(
+ keySerializer.serialize(firstKey),
+ keySerializer.serialize(lastKey),
+ nullBitmap.initialized())
+ .serialize();
+ return Collections.singletonList(new ResultEntry(fileName, rowCount,
metaBytes));
}
@Nullable
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexReaderWrapper.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexReaderWrapper.java
index ab7d16a71c..2fb8fe3065 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexReaderWrapper.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexReaderWrapper.java
@@ -27,18 +27,19 @@ import org.apache.paimon.predicate.FieldRef;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
import java.util.function.Function;
/** A {@link GlobalIndexReader} wrapper for {@link FileIndexReader}. */
public class FileIndexReaderWrapper implements GlobalIndexReader {
private final FileIndexReader reader;
- private final Function<FileIndexResult, GlobalIndexResult> transform;
+ private final Function<FileIndexResult, Optional<GlobalIndexResult>>
transform;
private final Closeable closeable;
public FileIndexReaderWrapper(
FileIndexReader reader,
- Function<FileIndexResult, GlobalIndexResult> transform,
+ Function<FileIndexResult, Optional<GlobalIndexResult>> transform,
Closeable closeable) {
this.reader = reader;
this.transform = transform;
@@ -46,72 +47,72 @@ public class FileIndexReaderWrapper implements
GlobalIndexReader {
}
@Override
- public GlobalIndexResult visitIsNotNull(FieldRef fieldRef) {
+ public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
return transform.apply(reader.visitIsNotNull(fieldRef));
}
@Override
- public GlobalIndexResult visitIsNull(FieldRef fieldRef) {
+ public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
return transform.apply(reader.visitIsNull(fieldRef));
}
@Override
- public GlobalIndexResult visitStartsWith(FieldRef fieldRef, Object
literal) {
+ public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef,
Object literal) {
return transform.apply(reader.visitStartsWith(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitEndsWith(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object
literal) {
return transform.apply(reader.visitEndsWith(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitContains(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object
literal) {
return transform.apply(reader.visitContains(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitLike(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitLike(FieldRef fieldRef, Object
literal) {
return transform.apply(reader.visitLike(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object
literal) {
return transform.apply(reader.visitLessThan(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object
literal) {
+ public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef,
Object literal) {
return transform.apply(reader.visitGreaterOrEqual(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object
literal) {
return transform.apply(reader.visitNotEqual(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitLessOrEqual(FieldRef fieldRef, Object
literal) {
+ public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef,
Object literal) {
return transform.apply(reader.visitLessOrEqual(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitEqual(FieldRef fieldRef, Object literal) {
+ public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object
literal) {
return transform.apply(reader.visitEqual(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitGreaterThan(FieldRef fieldRef, Object
literal) {
+ public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef,
Object literal) {
return transform.apply(reader.visitGreaterThan(fieldRef, literal));
}
@Override
- public GlobalIndexResult visitIn(FieldRef fieldRef, List<Object> literals)
{
+ public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object>
literals) {
return transform.apply(reader.visitIn(fieldRef, literals));
}
@Override
- public GlobalIndexResult visitNotIn(FieldRef fieldRef, List<Object>
literals) {
+ public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef,
List<Object> literals) {
return transform.apply(reader.visitNotIn(fieldRef, literals));
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexWriterWrapper.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexWriterWrapper.java
index b7bbb5b2a7..536da1ac8c 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexWriterWrapper.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexWriterWrapper.java
@@ -20,16 +20,16 @@ package org.apache.paimon.globalindex.wrap;
import org.apache.paimon.fileindex.FileIndexWriter;
import org.apache.paimon.globalindex.GlobalIndexReader;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
-import org.apache.paimon.utils.Range;
import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
/** A {@link GlobalIndexReader} wrapper for {@link FileIndexWriter}. */
-public class FileIndexWriterWrapper implements GlobalIndexWriter {
+public class FileIndexWriterWrapper implements GlobalIndexSingletonWriter {
private final GlobalIndexFileWriter fileWriter;
private final FileIndexWriter writer;
@@ -58,8 +58,7 @@ public class FileIndexWriterWrapper implements
GlobalIndexWriter {
} catch (Exception e) {
throw new RuntimeException("Failed to write global index file:
" + fileName, e);
}
- return Collections.singletonList(
- ResultEntry.of(fileName, null, new Range(0, count - 1)));
+ return Collections.singletonList(new ResultEntry(fileName, count,
null));
} else {
return Collections.emptyList();
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/predicate/VectorSearch.java
b/paimon-common/src/main/java/org/apache/paimon/predicate/VectorSearch.java
index 9097317821..a6042ec7e9 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/VectorSearch.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/VectorSearch.java
@@ -25,6 +25,7 @@ import org.apache.paimon.utils.RoaringNavigableMap64;
import javax.annotation.Nullable;
import java.io.Serializable;
+import java.util.Optional;
/** VectorSearch to perform vector similarity search. * */
public class VectorSearch implements Serializable {
@@ -75,7 +76,7 @@ public class VectorSearch implements Serializable {
return this;
}
- public GlobalIndexResult visit(GlobalIndexReader visitor) {
+ public Optional<GlobalIndexResult> visit(GlobalIndexReader visitor) {
return visitor.visitVectorSearch(this);
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
index 7edd6173da..18f0a75e2d 100644
---
a/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
@@ -28,7 +28,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexReader;
import org.apache.paimon.globalindex.GlobalIndexResult;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
import org.apache.paimon.globalindex.bitmap.BitmapGlobalIndex;
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
@@ -94,14 +94,22 @@ public class BitmapGlobalIndexTest {
}
});
assert reader.visitEqual(fieldRef, a)
+ .get()
.results()
.equals(RoaringNavigableMap64.bitmapOf(0, 4));
- assert reader.visitEqual(fieldRef,
b).results().equals(RoaringNavigableMap64.bitmapOf(2));
- assert
reader.visitIsNull(fieldRef).results().equals(RoaringNavigableMap64.bitmapOf(1,
3));
+ assert reader.visitEqual(fieldRef, b)
+ .get()
+ .results()
+ .equals(RoaringNavigableMap64.bitmapOf(2));
+ assert reader.visitIsNull(fieldRef)
+ .get()
+ .results()
+ .equals(RoaringNavigableMap64.bitmapOf(1, 3));
assert reader.visitIn(fieldRef, Arrays.asList(a, b))
+ .get()
.results()
.equals(RoaringNavigableMap64.bitmapOf(0, 2, 4));
- assert reader.visitEqual(fieldRef,
BinaryString.fromString("c")).results().isEmpty();
+ assert reader.visitEqual(fieldRef,
BinaryString.fromString("c")).get().results().isEmpty();
}
private void testIntType(int version) throws Exception {
@@ -117,14 +125,24 @@ public class BitmapGlobalIndexTest {
writer.write(o);
}
});
- assert reader.visitEqual(fieldRef,
0).results().equals(RoaringNavigableMap64.bitmapOf(0));
- assert reader.visitEqual(fieldRef,
1).results().equals(RoaringNavigableMap64.bitmapOf(1));
- assert
reader.visitIsNull(fieldRef).results().equals(RoaringNavigableMap64.bitmapOf(2));
+ assert reader.visitEqual(fieldRef, 0)
+ .get()
+ .results()
+ .equals(RoaringNavigableMap64.bitmapOf(0));
+ assert reader.visitEqual(fieldRef, 1)
+ .get()
+ .results()
+ .equals(RoaringNavigableMap64.bitmapOf(1));
+ assert reader.visitIsNull(fieldRef)
+ .get()
+ .results()
+ .equals(RoaringNavigableMap64.bitmapOf(2));
assert reader.visitIn(fieldRef, Arrays.asList(0, 1, 2))
+ .get()
.results()
.equals(RoaringNavigableMap64.bitmapOf(0, 1));
- assert reader.visitEqual(fieldRef, 2).results().isEmpty();
+ assert reader.visitEqual(fieldRef, 2).get().results().isEmpty();
}
private void testBooleanType(int version) throws Exception {
@@ -141,9 +159,13 @@ public class BitmapGlobalIndexTest {
}
});
assert reader.visitEqual(fieldRef, Boolean.TRUE)
+ .get()
.results()
.equals(RoaringNavigableMap64.bitmapOf(0, 2));
- assert
reader.visitIsNull(fieldRef).results().equals(RoaringNavigableMap64.bitmapOf(4));
+ assert reader.visitIsNull(fieldRef)
+ .get()
+ .results()
+ .equals(RoaringNavigableMap64.bitmapOf(4));
}
private void testHighCardinality(
@@ -177,11 +199,12 @@ public class BitmapGlobalIndexTest {
long time2 = System.currentTimeMillis();
GlobalIndexResult result =
reader.visitEqual(
- fieldRef, BinaryString.fromString(prefix +
(approxCardinality / 2)));
+ fieldRef, BinaryString.fromString(prefix +
(approxCardinality / 2)))
+ .get();
System.out.println("read time: " + (System.currentTimeMillis() -
time2));
assert result.results().equals(middleBm.toNavigable64());
long time3 = System.currentTimeMillis();
- GlobalIndexResult resultNull = reader.visitIsNull(fieldRef);
+ GlobalIndexResult resultNull = reader.visitIsNull(fieldRef).get();
System.out.println("read null bitmap time: " +
(System.currentTimeMillis() - time3));
assert resultNull.results().equals(nullBm.toNavigable64());
}
@@ -190,7 +213,7 @@ public class BitmapGlobalIndexTest {
int writerVersion,
Integer indexBlockSize,
DataType dataType,
- Consumer<GlobalIndexWriter> consumer)
+ Consumer<GlobalIndexSingletonWriter> consumer)
throws Exception {
Options options = new Options();
options.setInteger(BitmapFileIndex.VERSION, writerVersion);
@@ -213,7 +236,7 @@ public class BitmapGlobalIndexTest {
return fileIO.newOutputStream(new
Path(tempDir.toString(), fileName), true);
}
};
- GlobalIndexWriter globalIndexWriter =
bitmapGlobalIndex.createWriter(fileWriter);
+ GlobalIndexSingletonWriter globalIndexWriter =
bitmapGlobalIndex.createWriter(fileWriter);
consumer.accept(globalIndexWriter);
String fileName = globalIndexWriter.finish().get(0).fileName();
Path path = new Path(tempDir.toString(), fileName);
@@ -232,8 +255,7 @@ public class BitmapGlobalIndexTest {
}
};
- GlobalIndexIOMeta globalIndexMeta =
- new GlobalIndexIOMeta(fileName, fileSize, Long.MAX_VALUE,
null);
+ GlobalIndexIOMeta globalIndexMeta = new GlobalIndexIOMeta(fileName,
fileSize, null);
return bitmapGlobalIndex.createReader(
fileReader, Collections.singletonList(globalIndexMeta));
@@ -259,15 +281,23 @@ public class BitmapGlobalIndexTest {
a.pointTo(c.getSegments(), c.getOffset(),
c.getSizeInBytes());
writer.write(null);
});
- assert reader.visitEqual(fieldRef,
a).results().equals(RoaringNavigableMap64.bitmapOf(0));
- assert reader.visitEqual(fieldRef,
b).results().equals(RoaringNavigableMap64.bitmapOf(3));
+ assert reader.visitEqual(fieldRef, a)
+ .get()
+ .results()
+ .equals(RoaringNavigableMap64.bitmapOf(0));
+ assert reader.visitEqual(fieldRef, b)
+ .get()
+ .results()
+ .equals(RoaringNavigableMap64.bitmapOf(3));
assert reader.visitIsNull(fieldRef)
+ .get()
.results()
.equals(RoaringNavigableMap64.bitmapOf(1, 2, 4, 5));
assert reader.visitIn(fieldRef, Arrays.asList(a, b))
+ .get()
.results()
.equals(RoaringNavigableMap64.bitmapOf(0, 3));
- assert reader.visitEqual(fieldRef,
BinaryString.fromString("c")).results().isEmpty();
+ assert reader.visitEqual(fieldRef,
BinaryString.fromString("c")).get().results().isEmpty();
}
private void testAllNull(int version) throws Exception {
@@ -284,8 +314,9 @@ public class BitmapGlobalIndexTest {
}
});
assert reader.visitIsNull(fieldRef)
+ .get()
.results()
.equals(RoaringNavigableMap64.bitmapOf(0, 1, 2));
- assert reader.visitIsNotNull(fieldRef).results().isEmpty();
+ assert reader.visitIsNotNull(fieldRef).get().results().isEmpty();
}
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelectorTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelectorTest.java
index 3015ffa715..51f72e4346 100644
---
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelectorTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeFileMetaSelectorTest.java
@@ -54,11 +54,11 @@ public class BTreeFileMetaSelectorTest {
files =
Arrays.asList(
- new GlobalIndexIOMeta("file1", 1, 1L,
meta1.serialize()),
- new GlobalIndexIOMeta("file2", 1, 1L,
meta2.serialize()),
- new GlobalIndexIOMeta("file3", 1, 1L,
meta3.serialize()),
- new GlobalIndexIOMeta("file4", 1, 1L,
meta4.serialize()),
- new GlobalIndexIOMeta("file5", 1, 1L,
meta5.serialize()));
+ new GlobalIndexIOMeta("file1", 1, meta1.serialize()),
+ new GlobalIndexIOMeta("file2", 1, meta2.serialize()),
+ new GlobalIndexIOMeta("file3", 1, meta3.serialize()),
+ new GlobalIndexIOMeta("file4", 1, meta4.serialize()),
+ new GlobalIndexIOMeta("file5", 1, meta5.serialize()));
}
@Test
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
index 8cf5a9d840..7acf024542 100644
---
a/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexerTest.java
@@ -27,9 +27,10 @@ import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
import org.apache.paimon.globalindex.GlobalIndexReader;
import org.apache.paimon.globalindex.GlobalIndexResult;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
import org.apache.paimon.options.MemorySize;
@@ -76,6 +77,8 @@ import java.util.TreeSet;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import static org.assertj.core.api.Assertions.assertThat;
+
/** Test for {@link BTreeGlobalIndexer}. */
@ExtendWith(ParameterizedTestExtension.class)
public class BTreeGlobalIndexerTest {
@@ -174,38 +177,38 @@ public class BTreeGlobalIndexerTest {
Object literal = data.get(random.nextInt(dataNum)).getKey();
// 1. test <= literal
- result = reader.visitLessOrEqual(ref, literal);
+ result = reader.visitLessOrEqual(ref, literal).get();
assertResult(result, filter(obj -> comparator.compare(obj,
literal) <= 0));
// 2. test < literal
- result = reader.visitLessThan(ref, literal);
+ result = reader.visitLessThan(ref, literal).get();
assertResult(result, filter(obj -> comparator.compare(obj,
literal) < 0));
// 3. test >= literal
- result = reader.visitGreaterOrEqual(ref, literal);
+ result = reader.visitGreaterOrEqual(ref, literal).get();
assertResult(result, filter(obj -> comparator.compare(obj,
literal) >= 0));
// 4. test > literal
- result = reader.visitGreaterThan(ref, literal);
+ result = reader.visitGreaterThan(ref, literal).get();
assertResult(result, filter(obj -> comparator.compare(obj,
literal) > 0));
// 5. test equal
- result = reader.visitEqual(ref, literal);
+ result = reader.visitEqual(ref, literal).get();
assertResult(result, filter(obj -> comparator.compare(obj,
literal) == 0));
// 6. test not equal
- result = reader.visitNotEqual(ref, literal);
+ result = reader.visitNotEqual(ref, literal).get();
assertResult(result, filter(obj -> comparator.compare(obj,
literal) != 0));
}
// 7. test < min
Object literal7 = data.get(0).getKey();
- result = reader.visitLessThan(ref, literal7);
+ result = reader.visitLessThan(ref, literal7).get();
Assertions.assertTrue(result.results().isEmpty());
// 8. test > max
Object literal8 = data.get(dataNum - 1).getKey();
- result = reader.visitGreaterThan(ref, literal8);
+ result = reader.visitGreaterThan(ref, literal8).get();
Assertions.assertTrue(result.results().isEmpty());
}
}
@@ -223,10 +226,10 @@ public class BTreeGlobalIndexerTest {
globalIndexer.createReader(fileReader,
Collections.singletonList(written))) {
GlobalIndexResult result;
- result = reader.visitIsNull(ref);
+ result = reader.visitIsNull(ref).get();
assertResult(result, filter(Objects::isNull));
- result = reader.visitIsNotNull(ref);
+ result = reader.visitIsNotNull(ref).get();
assertResult(result, filter(Objects::nonNull));
}
}
@@ -250,30 +253,29 @@ public class BTreeGlobalIndexerTest {
set.addAll(literals);
// 1. test in
- result = reader.visitIn(ref, literals);
+ result = reader.visitIn(ref, literals).get();
assertResult(result, filter(set::contains));
// 2. test not in
- result = reader.visitNotIn(ref, literals);
+ result = reader.visitNotIn(ref, literals).get();
assertResult(result, filter(obj -> !set.contains(obj)));
}
}
}
private GlobalIndexIOMeta writeData() throws IOException {
- GlobalIndexWriter indexWriter = globalIndexer.createWriter(fileWriter);
+ GlobalIndexParallelWriter indexWriter =
globalIndexer.createWriter(fileWriter);
for (Pair<Object, Long> pair : data) {
indexWriter.write(pair.getKey(), pair.getValue());
}
- List<GlobalIndexWriter.ResultEntry> results = indexWriter.finish();
+ List<ResultEntry> results = indexWriter.finish();
Assertions.assertEquals(1, results.size());
- GlobalIndexWriter.ResultEntry resultEntry = results.get(0);
+ ResultEntry resultEntry = results.get(0);
String fileName = resultEntry.fileName();
return new GlobalIndexIOMeta(
fileName,
fileIO.getFileSize(new Path(new Path(tempPath.toUri()),
fileName)),
- resultEntry.rowRange().to - resultEntry.rowRange().from,
resultEntry.meta());
}
@@ -290,8 +292,7 @@ public class BTreeGlobalIndexerTest {
while (iter.hasNext()) {
result.add(iter.next());
}
- org.assertj.core.api.Assertions.assertThat(result)
- .containsExactlyInAnyOrderElementsOf(expected);
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expected);
}
/** The Generator to generate test data. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
index 73e7cbf19f..fc87057027 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/RowRangeGlobalIndexScanner.java
@@ -103,14 +103,6 @@ public class RowRangeGlobalIndexScanner implements
Closeable {
this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType,
readersFunction);
}
- public Optional<GlobalIndexResult> scan(Predicate predicate) {
- return globalIndexEvaluator.evaluate(predicate, null);
- }
-
- public Optional<GlobalIndexResult> scan(@Nullable VectorSearch
vectorSearch) {
- return globalIndexEvaluator.evaluate(null, vectorSearch);
- }
-
public Optional<GlobalIndexResult> scan(
Predicate predicate, @Nullable VectorSearch vectorSearch) {
return globalIndexEvaluator.evaluate(predicate, vectorSearch);
@@ -161,8 +153,7 @@ public class RowRangeGlobalIndexScanner implements
Closeable {
private GlobalIndexIOMeta toGlobalMeta(IndexFileMeta meta) {
GlobalIndexMeta globalIndex = meta.globalIndexMeta();
checkNotNull(globalIndex);
- return new GlobalIndexIOMeta(
- meta.fileName(), meta.fileSize(), meta.rowCount() - 1,
globalIndex.indexMeta());
+ return new GlobalIndexIOMeta(meta.fileName(), meta.fileSize(),
globalIndex.indexMeta());
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index a1460257fa..b104ba20f2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -30,11 +30,12 @@ import org.apache.paimon.globalindex.DataEvolutionBatchScan;
import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
import org.apache.paimon.globalindex.GlobalIndexResult;
import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
import org.apache.paimon.globalindex.GlobalIndexer;
import org.apache.paimon.globalindex.GlobalIndexerFactory;
import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils;
import org.apache.paimon.globalindex.IndexedSplit;
+import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.globalindex.RowRangeGlobalIndexScanner;
import org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory;
import org.apache.paimon.index.GlobalIndexMeta;
@@ -1014,19 +1015,20 @@ public class DataEvolutionTableTest extends
TableTestBase {
GlobalIndexerFactory globalIndexerFactory =
GlobalIndexerFactoryUtils.load(BitmapGlobalIndexerFactory.IDENTIFIER);
GlobalIndexer globalIndexer = globalIndexerFactory.create(indexField,
new Options());
- GlobalIndexWriter globaIndexBuilder =
globalIndexer.createWriter(indexFileReadWrite);
+ GlobalIndexSingletonWriter globaIndexBuilder =
+ (GlobalIndexSingletonWriter)
globalIndexer.createWriter(indexFileReadWrite);
reader.forEachRemaining(r -> globaIndexBuilder.write(r.getString(0)));
- List<GlobalIndexWriter.ResultEntry> results =
globaIndexBuilder.finish();
+ List<ResultEntry> results = globaIndexBuilder.finish();
List<IndexFileMeta> indexFileMetaList = new ArrayList<>();
- for (GlobalIndexWriter.ResultEntry result : results) {
+ for (ResultEntry result : results) {
String fileName = result.fileName();
- Range range = result.rowRange();
long fileSize =
fileIO.getFileSize(indexFileReadWrite.filePath(fileName));
GlobalIndexMeta globalIndexMeta =
- new GlobalIndexMeta(range.from, range.to, indexField.id(),
null, result.meta());
+ new GlobalIndexMeta(
+ 0, result.rowCount() - 1, indexField.id(), null,
result.meta());
indexFileMetaList.add(
new IndexFileMeta(
BitmapGlobalIndexerFactory.IDENTIFIER,
@@ -1057,7 +1059,7 @@ public class DataEvolutionTableTest extends TableTestBase
{
for (Range range : ranges) {
try (RowRangeGlobalIndexScanner scanner =
indexScanBuilder.withRowRange(range).build()) {
- Optional<GlobalIndexResult> globalIndexResult =
scanner.scan(predicate);
+ Optional<GlobalIndexResult> globalIndexResult =
scanner.scan(predicate, null);
if (!globalIndexResult.isPresent()) {
throw new RuntimeException("Can't find index result by
scan");
}
diff --git
a/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexReader.java
b/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexReader.java
index 362ac41163..c6acf36f69 100644
---
a/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexReader.java
+++
b/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexReader.java
@@ -30,7 +30,6 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.FloatType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RoaringNavigableMap64;
import org.apache.lucene.document.Document;
@@ -50,6 +49,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
@@ -67,7 +67,6 @@ public class LuceneVectorGlobalIndexReader implements
GlobalIndexReader {
private final List<LuceneIndexMMapDirectory> directories;
private final List<GlobalIndexIOMeta> ioMetas;
private final GlobalIndexFileReader fileReader;
- private final GlobalIndexResult defaultResult;
private volatile boolean indicesLoaded = false;
private final DataType fieldType;
@@ -78,11 +77,10 @@ public class LuceneVectorGlobalIndexReader implements
GlobalIndexReader {
this.fieldType = fieldType;
this.searchers = new ArrayList<>();
this.directories = new ArrayList<>();
- this.defaultResult = GlobalIndexResult.fromRange(new Range(0,
ioMetas.get(0).rangeEnd()));
}
@Override
- public GlobalIndexResult visitVectorSearch(VectorSearch vectorSearch) {
+ public Optional<GlobalIndexResult> visitVectorSearch(VectorSearch
vectorSearch) {
try {
ensureLoadIndices(fileReader, ioMetas);
Query query = query(vectorSearch, fieldType);
@@ -178,7 +176,7 @@ public class LuceneVectorGlobalIndexReader implements
GlobalIndexReader {
}
}
- private GlobalIndexResult search(Query query, int limit) throws
IOException {
+ private Optional<GlobalIndexResult> search(Query query, int limit) throws
IOException {
PriorityQueue<ScoredRow> result =
new PriorityQueue<>(Comparator.comparingDouble(sr ->
sr.score));
for (IndexSearcher searcher : searchers) {
@@ -209,7 +207,7 @@ public class LuceneVectorGlobalIndexReader implements
GlobalIndexReader {
id2scores.put(rowId, scoredRow.score);
roaringBitmap64.add(rowId);
}
- return new LuceneVectorSearchGlobalIndexResult(roaringBitmap64,
id2scores);
+ return Optional.of(new
LuceneVectorSearchGlobalIndexResult(roaringBitmap64, id2scores));
}
/** Helper class to store row ID with its score. */
@@ -255,72 +253,72 @@ public class LuceneVectorGlobalIndexReader implements
GlobalIndexReader {
// =================== unsupported =====================
@Override
- public GlobalIndexResult visitIsNotNull(FieldRef fieldRef) {
- return defaultResult;
+ public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
+ return Optional.empty();
}
@Override
- public GlobalIndexResult visitIsNull(FieldRef fieldRef) {
- return defaultResult;
+ public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
+ return Optional.empty();
}
@Override
- public GlobalIndexResult visitStartsWith(FieldRef fieldRef, Object
literal) {
- return defaultResult;
+ public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
}
@Override
- public GlobalIndexResult visitEndsWith(FieldRef fieldRef, Object literal) {
- return defaultResult;
+ public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object
literal) {
+ return Optional.empty();
}
@Override
- public GlobalIndexResult visitContains(FieldRef fieldRef, Object literal) {
- return defaultResult;
+ public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object
literal) {
+ return Optional.empty();
}
@Override
- public GlobalIndexResult visitLike(FieldRef fieldRef, Object literal) {
- return defaultResult;
+ public Optional<GlobalIndexResult> visitLike(FieldRef fieldRef, Object
literal) {
+ return Optional.empty();
}
@Override
- public GlobalIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
- return defaultResult;
+ public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object
literal) {
+ return Optional.empty();
}
@Override
- public GlobalIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object
literal) {
- return defaultResult;
+ public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
}
@Override
- public GlobalIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
- return defaultResult;
+ public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object
literal) {
+ return Optional.empty();
}
@Override
- public GlobalIndexResult visitLessOrEqual(FieldRef fieldRef, Object
literal) {
- return defaultResult;
+ public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
}
@Override
- public GlobalIndexResult visitEqual(FieldRef fieldRef, Object literal) {
- return defaultResult;
+ public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object
literal) {
+ return Optional.empty();
}
@Override
- public GlobalIndexResult visitGreaterThan(FieldRef fieldRef, Object
literal) {
- return defaultResult;
+ public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
}
@Override
- public GlobalIndexResult visitIn(FieldRef fieldRef, List<Object> literals)
{
- return defaultResult;
+ public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object>
literals) {
+ return Optional.empty();
}
@Override
- public GlobalIndexResult visitNotIn(FieldRef fieldRef, List<Object>
literals) {
- return defaultResult;
+ public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef,
List<Object> literals) {
+ return Optional.empty();
}
}
diff --git
a/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexWriter.java
b/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexWriter.java
index 9c363fa779..bc77a3551f 100644
---
a/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexWriter.java
+++
b/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexWriter.java
@@ -18,10 +18,10 @@
package org.apache.paimon.lucene.index;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
import org.apache.paimon.types.DataType;
-import org.apache.paimon.utils.Range;
import org.apache.lucene.codecs.KnnVectorsFormat;
import org.apache.lucene.codecs.lucene912.Lucene912Codec;
@@ -43,7 +43,7 @@ import java.util.List;
* <p>This implementation uses Lucene's native KnnFloatVectorField with HNSW
algorithm for efficient
* approximate nearest neighbor search.
*/
-public class LuceneVectorGlobalIndexWriter implements GlobalIndexWriter {
+public class LuceneVectorGlobalIndexWriter implements
GlobalIndexSingletonWriter {
private final GlobalIndexFileWriter fileWriter;
private final LuceneVectorIndexOptions vectorIndexOptions;
@@ -106,9 +106,7 @@ public class LuceneVectorGlobalIndexWriter implements
GlobalIndexWriter {
this.vectorIndexOptions.writeBufferSize(),
out);
}
- long minRowIdInBatch = vectorIndices.get(0).id();
- long maxRowIdInBatch = vectorIndices.get(vectorIndices.size() -
1).id();
- results.add(ResultEntry.of(fileName, null, new Range(minRowIdInBatch,
maxRowIdInBatch)));
+ results.add(new ResultEntry(fileName, count, null));
vectorIndices.clear();
}
diff --git
a/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexScanTest.java
b/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexScanTest.java
index a608df5baa..e9e476ede6 100644
---
a/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexScanTest.java
+++
b/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexScanTest.java
@@ -26,7 +26,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
@@ -170,27 +170,22 @@ public class LuceneVectorGlobalIndexScanTest {
writer.write(vec);
}
- List<GlobalIndexWriter.ResultEntry> entries = writer.finish();
+ List<ResultEntry> entries = writer.finish();
List<IndexFileMeta> metas = new ArrayList<>();
int fieldId = rowType.getFieldIndex(vectorFieldName);
- for (GlobalIndexWriter.ResultEntry entry : entries) {
+ for (ResultEntry entry : entries) {
long fileSize = fileIO.getFileSize(new Path(indexDir,
entry.fileName()));
GlobalIndexMeta globalMeta =
- new GlobalIndexMeta(
- entry.rowRange().from,
- entry.rowRange().to,
- fieldId,
- null,
- entry.meta());
+ new GlobalIndexMeta(0, vectors.length - 1, fieldId, null,
entry.meta());
metas.add(
new IndexFileMeta(
LuceneVectorGlobalIndexerFactory.IDENTIFIER,
entry.fileName(),
fileSize,
- entry.rowRange().to - entry.rowRange().from + 1,
+ entry.rowCount(),
globalMeta));
}
return metas;
diff --git
a/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexTest.java
b/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexTest.java
index d4ccc0eb2e..2848a04f72 100644
---
a/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexTest.java
+++
b/paimon-lucene/src/test/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexTest.java
@@ -25,7 +25,7 @@ import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.globalindex.GlobalIndexIOMeta;
import org.apache.paimon.globalindex.GlobalIndexResult;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
import org.apache.paimon.globalindex.io.GlobalIndexFileWriter;
import org.apache.paimon.options.Options;
@@ -45,6 +45,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.Random;
import java.util.UUID;
@@ -122,24 +123,23 @@ public class LuceneVectorGlobalIndexTest {
List<float[]> testVectors = generateRandomVectors(numVectors,
dimension);
testVectors.forEach(writer::write);
- List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+ List<ResultEntry> results = writer.finish();
assertThat(results).hasSize(1);
- GlobalIndexWriter.ResultEntry result = results.get(0);
+ ResultEntry result = results.get(0);
GlobalIndexFileReader fileReader =
createFileReader(metricIndexPath);
List<GlobalIndexIOMeta> metas = new ArrayList<>();
metas.add(
new GlobalIndexIOMeta(
result.fileName(),
fileIO.getFileSize(new Path(metricIndexPath,
result.fileName())),
- result.rowRange().to - result.rowRange().from,
result.meta()));
try (LuceneVectorGlobalIndexReader reader =
new LuceneVectorGlobalIndexReader(fileReader, metas,
vectorType)) {
VectorSearch vectorSearch = new
VectorSearch(testVectors.get(0), 3, fieldName);
- GlobalIndexResult searchResult =
reader.visitVectorSearch(vectorSearch);
- assertThat(searchResult).isNotNull();
+ Optional<GlobalIndexResult> searchResult =
reader.visitVectorSearch(vectorSearch);
+ assertThat(searchResult).isPresent();
}
}
}
@@ -160,24 +160,23 @@ public class LuceneVectorGlobalIndexTest {
List<float[]> testVectors = generateRandomVectors(numVectors,
dimension);
testVectors.forEach(writer::write);
- List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+ List<ResultEntry> results = writer.finish();
assertThat(results).hasSize(1);
- GlobalIndexWriter.ResultEntry result = results.get(0);
+ ResultEntry result = results.get(0);
GlobalIndexFileReader fileReader = createFileReader(dimIndexPath);
List<GlobalIndexIOMeta> metas = new ArrayList<>();
metas.add(
new GlobalIndexIOMeta(
result.fileName(),
fileIO.getFileSize(new Path(dimIndexPath,
result.fileName())),
- result.rowRange().to - result.rowRange().from,
result.meta()));
try (LuceneVectorGlobalIndexReader reader =
new LuceneVectorGlobalIndexReader(fileReader, metas,
vectorType)) {
// Verify search works with this dimension
VectorSearch vectorSearch = new
VectorSearch(testVectors.get(0), 5, fieldName);
- GlobalIndexResult searchResult =
reader.visitVectorSearch(vectorSearch);
+ Optional<GlobalIndexResult> searchResult =
reader.visitVectorSearch(vectorSearch);
assertThat(searchResult).isNotNull();
}
}
@@ -218,17 +217,16 @@ public class LuceneVectorGlobalIndexTest {
new LuceneVectorGlobalIndexWriter(fileWriter, vectorType,
indexOptions);
Arrays.stream(vectors).forEach(writer::write);
- List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+ List<ResultEntry> results = writer.finish();
assertThat(results).hasSize(2);
GlobalIndexFileReader fileReader = createFileReader(indexPath);
List<GlobalIndexIOMeta> metas = new ArrayList<>();
- for (GlobalIndexWriter.ResultEntry result : results) {
+ for (ResultEntry result : results) {
metas.add(
new GlobalIndexIOMeta(
result.fileName(),
fileIO.getFileSize(new Path(indexPath,
result.fileName())),
- result.rowRange().to - result.rowRange().from,
result.meta()));
}
@@ -236,7 +234,8 @@ public class LuceneVectorGlobalIndexTest {
new LuceneVectorGlobalIndexReader(fileReader, metas,
vectorType)) {
VectorSearch vectorSearch = new VectorSearch(vectors[0], 1,
fieldName);
LuceneVectorSearchGlobalIndexResult result =
- (LuceneVectorSearchGlobalIndexResult)
reader.visitVectorSearch(vectorSearch);
+ (LuceneVectorSearchGlobalIndexResult)
+ reader.visitVectorSearch(vectorSearch).get();
assertThat(result.results().getLongCardinality()).isEqualTo(1);
long expectedRowId = 0;
assertThat(containsRowId(result, expectedRowId)).isTrue();
@@ -246,12 +245,16 @@ public class LuceneVectorGlobalIndexTest {
filterResults.add(expectedRowId);
vectorSearch =
new VectorSearch(vectors[0], 1,
fieldName).withIncludeRowIds(filterResults);
- result = (LuceneVectorSearchGlobalIndexResult)
reader.visitVectorSearch(vectorSearch);
+ result =
+ (LuceneVectorSearchGlobalIndexResult)
+ reader.visitVectorSearch(vectorSearch).get();
assertThat(containsRowId(result, expectedRowId)).isTrue();
float[] queryVector = new float[] {0.85f, 0.15f};
vectorSearch = new VectorSearch(queryVector, 2, fieldName);
- result = (LuceneVectorSearchGlobalIndexResult)
reader.visitVectorSearch(vectorSearch);
+ result =
+ (LuceneVectorSearchGlobalIndexResult)
+ reader.visitVectorSearch(vectorSearch).get();
assertThat(result.results().getLongCardinality()).isEqualTo(2);
long rowId1 = 1;
long rowId2 = 3;
@@ -282,30 +285,29 @@ public class LuceneVectorGlobalIndexTest {
new LuceneVectorGlobalIndexWriter(fileWriter, byteVectorType,
indexOptions);
Arrays.stream(vectors).forEach(writer::write);
- List<GlobalIndexWriter.ResultEntry> results = writer.finish();
+ List<ResultEntry> results = writer.finish();
assertThat(results).hasSize(2);
GlobalIndexFileReader fileReader = createFileReader(indexPath);
List<GlobalIndexIOMeta> metas = new ArrayList<>();
- for (GlobalIndexWriter.ResultEntry result : results) {
+ for (ResultEntry result : results) {
metas.add(
new GlobalIndexIOMeta(
result.fileName(),
fileIO.getFileSize(new Path(indexPath,
result.fileName())),
- result.rowRange().to - result.rowRange().from,
result.meta()));
}
try (LuceneVectorGlobalIndexReader reader =
new LuceneVectorGlobalIndexReader(fileReader, metas,
byteVectorType)) {
VectorSearch vectorSearch = new VectorSearch(vectors[0], 1,
fieldName);
- GlobalIndexResult result = reader.visitVectorSearch(vectorSearch);
+ GlobalIndexResult result =
reader.visitVectorSearch(vectorSearch).get();
assertThat(result.results().getLongCardinality()).isEqualTo(1);
assertThat(containsRowId(result, 0)).isTrue();
byte[] queryVector = new byte[] {85, 15};
vectorSearch = new VectorSearch(queryVector, 2, fieldName);
- result = reader.visitVectorSearch(vectorSearch);
+ result = reader.visitVectorSearch(vectorSearch).get();
assertThat(result.results().getLongCardinality()).isEqualTo(2);
assertThat(containsRowId(result, 1)).isTrue();
assertThat(containsRowId(result, 3)).isTrue();
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
index e66db5b8b5..1bc3b1b8d1 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
@@ -20,9 +20,10 @@ package org.apache.paimon.spark.globalindex;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
import org.apache.paimon.globalindex.GlobalIndexer;
import org.apache.paimon.globalindex.IndexedSplit;
+import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
@@ -31,7 +32,7 @@ import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.LongCounter;
import java.io.IOException;
import java.util.ArrayList;
@@ -50,43 +51,53 @@ public abstract class GlobalIndexBuilder {
ReadBuilder builder = context.table().newReadBuilder();
builder.withReadType(context.readType());
RecordReader<InternalRow> rows =
builder.newRead().createReader(indexedSplit);
- List<GlobalIndexWriter.ResultEntry> resultEntries =
writePaimonRows(context, rows);
- List<IndexFileMeta> indexFileMetas = convertToIndexMeta(context,
resultEntries);
+ LongCounter rowCounter = new LongCounter(0);
+ List<ResultEntry> resultEntries = writePaimonRows(context, rows,
rowCounter);
+ List<IndexFileMeta> indexFileMetas =
+ convertToIndexMeta(context, rowCounter.getValue(),
resultEntries);
DataIncrement dataIncrement =
DataIncrement.indexIncrement(indexFileMetas);
return new CommitMessageImpl(
context.partition(), 0, null, dataIncrement,
CompactIncrement.emptyIncrement());
}
private static List<IndexFileMeta> convertToIndexMeta(
- GlobalIndexBuilderContext context,
List<GlobalIndexWriter.ResultEntry> entries)
+ GlobalIndexBuilderContext context, long totalRowCount,
List<ResultEntry> entries)
throws IOException {
List<IndexFileMeta> results = new ArrayList<>();
- for (GlobalIndexWriter.ResultEntry entry : entries) {
+ long rangeEnd = context.startOffset() + totalRowCount - 1;
+ for (ResultEntry entry : entries) {
String fileName = entry.fileName();
- Range range = entry.rowRange().addOffset(context.startOffset());
GlobalIndexFileReadWrite readWrite =
context.globalIndexFileReadWrite();
long fileSize = readWrite.fileSize(fileName);
GlobalIndexMeta globalIndexMeta =
new GlobalIndexMeta(
- range.from, range.to, context.indexField().id(),
null, entry.meta());
+ context.startOffset(),
+ rangeEnd,
+ context.indexField().id(),
+ null,
+ entry.meta());
IndexFileMeta indexFileMeta =
new IndexFileMeta(
context.indexType(),
fileName,
fileSize,
- range.to - range.from + 1,
+ entry.rowCount(),
globalIndexMeta);
results.add(indexFileMeta);
}
return results;
}
- private static List<GlobalIndexWriter.ResultEntry> writePaimonRows(
- GlobalIndexBuilderContext context, RecordReader<InternalRow> rows)
throws IOException {
+ private static List<ResultEntry> writePaimonRows(
+ GlobalIndexBuilderContext context,
+ RecordReader<InternalRow> rows,
+ LongCounter rowCounter)
+ throws IOException {
GlobalIndexer globalIndexer =
GlobalIndexer.create(context.indexType(),
context.indexField(), context.options());
- GlobalIndexWriter globalIndexWriter =
- globalIndexer.createWriter(context.globalIndexFileReadWrite());
+ GlobalIndexSingletonWriter globalIndexWriter =
+ (GlobalIndexSingletonWriter)
+
globalIndexer.createWriter(context.globalIndexFileReadWrite());
InternalRow.FieldGetter getter =
InternalRow.createFieldGetter(
context.indexField().type(),
@@ -95,6 +106,7 @@ public abstract class GlobalIndexBuilder {
row -> {
Object indexO = getter.getFieldOrNull(row);
globalIndexWriter.write(indexO);
+ rowCounter.add(1);
});
return globalIndexWriter.finish();
}