This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dee829fa3 [core] Refactor GlobalIndexBatchScan to 
DataEvolutionBatchScan (#6804)
4dee829fa3 is described below

commit 4dee829fa38321dd00e31bc2ae76ee0a16312ade
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Dec 11 23:10:50 2025 +0800

    [core] Refactor GlobalIndexBatchScan to DataEvolutionBatchScan (#6804)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |   2 +-
 .../src/main/java/org/apache/paimon/FileStore.java |   2 +-
 ...xBatchScan.java => DataEvolutionBatchScan.java} | 155 +++++++++++----------
 .../paimon/privilege/PrivilegedFileStore.java      |   4 +-
 .../paimon/table/AbstractFileStoreTable.java       |  21 ++-
 .../org/apache/paimon/table/FileStoreTable.java    |   6 -
 .../paimon/table/source/AbstractDataTableScan.java |   6 +-
 .../apache/paimon/table/source/InnerTableScan.java |   5 -
 .../paimon/table/source/ReadBuilderImpl.java       |  16 +--
 .../paimon/table/DataEvolutionTableTest.java       |   2 +-
 .../index/LuceneVectorGlobalIndexerFactory.java    |   6 +-
 .../procedure/CreateGlobalIndexProcedureTest.scala |   2 +-
 12 files changed, 105 insertions(+), 122 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index baf03f514f..3c25c39c70 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -498,7 +498,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
     }
 
     @Override
-    public GlobalIndexScanBuilder newIndexScanBuilder() {
+    public GlobalIndexScanBuilder newGlobalIndexScanBuilder() {
         return new GlobalIndexScanBuilderImpl(
                 options.toConfiguration(),
                 schema.logicalRowType(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index b73b9276a3..1ccd19ca11 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -126,5 +126,5 @@ public interface FileStore<T> {
 
     void setSnapshotCache(Cache<Path, Snapshot> cache);
 
-    GlobalIndexScanBuilder newIndexScanBuilder();
+    GlobalIndexScanBuilder newGlobalIndexScanBuilder();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
similarity index 61%
rename from 
paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBatchScan.java
rename to 
paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
index 264edea6f0..8294a48612 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/DataEvolutionBatchScan.java
@@ -28,6 +28,8 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DataTableBatchScan;
+import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.snapshot.TimeTravelUtil;
@@ -40,22 +42,31 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 
 import static 
org.apache.paimon.globalindex.GlobalIndexScanBuilder.parallelScan;
 
-/** Scan with global index. */
-public class GlobalIndexBatchScan implements InnerTableScan {
-    private final FileStoreTable wrapped;
-    private final InnerTableScan batchScan;
-    private GlobalIndexResult globalIndexResult;
+/** Scan for data evolution table. */
+public class DataEvolutionBatchScan implements DataTableScan {
+
+    private final FileStoreTable table;
+    private final DataTableBatchScan batchScan;
+
     private Predicate filter;
+    private List<Range> rowRanges;
+    private ScoreGetter scoreGetter;
 
-    public GlobalIndexBatchScan(FileStoreTable wrapped, InnerTableScan 
batchScan) {
-        this.wrapped = wrapped;
+    public DataEvolutionBatchScan(FileStoreTable wrapped, DataTableBatchScan 
batchScan) {
+        this.table = wrapped;
         this.batchScan = batchScan;
     }
 
+    @Override
+    public DataTableScan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+        return batchScan.withShard(indexOfThisSubtask, 
numberOfParallelSubtasks);
+    }
+
     @Override
     public InnerTableScan withFilter(Predicate predicate) {
         this.filter = predicate;
@@ -69,12 +80,6 @@ public class GlobalIndexBatchScan implements InnerTableScan {
         return this;
     }
 
-    @Nullable
-    @Override
-    public PartitionPredicate partitionFilter() {
-        return batchScan.partitionFilter();
-    }
-
     @Override
     public InnerTableScan withBucket(int bucket) {
         batchScan.withBucket(bucket);
@@ -143,64 +148,72 @@ public class GlobalIndexBatchScan implements 
InnerTableScan {
 
     @Override
     public InnerTableScan withRowRanges(List<Range> rowRanges) {
-        if (rowRanges != null) {
-            this.globalIndexResult = GlobalIndexResult.fromRanges(rowRanges);
-        }
+        this.rowRanges = rowRanges;
         return this;
     }
 
-    public InnerTableScan withGlobalIndexResult(GlobalIndexResult 
globalIndexResult) {
-        this.globalIndexResult = globalIndexResult;
-        return this;
+    @Override
+    public List<PartitionEntry> listPartitionEntries() {
+        return batchScan.listPartitionEntries();
+    }
+
+    @Override
+    public Plan plan() {
+        generateRowRanges();
+        if (rowRanges != null) {
+            batchScan.withRowRanges(rowRanges);
+        }
+        List<Split> splits = batchScan.plan().splits();
+        return tryWrapToIndexSplits(splits);
     }
 
-    private void configureGlobalIndex(InnerTableScan scan) {
-        if (globalIndexResult == null && filter != null) {
-            PartitionPredicate partitionPredicate = scan.partitionFilter();
-            GlobalIndexScanBuilder globalIndexScanBuilder = 
wrapped.newIndexScanBuilder();
-            Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(wrapped);
-            globalIndexScanBuilder
-                    .withPartitionPredicate(partitionPredicate)
-                    .withSnapshot(snapshot);
-            List<Range> indexedRowRanges = globalIndexScanBuilder.shardList();
-            if (!indexedRowRanges.isEmpty()) {
-                List<Range> nonIndexedRowRanges =
-                        new Range(0, snapshot.nextRowId() - 
1).exclude(indexedRowRanges);
-                Optional<GlobalIndexResult> combined =
-                        parallelScan(
-                                indexedRowRanges,
-                                globalIndexScanBuilder,
-                                filter,
-                                wrapped.coreOptions().globalIndexThreadNum());
-                if (combined.isPresent()) {
-                    GlobalIndexResult globalIndexResultTemp = combined.get();
-                    if (!nonIndexedRowRanges.isEmpty()) {
-                        for (Range range : nonIndexedRowRanges) {
-                            
globalIndexResultTemp.or(GlobalIndexResult.fromRange(range));
-                        }
+    private void generateRowRanges() {
+        if (rowRanges != null) {
+            return;
+        }
+        if (filter == null) {
+            return;
+        }
+        if (!table.coreOptions().globalIndexEnabled()) {
+            return;
+        }
+
+        PartitionPredicate partitionPredicate =
+                batchScan.snapshotReader().manifestsReader().partitionFilter();
+        GlobalIndexScanBuilder indexScanBuilder = 
table.store().newGlobalIndexScanBuilder();
+        Snapshot snapshot = TimeTravelUtil.tryTravelOrLatest(table);
+        
indexScanBuilder.withPartitionPredicate(partitionPredicate).withSnapshot(snapshot);
+        List<Range> indexedRowRanges = indexScanBuilder.shardList();
+        if (!indexedRowRanges.isEmpty()) {
+            Long nextRowId = Objects.requireNonNull(snapshot.nextRowId());
+            List<Range> nonIndexedRowRanges = new Range(0, nextRowId - 
1).exclude(indexedRowRanges);
+            Optional<GlobalIndexResult> combined =
+                    parallelScan(
+                            indexedRowRanges,
+                            indexScanBuilder,
+                            filter,
+                            table.coreOptions().globalIndexThreadNum());
+            if (combined.isPresent()) {
+                GlobalIndexResult result = combined.get();
+                if (!nonIndexedRowRanges.isEmpty()) {
+                    for (Range range : nonIndexedRowRanges) {
+                        result.or(GlobalIndexResult.fromRange(range));
                     }
+                }
 
-                    this.globalIndexResult = globalIndexResultTemp;
+                rowRanges = result.results().toRangeList();
+                if (result instanceof TopkGlobalIndexResult) {
+                    scoreGetter = ((TopkGlobalIndexResult) 
result).scoreGetter();
                 }
             }
         }
-
-        if (this.globalIndexResult != null) {
-            scan.withRowRanges(this.globalIndexResult.results().toRangeList());
-        }
-    }
-
-    @Override
-    public Plan plan() {
-        configureGlobalIndex(batchScan);
-        List<Split> splits = batchScan.plan().splits();
-        return wrap(splits);
     }
 
-    private Plan wrap(List<Split> splits) {
-        if (globalIndexResult == null) {
+    private Plan tryWrapToIndexSplits(List<Split> splits) {
+        if (rowRanges == null) {
             return () -> splits;
         }
+
         List<Split> indexedSplits = new ArrayList<>();
         for (Split split : splits) {
             DataSplit dataSplit = (DataSplit) split;
@@ -212,22 +225,17 @@ public class GlobalIndexBatchScan implements 
InnerTableScan {
 
             fromDataFile = Range.mergeSortedAsPossible(fromDataFile);
 
-            List<Range> expected =
-                    Range.and(fromDataFile, 
globalIndexResult.results().toRangeList());
+            List<Range> expected = Range.and(fromDataFile, rowRanges);
 
             float[] scores = null;
-            if (globalIndexResult instanceof TopkGlobalIndexResult) {
-                ScoreGetter scoreFunction =
-                        ((TopkGlobalIndexResult) 
globalIndexResult).scoreGetter();
-                if (scoreFunction != null) {
-                    int size = expected.stream().mapToInt(r -> (int) (r.to - 
r.from + 1)).sum();
-                    scores = new float[size];
-
-                    int index = 0;
-                    for (Range range : expected) {
-                        for (long i = range.from; i <= range.to; i++) {
-                            scores[index++] = scoreFunction.score(i);
-                        }
+            if (scoreGetter != null) {
+                int size = expected.stream().mapToInt(r -> (int) (r.to - 
r.from + 1)).sum();
+                scores = new float[size];
+
+                int index = 0;
+                for (Range range : expected) {
+                    for (long i = range.from; i <= range.to; i++) {
+                        scores[index++] = scoreGetter.score(i);
                     }
                 }
             }
@@ -236,9 +244,4 @@ public class GlobalIndexBatchScan implements InnerTableScan 
{
         }
         return () -> indexedSplits;
     }
-
-    @Override
-    public List<PartitionEntry> listPartitionEntries() {
-        return batchScan.listPartitionEntries();
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 9b1f8692bb..917301a2e5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -241,7 +241,7 @@ public class PrivilegedFileStore<T> implements FileStore<T> 
{
     }
 
     @Override
-    public GlobalIndexScanBuilder newIndexScanBuilder() {
-        return wrapped.newIndexScanBuilder();
+    public GlobalIndexScanBuilder newGlobalIndexScanBuilder() {
+        return wrapped.newGlobalIndexScanBuilder();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index e1ec6ff6c4..c7d9952ada 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.globalindex.DataEvolutionBatchScan;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
@@ -45,6 +46,7 @@ import org.apache.paimon.table.sink.RowKindGenerator;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.WriteSelector;
 import org.apache.paimon.table.source.DataTableBatchScan;
+import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.DataTableStreamScan;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.StreamDataTableScan;
@@ -273,13 +275,18 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     }
 
     @Override
-    public DataTableBatchScan newScan() {
-        return new DataTableBatchScan(
-                tableSchema,
-                schemaManager(),
-                coreOptions(),
-                newSnapshotReader(),
-                catalogEnvironment.tableQueryAuth(coreOptions()));
+    public DataTableScan newScan() {
+        DataTableBatchScan scan =
+                new DataTableBatchScan(
+                        tableSchema,
+                        schemaManager(),
+                        coreOptions(),
+                        newSnapshotReader(),
+                        catalogEnvironment.tableQueryAuth(coreOptions()));
+        if (coreOptions().dataEvolutionEnabled()) {
+            return new DataEvolutionBatchScan(this, scan);
+        }
+        return scan;
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index a543d0fbbc..b07465a258 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -23,7 +23,6 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
 import org.apache.paimon.operation.LocalOrphanFilesClean;
 import org.apache.paimon.options.ExpireConfig;
 import org.apache.paimon.schema.TableSchema;
@@ -131,11 +130,6 @@ public interface FileStoreTable extends DataTable {
 
     RowKeyExtractor createRowKeyExtractor();
 
-    /** Returns a new global index scan builder. */
-    default GlobalIndexScanBuilder newIndexScanBuilder() {
-        return store().newIndexScanBuilder();
-    }
-
     /**
      * Get {@link DataTable} with branch identified by {@code branchName}. 
Note that this method
      * does not keep dynamic options in current table.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index c33a3f58f1..100c696f4b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -179,10 +179,8 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
         return this;
     }
 
-    @Nullable
-    @Override
-    public PartitionPredicate partitionFilter() {
-        return snapshotReader.manifestsReader().partitionFilter();
+    public SnapshotReader snapshotReader() {
+        return snapshotReader;
     }
 
     public CoreOptions options() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index b4d147d885..ce05f9c2c3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -91,9 +91,4 @@ public interface InnerTableScan extends TableScan {
         // do nothing, should implement this if need
         return this;
     }
-
-    @Nullable
-    default PartitionPredicate partitionFilter() {
-        return null;
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index c40ef86554..056eed5a10 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -21,14 +21,11 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.data.variant.VariantAccessInfoUtils;
-import org.apache.paimon.globalindex.GlobalIndexBatchScan;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.TopN;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.InnerTable;
-import org.apache.paimon.table.Table;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Range;
@@ -186,11 +183,7 @@ public class ReadBuilderImpl implements ReadBuilder {
 
     @Override
     public TableScan newScan() {
-        InnerTableScan tableScan = table.newScan();
-        if (searchGlobalIndex(table)) {
-            tableScan = new GlobalIndexBatchScan((FileStoreTable) table, 
tableScan);
-        }
-        tableScan = configureScan(tableScan);
+        InnerTableScan tableScan = configureScan(table.newScan());
         if (limit != null) {
             tableScan.withLimit(limit);
         }
@@ -239,13 +232,6 @@ public class ReadBuilderImpl implements ReadBuilder {
         return scan;
     }
 
-    private boolean searchGlobalIndex(Table table) {
-        return table instanceof FileStoreTable
-                && ((FileStoreTable) 
table).coreOptions().dataEvolutionEnabled()
-                && (((FileStoreTable) table).coreOptions().globalIndexEnabled()
-                        || this.rowRanges != null);
-    }
-
     @Override
     public TableRead newRead() {
         InnerTableRead read = table.newRead().withFilter(filter);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index de941ed6ff..8845e22de9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -977,7 +977,7 @@ public class DataEvolutionTableTest extends TableTestBase {
 
     private RoaringNavigableMap64 globalIndexScan(FileStoreTable table, 
Predicate predicate)
             throws Exception {
-        GlobalIndexScanBuilder indexScanBuilder = table.newIndexScanBuilder();
+        GlobalIndexScanBuilder indexScanBuilder = 
table.store().newGlobalIndexScanBuilder();
         List<Range> ranges = indexScanBuilder.shardList();
         GlobalIndexResult globalFileIndexResult = 
GlobalIndexResult.createEmpty();
         for (Range range : ranges) {
diff --git 
a/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexerFactory.java
 
b/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexerFactory.java
index e5e66cf0b8..c195965750 100644
--- 
a/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexerFactory.java
+++ 
b/paimon-lucene/src/main/java/org/apache/paimon/lucene/index/LuceneVectorGlobalIndexerFactory.java
@@ -21,7 +21,7 @@ package org.apache.paimon.lucene.index;
 import org.apache.paimon.globalindex.GlobalIndexer;
 import org.apache.paimon.globalindex.GlobalIndexerFactory;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataField;
 
 /** Factory for creating Lucene vector index. */
 public class LuceneVectorGlobalIndexerFactory implements GlobalIndexerFactory {
@@ -34,7 +34,7 @@ public class LuceneVectorGlobalIndexerFactory implements 
GlobalIndexerFactory {
     }
 
     @Override
-    public GlobalIndexer create(DataType type, Options options) {
-        return new LuceneVectorGlobalIndexer(type, options);
+    public GlobalIndexer create(DataField field, Options options) {
+        return new LuceneVectorGlobalIndexer(field.type(), options);
     }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
index 11eeeab65a..eba8d51625 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
@@ -56,7 +56,7 @@ class CreateGlobalIndexProcedureTest extends 
PaimonSparkTestBase with StreamTest
         .scanEntries()
         .asScala
         .filter(_.indexFile().indexType() == "bitmap")
-      table.newIndexScanBuilder().shardList()
+      table.store().newGlobalIndexScanBuilder().shardList()
       assert(bitmapEntries.nonEmpty)
       val totalRowCount = bitmapEntries.map(_.indexFile().rowCount()).sum
       assert(totalRowCount == 100000L)

Reply via email to