This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4dee829fa3 [core] Refactor GlobalIndexBatchScan to
DataEvolutionBatchScan (#6804)
4dee829fa3 is described below
commit 4dee829fa38321dd00e31bc2ae76ee0a16312ade
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Dec 11 23:10:50 2025 +0800
[core] Refactor GlobalIndexBatchScan to DataEvolutionBatchScan (#6804)
---
.../java/org/apache/paimon/AbstractFileStore.java | 2 +-
.../src/main/java/org/apache/paimon/FileStore.java | 2 +-
...xBatchScan.java => DataEvolutionBatchScan.java} | 155 +++++++++++----------
.../paimon/privilege/PrivilegedFileStore.java | 4 +-
.../paimon/table/AbstractFileStoreTable.java | 21 ++-
.../org/apache/paimon/table/FileStoreTable.java | 6 -
.../paimon/table/source/AbstractDataTableScan.java | 6 +-
.../apache/paimon/table/source/InnerTableScan.java | 5 -
.../paimon/table/source/ReadBuilderImpl.java | 16 +--
.../paimon/table/DataEvolutionTableTest.java | 2 +-
.../index/LuceneVectorGlobalIndexerFactory.java | 6 +-
.../procedure/CreateGlobalIndexProcedureTest.scala | 2 +-
12 files changed, 105 insertions(+), 122 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index baf03f514f..3c25c39c70 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -498,7 +498,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
}
@Override
- public GlobalIndexScanBuilder newIndexScanBuilder() {
+ public GlobalIndexScanBuilder newGlobalIndexScanBuilder() {
return new GlobalIndexScanBuilderImpl(
options.toConfiguration(),
schema.logicalRowType(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index b73b9276a3..1ccd19ca11 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -126,5 +126,5 @@ public interface FileStore<T> {
void setSnapshotCache(Cache<Path, Snapshot> cache);
- GlobalIndexScanBuilder newIndexScanBuilder();
+ GlobalIndexScanBuilder newGlobalIndexScanBuilder();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBatchScan.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
similarity index 61%
rename from
paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBatchScan.java
rename to
paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
index 264edea6f0..8294a48612 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBatchScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
@@ -28,6 +28,8 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DataTableBatchScan;
+import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
@@ -40,22 +42,31 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import static
org.apache.paimon.globalindex.GlobalIndexScanBuilder.parallelScan;
-/** Scan with global index. */
-public class GlobalIndexBatchScan implements InnerTableScan {
- private final FileStoreTable wrapped;
- private final InnerTableScan batchScan;
- private GlobalIndexResult globalIndexResult;
+/** Scan for data evolution table. */
+public class DataEvolutionBatchScan implements DataTableScan {
+
+ private final FileStoreTable table;
+ private final DataTableBatchScan batchScan;
+
private Predicate filter;
+ private List<Range> rowRanges;
+ private ScoreGetter scoreGetter;
- public GlobalIndexBatchScan(FileStoreTable wrapped, InnerTableScan
batchScan) {
- this.wrapped = wrapped;
+ public DataEvolutionBatchScan(FileStoreTable wrapped, DataTableBatchScan
batchScan) {
+ this.table = wrapped;
this.batchScan = batchScan;
}
+ @Override
+ public DataTableScan withShard(int indexOfThisSubtask, int
numberOfParallelSubtasks) {
+ return batchScan.withShard(indexOfThisSubtask,
numberOfParallelSubtasks);
+ }
+
@Override
public InnerTableScan withFilter(Predicate predicate) {
this.filter = predicate;
@@ -69,12 +80,6 @@ public class GlobalIndexBatchScan implements InnerTableScan {
return this;
}
- @Nullable
- @Override
- public PartitionPredicate partitionFilter() {
- return batchScan.partitionFilter();
- }
-
@Override
public InnerTableScan withBucket(int bucket) {
batchScan.withBucket(bucket);
@@ -143,64 +148,72 @@ public class GlobalIndexBatchScan implements
InnerTableScan {
@Override
public InnerTableScan withRowRanges(List<Range> rowRanges) {
- if (rowRanges != null) {
- this.globalIndexResult = GlobalIndexResult.fromRanges(rowRanges);
- }
+ this.rowRanges = rowRanges;
return this;
}
- public InnerTableScan withGlobalIndexResult(GlobalIndexResult
globalIndexResult) {
- this.globalIndexResult = globalIndexResult;
- return this;
+ @Override
+ public List<PartitionEntry> listPartitionEntries() {
+ return batchScan.listPartitionEntries();
+ }
+
+ @Override
+ public Plan plan() {
+ generateRowRanges();
+ if (rowRanges != null) {
+ batchScan.withRowRanges(rowRanges);
+ }
+ List<Split> splits = batchScan.plan().splits();
+ return tryWrapToIndexSplits(splits);
}
- private void configureGlobalIndex(InnerTableScan scan) {
- if (globalIndexResult == null && filter != null) {
- PartitionPredicate partitionPredicate = scan.partitionFilter();
- GlobalIndexScanBuilder globalIndexScanBuilder =
wrapped.newIndexScanBuilder();
- Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(wrapped);
- globalIndexScanBuilder
- .withPartitionPredicate(partitionPredicate)
- .withSnapshot(snapshot);
- List<Range> indexedRowRanges = globalIndexScanBuilder.shardList();
- if (!indexedRowRanges.isEmpty()) {
- List<Range> nonIndexedRowRanges =
- new Range(0, snapshot.nextRowId() -
1).exclude(indexedRowRanges);
- Optional<GlobalIndexResult> combined =
- parallelScan(
- indexedRowRanges,
- globalIndexScanBuilder,
- filter,
- wrapped.coreOptions().globalIndexThreadNum());
- if (combined.isPresent()) {
- GlobalIndexResult globalIndexResultTemp = combined.get();
- if (!nonIndexedRowRanges.isEmpty()) {
- for (Range range : nonIndexedRowRanges) {
-
globalIndexResultTemp.or(GlobalIndexResult.fromRange(range));
- }
+ private void generateRowRanges() {
+ if (rowRanges != null) {
+ return;
+ }
+ if (filter == null) {
+ return;
+ }
+ if (!table.coreOptions().globalIndexEnabled()) {
+ return;
+ }
+
+ PartitionPredicate partitionPredicate =
+ batchScan.snapshotReader().manifestsReader().partitionFilter();
+ GlobalIndexScanBuilder indexScanBuilder =
table.store().newGlobalIndexScanBuilder();
+ Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(table);
+
indexScanBuilder.withPartitionPredicate(partitionPredicate).withSnapshot(snapshot);
+ List<Range> indexedRowRanges = indexScanBuilder.shardList();
+ if (!indexedRowRanges.isEmpty()) {
+ Long nextRowId = Objects.requireNonNull(snapshot.nextRowId());
+ List<Range> nonIndexedRowRanges = new Range(0, nextRowId -
1).exclude(indexedRowRanges);
+ Optional<GlobalIndexResult> combined =
+ parallelScan(
+ indexedRowRanges,
+ indexScanBuilder,
+ filter,
+ table.coreOptions().globalIndexThreadNum());
+ if (combined.isPresent()) {
+ GlobalIndexResult result = combined.get();
+ if (!nonIndexedRowRanges.isEmpty()) {
+ for (Range range : nonIndexedRowRanges) {
+ result.or(GlobalIndexResult.fromRange(range));
}
+ }
- this.globalIndexResult = globalIndexResultTemp;
+ rowRanges = result.results().toRangeList();
+ if (result instanceof TopkGlobalIndexResult) {
+ scoreGetter = ((TopkGlobalIndexResult)
result).scoreGetter();
}
}
}
-
- if (this.globalIndexResult != null) {
- scan.withRowRanges(this.globalIndexResult.results().toRangeList());
- }
- }
-
- @Override
- public Plan plan() {
- configureGlobalIndex(batchScan);
- List<Split> splits = batchScan.plan().splits();
- return wrap(splits);
}
- private Plan wrap(List<Split> splits) {
- if (globalIndexResult == null) {
+ private Plan tryWrapToIndexSplits(List<Split> splits) {
+ if (rowRanges == null) {
return () -> splits;
}
+
List<Split> indexedSplits = new ArrayList<>();
for (Split split : splits) {
DataSplit dataSplit = (DataSplit) split;
@@ -212,22 +225,17 @@ public class GlobalIndexBatchScan implements
InnerTableScan {
fromDataFile = Range.mergeSortedAsPossible(fromDataFile);
- List<Range> expected =
- Range.and(fromDataFile,
globalIndexResult.results().toRangeList());
+ List<Range> expected = Range.and(fromDataFile, rowRanges);
float[] scores = null;
- if (globalIndexResult instanceof TopkGlobalIndexResult) {
- ScoreGetter scoreFunction =
- ((TopkGlobalIndexResult)
globalIndexResult).scoreGetter();
- if (scoreFunction != null) {
- int size = expected.stream().mapToInt(r -> (int) (r.to -
r.from + 1)).sum();
- scores = new float[size];
-
- int index = 0;
- for (Range range : expected) {
- for (long i = range.from; i <= range.to; i++) {
- scores[index++] = scoreFunction.score(i);
- }
+ if (scoreGetter != null) {
+ int size = expected.stream().mapToInt(r -> (int) (r.to -
r.from + 1)).sum();
+ scores = new float[size];
+
+ int index = 0;
+ for (Range range : expected) {
+ for (long i = range.from; i <= range.to; i++) {
+ scores[index++] = scoreGetter.score(i);
}
}
}
@@ -236,9 +244,4 @@ public class GlobalIndexBatchScan implements InnerTableScan
{
}
return () -> indexedSplits;
}
-
- @Override
- public List<PartitionEntry> listPartitionEntries() {
- return batchScan.listPartitionEntries();
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 9b1f8692bb..917301a2e5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -241,7 +241,7 @@ public class PrivilegedFileStore<T> implements FileStore<T>
{
}
@Override
- public GlobalIndexScanBuilder newIndexScanBuilder() {
- return wrapped.newIndexScanBuilder();
+ public GlobalIndexScanBuilder newGlobalIndexScanBuilder() {
+ return wrapped.newGlobalIndexScanBuilder();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index e1ec6ff6c4..c7d9952ada 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.globalindex.DataEvolutionBatchScan;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
@@ -45,6 +46,7 @@ import org.apache.paimon.table.sink.RowKindGenerator;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.WriteSelector;
import org.apache.paimon.table.source.DataTableBatchScan;
+import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.DataTableStreamScan;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.StreamDataTableScan;
@@ -273,13 +275,18 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
}
@Override
- public DataTableBatchScan newScan() {
- return new DataTableBatchScan(
- tableSchema,
- schemaManager(),
- coreOptions(),
- newSnapshotReader(),
- catalogEnvironment.tableQueryAuth(coreOptions()));
+ public DataTableScan newScan() {
+ DataTableBatchScan scan =
+ new DataTableBatchScan(
+ tableSchema,
+ schemaManager(),
+ coreOptions(),
+ newSnapshotReader(),
+ catalogEnvironment.tableQueryAuth(coreOptions()));
+ if (coreOptions().dataEvolutionEnabled()) {
+ return new DataEvolutionBatchScan(this, scan);
+ }
+ return scan;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index a543d0fbbc..b07465a258 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -23,7 +23,6 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
import org.apache.paimon.operation.LocalOrphanFilesClean;
import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.schema.TableSchema;
@@ -131,11 +130,6 @@ public interface FileStoreTable extends DataTable {
RowKeyExtractor createRowKeyExtractor();
- /** Returns a new global index scan builder. */
- default GlobalIndexScanBuilder newIndexScanBuilder() {
- return store().newIndexScanBuilder();
- }
-
/**
* Get {@link DataTable} with branch identified by {@code branchName}.
Note that this method
* does not keep dynamic options in current table.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index c33a3f58f1..100c696f4b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -179,10 +179,8 @@ abstract class AbstractDataTableScan implements
DataTableScan {
return this;
}
- @Nullable
- @Override
- public PartitionPredicate partitionFilter() {
- return snapshotReader.manifestsReader().partitionFilter();
+ public SnapshotReader snapshotReader() {
+ return snapshotReader;
}
public CoreOptions options() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index b4d147d885..ce05f9c2c3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -91,9 +91,4 @@ public interface InnerTableScan extends TableScan {
// do nothing, should implement this if need
return this;
}
-
- @Nullable
- default PartitionPredicate partitionFilter() {
- return null;
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index c40ef86554..056eed5a10 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -21,14 +21,11 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.variant.VariantAccessInfo;
import org.apache.paimon.data.variant.VariantAccessInfoUtils;
-import org.apache.paimon.globalindex.GlobalIndexBatchScan;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.TopN;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.InnerTable;
-import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Range;
@@ -186,11 +183,7 @@ public class ReadBuilderImpl implements ReadBuilder {
@Override
public TableScan newScan() {
- InnerTableScan tableScan = table.newScan();
- if (searchGlobalIndex(table)) {
- tableScan = new GlobalIndexBatchScan((FileStoreTable) table,
tableScan);
- }
- tableScan = configureScan(tableScan);
+ InnerTableScan tableScan = configureScan(table.newScan());
if (limit != null) {
tableScan.withLimit(limit);
}
@@ -239,13 +232,6 @@ public class ReadBuilderImpl implements ReadBuilder {
return scan;
}
- private boolean searchGlobalIndex(Table table) {
- return table instanceof FileStoreTable
- && ((FileStoreTable)
table).coreOptions().dataEvolutionEnabled()
- && (((FileStoreTable) table).coreOptions().globalIndexEnabled()
- || this.rowRanges != null);
- }
-
@Override
public TableRead newRead() {
InnerTableRead read = table.newRead().withFilter(filter);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index de941ed6ff..8845e22de9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -977,7 +977,7 @@ public class DataEvolutionTableTest extends TableTestBase {
private RoaringNavigableMap64 globalIndexScan(FileStoreTable table,
Predicate predicate)
throws Exception {
- GlobalIndexScanBuilder indexScanBuilder = table.newIndexScanBuilder();
+ GlobalIndexScanBuilder indexScanBuilder =
table.store().newGlobalIndexScanBuilder();
List<Range> ranges = indexScanBuilder.shardList();
GlobalIndexResult globalFileIndexResult =
GlobalIndexResult.createEmpty();
for (Range range : ranges) {
diff --git
a/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexerFactory.java
b/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexerFactory.java
index e5e66cf0b8..c195965750 100644
---
a/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexerFactory.java
+++
b/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexerFactory.java
@@ -21,7 +21,7 @@ package org.apache.paimon.lucene.index;
import org.apache.paimon.globalindex.GlobalIndexer;
import org.apache.paimon.globalindex.GlobalIndexerFactory;
import org.apache.paimon.options.Options;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
/** Factory for creating Lucene vector index. */
public class LuceneVectorGlobalIndexerFactory implements GlobalIndexerFactory {
@@ -34,7 +34,7 @@ public class LuceneVectorGlobalIndexerFactory implements
GlobalIndexerFactory {
}
@Override
- public GlobalIndexer create(DataType type, Options options) {
- return new LuceneVectorGlobalIndexer(type, options);
+ public GlobalIndexer create(DataField field, Options options) {
+ return new LuceneVectorGlobalIndexer(field.type(), options);
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
index 11eeeab65a..eba8d51625 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
@@ -56,7 +56,7 @@ class CreateGlobalIndexProcedureTest extends
PaimonSparkTestBase with StreamTest
.scanEntries()
.asScala
.filter(_.indexFile().indexType() == "bitmap")
- table.newIndexScanBuilder().shardList()
+ table.store().newGlobalIndexScanBuilder().shardList()
assert(bitmapEntries.nonEmpty)
val totalRowCount = bitmapEntries.map(_.indexFile().rowCount()).sum
assert(totalRowCount == 100000L)