This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 22bc3abc86 [spark] support building BTree index through procedure 
(#6956)
22bc3abc86 is described below

commit 22bc3abc86dd7458087031462c9c9353c5318e23
Author: Faiz <[email protected]>
AuthorDate: Wed Jan 7 14:18:32 2026 +0800

    [spark] support building BTree index through procedure (#6956)
---
 .../globalindex/btree/BTreeIndexOptions.java       |  14 +-
 ....apache.paimon.globalindex.GlobalIndexerFactory |   1 +
 .../paimon/index/IndexFileMetaSerializer.java      |   2 +-
 .../manifest/IndexManifestEntrySerializer.java     |   2 +-
 .../globalindex/DefaultGlobalIndexBuilder.java     |  51 +++++
 .../spark/globalindex/GlobalIndexBuilder.java      |  57 +-----
 .../globalindex/GlobalIndexBuilderContext.java     |  42 ++--
 .../spark/globalindex/GlobalIndexTopoBuilder.java  |  12 +-
 .../globalindex/btree/BTreeGlobalIndexBuilder.java | 140 +++++++++++++
 .../btree/BTreeGlobalIndexBuilderFactory.java      |  46 +++++
 .../globalindex/btree/BTreeIndexTopoBuilder.java   | 184 +++++++++++++++++
 .../globalindex/btree/IndexFieldsExtractor.java    |  59 ++++++
 .../procedure/CreateGlobalIndexProcedure.java      |  65 ++++--
 ...mon.spark.globalindex.GlobalIndexBuilderFactory |   2 +-
 .../procedure/CreateGlobalIndexProcedureTest.scala | 218 +++++++++++++++++++++
 15 files changed, 805 insertions(+), 90 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
index acac020db3..8d0758a1fc 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
@@ -32,7 +32,7 @@ public class BTreeIndexOptions {
                     .withDescription("The compression algorithm to use for 
BTreeIndex");
 
     public static final ConfigOption<Integer> BTREE_INDEX_COMPRESSION_LEVEL =
-            ConfigOptions.key("btree-index.compression")
+            ConfigOptions.key("btree-index.compression-level")
                     .intType()
                     .defaultValue(1)
                     .withDescription("The compression level of the compression 
algorithm");
@@ -54,4 +54,16 @@ public class BTreeIndexOptions {
                     .doubleType()
                     .defaultValue(0.1)
                     .withDescription("The high priority pool ratio to use for 
BTreeIndex");
+
+    public static final ConfigOption<Long> BTREE_INDEX_RECORDS_PER_RANGE =
+            ConfigOptions.key("btree-index.records-per-range")
+                    .longType()
+                    .defaultValue(1000_000L)
+                    .withDescription("The expected number of records per BTree 
Index File.");
+
+    public static final ConfigOption<Integer> 
BTREE_INDEX_BUILD_MAX_PARALLELISM =
+            ConfigOptions.key("btree-index.build.max-parallelism")
+                    .intType()
+                    .defaultValue(4096)
+                    .withDescription("The max parallelism of Flink/Spark for 
building BTreeIndex.");
 }
diff --git 
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
 
b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
index e1e63b1057..4c3fe70db9 100644
--- 
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
+++ 
b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
@@ -14,3 +14,4 @@
 # limitations under the License.
 
 org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory
+org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory
\ No newline at end of file
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
index 9e3ead6909..6d98e61248 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
@@ -47,7 +47,7 @@ public class IndexFileMetaSerializer extends 
ObjectSerializer<IndexFileMeta> {
                                 globalIndexMeta.rowRangeStart(),
                                 globalIndexMeta.rowRangeEnd(),
                                 globalIndexMeta.indexFieldId(),
-                                globalIndexMeta.indexMeta() == null
+                                globalIndexMeta.extraFieldIds() == null
                                         ? null
                                         : new 
GenericArray(globalIndexMeta.extraFieldIds()),
                                 globalIndexMeta.indexMeta());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
index cd7a1efd84..98ab4df6f1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
@@ -54,7 +54,7 @@ public class IndexManifestEntrySerializer extends 
VersionedObjectSerializer<Inde
                                 globalIndexMeta.rowRangeStart(),
                                 globalIndexMeta.rowRangeEnd(),
                                 globalIndexMeta.indexFieldId(),
-                                globalIndexMeta.indexMeta() == null
+                                globalIndexMeta.extraFieldIds() == null
                                         ? null
                                         : new 
GenericArray(globalIndexMeta.extraFieldIds()),
                                 globalIndexMeta.indexMeta());
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
index d2aa11b35c..5ec81c7389 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
@@ -18,9 +18,60 @@
 
 package org.apache.paimon.spark.globalindex;
 
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.LongCounter;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
 /** Default {@link GlobalIndexBuilder}. */
 public class DefaultGlobalIndexBuilder extends GlobalIndexBuilder {
     public DefaultGlobalIndexBuilder(GlobalIndexBuilderContext context) {
         super(context);
     }
+
+    @Override
+    public List<CommitMessage> build(CloseableIterator<InternalRow> data) 
throws IOException {
+        LongCounter rowCounter = new LongCounter(0);
+        List<ResultEntry> resultEntries = writePaimonRows(data, rowCounter);
+        List<IndexFileMeta> indexFileMetas =
+                convertToIndexMeta(
+                        context.startOffset(),
+                        context.startOffset() + rowCounter.getValue() - 1,
+                        resultEntries);
+        DataIncrement dataIncrement = 
DataIncrement.indexIncrement(indexFileMetas);
+        return Collections.singletonList(
+                new CommitMessageImpl(
+                        context.partition(),
+                        0,
+                        null,
+                        dataIncrement,
+                        CompactIncrement.emptyIncrement()));
+    }
+
+    private List<ResultEntry> writePaimonRows(
+            CloseableIterator<InternalRow> rows, LongCounter rowCounter) 
throws IOException {
+        GlobalIndexSingletonWriter indexWriter = (GlobalIndexSingletonWriter) 
createIndexWriter();
+
+        InternalRow.FieldGetter getter =
+                InternalRow.createFieldGetter(
+                        context.indexField().type(),
+                        
context.readType().getFieldIndex(context.indexField().name()));
+        rows.forEachRemaining(
+                row -> {
+                    Object indexO = getter.getFieldOrNull(row);
+                    indexWriter.write(indexO);
+                    rowCounter.add(1);
+                });
+        return indexWriter.finish();
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
index 1bc3b1b8d1..621f5205ae 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
@@ -20,19 +20,13 @@ package org.apache.paimon.spark.globalindex;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
-import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
 import org.apache.paimon.globalindex.GlobalIndexer;
-import org.apache.paimon.globalindex.IndexedSplit;
 import org.apache.paimon.globalindex.ResultEntry;
 import org.apache.paimon.index.GlobalIndexMeta;
 import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.CommitMessageImpl;
-import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.CloseableIterator;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -47,35 +41,19 @@ public abstract class GlobalIndexBuilder {
         this.context = context;
     }
 
-    public CommitMessage build(IndexedSplit indexedSplit) throws IOException {
-        ReadBuilder builder = context.table().newReadBuilder();
-        builder.withReadType(context.readType());
-        RecordReader<InternalRow> rows = 
builder.newRead().createReader(indexedSplit);
-        LongCounter rowCounter = new LongCounter(0);
-        List<ResultEntry> resultEntries = writePaimonRows(context, rows, 
rowCounter);
-        List<IndexFileMeta> indexFileMetas =
-                convertToIndexMeta(context, rowCounter.getValue(), 
resultEntries);
-        DataIncrement dataIncrement = 
DataIncrement.indexIncrement(indexFileMetas);
-        return new CommitMessageImpl(
-                context.partition(), 0, null, dataIncrement, 
CompactIncrement.emptyIncrement());
-    }
+    public abstract List<CommitMessage> build(CloseableIterator<InternalRow> 
data)
+            throws IOException;
 
-    private static List<IndexFileMeta> convertToIndexMeta(
-            GlobalIndexBuilderContext context, long totalRowCount, 
List<ResultEntry> entries)
-            throws IOException {
+    protected List<IndexFileMeta> convertToIndexMeta(
+            long rangeStart, long rangeEnd, List<ResultEntry> entries) throws 
IOException {
         List<IndexFileMeta> results = new ArrayList<>();
-        long rangeEnd = context.startOffset() + totalRowCount - 1;
         for (ResultEntry entry : entries) {
             String fileName = entry.fileName();
             GlobalIndexFileReadWrite readWrite = 
context.globalIndexFileReadWrite();
             long fileSize = readWrite.fileSize(fileName);
             GlobalIndexMeta globalIndexMeta =
                     new GlobalIndexMeta(
-                            context.startOffset(),
-                            rangeEnd,
-                            context.indexField().id(),
-                            null,
-                            entry.meta());
+                            rangeStart, rangeEnd, context.indexField().id(), 
null, entry.meta());
             IndexFileMeta indexFileMeta =
                     new IndexFileMeta(
                             context.indexType(),
@@ -88,26 +66,9 @@ public abstract class GlobalIndexBuilder {
         return results;
     }
 
-    private static List<ResultEntry> writePaimonRows(
-            GlobalIndexBuilderContext context,
-            RecordReader<InternalRow> rows,
-            LongCounter rowCounter)
-            throws IOException {
+    protected GlobalIndexWriter createIndexWriter() throws IOException {
         GlobalIndexer globalIndexer =
                 GlobalIndexer.create(context.indexType(), 
context.indexField(), context.options());
-        GlobalIndexSingletonWriter globalIndexWriter =
-                (GlobalIndexSingletonWriter)
-                        
globalIndexer.createWriter(context.globalIndexFileReadWrite());
-        InternalRow.FieldGetter getter =
-                InternalRow.createFieldGetter(
-                        context.indexField().type(),
-                        
context.readType().getFieldIndex(context.indexField().name()));
-        rows.forEachRemaining(
-                row -> {
-                    Object indexO = getter.getFieldOrNull(row);
-                    globalIndexWriter.write(indexO);
-                    rowCounter.add(1);
-                });
-        return globalIndexWriter.finish();
+        return globalIndexer.createWriter(context.globalIndexFileReadWrite());
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
index 8e4a4d7fab..dbf2f9a74a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
@@ -27,6 +27,9 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -36,45 +39,57 @@ import java.io.Serializable;
  *
  * <p>This class is serializable to support Spark distributed execution. The 
partition is stored
  * both as a transient {@link BinaryRow} and as serialized bytes to ensure 
proper serialization
- * across executor nodes.
+ * across executor nodes. Partition info can be null if the actual index 
partition is dynamically
+ * extracted from data.
  */
 public class GlobalIndexBuilderContext implements Serializable {
 
     private final FileStoreTable table;
-    private final BinaryRowSerializer binaryRowSerializer;
-    private final byte[] partitionBytes;
+    @Nullable private final BinaryRowSerializer binaryRowSerializer;
+    @Nullable private final byte[] partitionBytes;
     private final RowType readType;
     private final DataField indexField;
     private final String indexType;
     private final long startOffset;
     private final Options options;
+    @Nullable private final Range fullRange;
 
     public GlobalIndexBuilderContext(
             FileStoreTable table,
-            BinaryRow partition,
+            @Nullable BinaryRow partition,
             RowType readType,
             DataField indexField,
             String indexType,
             long startOffset,
-            Options options) {
+            Options options,
+            @Nullable Range fullRange) {
         this.table = table;
         this.readType = readType;
         this.indexField = indexField;
         this.indexType = indexType;
         this.startOffset = startOffset;
         this.options = options;
+        this.fullRange = fullRange;
 
-        this.binaryRowSerializer = new 
BinaryRowSerializer(partition.getFieldCount());
-        try {
-            this.partitionBytes = 
binaryRowSerializer.serializeToBytes(partition);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        if (partition != null) {
+            this.binaryRowSerializer = new 
BinaryRowSerializer(partition.getFieldCount());
+            try {
+                this.partitionBytes = 
binaryRowSerializer.serializeToBytes(partition);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        } else {
+            this.binaryRowSerializer = null;
+            this.partitionBytes = null;
         }
     }
 
+    @Nullable
     public BinaryRow partition() {
         try {
-            return binaryRowSerializer.deserializeFromBytes(partitionBytes);
+            return partitionBytes == null || binaryRowSerializer == null
+                    ? null
+                    : binaryRowSerializer.deserializeFromBytes(partitionBytes);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -109,4 +124,9 @@ public class GlobalIndexBuilderContext implements 
Serializable {
         IndexPathFactory indexPathFactory = 
table.store().pathFactory().globalIndexFileFactory();
         return new GlobalIndexFileReadWrite(fileIO, indexPathFactory);
     }
+
+    @Nullable
+    public Range fullRange() {
+        return fullRange;
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopoBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopoBuilder.java
index dfa5916cd5..7e4d6ff8b8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopoBuilder.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopoBuilder.java
@@ -18,27 +18,27 @@
 
 package org.apache.paimon.spark.globalindex;
 
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.globalindex.IndexedSplit;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 
 /** User defined topology builder. */
 public interface GlobalIndexTopoBuilder {
 
     List<CommitMessage> buildIndex(
-            JavaSparkContext javaSparkContext,
+            SparkSession spark,
+            DataSourceV2Relation relation,
+            PartitionPredicate partitionPredicate,
             FileStoreTable table,
-            Map<BinaryRow, List<IndexedSplit>> preparedDS,
             String indexType,
             RowType readType,
             DataField indexField,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
new file mode 100644
index 0000000000..f9443b0bdc
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.btree;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The {@link GlobalIndexBuilder} implementation for BTree Index. The caller 
of {@link
+ * BTreeGlobalIndexBuilder#build(CloseableIterator) build} must ensure the 
input data is sorted by
+ * partitions and indexed field.
+ */
+public class BTreeGlobalIndexBuilder extends GlobalIndexBuilder {
+    private static final double FLOATING = 1.2;
+
+    private final IndexFieldsExtractor extractor;
+    private final long recordsPerRange;
+    private BinaryRow currentPart = null;
+    private GlobalIndexParallelWriter currentWriter = null;
+    private LongCounter counter = new LongCounter();
+
+    protected BTreeGlobalIndexBuilder(GlobalIndexBuilderContext context) {
+        super(context);
+        Preconditions.checkNotNull(
+                context.fullRange(), "Full range cannot be null for 
BTreeGlobalIndexBuilder.");
+
+        FileStoreTable table = context.table();
+        List<String> readColumns = new ArrayList<>(table.partitionKeys());
+        readColumns.addAll(context.readType().getFieldNames());
+        this.extractor =
+                new IndexFieldsExtractor(
+                        
SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns),
+                        table.partitionKeys(),
+                        context.indexField().name());
+
+        // Each partition boundary is derived from sampling, so we introduce a 
slack factor
+        // to avoid generating too many small files due to sampling variance.
+        this.recordsPerRange =
+                (long)
+                        
(context.options().get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE)
+                                * FLOATING);
+    }
+
+    @Override
+    public List<CommitMessage> build(CloseableIterator<InternalRow> data) 
throws IOException {
+        List<CommitMessage> commitMessages = new ArrayList<>();
+
+        while (data.hasNext()) {
+            InternalRow row = data.next();
+
+            BinaryRow partRow = extractor.extractPartition(row);
+            // may flush last part data
+            // this is correct only if the input is sorted by <partition, 
indexedField>
+            if (currentPart != null && !partRow.equals(currentPart)
+                    || counter.getValue() >= recordsPerRange) {
+                flushIndex(commitMessages);
+            }
+
+            createWriterIfNeeded();
+
+            // write <value, rowId> pair to index file
+            currentPart = partRow;
+            counter.add(1);
+            currentWriter.write(extractor.extractIndexField(row), 
extractor.extractRowId(row));
+        }
+
+        flushIndex(commitMessages);
+
+        return commitMessages;
+    }
+
+    private void flushIndex(List<CommitMessage> resultMessages) throws 
IOException {
+        if (counter.getValue() == 0 || currentWriter == null || currentPart == 
null) {
+            return;
+        }
+
+        List<ResultEntry> resultEntries = currentWriter.finish();
+        List<IndexFileMeta> fileMetas =
+                convertToIndexMeta(context.fullRange().from, 
context.fullRange().to, resultEntries);
+        DataIncrement dataIncrement = DataIncrement.indexIncrement(fileMetas);
+        CommitMessage commitMessage =
+                new CommitMessageImpl(
+                        currentPart, 0, null, dataIncrement, 
CompactIncrement.emptyIncrement());
+
+        // reset writer
+        currentWriter = null;
+        currentPart = null;
+        counter.reset();
+
+        resultMessages.add(commitMessage);
+    }
+
+    private void createWriterIfNeeded() throws IOException {
+        if (currentWriter == null) {
+            GlobalIndexWriter indexWriter = createIndexWriter();
+            if (!(indexWriter instanceof GlobalIndexParallelWriter)) {
+                throw new RuntimeException(
+                        "Unexpected implementation, the index writer of BTree 
should be an instance of GlobalIndexParallelWriter, but found: "
+                                + indexWriter.getClass().getName());
+            }
+            currentWriter = (GlobalIndexParallelWriter) indexWriter;
+        }
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilderFactory.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilderFactory.java
new file mode 100644
index 0000000000..38f0997915
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilderFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.btree;
+
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory;
+import org.apache.paimon.spark.globalindex.GlobalIndexTopoBuilder;
+
+/** The {@link GlobalIndexBuilderFactory} implementation for BTree index type. 
*/
+public class BTreeGlobalIndexBuilderFactory implements 
GlobalIndexBuilderFactory {
+
+    // keep this identifier consistent with BTreeGlobalIndexerFactory
+    private static final String IDENTIFIER = "btree";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public GlobalIndexBuilder create(GlobalIndexBuilderContext context) {
+        return new BTreeGlobalIndexBuilder(context);
+    }
+
+    @Override
+    public GlobalIndexTopoBuilder createTopoBuilder() {
+        return new BTreeIndexTopoBuilder();
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
new file mode 100644
index 0000000000..e898a0e168
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.btree;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.spark.SparkRow;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactoryUtils;
+import org.apache.paimon.spark.globalindex.GlobalIndexTopoBuilder;
+import org.apache.paimon.spark.util.ScanPlanHelper$;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageSerializer;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.Range;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.PaimonUtils;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.functions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/** The {@link GlobalIndexTopoBuilder} for BTree index. */
+public class BTreeIndexTopoBuilder implements GlobalIndexTopoBuilder {
+
+    @Override
+    public List<CommitMessage> buildIndex(
+            SparkSession spark,
+            DataSourceV2Relation relation,
+            PartitionPredicate partitionPredicate,
+            FileStoreTable table,
+            String indexType,
+            RowType readType,
+            DataField indexField,
+            Options options)
+            throws IOException {
+
+        // 1. read the whole dataset of target partitions
+        SnapshotReader snapshotReader = table.newSnapshotReader();
+        if (partitionPredicate != null) {
+            snapshotReader = 
snapshotReader.withPartitionFilter(partitionPredicate);
+        }
+
+        List<DataSplit> dataSplits = snapshotReader.read().dataSplits();
+        Range fullRange = calcRowRange(dataSplits);
+        if (dataSplits.isEmpty() || fullRange == null) {
+            return Collections.emptyList();
+        }
+
+        // we need to read all partition columns for shuffle
+        List<String> selectedColumns = new ArrayList<>();
+        selectedColumns.addAll(table.partitionKeys());
+        selectedColumns.addAll(readType.getFieldNames());
+
+        Dataset<Row> source =
+                PaimonUtils.createDataset(
+                        spark,
+                        ScanPlanHelper$.MODULE$.createNewScanPlan(
+                                dataSplits.toArray(new DataSplit[0]), 
relation));
+
+        Dataset<Row> selected =
+                
source.select(selectedColumns.stream().map(functions::col).toArray(Column[]::new));
+
+        // 2. shuffle and sort by partitions and index keys
+        Column[] sortFields =
+                selectedColumns.stream()
+                        .filter(name -> 
!SpecialFields.ROW_ID.name().equals(name))
+                        .map(functions::col)
+                        .toArray(Column[]::new);
+
+        long recordsPerRange = 
options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
+        // this should be superfast since append only table can utilize 
count-start pushdown well.
+        long rowCount = source.count();
+        int partitionNum = Math.max((int) (rowCount / recordsPerRange), 1);
+        int maxParallelism = 
options.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
+        partitionNum = Math.min(partitionNum, maxParallelism);
+
+        // For efficiency, we do not repartition within each paimon partition. 
Instead, we directly
+        // divide ranges by <partitions, index field>, and each subtask is 
expected to process
+        // records from multiple partitions. The drawback is that if a Paimon 
partition spans
+        // multiple Spark partitions, the first and last output files may 
contain relatively few
+        // records.
+        Dataset<Row> partitioned =
+                selected.repartitionByRange(partitionNum, sortFields)
+                        .sortWithinPartitions(sortFields);
+
+        // 3. write index for each partition & range
+        final GlobalIndexBuilderContext context =
+                new GlobalIndexBuilderContext(
+                        table, null, readType, indexField, indexType, 0, 
options, fullRange);
+        final RowType rowType =
+                
SpecialFields.rowTypeWithRowId(table.rowType()).project(selectedColumns);
+        JavaRDD<byte[]> written =
+                partitioned
+                        .javaRDD()
+                        .map(row -> (InternalRow) (new SparkRow(rowType, row)))
+                        .mapPartitions(
+                                (FlatMapFunction<Iterator<InternalRow>, 
byte[]>)
+                                        iter -> {
+                                            CommitMessageSerializer 
commitMessageSerializer =
+                                                    new 
CommitMessageSerializer();
+
+                                            GlobalIndexBuilder 
globalIndexBuilder =
+                                                    
GlobalIndexBuilderFactoryUtils
+                                                            
.createIndexBuilder(context);
+
+                                            List<CommitMessage> commitMessages 
=
+                                                    globalIndexBuilder.build(
+                                                            
CloseableIterator.adapterForIterator(
+                                                                    iter));
+                                            List<byte[]> messageBytes = new 
ArrayList<>();
+
+                                            for (CommitMessage commitMessage : 
commitMessages) {
+                                                messageBytes.add(
+                                                        
commitMessageSerializer.serialize(
+                                                                
commitMessage));
+                                            }
+
+                                            return messageBytes.iterator();
+                                        });
+
+        // 4. collect all commit messages and return
+        List<byte[]> commitBytes = written.collect();
+        List<CommitMessage> result = new ArrayList<>();
+        CommitMessageSerializer commitMessageSerializer = new 
CommitMessageSerializer();
+        for (byte[] commitByte : commitBytes) {
+            result.add(
+                    commitMessageSerializer.deserialize(
+                            commitMessageSerializer.getVersion(), commitByte));
+        }
+
+        return result;
+    }
+
+    private Range calcRowRange(List<DataSplit> dataSplits) {
+        long start = Long.MAX_VALUE;
+        long end = Long.MIN_VALUE;
+        for (DataSplit dataSplit : dataSplits) {
+            for (DataFileMeta file : dataSplit.dataFiles()) {
+                if (file.firstRowId() != null) {
+                    start = Math.min(start, file.firstRowId());
+                    end = Math.max(end, file.firstRowId() + file.rowCount());
+                }
+            }
+        }
+        return start == Long.MAX_VALUE ? null : new Range(start, end);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/IndexFieldsExtractor.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/IndexFieldsExtractor.java
new file mode 100644
index 0000000000..bbe328d9d5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/IndexFieldsExtractor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.btree;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** The extractor to get partition, index field and row id from records. */
+public class IndexFieldsExtractor {
+    private final Projection partitionProjection;
+    private final InternalRow.FieldGetter indexFieldGetter;
+    private final int rowIdPos;
+
+    public IndexFieldsExtractor(RowType readType, List<String> partitionKeys, 
String indexField) {
+        this.partitionProjection = CodeGenUtils.newProjection(readType, 
partitionKeys);
+        int indexFieldPos = readType.getFieldIndex(indexField);
+        this.indexFieldGetter =
+                
InternalRow.createFieldGetter(readType.getTypeAt(indexFieldPos), indexFieldPos);
+        this.rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name());
+    }
+
+    public BinaryRow extractPartition(InternalRow record) {
+        // projection will reuse returning record, copy is necessary
+        return partitionProjection.apply(record).copy();
+    }
+
+    @Nullable
+    public Object extractIndexField(InternalRow record) {
+        return indexFieldGetter.getFieldOrNull(record);
+    }
+
+    public Long extractRowId(InternalRow record) {
+        return record.getLong(rowIdPos);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
index 48a5b9d7b6..9b7c3e98be 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
@@ -25,6 +25,7 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
 import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
 import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactoryUtils;
@@ -36,11 +37,14 @@ import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageSerializer;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.InstantiationUtil;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.ProcedureUtils;
 import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.StringUtils;
@@ -164,33 +168,39 @@ public class CreateGlobalIndexProcedure extends 
BaseProcedure {
                         ProcedureUtils.putAllOptions(parsedOptions, 
optionString);
                         Options userOptions = Options.fromMap(parsedOptions);
                         Options tableOptions = new Options(table.options());
-                        long rowsPerShard =
-                                tableOptions
-                                        
.getOptional(GLOBAL_INDEX_ROW_COUNT_PER_SHARD)
-                                        
.orElse(GLOBAL_INDEX_ROW_COUNT_PER_SHARD.defaultValue());
-                        checkArgument(
-                                rowsPerShard > 0,
-                                "Option 'global-index.row-count-per-shard' 
must be greater than 0.");
-
-                        // Step 1: generate splits for each partition&&shard
-                        Map<BinaryRow, List<IndexedSplit>> splits =
-                                split(table, partitionPredicate, rowsPerShard);
 
                         List<CommitMessage> indexResults;
-                        // Step 2: build index by certain index system
-                        GlobalIndexTopoBuilder topoBuildr =
+                        // Step 1: build index by certain index system
+                        GlobalIndexTopoBuilder topoBuilder =
                                 
GlobalIndexBuilderFactoryUtils.createTopoBuilder(indexType);
-                        if (topoBuildr != null) {
+
+                        if (topoBuilder != null) {
+                            // do not need to prepare index shards for custom 
topo builder
                             indexResults =
-                                    topoBuildr.buildIndex(
-                                            new 
JavaSparkContext(spark().sparkContext()),
+                                    topoBuilder.buildIndex(
+                                            spark(),
+                                            relation,
+                                            partitionPredicate,
                                             table,
-                                            splits,
                                             indexType,
                                             readRowType,
                                             indexField,
                                             userOptions);
                         } else {
+                            long rowsPerShard =
+                                    tableOptions
+                                            
.getOptional(GLOBAL_INDEX_ROW_COUNT_PER_SHARD)
+                                            .orElse(
+                                                    
GLOBAL_INDEX_ROW_COUNT_PER_SHARD
+                                                            .defaultValue());
+                            checkArgument(
+                                    rowsPerShard > 0,
+                                    "Option 'global-index.row-count-per-shard' 
must be greater than 0.");
+
+                            // generate splits for each partition&&shard
+                            Map<BinaryRow, List<IndexedSplit>> splits =
+                                    split(table, partitionPredicate, 
rowsPerShard);
+
                             indexResults =
                                     buildIndex(
                                             table,
@@ -201,7 +211,7 @@ public class CreateGlobalIndexProcedure extends 
BaseProcedure {
                                             userOptions);
                         }
 
-                        // Step 3: commit index meta to a new snapshot
+                        // Step 2: commit index meta to a new snapshot
                         commit(table, indexResults);
 
                         return new InternalRow[] {newInternalRow(true)};
@@ -241,7 +251,8 @@ public class CreateGlobalIndexProcedure extends 
BaseProcedure {
                                 indexField,
                                 indexType,
                                 indexedSplit.rowRanges().get(0).from,
-                                options);
+                                options,
+                                null);
 
                 byte[] dsBytes = 
InstantiationUtil.serializeObject(indexedSplit);
                 taskList.add(Pair.of(builderContext, dsBytes));
@@ -264,8 +275,20 @@ public class CreateGlobalIndexProcedure extends 
BaseProcedure {
                                     GlobalIndexBuilder globalIndexBuilder =
                                             
GlobalIndexBuilderFactoryUtils.createIndexBuilder(
                                                     builderContext);
-                                    return commitMessageSerializer.serialize(
-                                            globalIndexBuilder.build(split));
+                                    ReadBuilder builder = 
builderContext.table().newReadBuilder();
+                                    
builder.withReadType(builderContext.readType());
+
+                                    try 
(RecordReader<org.apache.paimon.data.InternalRow>
+                                                    recordReader =
+                                                            
builder.newRead().createReader(split);
+                                            
CloseableIterator<org.apache.paimon.data.InternalRow>
+                                                    data = 
recordReader.toCloseableIterator()) {
+                                        List<CommitMessage> commitMessage =
+                                                globalIndexBuilder.build(data);
+                                        
Preconditions.checkState(commitMessage.size() == 1);
+                                        return 
commitMessageSerializer.serialize(
+                                                commitMessage.get(0));
+                                    }
                                 })
                         .collect();
 
diff --git 
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
 
b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
similarity index 91%
copy from 
paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
copy to 
paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
index e1e63b1057..e99a1bdb2e 100644
--- 
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
+++ 
b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory
+org.apache.paimon.spark.globalindex.btree.BTreeGlobalIndexBuilderFactory
\ No newline at end of file
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
index e4ecff618a..23ed2921c6 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
@@ -18,7 +18,10 @@
 
 package org.apache.paimon.spark.procedure
 
+import org.apache.paimon.globalindex.btree.{BTreeIndexMeta, KeySerializer}
+import org.apache.paimon.memory.MemorySlice
 import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.types.VarCharType
 import org.apache.paimon.utils.Range
 
 import org.apache.spark.sql.streaming.StreamTest
@@ -137,4 +140,219 @@ class CreateGlobalIndexProcedureTest extends 
PaimonSparkTestBase with StreamTest
       assert(totalRowCount == 189088L)
     }
   }
+
+  test("create btree global index") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, name STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true',
+                  |  'btree-index.records-per-range' = '1000')
+                  |""".stripMargin)
+
+      val values =
+        (0 until 100000).map(i => s"($i, 'name_$i')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output =
+        spark
+          .sql(
+            "CALL sys.create_global_index(table => 'test.T', index_column => 
'name', index_type => 'btree'," +
+              " options => 'btree-index.records-per-range=1000')")
+          .collect()
+          .head
+
+      assert(output.getBoolean(0))
+      val table = loadTable("T")
+      val btreeEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == "btree")
+        .map(_.indexFile())
+      table.store().newGlobalIndexScanBuilder().shardList()
+      assert(btreeEntries.nonEmpty)
+
+      // 1. assert total row count and file count
+      val totalRowCount = btreeEntries.map(_.rowCount()).sum
+      assert(btreeEntries.size == 100)
+      assert(totalRowCount == 100000L)
+
+      // 2. assert global index meta not null
+      btreeEntries.foreach(e => assert(e.globalIndexMeta() != null))
+
+      // 3. assert btree index file range non-overlapping
+      case class MetaWithKey(meta: BTreeIndexMeta, first: Object, last: Object)
+      val keySerializer = KeySerializer.create(new VarCharType())
+      val comparator = keySerializer.createComparator()
+
+      def deserialize(bytes: Array[Byte]): Object = {
+        keySerializer.deserialize(MemorySlice.wrap(bytes))
+      }
+
+      val btreeMetas = btreeEntries
+        .map(_.globalIndexMeta().indexMeta())
+        .map(meta => BTreeIndexMeta.deserialize(meta))
+        .map(
+          m => {
+            assert(m.getFirstKey != null)
+            assert(m.getLastKey != null)
+            MetaWithKey(m, deserialize(m.getFirstKey), 
deserialize(m.getLastKey))
+          })
+
+      // sort by first key
+      val sorted = btreeMetas.sortWith((m1, m2) => 
comparator.compare(m1.first, m2.first) < 0)
+
+      // should not overlap
+      sorted.sliding(2).foreach {
+        case Seq(prev: MetaWithKey, next: MetaWithKey) =>
+          assert(comparator.compare(prev.last, next.first) <= 0)
+        case _ => // ignore
+      }
+    }
+  }
+
+  test("create btree global index with multiple partitions") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, name STRING, pt STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |  PARTITIONED BY (pt)
+                  |""".stripMargin)
+
+      var values =
+        (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 22222).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 100).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 100).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 33333).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 33333).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output =
+        spark
+          .sql(
+            "CALL sys.create_global_index(table => 'test.T', index_column => 
'name', index_type => 'btree'," +
+              " options => 'btree-index.records-per-range=1000')")
+          .collect()
+          .head
+
+      assert(output.getBoolean(0))
+
+      assertMultiplePartitionsResult("T", 189088L, 3)
+    }
+  }
+
+  test("create btree index within one spark partition") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, name STRING, pt STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |  PARTITIONED BY (pt)
+                  |""".stripMargin)
+
+      var values =
+        (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      // force output parallelism = 1
+      val output =
+        spark
+          .sql("CALL sys.create_global_index(table => 'test.T', index_column 
=> 'name', index_type => 'btree'," +
+            " options => 
'btree-index.records-per-range=1000,btree-index.build.max-parallelism=1')")
+          .collect()
+          .head
+
+      assert(output.getBoolean(0))
+
+      assertMultiplePartitionsResult("T", 100000L, 2)
+    }
+  }
+
+  private def assertMultiplePartitionsResult(
+      tableName: String,
+      rowCount: Long,
+      partCount: Int
+  ): Unit = {
+    val table = loadTable(tableName)
+    val btreeEntries = table
+      .store()
+      .newIndexFileHandler()
+      .scanEntries()
+      .asScala
+      .filter(_.indexFile().indexType() == "btree")
+    table.store().newGlobalIndexScanBuilder().shardList()
+    assert(btreeEntries.nonEmpty)
+
+    // 1. assert total row count
+    val totalRowCount = btreeEntries.map(_.indexFile().rowCount()).sum
+    assert(totalRowCount == rowCount)
+
+    // 2. assert global index meta not null
+    btreeEntries.foreach(e => assert(e.indexFile().globalIndexMeta() != null))
+
+    // 3. assert non-overlapped within each partition
+    val entriesByPart = btreeEntries.groupBy(_.partition())
+    assert(entriesByPart.size == partCount)
+
+    case class MetaWithKey(meta: BTreeIndexMeta, first: Object, last: Object)
+    val keySerializer = KeySerializer.create(new VarCharType())
+    val comparator = keySerializer.createComparator()
+
+    def deserialize(bytes: Array[Byte]): Object = {
+      keySerializer.deserialize(MemorySlice.wrap(bytes))
+    }
+
+    for ((k, v) <- entriesByPart) {
+      val metas = v
+        .map(_.indexFile().globalIndexMeta().indexMeta())
+        .map(bytes => BTreeIndexMeta.deserialize(bytes))
+        .map(
+          m => {
+            assert(m.getFirstKey != null)
+            assert(m.getLastKey != null)
+            MetaWithKey(m, deserialize(m.getFirstKey), 
deserialize(m.getLastKey))
+          })
+
+      val sorted = metas.sortWith((m1, m2) => comparator.compare(m1.first, 
m2.first) < 0)
+
+      // should not overlap
+      sorted.sliding(2).foreach {
+        case Seq(prev: MetaWithKey, next: MetaWithKey) =>
+          assert(
+            comparator.compare(prev.last, next.first) <= 0,
+            s"Found overlap for partition ${k.getString(0).toString}. The last 
key ${prev.last}, next first key ${next.first}"
+          )
+        case _ => // ignore
+      }
+    }
+  }
 }

Reply via email to