This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e14a811106 [spark] Introduce global file index builder on spark (#6684)
e14a811106 is described below

commit e14a8111063de4108a26f8a3e880bd15aae7af08
Author: YeJunHao <[email protected]>
AuthorDate: Thu Nov 27 19:52:12 2025 +0800

    [spark] Introduce global file index builder on spark (#6684)
---
 .../org/apache/paimon/table/SpecialFields.java     |  16 +
 .../paimon/globalindex/GlobalIndexReader.java      |   9 +-
 .../paimon/globalindex/GlobalIndexWriter.java      |   2 +-
 .../globalindex/wrap/FileIndexReaderWrapper.java   |  53 ++-
 .../main/java/org/apache/paimon/utils/Range.java   |  13 +-
 .../bitmapindex/BitmapGlobalIndexTest.java         |  49 ++-
 .../paimon/globalindex/GlobalIndexEvaluator.java   |  17 +-
 .../globalindex/GlobalIndexFileReadWrite.java      |   4 +
 .../paimon/manifest/IndexManifestFileHandler.java  |  35 ++
 .../apache/paimon/table/source/ReadBuilder.java    |   1 +
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 .../spark/globalindex/GlobalIndexBuilder.java      | 144 ++++++++
 .../globalindex/GlobalIndexBuilderContext.java     | 129 +++++++
 .../globalindex/GlobalIndexBuilderFactory.java     |  27 ++
 .../GlobalIndexBuilderFactoryUtils.java            |  73 ++++
 .../bitmap/BitmapGlobalIndexBuilder.java           |  35 ++
 .../bitmap/BitmapGlobalIndexBuilderFactory.java    |  43 +++
 .../procedure/CreateGlobalIndexProcedure.java      | 400 +++++++++++++++++++++
 ...mon.spark.globalindex.GlobalIndexBuilderFactory |  18 +
 .../procedure/CreateGlobalIndexProcedureTest.java  | 321 +++++++++++++++++
 .../procedure/CreateGlobalIndexProcedureTest.scala |  65 ++++
 21 files changed, 1384 insertions(+), 72 deletions(-)

diff --git 
a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java 
b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
index d89776747b..182682637e 100644
--- a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
+++ b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
@@ -168,4 +168,20 @@ public class SpecialFields {
                         : SpecialFields.SEQUENCE_NUMBER);
         return new RowType(fieldsWithRowTracking);
     }
+
+    public static RowType rowTypeWithRowId(RowType rowType) {
+        List<DataField> fieldsWithRowTracking = new 
ArrayList<>(rowType.getFields());
+
+        fieldsWithRowTracking.forEach(
+                f -> {
+                    if (ROW_ID.name().equals(f.name())) {
+                        throw new IllegalArgumentException(
+                                "Row tracking field name '"
+                                        + f.name()
+                                        + "' conflicts with existing field 
names.");
+                    }
+                });
+        fieldsWithRowTracking.add(SpecialFields.ROW_ID);
+        return new RowType(fieldsWithRowTracking);
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
index 2339634243..2ab40805f8 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
@@ -23,23 +23,22 @@ import org.apache.paimon.predicate.TransformPredicate;
 
 import java.io.Closeable;
 import java.util.List;
-import java.util.Optional;
 
 /** Index reader for global index, return {@link GlobalIndexResult}. */
-public interface GlobalIndexReader extends 
FunctionVisitor<Optional<GlobalIndexResult>>, Closeable {
+public interface GlobalIndexReader extends FunctionVisitor<GlobalIndexResult>, 
Closeable {
 
     @Override
-    default Optional<GlobalIndexResult> 
visitAnd(List<Optional<GlobalIndexResult>> children) {
+    default GlobalIndexResult visitAnd(List<GlobalIndexResult> children) {
         throw new UnsupportedOperationException("Should not invoke this");
     }
 
     @Override
-    default Optional<GlobalIndexResult> 
visitOr(List<Optional<GlobalIndexResult>> children) {
+    default GlobalIndexResult visitOr(List<GlobalIndexResult> children) {
         throw new UnsupportedOperationException("Should not invoke this");
     }
 
     @Override
-    default Optional<GlobalIndexResult> visit(TransformPredicate predicate) {
+    default GlobalIndexResult visit(TransformPredicate predicate) {
         throw new UnsupportedOperationException("Should not invoke this");
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
index d04d71aa5e..4dd89eb6b3 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
@@ -27,7 +27,7 @@ import java.util.List;
 /** Index writer for global index. */
 public interface GlobalIndexWriter {
 
-    void write(Object key);
+    void write(@Nullable Object key);
 
     List<ResultEntry> finish();
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexReaderWrapper.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexReaderWrapper.java
index 416b355c36..8a10911ee9 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexReaderWrapper.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexReaderWrapper.java
@@ -27,7 +27,6 @@ import org.apache.paimon.predicate.FieldRef;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
 import java.util.function.Function;
 
 /** A {@link GlobalIndexReader} wrapper for {@link FileIndexReader}. */
@@ -47,68 +46,68 @@ public class FileIndexReaderWrapper implements 
GlobalIndexReader {
     }
 
     @Override
-    public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
-        return 
Optional.ofNullable(transform.apply(reader.visitIsNotNull(fieldRef)));
+    public GlobalIndexResult visitIsNotNull(FieldRef fieldRef) {
+        return transform.apply(reader.visitIsNotNull(fieldRef));
     }
 
     @Override
-    public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
-        return 
Optional.ofNullable(transform.apply(reader.visitIsNull(fieldRef)));
+    public GlobalIndexResult visitIsNull(FieldRef fieldRef) {
+        return transform.apply(reader.visitIsNull(fieldRef));
     }
 
     @Override
-    public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef, 
Object literal) {
-        return 
Optional.ofNullable(transform.apply(reader.visitStartsWith(fieldRef, literal)));
+    public GlobalIndexResult visitStartsWith(FieldRef fieldRef, Object 
literal) {
+        return transform.apply(reader.visitStartsWith(fieldRef, literal));
     }
 
     @Override
-    public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object 
literal) {
-        return 
Optional.ofNullable(transform.apply(reader.visitEndsWith(fieldRef, literal)));
+    public GlobalIndexResult visitEndsWith(FieldRef fieldRef, Object literal) {
+        return transform.apply(reader.visitEndsWith(fieldRef, literal));
     }
 
     @Override
-    public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object 
literal) {
-        return 
Optional.ofNullable(transform.apply(reader.visitContains(fieldRef, literal)));
+    public GlobalIndexResult visitContains(FieldRef fieldRef, Object literal) {
+        return transform.apply(reader.visitContains(fieldRef, literal));
     }
 
     @Override
-    public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object 
literal) {
-        return 
Optional.ofNullable(transform.apply(reader.visitLessThan(fieldRef, literal)));
+    public GlobalIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
+        return transform.apply(reader.visitLessThan(fieldRef, literal));
     }
 
     @Override
-    public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef, 
Object literal) {
-        return 
Optional.ofNullable(transform.apply(reader.visitGreaterOrEqual(fieldRef, 
literal)));
+    public GlobalIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object 
literal) {
+        return transform.apply(reader.visitGreaterOrEqual(fieldRef, literal));
     }
 
     @Override
-    public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object 
literal) {
-        return 
Optional.ofNullable(transform.apply(reader.visitNotEqual(fieldRef, literal)));
+    public GlobalIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
+        return transform.apply(reader.visitNotEqual(fieldRef, literal));
     }
 
     @Override
-    public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef, 
Object literal) {
-        return 
Optional.ofNullable(transform.apply(reader.visitLessOrEqual(fieldRef, 
literal)));
+    public GlobalIndexResult visitLessOrEqual(FieldRef fieldRef, Object 
literal) {
+        return transform.apply(reader.visitLessOrEqual(fieldRef, literal));
     }
 
     @Override
-    public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object 
literal) {
-        return Optional.ofNullable(transform.apply(reader.visitEqual(fieldRef, 
literal)));
+    public GlobalIndexResult visitEqual(FieldRef fieldRef, Object literal) {
+        return transform.apply(reader.visitEqual(fieldRef, literal));
     }
 
     @Override
-    public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef, 
Object literal) {
-        return 
Optional.ofNullable(transform.apply(reader.visitGreaterThan(fieldRef, 
literal)));
+    public GlobalIndexResult visitGreaterThan(FieldRef fieldRef, Object 
literal) {
+        return transform.apply(reader.visitGreaterThan(fieldRef, literal));
     }
 
     @Override
-    public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object> 
literals) {
-        return Optional.ofNullable(transform.apply(reader.visitIn(fieldRef, 
literals)));
+    public GlobalIndexResult visitIn(FieldRef fieldRef, List<Object> literals) 
{
+        return transform.apply(reader.visitIn(fieldRef, literals));
     }
 
     @Override
-    public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef, 
List<Object> literals) {
-        return Optional.ofNullable(transform.apply(reader.visitNotIn(fieldRef, 
literals)));
+    public GlobalIndexResult visitNotIn(FieldRef fieldRef, List<Object> 
literals) {
+        return transform.apply(reader.visitNotIn(fieldRef, literals));
     }
 
     @Override
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
index 7d1640c361..2de1bc923d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
@@ -18,10 +18,13 @@
 
 package org.apache.paimon.utils;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
 
 /** Range represents from (inclusive) and to (inclusive). */
-public class Range {
+public class Range implements Serializable {
 
     public final long from;
     public final long to;
@@ -49,6 +52,14 @@ public class Range {
         return from > other.to;
     }
 
+    public List<Long> toListLong() {
+        List<Long> longs = new ArrayList<>();
+        for (long i = from; i <= to; i++) {
+            longs.add(i);
+        }
+        return longs;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (o == null || getClass() != o.getClass()) {
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
index 6019def954..fe57146e51 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
@@ -93,26 +93,23 @@ public class BitmapGlobalIndexTest {
                                 writer.write(o);
                             }
                         });
-        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 
a).get())
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(0, 4));
-        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 
b).get())
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(2));
-        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
+        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(1, 3));
-        assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, 
Arrays.asList(a, b)).get())
+        assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, 
Arrays.asList(a, b)))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(0, 2, 4));
-        assert !reader.visitEqual(fieldRef, BinaryString.fromString("c"))
-                .get()
-                .iterator()
-                .hasNext();
+        assert !reader.visitEqual(fieldRef, 
BinaryString.fromString("c")).iterator().hasNext();
     }
 
     private void testIntType(int version) throws Exception {
@@ -128,24 +125,24 @@ public class BitmapGlobalIndexTest {
                                 writer.write(o);
                             }
                         });
-        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 
0).get())
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 0))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(0));
-        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 
1).get())
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 1))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(1));
-        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
+        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(2));
-        assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, 
Arrays.asList(0, 1, 2)).get())
+        assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, 
Arrays.asList(0, 1, 2)))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(0, 1));
 
-        assert !reader.visitEqual(fieldRef, 2).get().iterator().hasNext();
+        assert !reader.visitEqual(fieldRef, 2).iterator().hasNext();
     }
 
     private void testBooleanType(int version) throws Exception {
@@ -161,11 +158,11 @@ public class BitmapGlobalIndexTest {
                                 writer.write(o);
                             }
                         });
-        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 
Boolean.TRUE).get())
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 
Boolean.TRUE))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(0, 2));
-        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
+        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(4));
@@ -202,13 +199,12 @@ public class BitmapGlobalIndexTest {
         long time2 = System.currentTimeMillis();
         GlobalIndexResult result =
                 reader.visitEqual(
-                                fieldRef, BinaryString.fromString(prefix + 
(approxCardinality / 2)))
-                        .get();
+                        fieldRef, BinaryString.fromString(prefix + 
(approxCardinality / 2)));
         RoaringBitmap32 resultBm = ((BitmapIndexResultWrapper) 
result).getBitmapIndexResult().get();
         System.out.println("read time: " + (System.currentTimeMillis() - 
time2));
         assert resultBm.equals(middleBm);
         long time3 = System.currentTimeMillis();
-        GlobalIndexResult resultNull = reader.visitIsNull(fieldRef).get();
+        GlobalIndexResult resultNull = reader.visitIsNull(fieldRef);
         RoaringBitmap32 resultNullBm =
                 ((BitmapIndexResultWrapper) 
resultNull).getBitmapIndexResult().get();
         System.out.println("read null bitmap time: " + 
(System.currentTimeMillis() - time3));
@@ -277,26 +273,23 @@ public class BitmapGlobalIndexTest {
                             a.pointTo(c.getSegments(), c.getOffset(), 
c.getSizeInBytes());
                             writer.write(null);
                         });
-        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 
a).get())
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(0));
-        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 
b).get())
+        assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(3));
-        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
+        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(1, 2, 4, 5));
-        assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, 
Arrays.asList(a, b)).get())
+        assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef, 
Arrays.asList(a, b)))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(0, 3));
-        assert !reader.visitEqual(fieldRef, BinaryString.fromString("c"))
-                .get()
-                .iterator()
-                .hasNext();
+        assert !reader.visitEqual(fieldRef, 
BinaryString.fromString("c")).iterator().hasNext();
     }
 
     private void testAllNull(int version) throws Exception {
@@ -312,10 +305,10 @@ public class BitmapGlobalIndexTest {
                                 writer.write(o);
                             }
                         });
-        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
+        assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
                 .getBitmapIndexResult()
                 .get()
                 .equals(RoaringBitmap32.bitmapOf(0, 1, 2));
-        assert !reader.visitIsNotNull(fieldRef).get().iterator().hasNext();
+        assert !reader.visitIsNotNull(fieldRef).iterator().hasNext();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
index 36e22fd144..da827d568f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
@@ -75,21 +75,18 @@ public class GlobalIndexEvaluator
         Collection<GlobalIndexReader> readers =
                 indexReadersCache.computeIfAbsent(fieldId, 
readersFunction::apply);
         for (GlobalIndexReader fileIndexReader : readers) {
-            Optional<GlobalIndexResult> childResult =
+            GlobalIndexResult childResult =
                     predicate.function().visit(fileIndexReader, fieldRef, 
predicate.literals());
 
             // AND Operation
-            if (childResult.isPresent()) {
-                if (compoundResult.isPresent()) {
-                    GlobalIndexResult r1 = compoundResult.get();
-                    GlobalIndexResult r2 = childResult.get();
-                    compoundResult = Optional.of(r1.and(r2));
-                } else {
-                    compoundResult = childResult;
-                }
+            if (compoundResult.isPresent()) {
+                GlobalIndexResult r1 = compoundResult.get();
+                compoundResult = Optional.of(r1.and(childResult));
+            } else {
+                compoundResult = Optional.of(childResult);
             }
 
-            if (compoundResult.isPresent() && 
!compoundResult.get().iterator().hasNext()) {
+            if (!compoundResult.get().iterator().hasNext()) {
                 return compoundResult;
             }
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
index cad2c67354..c48a729950 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
@@ -48,6 +48,10 @@ public class GlobalIndexFileReadWrite implements 
GlobalIndexFileReader, GlobalIn
         return indexPathFactory.toPath(fileName);
     }
 
+    public long fileSize(String fileName) throws IOException {
+        return fileIO.getFileSize(filePath(fileName));
+    }
+
     public OutputStream newOutputStream(String fileName) throws IOException {
         return fileIO.newOutputStream(indexPathFactory.toPath(fileName), true);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
index 035420df28..ebe6e2e421 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
@@ -37,6 +37,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
@@ -91,6 +92,10 @@ public class IndexManifestFileHandler {
     }
 
     private IndexManifestFileCombiner getIndexManifestFileCombine(String 
indexType) {
+        if (!DELETION_VECTORS_INDEX.equals(indexType) && 
!HASH_INDEX.equals(indexType)) {
+            return new GlobalFileNameCombiner();
+        }
+
         if (DELETION_VECTORS_INDEX.equals(indexType) && 
BucketMode.BUCKET_UNAWARE == bucketMode) {
             return new GlobalCombiner();
         } else {
@@ -191,6 +196,36 @@ public class IndexManifestFileHandler {
         }
     }
 
+    /** We combine the previous and new index files by file name. */
+    static class GlobalFileNameCombiner implements IndexManifestFileCombiner {
+
+        @Override
+        public List<IndexManifestEntry> combine(
+                List<IndexManifestEntry> prevIndexFiles, 
List<IndexManifestEntry> newIndexFiles) {
+            Map<String, IndexManifestEntry> indexEntries = new HashMap<>();
+            for (IndexManifestEntry entry : prevIndexFiles) {
+                indexEntries.put(entry.indexFile().fileName(), entry);
+            }
+
+            // The deleted entry is processed first to avoid overwriting a new 
entry.
+            List<IndexManifestEntry> removed =
+                    newIndexFiles.stream()
+                            .filter(f -> f.kind() == FileKind.DELETE)
+                            .collect(Collectors.toList());
+            List<IndexManifestEntry> added =
+                    newIndexFiles.stream()
+                            .filter(f -> f.kind() == FileKind.ADD)
+                            .collect(Collectors.toList());
+            for (IndexManifestEntry entry : removed) {
+                indexEntries.remove(entry.indexFile().fileName());
+            }
+            for (IndexManifestEntry entry : added) {
+                indexEntries.put(entry.indexFile().fileName(), entry);
+            }
+            return new ArrayList<>(indexEntries.values());
+        }
+    }
+
     private static BucketIdentifier identifier(IndexManifestEntry 
indexManifestEntry) {
         return new BucketIdentifier(
                 indexManifestEntry.partition(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index 6250d5f50f..1d341a502c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -164,6 +164,7 @@ public interface ReadBuilder extends Serializable {
      *
      * @param indices the row ids to be read
      */
+    // TODO: support List<Range>, List<Long> is wasting of memory
     ReadBuilder withRowIds(List<Long> indices);
 
     /** Delete stats in scan plan result. */
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 32909a2caa..99ed1e83a3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -26,6 +26,7 @@ import org.apache.paimon.spark.procedure.CompactProcedure;
 import org.apache.paimon.spark.procedure.CopyFilesProcedure;
 import org.apache.paimon.spark.procedure.CreateBranchProcedure;
 import org.apache.paimon.spark.procedure.CreateFunctionProcedure;
+import org.apache.paimon.spark.procedure.CreateGlobalIndexProcedure;
 import org.apache.paimon.spark.procedure.CreateTagFromTimestampProcedure;
 import org.apache.paimon.spark.procedure.CreateTagProcedure;
 import org.apache.paimon.spark.procedure.DeleteBranchProcedure;
@@ -92,6 +93,7 @@ public class SparkProcedures {
         procedureBuilders.put("delete_tag", DeleteTagProcedure::builder);
         procedureBuilders.put("expire_tags", ExpireTagsProcedure::builder);
         procedureBuilders.put("create_branch", CreateBranchProcedure::builder);
+        procedureBuilders.put("create_global_index", 
CreateGlobalIndexProcedure::builder);
         procedureBuilders.put("delete_branch", DeleteBranchProcedure::builder);
         procedureBuilders.put("compact", CompactProcedure::builder);
         procedureBuilders.put("rescale", RescaleProcedure::builder);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
new file mode 100644
index 0000000000..dbe9f7c7b0
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestEntrySerializer;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.utils.InstantiationUtil;
+import org.apache.paimon.utils.Range;
+
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** This is a class who truly build index file and generate index metas. */
+public abstract class GlobalIndexBuilder {
+
+    protected final GlobalIndexBuilderContext context;
+
+    protected GlobalIndexBuilder(GlobalIndexBuilderContext context) {
+        this.context = context;
+    }
+
+    public List<IndexManifestEntry> build(DataSplit dataSplit) throws 
IOException {
+        final GlobalIndexBuilderContext buildContext = this.context;
+        JavaSparkContext javaSparkContext =
+                new JavaSparkContext(buildContext.spark().sparkContext());
+        byte[] dsBytes = InstantiationUtil.serializeObject(dataSplit);
+        IndexManifestEntrySerializer indexManifestEntrySerializer =
+                new IndexManifestEntrySerializer();
+        return javaSparkContext.parallelize(Collections.singletonList(dsBytes))
+                .map(
+                        splitBytes -> {
+                            DataSplit split =
+                                    InstantiationUtil.deserializeObject(
+                                            splitBytes, 
GlobalIndexBuilder.class.getClassLoader());
+                            ReadBuilder builder = 
buildContext.table().newReadBuilder();
+                            
builder.withRowIds(buildContext.range().toListLong())
+                                    .withReadType(buildContext.readType());
+                            RecordReader<InternalRow> rows = 
builder.newRead().createReader(split);
+                            List<GlobalIndexWriter.ResultEntry> resultEntries =
+                                    writePaimonRows(buildContext, rows);
+                            return convertToEntry(buildContext, resultEntries);
+                        })
+                .flatMap(
+                        e ->
+                                e.stream()
+                                        .map(
+                                                entry -> {
+                                                    try {
+                                                        return 
indexManifestEntrySerializer
+                                                                
.serializeToBytes(entry);
+                                                    } catch (IOException ex) {
+                                                        throw new 
RuntimeException(ex);
+                                                    }
+                                                })
+                                        .iterator())
+                .collect().stream()
+                .map(
+                        e -> {
+                            try {
+                                return 
indexManifestEntrySerializer.deserializeFromBytes(e);
+                            } catch (IOException ex) {
+                                throw new RuntimeException(ex);
+                            }
+                        })
+                .collect(Collectors.toList());
+    }
+
+    private static List<IndexManifestEntry> convertToEntry(
+            GlobalIndexBuilderContext context, 
List<GlobalIndexWriter.ResultEntry> entries)
+            throws IOException {
+        List<IndexManifestEntry> results = new ArrayList<>();
+        for (GlobalIndexWriter.ResultEntry entry : entries) {
+            String fileName = entry.fileName();
+            Range range = entry.rowRange().addOffset(context.range().from);
+            GlobalIndexFileReadWrite readWrite = 
context.globalIndexFileReadWrite();
+            long fileSize = readWrite.fileSize(fileName);
+            GlobalIndexMeta globalIndexMeta =
+                    new GlobalIndexMeta(
+                            range.from, range.to, context.indexField().id(), 
null, entry.meta());
+            IndexFileMeta indexFileMeta =
+                    new IndexFileMeta(
+                            BitmapGlobalIndexerFactory.IDENTIFIER,
+                            fileName,
+                            fileSize,
+                            range.to - range.from + 1,
+                            globalIndexMeta);
+            results.add(
+                    new IndexManifestEntry(
+                            FileKind.ADD, context.partitionFromBytes(), 0, 
indexFileMeta));
+        }
+        return results;
+    }
+
+    private static List<GlobalIndexWriter.ResultEntry> writePaimonRows(
+            GlobalIndexBuilderContext context, RecordReader<InternalRow> rows) 
throws IOException {
+        GlobalIndexer globalIndexer =
+                GlobalIndexer.create(
+                        context.indexType(), context.indexField().type(), 
context.options());
+        GlobalIndexWriter globalIndexWriter =
+                globalIndexer.createWriter(context.globalIndexFileReadWrite());
+        InternalRow.FieldGetter getter =
+                InternalRow.createFieldGetter(
+                        context.indexField().type(),
+                        
context.readType().getFieldIndex(context.indexField().name()));
+        rows.forEachRemaining(
+                row -> {
+                    Object indexO = getter.getFieldOrNull(row);
+                    globalIndexWriter.write(indexO);
+                });
+        return globalIndexWriter.finish();
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
new file mode 100644
index 0000000000..1911d9fa4e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
+import org.apache.paimon.index.IndexPathFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.apache.spark.sql.SparkSession;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Context containing all necessary information for building global indexes.
+ *
+ * <p>This class is serializable to support Spark distributed execution. The 
partition is stored
+ * both as a transient {@link BinaryRow} and as serialized bytes to ensure 
proper serialization
+ * across executor nodes.
+ */
+public class GlobalIndexBuilderContext implements Serializable {
+
+    private final transient SparkSession spark;
+    private final FileStoreTable table;
+    private final BinaryRowSerializer binaryRowSerializer;
+    private final transient BinaryRow partition;
+    private final byte[] partitionBytes;
+    private final RowType readType;
+    private final DataField indexField;
+    private final String indexType;
+    private final Range rowRange;
+    private final Options options;
+
+    public GlobalIndexBuilderContext(
+            SparkSession spark,
+            FileStoreTable table,
+            BinaryRow partition,
+            RowType readType,
+            DataField indexField,
+            String indexType,
+            Range rowRange,
+            Options options) {
+        this.spark = spark;
+        this.table = table;
+        this.partition = partition;
+        this.readType = readType;
+        this.indexField = indexField;
+        this.indexType = indexType;
+        this.rowRange = rowRange;
+        this.options = options;
+
+        this.binaryRowSerializer = new 
BinaryRowSerializer(partition.getFieldCount());
+        try {
+            this.partitionBytes = 
binaryRowSerializer.serializeToBytes(partition);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public SparkSession spark() {
+        return spark;
+    }
+
+    public BinaryRow partitionFromBytes() {
+        try {
+            return binaryRowSerializer.deserializeFromBytes(partitionBytes);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public BinaryRow partition() {
+        return partition;
+    }
+
+    public FileStoreTable table() {
+        return table;
+    }
+
+    public RowType readType() {
+        return readType;
+    }
+
+    public DataField indexField() {
+        return indexField;
+    }
+
+    public String indexType() {
+        return indexType;
+    }
+
+    public Range range() {
+        return rowRange;
+    }
+
+    public Options options() {
+        return options;
+    }
+
+    public GlobalIndexFileReadWrite globalIndexFileReadWrite() {
+        FileIO fileIO = table.fileIO();
+        IndexPathFactory indexPathFactory =
+                
table.store().pathFactory().indexFileFactory(partitionFromBytes(), 0);
+        return new GlobalIndexFileReadWrite(fileIO, indexPathFactory);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderFactory.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderFactory.java
new file mode 100644
index 0000000000..ace48b1120
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex;
+
+/** Global index builder factory on spark. */
+public interface GlobalIndexBuilderFactory {
+
+    String identifier();
+
+    GlobalIndexBuilder create(GlobalIndexBuilderContext context);
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderFactoryUtils.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderFactoryUtils.java
new file mode 100644
index 0000000000..23575e532a
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderFactoryUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+/**
+ * Utility class for loading {@link GlobalIndexBuilderFactory} implementations 
via Java's {@link
+ * ServiceLoader} mechanism.
+ *
+ * <p>Factories are loaded once during class initialization and cached for 
subsequent lookups.
+ */
+public class GlobalIndexBuilderFactoryUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(GlobalIndexBuilderFactoryUtils.class);
+
+    private static final Map<String, GlobalIndexBuilderFactory> FACTORIES = 
new HashMap<>();
+
+    static {
+        ServiceLoader<GlobalIndexBuilderFactory> serviceLoader =
+                ServiceLoader.load(GlobalIndexBuilderFactory.class);
+
+        for (GlobalIndexBuilderFactory factory : serviceLoader) {
+            String identifier = factory.identifier();
+            if (FACTORIES.put(identifier, factory) != null) {
+                LOG.warn(
+                        "Found multiple GlobalIndexBuilderFactory 
implementations for type '{}'. "
+                                + "Using the last one loaded.",
+                        identifier);
+            }
+        }
+    }
+
+    /**
+     * Loads the global index builder factory for the specified type.
+     *
+     * @param indexType The type of index (e.g., "bitmap")
+     * @return The corresponding factory
+     * @throws IllegalArgumentException If no factory is found for the 
specified type
+     */
+    public static GlobalIndexBuilderFactory load(String indexType) {
+        GlobalIndexBuilderFactory factory = FACTORIES.get(indexType);
+        if (factory == null) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "No GlobalIndexBuilderFactory found for index type 
'%s'. "
+                                    + "Available types: %s",
+                            indexType, FACTORIES.keySet()));
+        }
+        return factory;
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/bitmap/BitmapGlobalIndexBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/bitmap/BitmapGlobalIndexBuilder.java
new file mode 100644
index 0000000000..affd6a363a
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/bitmap/BitmapGlobalIndexBuilder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.bitmap;
+
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+
+/**
+ * Builder for creating bitmap-based global indexes.
+ *
+ * <p>This implementation does not apply any custom transformations to the 
input dataset, allowing
+ * the data to be processed as-is for bitmap index creation.
+ */
+public class BitmapGlobalIndexBuilder extends GlobalIndexBuilder {
+
+    protected BitmapGlobalIndexBuilder(GlobalIndexBuilderContext context) {
+        super(context);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/bitmap/BitmapGlobalIndexBuilderFactory.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/bitmap/BitmapGlobalIndexBuilderFactory.java
new file mode 100644
index 0000000000..7881bce916
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/bitmap/BitmapGlobalIndexBuilderFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.bitmap;
+
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory;
+
+/**
+ * Factory for creating bitmap-based global index builders.
+ *
+ * <p>This factory is automatically discovered via Java's ServiceLoader 
mechanism.
+ */
+public class BitmapGlobalIndexBuilderFactory implements 
GlobalIndexBuilderFactory {
+
+    private static final String IDENTIFIER = "bitmap";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public GlobalIndexBuilder create(GlobalIndexBuilderContext context) {
+        return new BitmapGlobalIndexBuilder(context);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
new file mode 100644
index 0000000000..b375573e3d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactoryUtils;
+import org.apache.paimon.spark.utils.SparkProcedureUtils;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.ProcedureUtils;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_ROW_COUNT_PER_SHARD;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** Procedure to build global index files via Spark. */
+public class CreateGlobalIndexProcedure extends BaseProcedure {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CreateGlobalIndexProcedure.class);
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {
+                ProcedureParameter.required("table", DataTypes.StringType),
+                ProcedureParameter.required("index_column", 
DataTypes.StringType),
+                ProcedureParameter.required("index_type", 
DataTypes.StringType),
+                ProcedureParameter.optional("partitions", StringType),
+                ProcedureParameter.optional("options", DataTypes.StringType)
+            };
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                    });
+
+    protected CreateGlobalIndexProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public String description() {
+        return "Create global index files for a given column.";
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+        String column = args.getString(1);
+        String indexType = args.getString(2).toLowerCase(Locale.ROOT).trim();
+        String partitions =
+                (args.isNullAt(3) || 
StringUtils.isNullOrWhitespaceOnly(args.getString(3)))
+                        ? null
+                        : args.getString(3);
+        String optionString = args.isNullAt(4) ? null : args.getString(4);
+
+        String finalWhere = partitions != null ? 
SparkProcedureUtils.toWhere(partitions) : null;
+
+        // Early validation: check if the index type is supported
+        GlobalIndexBuilderFactory globalIndexBuilderFactory =
+                GlobalIndexBuilderFactoryUtils.load(indexType);
+
+        LOG.info("Starting to build index for table " + tableIdent + " WHERE: 
" + finalWhere);
+
+        return modifyPaimonTable(
+                tableIdent,
+                t -> {
+                    try {
+                        checkArgument(
+                                t instanceof FileStoreTable,
+                                "Only FileStoreTable supports global index 
creation.");
+                        FileStoreTable table = (FileStoreTable) t;
+                        checkArgument(
+                                table.coreOptions().rowTrackingEnabled(),
+                                "Table '%s' must enable 
'row-tracking.enabled=true' before creating global index.",
+                                tableIdent);
+
+                        RowType rowType = table.rowType();
+                        checkArgument(
+                                rowType.containsField(column),
+                                "Column '%s' does not exist in table '%s'.",
+                                column,
+                                tableIdent);
+                        DataSourceV2Relation relation = 
createRelation(tableIdent);
+                        PartitionPredicate partitionPredicate =
+                                
SparkProcedureUtils.convertToPartitionPredicate(
+                                        finalWhere,
+                                        table.schema().logicalPartitionType(),
+                                        spark(),
+                                        relation);
+
+                        DataField indexField = rowType.getField(column);
+                        RowType projectedRowType =
+                                
rowType.project(Collections.singletonList(column));
+                        RowType readRowType = 
SpecialFields.rowTypeWithRowId(projectedRowType);
+
+                        HashMap<String, String> parsedOptions = new 
HashMap<>();
+                        ProcedureUtils.putAllOptions(parsedOptions, 
optionString);
+                        Options userOptions = Options.fromMap(parsedOptions);
+                        Options tableOptions = new Options(table.options());
+                        long rowsPerShard =
+                                tableOptions
+                                        
.getOptional(GLOBAL_INDEX_ROW_COUNT_PER_SHARD)
+                                        
.orElse(GLOBAL_INDEX_ROW_COUNT_PER_SHARD.defaultValue());
+                        checkArgument(
+                                rowsPerShard > 0,
+                                "Option 'global-index.row-count-per-shard' 
must be greater than 0.");
+
+                        // Step 1: generate splits for each partition&&shard
+                        Map<BinaryRow, Map<Range, DataSplit>> splits =
+                                split(table, partitionPredicate, rowsPerShard);
+
+                        // Step 2: build index by certain index system
+                        List<IndexManifestEntry> indexResults =
+                                buildIndex(
+                                        table,
+                                        splits,
+                                        indexType,
+                                        readRowType,
+                                        indexField,
+                                        userOptions,
+                                        globalIndexBuilderFactory);
+
+                        // Step 3: commit index meta to a new snapshot
+                        commit(table, indexResults);
+
+                        return new InternalRow[] {newInternalRow(true)};
+                    } catch (Exception e) {
+                        throw new RuntimeException(
+                                String.format(
+                                        "Failed to create %s index for column 
'%s' on table '%s'.",
+                                        indexType, column, tableIdent),
+                                e);
+                    }
+                });
+    }
+
+    private List<IndexManifestEntry> buildIndex(
+            FileStoreTable table,
+            Map<BinaryRow, Map<Range, DataSplit>> preparedDS,
+            String indexType,
+            RowType readType,
+            DataField indexField,
+            Options options,
+            GlobalIndexBuilderFactory globalIndexBuilderFactory) {
+        ExecutorService executor = Executors.newCachedThreadPool();
+        List<Future<List<IndexManifestEntry>>> futures = new ArrayList<>();
+        try {
+            for (Map.Entry<BinaryRow, Map<Range, DataSplit>> entry : 
preparedDS.entrySet()) {
+                BinaryRow partition = entry.getKey();
+                Map<Range, DataSplit> partitions = entry.getValue();
+
+                for (Map.Entry<Range, DataSplit> partitionEntry : 
partitions.entrySet()) {
+                    Range startOffset = partitionEntry.getKey();
+                    DataSplit partitionDS = partitionEntry.getValue();
+                    GlobalIndexBuilderContext builderContext =
+                            new GlobalIndexBuilderContext(
+                                    spark(),
+                                    table,
+                                    partition,
+                                    readType,
+                                    indexField,
+                                    indexType,
+                                    startOffset,
+                                    options);
+
+                    futures.add(
+                            executor.submit(
+                                    () -> {
+                                        GlobalIndexBuilder globalIndexBuilder =
+                                                
globalIndexBuilderFactory.create(builderContext);
+                                        return 
globalIndexBuilder.build(partitionDS);
+                                    }));
+                }
+            }
+
+            List<IndexManifestEntry> entries = new ArrayList<>();
+            for (Future<List<IndexManifestEntry>> future : futures) {
+                try {
+                    entries.addAll(future.get());
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException("Index creation was 
interrupted", e);
+                } catch (ExecutionException e) {
+                    throw new RuntimeException("Build index failed", 
e.getCause());
+                }
+            }
+
+            return entries;
+        } finally {
+            executor.shutdown();
+        }
+    }
+
+    private void commit(FileStoreTable table, List<IndexManifestEntry> 
indexResults)
+            throws Exception {
+        Map<BinaryRow, List<IndexFileMeta>> partitionResults =
+                indexResults.stream()
+                        .map(s -> Pair.of(s.partition(), s.indexFile()))
+                        .collect(
+                                Collectors.groupingBy(
+                                        Pair::getKey,
+                                        Collectors.mapping(Pair::getValue, 
Collectors.toList())));
+
+        List<CommitMessage> commitMessages = new ArrayList<>();
+        for (Map.Entry<BinaryRow, List<IndexFileMeta>> entry : 
partitionResults.entrySet()) {
+            BinaryRow partition = entry.getKey();
+            List<IndexFileMeta> indexFiles = entry.getValue();
+            commitMessages.add(
+                    new CommitMessageImpl(
+                            partition,
+                            0,
+                            null,
+                            DataIncrement.indexIncrement(indexFiles),
+                            CompactIncrement.emptyIncrement()));
+        }
+
+        try (TableCommitImpl commit = table.newCommit("global-index-create-" + 
UUID.randomUUID())) {
+            commit.commit(commitMessages);
+        }
+    }
+
+    protected Map<BinaryRow, Map<Range, DataSplit>> split(
+            FileStoreTable table, PartitionPredicate partitions, long 
rowsPerShard) {
+        FileStorePathFactory pathFactory = table.store().pathFactory();
+        // Get all manifest entries from the table scan
+        List<ManifestEntry> entries =
+                
table.store().newScan().withPartitionFilter(partitions).plan().files();
+
+        // Group manifest entries by partition
+        Map<BinaryRow, List<ManifestEntry>> entriesByPartition =
+                
entries.stream().collect(Collectors.groupingBy(ManifestEntry::partition));
+
+        return groupFilesIntoShardsByPartition(
+                entriesByPartition, rowsPerShard, pathFactory::bucketPath);
+    }
+
+    /**
+     * Groups files into shards by partition. This method is extracted from 
split() to make it more
+     * testable.
+     *
+     * @param entriesByPartition manifest entries grouped by partition
+     * @param rowsPerShard number of rows per shard
+     * @param pathFactory path factory for creating bucket paths
+     * @return map of partition to shard splits
+     */
+    public static Map<BinaryRow, Map<Range, DataSplit>> 
groupFilesIntoShardsByPartition(
+            Map<BinaryRow, List<ManifestEntry>> entriesByPartition,
+            long rowsPerShard,
+            BiFunction<BinaryRow, Integer, Path> pathFactory) {
+        Map<BinaryRow, Map<Range, DataSplit>> result = new HashMap<>();
+
+        for (Map.Entry<BinaryRow, List<ManifestEntry>> partitionEntry :
+                entriesByPartition.entrySet()) {
+            BinaryRow partition = partitionEntry.getKey();
+            List<ManifestEntry> partitionEntries = partitionEntry.getValue();
+
+            // Group files into shards - a file may belong to multiple shards
+            Map<Long, List<DataFileMeta>> filesByShard = new HashMap<>();
+
+            for (ManifestEntry entry : partitionEntries) {
+                DataFileMeta file = entry.file();
+                Long firstRowId = file.firstRowId();
+                if (firstRowId == null) {
+                    continue; // Skip files without row tracking
+                }
+
+                // Calculate the row ID range this file covers
+                long fileStartRowId = firstRowId;
+                long fileEndRowId = firstRowId + file.rowCount() - 1;
+
+                // Calculate which shards this file overlaps with
+                long startShardId = fileStartRowId / rowsPerShard;
+                long endShardId = fileEndRowId / rowsPerShard;
+
+                // Add this file to all shards it overlaps with
+                for (long shardId = startShardId; shardId <= endShardId; 
shardId++) {
+                    long shardStartRowId = shardId * rowsPerShard;
+                    filesByShard.computeIfAbsent(shardStartRowId, k -> new 
ArrayList<>()).add(file);
+                }
+            }
+
+            // Create DataSplit for each shard with exact ranges
+            Map<Range, DataSplit> shardSplits = new HashMap<>();
+
+            for (Map.Entry<Long, List<DataFileMeta>> shardEntry : 
filesByShard.entrySet()) {
+                long startRowId = shardEntry.getKey();
+                List<DataFileMeta> shardFiles = shardEntry.getValue();
+
+                if (shardFiles.isEmpty()) {
+                    continue;
+                }
+
+                // Use exact shard boundaries: [n*rowsPerShard, 
(n+1)*rowsPerShard - 1]
+                long minRowId = startRowId;
+                long maxRowId = startRowId + rowsPerShard - 1;
+                Range range = new Range(minRowId, maxRowId);
+
+                // Create DataSplit for this shard
+                DataSplit dataSplit =
+                        DataSplit.builder()
+                                .withPartition(partition)
+                                .withBucket(0)
+                                .withDataFiles(shardFiles)
+                                .withBucketPath(pathFactory.apply(partition, 
0).toString())
+                                .rawConvertible(false)
+                                .build();
+
+                shardSplits.put(range, dataSplit);
+            }
+
+            if (!shardSplits.isEmpty()) {
+                result.put(partition, shardSplits);
+            }
+        }
+
+        return result;
+    }
+
+    public static ProcedureBuilder builder() {
+        return new BaseProcedure.Builder<CreateGlobalIndexProcedure>() {
+            @Override
+            public CreateGlobalIndexProcedure doBuild() {
+                return new CreateGlobalIndexProcedure(tableCatalog());
+            }
+        };
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
 
b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
new file mode 100644
index 0000000000..704354af89
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.spark.globalindex.bitmap.BitmapGlobalIndexBuilderFactory
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
new file mode 100644
index 0000000000..f455f017a3
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.PojoDataFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.PojoManifestEntry;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CreateGlobalIndexProcedure}. */
+public class CreateGlobalIndexProcedureTest {
+
+    private final BiFunction<BinaryRow, Integer, Path> pathFactory =
+            (a, b) -> new Path(UUID.randomUUID().toString());
+
+    @Test
+    void testGroupFilesIntoShardsByPartitionSingleFileInSingleShard() {
+        // Create a partition
+        BinaryRow partition = createPartition(0);
+
+        // Create a single file that fits entirely in one shard (rows 0-99, 
shard size 1000)
+        DataFileMeta file = createDataFileMeta(0L, 100L);
+        ManifestEntry entry = createManifestEntry(partition, file);
+
+        Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new 
HashMap<>();
+        entriesByPartition.put(partition, Collections.singletonList(entry));
+
+        // Execute
+        Map<BinaryRow, Map<Range, DataSplit>> result =
+                CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+                        entriesByPartition, 1000L, pathFactory);
+
+        // Verify
+        assertThat(result).hasSize(1);
+        assertThat(result).containsKey(partition);
+
+        Map<Range, DataSplit> shardSplits = result.get(partition);
+        assertThat(shardSplits).hasSize(1);
+
+        // Should be in shard [0, 999]
+        Range expectedRange = new Range(0L, 999L);
+        assertThat(shardSplits).containsKey(expectedRange);
+
+        DataSplit split = shardSplits.get(expectedRange);
+        assertThat(split.dataFiles()).hasSize(1);
+        assertThat(split.dataFiles().get(0)).isEqualTo(file);
+    }
+
+    @Test
+    void testGroupFilesIntoShardsByPartitionFileSpanningMultipleShards() {
+        // Create a partition
+        BinaryRow partition = createPartition(0);
+
+        // Create a file that spans 3 shards (rows 500-2500, shard size 1000)
+        // This should be in shards [0,999], [1000,1999], and [2000,2999]
+        DataFileMeta file = createDataFileMeta(500L, 2001L);
+        ManifestEntry entry = createManifestEntry(partition, file);
+
+        Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new 
HashMap<>();
+        entriesByPartition.put(partition, Collections.singletonList(entry));
+
+        // Execute
+        Map<BinaryRow, Map<Range, DataSplit>> result =
+                CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+                        entriesByPartition, 1000L, pathFactory);
+
+        // Verify
+        assertThat(result).hasSize(1);
+        Map<Range, DataSplit> shardSplits = result.get(partition);
+        assertThat(shardSplits).hasSize(3);
+
+        // Verify all three shards contain the file
+        Range shard0 = new Range(0L, 999L);
+        Range shard1 = new Range(1000L, 1999L);
+        Range shard2 = new Range(2000L, 2999L);
+
+        assertThat(shardSplits).containsKeys(shard0, shard1, shard2);
+        assertThat(shardSplits.get(shard0).dataFiles()).contains(file);
+        assertThat(shardSplits.get(shard1).dataFiles()).contains(file);
+        assertThat(shardSplits.get(shard2).dataFiles()).contains(file);
+    }
+
+    @Test
+    void testGroupFilesIntoShardsByPartitionMultipleFilesInSameShard() {
+        // Create a partition
+        BinaryRow partition = createPartition(0);
+
+        // Create multiple files in the same shard
+        DataFileMeta file1 = createDataFileMeta(0L, 100L);
+        DataFileMeta file2 = createDataFileMeta(100L, 100L);
+        DataFileMeta file3 = createDataFileMeta(200L, 100L);
+
+        List<ManifestEntry> entries =
+                Arrays.asList(
+                        createManifestEntry(partition, file1),
+                        createManifestEntry(partition, file2),
+                        createManifestEntry(partition, file3));
+
+        Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new 
HashMap<>();
+        entriesByPartition.put(partition, entries);
+
+        // Execute
+        Map<BinaryRow, Map<Range, DataSplit>> result =
+                CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+                        entriesByPartition, 1000L, pathFactory);
+
+        // Verify
+        assertThat(result).hasSize(1);
+        Map<Range, DataSplit> shardSplits = result.get(partition);
+        assertThat(shardSplits).hasSize(1);
+
+        Range expectedRange = new Range(0L, 999L);
+        DataSplit split = shardSplits.get(expectedRange);
+        assertThat(split.dataFiles()).hasSize(3);
+        assertThat(split.dataFiles()).containsExactlyInAnyOrder(file1, file2, 
file3);
+    }
+
+    @Test
+    void testGroupFilesIntoShardsByPartitionMultipleFilesInDifferentShards() {
+        // Create a partition
+        BinaryRow partition = createPartition(0);
+
+        // Create files in different shards
+        DataFileMeta file1 = createDataFileMeta(0L, 100L); // Shard [0, 999]
+        DataFileMeta file2 = createDataFileMeta(1000L, 100L); // Shard [1000, 
1999]
+        DataFileMeta file3 = createDataFileMeta(2000L, 100L); // Shard [2000, 
2999]
+
+        List<ManifestEntry> entries =
+                Arrays.asList(
+                        createManifestEntry(partition, file1),
+                        createManifestEntry(partition, file2),
+                        createManifestEntry(partition, file3));
+
+        Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new 
HashMap<>();
+        entriesByPartition.put(partition, entries);
+
+        // Execute
+        Map<BinaryRow, Map<Range, DataSplit>> result =
+                CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+                        entriesByPartition, 1000L, pathFactory);
+
+        // Verify
+        assertThat(result).hasSize(1);
+        Map<Range, DataSplit> shardSplits = result.get(partition);
+        assertThat(shardSplits).hasSize(3);
+
+        // Verify each shard has the correct file
+        Range shard0 = new Range(0L, 999L);
+        Range shard1 = new Range(1000L, 1999L);
+        Range shard2 = new Range(2000L, 2999L);
+
+        assertThat(shardSplits.get(shard0).dataFiles()).containsExactly(file1);
+        assertThat(shardSplits.get(shard1).dataFiles()).containsExactly(file2);
+        assertThat(shardSplits.get(shard2).dataFiles()).containsExactly(file3);
+    }
+
+    @Test
+    void testGroupFilesIntoShardsByPartitionMultiplePartitions() {
+        // Create two partitions
+        BinaryRow partition1 = createPartition(0);
+        BinaryRow partition2 = createPartition(1);
+
+        // Create files for each partition
+        DataFileMeta file1 = createDataFileMeta(0L, 1050L);
+        DataFileMeta file2 = createDataFileMeta(1050L, 1000L);
+
+        Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new 
HashMap<>();
+        entriesByPartition.put(
+                partition1, 
Collections.singletonList(createManifestEntry(partition1, file1)));
+        entriesByPartition.put(
+                partition2, 
Collections.singletonList(createManifestEntry(partition2, file2)));
+
+        // Execute
+        Map<BinaryRow, Map<Range, DataSplit>> result =
+                CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+                        entriesByPartition, 100L, pathFactory);
+
+        // Verify
+        assertThat(result).hasSize(2);
+        assertThat(result).containsKeys(partition1, partition2);
+
+        // Verify partition1
+        Map<Range, DataSplit> shardSplits1 = result.get(partition1);
+        assertThat(shardSplits1).hasSize(11);
+        assertThat(shardSplits1).containsKey(new Range(1000, 1099));
+        assertThat(shardSplits1.get(new Range(1000L, 
1099L)).dataFiles()).containsExactly(file1);
+
+        // Verify partition2
+        Map<Range, DataSplit> shardSplits2 = result.get(partition2);
+        assertThat(shardSplits2).hasSize(11);
+        assertThat(shardSplits1).containsKey(new Range(1000, 1099));
+        assertThat(shardSplits2.get(new Range(1000L, 
1099L)).dataFiles()).containsExactly(file2);
+    }
+
+    @Test
+    void testGroupFilesIntoShardsByPartitionExactShardBoundaries() {
+        // Create a partition
+        BinaryRow partition = createPartition(0);
+
+        // Create a file that ends exactly at shard boundary (rows 0-999, 
shard size 1000)
+        DataFileMeta file = createDataFileMeta(0L, 1000L);
+        ManifestEntry entry = createManifestEntry(partition, file);
+
+        Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new 
HashMap<>();
+        entriesByPartition.put(partition, Collections.singletonList(entry));
+
+        // Execute
+        Map<BinaryRow, Map<Range, DataSplit>> result =
+                CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+                        entriesByPartition, 1000L, pathFactory);
+
+        // Verify - file ending at row 999 should be in shard [0,999] only
+        // File covers rows [0, 999]
+        assertThat(result).hasSize(1);
+        Map<Range, DataSplit> shardSplits = result.get(partition);
+        assertThat(shardSplits).hasSize(1);
+        assertThat(shardSplits).containsKey(new Range(0L, 999L));
+    }
+
+    @Test
+    void testGroupFilesIntoShardsByPartitionSmallShardSize() {
+        // Create a partition
+        BinaryRow partition = createPartition(0);
+
+        // Create a file with small shard size (rows 0-24, shard size 10)
+        DataFileMeta file = createDataFileMeta(0L, 25L);
+        ManifestEntry entry = createManifestEntry(partition, file);
+
+        Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new 
HashMap<>();
+        entriesByPartition.put(partition, Collections.singletonList(entry));
+
+        // Execute with shard size of 10
+        Map<BinaryRow, Map<Range, DataSplit>> result =
+                CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+                        entriesByPartition, 10L, pathFactory);
+
+        // Verify - file should span 3 shards: [0,9], [10,19], [20,29]
+        assertThat(result).hasSize(1);
+        Map<Range, DataSplit> shardSplits = result.get(partition);
+        assertThat(shardSplits).hasSize(3);
+
+        assertThat(shardSplits)
+                .containsKeys(new Range(0L, 9L), new Range(10L, 19L), new 
Range(20L, 29L));
+    }
+
+    private BinaryRow createPartition(int i) {
+        BinaryRow binaryRow = new BinaryRow(1);
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+        binaryRowWriter.writeInt(0, i);
+        binaryRowWriter.complete();
+        return binaryRow;
+    }
+
+    private DataFileMeta createDataFileMeta(Long firstRowId, Long rowCount) {
+        return new PojoDataFileMeta(
+                "test-file-" + UUID.randomUUID(),
+                1024L,
+                rowCount,
+                BinaryRow.EMPTY_ROW,
+                BinaryRow.EMPTY_ROW,
+                SimpleStats.EMPTY_STATS,
+                SimpleStats.EMPTY_STATS,
+                0L,
+                0L,
+                0L,
+                0,
+                Collections.emptyList(),
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                firstRowId,
+                null);
+    }
+
+    private ManifestEntry createManifestEntry(BinaryRow partition, 
DataFileMeta file) {
+        return new PojoManifestEntry(
+                FileKind.ADD,
+                partition,
+                0, // bucket
+                1, // totalBuckets
+                file);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
new file mode 100644
index 0000000000..11eeeab65a
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.streaming.StreamTest
+
+import scala.collection.JavaConverters._
+
+class CreateGlobalIndexProcedureTest extends PaimonSparkTestBase with 
StreamTest {
+
+  test("create bitmap global index") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, name STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      val values =
+        (0 until 100000).map(i => s"($i, 'name_$i')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output =
+        spark
+          .sql("CALL sys.create_global_index(table => 'test.T', index_column 
=> 'name', index_type => 'bitmap')")
+          .collect()
+          .head
+
+      assert(output.getBoolean(0))
+
+      val table = loadTable("T")
+      val bitmapEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == "bitmap")
+      table.newIndexScanBuilder().shardList()
+      assert(bitmapEntries.nonEmpty)
+      val totalRowCount = bitmapEntries.map(_.indexFile().rowCount()).sum
+      assert(totalRowCount == 100000L)
+    }
+  }
+}

Reply via email to