This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e14a811106 [spark] Introduce global file index builder on spark (#6684)
e14a811106 is described below
commit e14a8111063de4108a26f8a3e880bd15aae7af08
Author: YeJunHao <[email protected]>
AuthorDate: Thu Nov 27 19:52:12 2025 +0800
[spark] Introduce global file index builder on spark (#6684)
---
.../org/apache/paimon/table/SpecialFields.java | 16 +
.../paimon/globalindex/GlobalIndexReader.java | 9 +-
.../paimon/globalindex/GlobalIndexWriter.java | 2 +-
.../globalindex/wrap/FileIndexReaderWrapper.java | 53 ++-
.../main/java/org/apache/paimon/utils/Range.java | 13 +-
.../bitmapindex/BitmapGlobalIndexTest.java | 49 ++-
.../paimon/globalindex/GlobalIndexEvaluator.java | 17 +-
.../globalindex/GlobalIndexFileReadWrite.java | 4 +
.../paimon/manifest/IndexManifestFileHandler.java | 35 ++
.../apache/paimon/table/source/ReadBuilder.java | 1 +
.../org/apache/paimon/spark/SparkProcedures.java | 2 +
.../spark/globalindex/GlobalIndexBuilder.java | 144 ++++++++
.../globalindex/GlobalIndexBuilderContext.java | 129 +++++++
.../globalindex/GlobalIndexBuilderFactory.java | 27 ++
.../GlobalIndexBuilderFactoryUtils.java | 73 ++++
.../bitmap/BitmapGlobalIndexBuilder.java | 35 ++
.../bitmap/BitmapGlobalIndexBuilderFactory.java | 43 +++
.../procedure/CreateGlobalIndexProcedure.java | 400 +++++++++++++++++++++
...mon.spark.globalindex.GlobalIndexBuilderFactory | 18 +
.../procedure/CreateGlobalIndexProcedureTest.java | 321 +++++++++++++++++
.../procedure/CreateGlobalIndexProcedureTest.scala | 65 ++++
21 files changed, 1384 insertions(+), 72 deletions(-)
diff --git
a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
index d89776747b..182682637e 100644
--- a/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
+++ b/paimon-api/src/main/java/org/apache/paimon/table/SpecialFields.java
@@ -168,4 +168,20 @@ public class SpecialFields {
: SpecialFields.SEQUENCE_NUMBER);
return new RowType(fieldsWithRowTracking);
}
+
+ public static RowType rowTypeWithRowId(RowType rowType) {
+ List<DataField> fieldsWithRowTracking = new
ArrayList<>(rowType.getFields());
+
+ fieldsWithRowTracking.forEach(
+ f -> {
+ if (ROW_ID.name().equals(f.name())) {
+ throw new IllegalArgumentException(
+ "Row tracking field name '"
+ + f.name()
+ + "' conflicts with existing field
names.");
+ }
+ });
+ fieldsWithRowTracking.add(SpecialFields.ROW_ID);
+ return new RowType(fieldsWithRowTracking);
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
index 2339634243..2ab40805f8 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexReader.java
@@ -23,23 +23,22 @@ import org.apache.paimon.predicate.TransformPredicate;
import java.io.Closeable;
import java.util.List;
-import java.util.Optional;
/** Index reader for global index, return {@link GlobalIndexResult}. */
-public interface GlobalIndexReader extends
FunctionVisitor<Optional<GlobalIndexResult>>, Closeable {
+public interface GlobalIndexReader extends FunctionVisitor<GlobalIndexResult>,
Closeable {
@Override
- default Optional<GlobalIndexResult>
visitAnd(List<Optional<GlobalIndexResult>> children) {
+ default GlobalIndexResult visitAnd(List<GlobalIndexResult> children) {
throw new UnsupportedOperationException("Should not invoke this");
}
@Override
- default Optional<GlobalIndexResult>
visitOr(List<Optional<GlobalIndexResult>> children) {
+ default GlobalIndexResult visitOr(List<GlobalIndexResult> children) {
throw new UnsupportedOperationException("Should not invoke this");
}
@Override
- default Optional<GlobalIndexResult> visit(TransformPredicate predicate) {
+ default GlobalIndexResult visit(TransformPredicate predicate) {
throw new UnsupportedOperationException("Should not invoke this");
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
index d04d71aa5e..4dd89eb6b3 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexWriter.java
@@ -27,7 +27,7 @@ import java.util.List;
/** Index writer for global index. */
public interface GlobalIndexWriter {
- void write(Object key);
+ void write(@Nullable Object key);
List<ResultEntry> finish();
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexReaderWrapper.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexReaderWrapper.java
index 416b355c36..8a10911ee9 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexReaderWrapper.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/wrap/FileIndexReaderWrapper.java
@@ -27,7 +27,6 @@ import org.apache.paimon.predicate.FieldRef;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
import java.util.function.Function;
/** A {@link GlobalIndexReader} wrapper for {@link FileIndexReader}. */
@@ -47,68 +46,68 @@ public class FileIndexReaderWrapper implements
GlobalIndexReader {
}
@Override
- public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
- return
Optional.ofNullable(transform.apply(reader.visitIsNotNull(fieldRef)));
+ public GlobalIndexResult visitIsNotNull(FieldRef fieldRef) {
+ return transform.apply(reader.visitIsNotNull(fieldRef));
}
@Override
- public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
- return
Optional.ofNullable(transform.apply(reader.visitIsNull(fieldRef)));
+ public GlobalIndexResult visitIsNull(FieldRef fieldRef) {
+ return transform.apply(reader.visitIsNull(fieldRef));
}
@Override
- public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef,
Object literal) {
- return
Optional.ofNullable(transform.apply(reader.visitStartsWith(fieldRef, literal)));
+ public GlobalIndexResult visitStartsWith(FieldRef fieldRef, Object
literal) {
+ return transform.apply(reader.visitStartsWith(fieldRef, literal));
}
@Override
- public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, Object
literal) {
- return
Optional.ofNullable(transform.apply(reader.visitEndsWith(fieldRef, literal)));
+ public GlobalIndexResult visitEndsWith(FieldRef fieldRef, Object literal) {
+ return transform.apply(reader.visitEndsWith(fieldRef, literal));
}
@Override
- public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, Object
literal) {
- return
Optional.ofNullable(transform.apply(reader.visitContains(fieldRef, literal)));
+ public GlobalIndexResult visitContains(FieldRef fieldRef, Object literal) {
+ return transform.apply(reader.visitContains(fieldRef, literal));
}
@Override
- public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, Object
literal) {
- return
Optional.ofNullable(transform.apply(reader.visitLessThan(fieldRef, literal)));
+ public GlobalIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
+ return transform.apply(reader.visitLessThan(fieldRef, literal));
}
@Override
- public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef fieldRef,
Object literal) {
- return
Optional.ofNullable(transform.apply(reader.visitGreaterOrEqual(fieldRef,
literal)));
+ public GlobalIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object
literal) {
+ return transform.apply(reader.visitGreaterOrEqual(fieldRef, literal));
}
@Override
- public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, Object
literal) {
- return
Optional.ofNullable(transform.apply(reader.visitNotEqual(fieldRef, literal)));
+ public GlobalIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
+ return transform.apply(reader.visitNotEqual(fieldRef, literal));
}
@Override
- public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef,
Object literal) {
- return
Optional.ofNullable(transform.apply(reader.visitLessOrEqual(fieldRef,
literal)));
+ public GlobalIndexResult visitLessOrEqual(FieldRef fieldRef, Object
literal) {
+ return transform.apply(reader.visitLessOrEqual(fieldRef, literal));
}
@Override
- public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, Object
literal) {
- return Optional.ofNullable(transform.apply(reader.visitEqual(fieldRef,
literal)));
+ public GlobalIndexResult visitEqual(FieldRef fieldRef, Object literal) {
+ return transform.apply(reader.visitEqual(fieldRef, literal));
}
@Override
- public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef,
Object literal) {
- return
Optional.ofNullable(transform.apply(reader.visitGreaterThan(fieldRef,
literal)));
+ public GlobalIndexResult visitGreaterThan(FieldRef fieldRef, Object
literal) {
+ return transform.apply(reader.visitGreaterThan(fieldRef, literal));
}
@Override
- public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, List<Object>
literals) {
- return Optional.ofNullable(transform.apply(reader.visitIn(fieldRef,
literals)));
+ public GlobalIndexResult visitIn(FieldRef fieldRef, List<Object> literals)
{
+ return transform.apply(reader.visitIn(fieldRef, literals));
}
@Override
- public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef,
List<Object> literals) {
- return Optional.ofNullable(transform.apply(reader.visitNotIn(fieldRef,
literals)));
+ public GlobalIndexResult visitNotIn(FieldRef fieldRef, List<Object>
literals) {
+ return transform.apply(reader.visitNotIn(fieldRef, literals));
}
@Override
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
index 7d1640c361..2de1bc923d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
@@ -18,10 +18,13 @@
package org.apache.paimon.utils;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
/** Range represents from (inclusive) and to (inclusive). */
-public class Range {
+public class Range implements Serializable {
public final long from;
public final long to;
@@ -49,6 +52,14 @@ public class Range {
return from > other.to;
}
+ public List<Long> toListLong() {
+ List<Long> longs = new ArrayList<>();
+ for (long i = from; i <= to; i++) {
+ longs.add(i);
+ }
+ return longs;
+ }
+
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) {
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
index 6019def954..fe57146e51 100644
---
a/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/bitmapindex/BitmapGlobalIndexTest.java
@@ -93,26 +93,23 @@ public class BitmapGlobalIndexTest {
writer.write(o);
}
});
- assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef,
a).get())
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0, 4));
- assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef,
b).get())
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(2));
- assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
+ assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(1, 3));
- assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef,
Arrays.asList(a, b)).get())
+ assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef,
Arrays.asList(a, b)))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0, 2, 4));
- assert !reader.visitEqual(fieldRef, BinaryString.fromString("c"))
- .get()
- .iterator()
- .hasNext();
+ assert !reader.visitEqual(fieldRef,
BinaryString.fromString("c")).iterator().hasNext();
}
private void testIntType(int version) throws Exception {
@@ -128,24 +125,24 @@ public class BitmapGlobalIndexTest {
writer.write(o);
}
});
- assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef,
0).get())
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 0))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0));
- assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef,
1).get())
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, 1))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(1));
- assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
+ assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(2));
- assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef,
Arrays.asList(0, 1, 2)).get())
+ assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef,
Arrays.asList(0, 1, 2)))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0, 1));
- assert !reader.visitEqual(fieldRef, 2).get().iterator().hasNext();
+ assert !reader.visitEqual(fieldRef, 2).iterator().hasNext();
}
private void testBooleanType(int version) throws Exception {
@@ -161,11 +158,11 @@ public class BitmapGlobalIndexTest {
writer.write(o);
}
});
- assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef,
Boolean.TRUE).get())
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef,
Boolean.TRUE))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0, 2));
- assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
+ assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(4));
@@ -202,13 +199,12 @@ public class BitmapGlobalIndexTest {
long time2 = System.currentTimeMillis();
GlobalIndexResult result =
reader.visitEqual(
- fieldRef, BinaryString.fromString(prefix +
(approxCardinality / 2)))
- .get();
+ fieldRef, BinaryString.fromString(prefix +
(approxCardinality / 2)));
RoaringBitmap32 resultBm = ((BitmapIndexResultWrapper)
result).getBitmapIndexResult().get();
System.out.println("read time: " + (System.currentTimeMillis() -
time2));
assert resultBm.equals(middleBm);
long time3 = System.currentTimeMillis();
- GlobalIndexResult resultNull = reader.visitIsNull(fieldRef).get();
+ GlobalIndexResult resultNull = reader.visitIsNull(fieldRef);
RoaringBitmap32 resultNullBm =
((BitmapIndexResultWrapper)
resultNull).getBitmapIndexResult().get();
System.out.println("read null bitmap time: " +
(System.currentTimeMillis() - time3));
@@ -277,26 +273,23 @@ public class BitmapGlobalIndexTest {
a.pointTo(c.getSegments(), c.getOffset(),
c.getSizeInBytes());
writer.write(null);
});
- assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef,
a).get())
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, a))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0));
- assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef,
b).get())
+ assert ((BitmapIndexResultWrapper) reader.visitEqual(fieldRef, b))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(3));
- assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
+ assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(1, 2, 4, 5));
- assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef,
Arrays.asList(a, b)).get())
+ assert ((BitmapIndexResultWrapper) reader.visitIn(fieldRef,
Arrays.asList(a, b)))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0, 3));
- assert !reader.visitEqual(fieldRef, BinaryString.fromString("c"))
- .get()
- .iterator()
- .hasNext();
+ assert !reader.visitEqual(fieldRef,
BinaryString.fromString("c")).iterator().hasNext();
}
private void testAllNull(int version) throws Exception {
@@ -312,10 +305,10 @@ public class BitmapGlobalIndexTest {
writer.write(o);
}
});
- assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef).get())
+ assert ((BitmapIndexResultWrapper) reader.visitIsNull(fieldRef))
.getBitmapIndexResult()
.get()
.equals(RoaringBitmap32.bitmapOf(0, 1, 2));
- assert !reader.visitIsNotNull(fieldRef).get().iterator().hasNext();
+ assert !reader.visitIsNotNull(fieldRef).iterator().hasNext();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
index 36e22fd144..da827d568f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
@@ -75,21 +75,18 @@ public class GlobalIndexEvaluator
Collection<GlobalIndexReader> readers =
indexReadersCache.computeIfAbsent(fieldId,
readersFunction::apply);
for (GlobalIndexReader fileIndexReader : readers) {
- Optional<GlobalIndexResult> childResult =
+ GlobalIndexResult childResult =
predicate.function().visit(fileIndexReader, fieldRef,
predicate.literals());
// AND Operation
- if (childResult.isPresent()) {
- if (compoundResult.isPresent()) {
- GlobalIndexResult r1 = compoundResult.get();
- GlobalIndexResult r2 = childResult.get();
- compoundResult = Optional.of(r1.and(r2));
- } else {
- compoundResult = childResult;
- }
+ if (compoundResult.isPresent()) {
+ GlobalIndexResult r1 = compoundResult.get();
+ compoundResult = Optional.of(r1.and(childResult));
+ } else {
+ compoundResult = Optional.of(childResult);
}
- if (compoundResult.isPresent() &&
!compoundResult.get().iterator().hasNext()) {
+ if (!compoundResult.get().iterator().hasNext()) {
return compoundResult;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
index cad2c67354..c48a729950 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexFileReadWrite.java
@@ -48,6 +48,10 @@ public class GlobalIndexFileReadWrite implements
GlobalIndexFileReader, GlobalIn
return indexPathFactory.toPath(fileName);
}
+ public long fileSize(String fileName) throws IOException {
+ return fileIO.getFileSize(filePath(fileName));
+ }
+
public OutputStream newOutputStream(String fileName) throws IOException {
return fileIO.newOutputStream(indexPathFactory.toPath(fileName), true);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
index 035420df28..ebe6e2e421 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
@@ -37,6 +37,7 @@ import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkState;
@@ -91,6 +92,10 @@ public class IndexManifestFileHandler {
}
private IndexManifestFileCombiner getIndexManifestFileCombine(String
indexType) {
+ if (!DELETION_VECTORS_INDEX.equals(indexType) &&
!HASH_INDEX.equals(indexType)) {
+ return new GlobalFileNameCombiner();
+ }
+
if (DELETION_VECTORS_INDEX.equals(indexType) &&
BucketMode.BUCKET_UNAWARE == bucketMode) {
return new GlobalCombiner();
} else {
@@ -191,6 +196,36 @@ public class IndexManifestFileHandler {
}
}
+ /** We combine the previous and new index files by file name. */
+ static class GlobalFileNameCombiner implements IndexManifestFileCombiner {
+
+ @Override
+ public List<IndexManifestEntry> combine(
+ List<IndexManifestEntry> prevIndexFiles,
List<IndexManifestEntry> newIndexFiles) {
+ Map<String, IndexManifestEntry> indexEntries = new HashMap<>();
+ for (IndexManifestEntry entry : prevIndexFiles) {
+ indexEntries.put(entry.indexFile().fileName(), entry);
+ }
+
+ // The deleted entry is processed first to avoid overwriting a new
entry.
+ List<IndexManifestEntry> removed =
+ newIndexFiles.stream()
+ .filter(f -> f.kind() == FileKind.DELETE)
+ .collect(Collectors.toList());
+ List<IndexManifestEntry> added =
+ newIndexFiles.stream()
+ .filter(f -> f.kind() == FileKind.ADD)
+ .collect(Collectors.toList());
+ for (IndexManifestEntry entry : removed) {
+ indexEntries.remove(entry.indexFile().fileName());
+ }
+ for (IndexManifestEntry entry : added) {
+ indexEntries.put(entry.indexFile().fileName(), entry);
+ }
+ return new ArrayList<>(indexEntries.values());
+ }
+ }
+
private static BucketIdentifier identifier(IndexManifestEntry
indexManifestEntry) {
return new BucketIdentifier(
indexManifestEntry.partition(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index 6250d5f50f..1d341a502c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -164,6 +164,7 @@ public interface ReadBuilder extends Serializable {
*
* @param indices the row ids to be read
*/
+ // TODO: support List<Range>, List<Long> is wasting of memory
ReadBuilder withRowIds(List<Long> indices);
/** Delete stats in scan plan result. */
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 32909a2caa..99ed1e83a3 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -26,6 +26,7 @@ import org.apache.paimon.spark.procedure.CompactProcedure;
import org.apache.paimon.spark.procedure.CopyFilesProcedure;
import org.apache.paimon.spark.procedure.CreateBranchProcedure;
import org.apache.paimon.spark.procedure.CreateFunctionProcedure;
+import org.apache.paimon.spark.procedure.CreateGlobalIndexProcedure;
import org.apache.paimon.spark.procedure.CreateTagFromTimestampProcedure;
import org.apache.paimon.spark.procedure.CreateTagProcedure;
import org.apache.paimon.spark.procedure.DeleteBranchProcedure;
@@ -92,6 +93,7 @@ public class SparkProcedures {
procedureBuilders.put("delete_tag", DeleteTagProcedure::builder);
procedureBuilders.put("expire_tags", ExpireTagsProcedure::builder);
procedureBuilders.put("create_branch", CreateBranchProcedure::builder);
+ procedureBuilders.put("create_global_index",
CreateGlobalIndexProcedure::builder);
procedureBuilders.put("delete_branch", DeleteBranchProcedure::builder);
procedureBuilders.put("compact", CompactProcedure::builder);
procedureBuilders.put("rescale", RescaleProcedure::builder);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
new file mode 100644
index 0000000000..dbe9f7c7b0
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.GlobalIndexer;
+import org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestEntrySerializer;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.utils.InstantiationUtil;
+import org.apache.paimon.utils.Range;
+
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** This is a class who truly build index file and generate index metas. */
+public abstract class GlobalIndexBuilder {
+
+ protected final GlobalIndexBuilderContext context;
+
+ protected GlobalIndexBuilder(GlobalIndexBuilderContext context) {
+ this.context = context;
+ }
+
+ public List<IndexManifestEntry> build(DataSplit dataSplit) throws
IOException {
+ final GlobalIndexBuilderContext buildContext = this.context;
+ JavaSparkContext javaSparkContext =
+ new JavaSparkContext(buildContext.spark().sparkContext());
+ byte[] dsBytes = InstantiationUtil.serializeObject(dataSplit);
+ IndexManifestEntrySerializer indexManifestEntrySerializer =
+ new IndexManifestEntrySerializer();
+ return javaSparkContext.parallelize(Collections.singletonList(dsBytes))
+ .map(
+ splitBytes -> {
+ DataSplit split =
+ InstantiationUtil.deserializeObject(
+ splitBytes,
GlobalIndexBuilder.class.getClassLoader());
+ ReadBuilder builder =
buildContext.table().newReadBuilder();
+
builder.withRowIds(buildContext.range().toListLong())
+ .withReadType(buildContext.readType());
+ RecordReader<InternalRow> rows =
builder.newRead().createReader(split);
+ List<GlobalIndexWriter.ResultEntry> resultEntries =
+ writePaimonRows(buildContext, rows);
+ return convertToEntry(buildContext, resultEntries);
+ })
+ .flatMap(
+ e ->
+ e.stream()
+ .map(
+ entry -> {
+ try {
+ return
indexManifestEntrySerializer
+
.serializeToBytes(entry);
+ } catch (IOException ex) {
+ throw new
RuntimeException(ex);
+ }
+ })
+ .iterator())
+ .collect().stream()
+ .map(
+ e -> {
+ try {
+ return
indexManifestEntrySerializer.deserializeFromBytes(e);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ private static List<IndexManifestEntry> convertToEntry(
+ GlobalIndexBuilderContext context,
List<GlobalIndexWriter.ResultEntry> entries)
+ throws IOException {
+ List<IndexManifestEntry> results = new ArrayList<>();
+ for (GlobalIndexWriter.ResultEntry entry : entries) {
+ String fileName = entry.fileName();
+ Range range = entry.rowRange().addOffset(context.range().from);
+ GlobalIndexFileReadWrite readWrite =
context.globalIndexFileReadWrite();
+ long fileSize = readWrite.fileSize(fileName);
+ GlobalIndexMeta globalIndexMeta =
+ new GlobalIndexMeta(
+ range.from, range.to, context.indexField().id(),
null, entry.meta());
+ IndexFileMeta indexFileMeta =
+ new IndexFileMeta(
+ BitmapGlobalIndexerFactory.IDENTIFIER,
+ fileName,
+ fileSize,
+ range.to - range.from + 1,
+ globalIndexMeta);
+ results.add(
+ new IndexManifestEntry(
+ FileKind.ADD, context.partitionFromBytes(), 0,
indexFileMeta));
+ }
+ return results;
+ }
+
+ private static List<GlobalIndexWriter.ResultEntry> writePaimonRows(
+ GlobalIndexBuilderContext context, RecordReader<InternalRow> rows)
throws IOException {
+ GlobalIndexer globalIndexer =
+ GlobalIndexer.create(
+ context.indexType(), context.indexField().type(),
context.options());
+ GlobalIndexWriter globalIndexWriter =
+ globalIndexer.createWriter(context.globalIndexFileReadWrite());
+ InternalRow.FieldGetter getter =
+ InternalRow.createFieldGetter(
+ context.indexField().type(),
+
context.readType().getFieldIndex(context.indexField().name()));
+ rows.forEachRemaining(
+ row -> {
+ Object indexO = getter.getFieldOrNull(row);
+ globalIndexWriter.write(indexO);
+ });
+ return globalIndexWriter.finish();
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
new file mode 100644
index 0000000000..1911d9fa4e
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
+import org.apache.paimon.index.IndexPathFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import org.apache.spark.sql.SparkSession;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Context containing all necessary information for building global indexes.
+ *
+ * <p>This class is serializable to support Spark distributed execution. The
partition is stored
+ * both as a transient {@link BinaryRow} and as serialized bytes to ensure
proper serialization
+ * across executor nodes.
+ */
+public class GlobalIndexBuilderContext implements Serializable {
+
+ private final transient SparkSession spark;
+ private final FileStoreTable table;
+ private final BinaryRowSerializer binaryRowSerializer;
+ private final transient BinaryRow partition;
+ private final byte[] partitionBytes;
+ private final RowType readType;
+ private final DataField indexField;
+ private final String indexType;
+ private final Range rowRange;
+ private final Options options;
+
+ public GlobalIndexBuilderContext(
+ SparkSession spark,
+ FileStoreTable table,
+ BinaryRow partition,
+ RowType readType,
+ DataField indexField,
+ String indexType,
+ Range rowRange,
+ Options options) {
+ this.spark = spark;
+ this.table = table;
+ this.partition = partition;
+ this.readType = readType;
+ this.indexField = indexField;
+ this.indexType = indexType;
+ this.rowRange = rowRange;
+ this.options = options;
+
+ this.binaryRowSerializer = new
BinaryRowSerializer(partition.getFieldCount());
+ try {
+ this.partitionBytes =
binaryRowSerializer.serializeToBytes(partition);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public SparkSession spark() {
+ return spark;
+ }
+
+ public BinaryRow partitionFromBytes() {
+ try {
+ return binaryRowSerializer.deserializeFromBytes(partitionBytes);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public BinaryRow partition() {
+ return partition;
+ }
+
+ public FileStoreTable table() {
+ return table;
+ }
+
+ public RowType readType() {
+ return readType;
+ }
+
+ public DataField indexField() {
+ return indexField;
+ }
+
+ public String indexType() {
+ return indexType;
+ }
+
+ public Range range() {
+ return rowRange;
+ }
+
+ public Options options() {
+ return options;
+ }
+
+ public GlobalIndexFileReadWrite globalIndexFileReadWrite() {
+ FileIO fileIO = table.fileIO();
+ IndexPathFactory indexPathFactory =
+
table.store().pathFactory().indexFileFactory(partitionFromBytes(), 0);
+ return new GlobalIndexFileReadWrite(fileIO, indexPathFactory);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderFactory.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderFactory.java
new file mode 100644
index 0000000000..ace48b1120
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex;
+
+/** Global index builder factory on spark. */
+public interface GlobalIndexBuilderFactory {
+
+ String identifier();
+
+ GlobalIndexBuilder create(GlobalIndexBuilderContext context);
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderFactoryUtils.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderFactoryUtils.java
new file mode 100644
index 0000000000..23575e532a
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderFactoryUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+/**
+ * Utility class for loading {@link GlobalIndexBuilderFactory} implementations
via Java's {@link
+ * ServiceLoader} mechanism.
+ *
+ * <p>Factories are loaded once during class initialization and cached for
subsequent lookups.
+ */
+public class GlobalIndexBuilderFactoryUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GlobalIndexBuilderFactoryUtils.class);
+
+ private static final Map<String, GlobalIndexBuilderFactory> FACTORIES =
new HashMap<>();
+
+ static {
+ ServiceLoader<GlobalIndexBuilderFactory> serviceLoader =
+ ServiceLoader.load(GlobalIndexBuilderFactory.class);
+
+ for (GlobalIndexBuilderFactory factory : serviceLoader) {
+ String identifier = factory.identifier();
+ if (FACTORIES.put(identifier, factory) != null) {
+ LOG.warn(
+ "Found multiple GlobalIndexBuilderFactory
implementations for type '{}'. "
+ + "Using the last one loaded.",
+ identifier);
+ }
+ }
+ }
+
+ /**
+ * Loads the global index builder factory for the specified type.
+ *
+ * @param indexType The type of index (e.g., "bitmap")
+ * @return The corresponding factory
+ * @throws IllegalArgumentException If no factory is found for the
specified type
+ */
+ public static GlobalIndexBuilderFactory load(String indexType) {
+ GlobalIndexBuilderFactory factory = FACTORIES.get(indexType);
+ if (factory == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "No GlobalIndexBuilderFactory found for index type
'%s'. "
+ + "Available types: %s",
+ indexType, FACTORIES.keySet()));
+ }
+ return factory;
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/bitmap/BitmapGlobalIndexBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/bitmap/BitmapGlobalIndexBuilder.java
new file mode 100644
index 0000000000..affd6a363a
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/bitmap/BitmapGlobalIndexBuilder.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.bitmap;
+
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+
+/**
+ * Builder for creating bitmap-based global indexes.
+ *
+ * <p>This implementation does not apply any custom transformations to the
input dataset, allowing
+ * the data to be processed as-is for bitmap index creation.
+ */
+public class BitmapGlobalIndexBuilder extends GlobalIndexBuilder {
+
+ protected BitmapGlobalIndexBuilder(GlobalIndexBuilderContext context) {
+ super(context);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/bitmap/BitmapGlobalIndexBuilderFactory.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/bitmap/BitmapGlobalIndexBuilderFactory.java
new file mode 100644
index 0000000000..7881bce916
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/bitmap/BitmapGlobalIndexBuilderFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.bitmap;
+
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory;
+
+/**
+ * Factory for creating bitmap-based global index builders.
+ *
+ * <p>This factory is automatically discovered via Java's ServiceLoader
mechanism.
+ */
+public class BitmapGlobalIndexBuilderFactory implements
GlobalIndexBuilderFactory {
+
+ private static final String IDENTIFIER = "bitmap";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public GlobalIndexBuilder create(GlobalIndexBuilderContext context) {
+ return new BitmapGlobalIndexBuilder(context);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
new file mode 100644
index 0000000000..b375573e3d
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactoryUtils;
+import org.apache.paimon.spark.utils.SparkProcedureUtils;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.ProcedureUtils;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_ROW_COUNT_PER_SHARD;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** Procedure to build global index files via Spark. */
+public class CreateGlobalIndexProcedure extends BaseProcedure {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CreateGlobalIndexProcedure.class);
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.required("index_column",
DataTypes.StringType),
+ ProcedureParameter.required("index_type",
DataTypes.StringType),
+ ProcedureParameter.optional("partitions", StringType),
+ ProcedureParameter.optional("options", DataTypes.StringType)
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true,
Metadata.empty())
+ });
+
+ protected CreateGlobalIndexProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public String description() {
+ return "Create global index files for a given column.";
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ String column = args.getString(1);
+ String indexType = args.getString(2).toLowerCase(Locale.ROOT).trim();
+ String partitions =
+ (args.isNullAt(3) ||
StringUtils.isNullOrWhitespaceOnly(args.getString(3)))
+ ? null
+ : args.getString(3);
+ String optionString = args.isNullAt(4) ? null : args.getString(4);
+
+ String finalWhere = partitions != null ?
SparkProcedureUtils.toWhere(partitions) : null;
+
+ // Early validation: check if the index type is supported
+ GlobalIndexBuilderFactory globalIndexBuilderFactory =
+ GlobalIndexBuilderFactoryUtils.load(indexType);
+
+ LOG.info("Starting to build index for table " + tableIdent + " WHERE:
" + finalWhere);
+
+ return modifyPaimonTable(
+ tableIdent,
+ t -> {
+ try {
+ checkArgument(
+ t instanceof FileStoreTable,
+ "Only FileStoreTable supports global index
creation.");
+ FileStoreTable table = (FileStoreTable) t;
+ checkArgument(
+ table.coreOptions().rowTrackingEnabled(),
+ "Table '%s' must enable
'row-tracking.enabled=true' before creating global index.",
+ tableIdent);
+
+ RowType rowType = table.rowType();
+ checkArgument(
+ rowType.containsField(column),
+ "Column '%s' does not exist in table '%s'.",
+ column,
+ tableIdent);
+ DataSourceV2Relation relation =
createRelation(tableIdent);
+ PartitionPredicate partitionPredicate =
+
SparkProcedureUtils.convertToPartitionPredicate(
+ finalWhere,
+ table.schema().logicalPartitionType(),
+ spark(),
+ relation);
+
+ DataField indexField = rowType.getField(column);
+ RowType projectedRowType =
+
rowType.project(Collections.singletonList(column));
+ RowType readRowType =
SpecialFields.rowTypeWithRowId(projectedRowType);
+
+ HashMap<String, String> parsedOptions = new
HashMap<>();
+ ProcedureUtils.putAllOptions(parsedOptions,
optionString);
+ Options userOptions = Options.fromMap(parsedOptions);
+ Options tableOptions = new Options(table.options());
+ long rowsPerShard =
+ tableOptions
+
.getOptional(GLOBAL_INDEX_ROW_COUNT_PER_SHARD)
+
.orElse(GLOBAL_INDEX_ROW_COUNT_PER_SHARD.defaultValue());
+ checkArgument(
+ rowsPerShard > 0,
+ "Option 'global-index.row-count-per-shard'
must be greater than 0.");
+
+ // Step 1: generate splits for each partition&&shard
+ Map<BinaryRow, Map<Range, DataSplit>> splits =
+ split(table, partitionPredicate, rowsPerShard);
+
+ // Step 2: build index by certain index system
+ List<IndexManifestEntry> indexResults =
+ buildIndex(
+ table,
+ splits,
+ indexType,
+ readRowType,
+ indexField,
+ userOptions,
+ globalIndexBuilderFactory);
+
+ // Step 3: commit index meta to a new snapshot
+ commit(table, indexResults);
+
+ return new InternalRow[] {newInternalRow(true)};
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to create %s index for column
'%s' on table '%s'.",
+ indexType, column, tableIdent),
+ e);
+ }
+ });
+ }
+
+ private List<IndexManifestEntry> buildIndex(
+ FileStoreTable table,
+ Map<BinaryRow, Map<Range, DataSplit>> preparedDS,
+ String indexType,
+ RowType readType,
+ DataField indexField,
+ Options options,
+ GlobalIndexBuilderFactory globalIndexBuilderFactory) {
+ ExecutorService executor = Executors.newCachedThreadPool();
+ List<Future<List<IndexManifestEntry>>> futures = new ArrayList<>();
+ try {
+ for (Map.Entry<BinaryRow, Map<Range, DataSplit>> entry :
preparedDS.entrySet()) {
+ BinaryRow partition = entry.getKey();
+ Map<Range, DataSplit> partitions = entry.getValue();
+
+ for (Map.Entry<Range, DataSplit> partitionEntry :
partitions.entrySet()) {
+ Range startOffset = partitionEntry.getKey();
+ DataSplit partitionDS = partitionEntry.getValue();
+ GlobalIndexBuilderContext builderContext =
+ new GlobalIndexBuilderContext(
+ spark(),
+ table,
+ partition,
+ readType,
+ indexField,
+ indexType,
+ startOffset,
+ options);
+
+ futures.add(
+ executor.submit(
+ () -> {
+ GlobalIndexBuilder globalIndexBuilder =
+
globalIndexBuilderFactory.create(builderContext);
+ return
globalIndexBuilder.build(partitionDS);
+ }));
+ }
+ }
+
+ List<IndexManifestEntry> entries = new ArrayList<>();
+ for (Future<List<IndexManifestEntry>> future : futures) {
+ try {
+ entries.addAll(future.get());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Index creation was
interrupted", e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Build index failed",
e.getCause());
+ }
+ }
+
+ return entries;
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ private void commit(FileStoreTable table, List<IndexManifestEntry>
indexResults)
+ throws Exception {
+ Map<BinaryRow, List<IndexFileMeta>> partitionResults =
+ indexResults.stream()
+ .map(s -> Pair.of(s.partition(), s.indexFile()))
+ .collect(
+ Collectors.groupingBy(
+ Pair::getKey,
+ Collectors.mapping(Pair::getValue,
Collectors.toList())));
+
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ for (Map.Entry<BinaryRow, List<IndexFileMeta>> entry :
partitionResults.entrySet()) {
+ BinaryRow partition = entry.getKey();
+ List<IndexFileMeta> indexFiles = entry.getValue();
+ commitMessages.add(
+ new CommitMessageImpl(
+ partition,
+ 0,
+ null,
+ DataIncrement.indexIncrement(indexFiles),
+ CompactIncrement.emptyIncrement()));
+ }
+
+ try (TableCommitImpl commit = table.newCommit("global-index-create-" +
UUID.randomUUID())) {
+ commit.commit(commitMessages);
+ }
+ }
+
+ protected Map<BinaryRow, Map<Range, DataSplit>> split(
+ FileStoreTable table, PartitionPredicate partitions, long
rowsPerShard) {
+ FileStorePathFactory pathFactory = table.store().pathFactory();
+ // Get all manifest entries from the table scan
+ List<ManifestEntry> entries =
+
table.store().newScan().withPartitionFilter(partitions).plan().files();
+
+ // Group manifest entries by partition
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition =
+
entries.stream().collect(Collectors.groupingBy(ManifestEntry::partition));
+
+ return groupFilesIntoShardsByPartition(
+ entriesByPartition, rowsPerShard, pathFactory::bucketPath);
+ }
+
+ /**
+ * Groups files into shards by partition. This method is extracted from
split() to make it more
+ * testable.
+ *
+ * @param entriesByPartition manifest entries grouped by partition
+ * @param rowsPerShard number of rows per shard
+ * @param pathFactory path factory for creating bucket paths
+ * @return map of partition to shard splits
+ */
+ public static Map<BinaryRow, Map<Range, DataSplit>>
groupFilesIntoShardsByPartition(
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition,
+ long rowsPerShard,
+ BiFunction<BinaryRow, Integer, Path> pathFactory) {
+ Map<BinaryRow, Map<Range, DataSplit>> result = new HashMap<>();
+
+ for (Map.Entry<BinaryRow, List<ManifestEntry>> partitionEntry :
+ entriesByPartition.entrySet()) {
+ BinaryRow partition = partitionEntry.getKey();
+ List<ManifestEntry> partitionEntries = partitionEntry.getValue();
+
+ // Group files into shards - a file may belong to multiple shards
+ Map<Long, List<DataFileMeta>> filesByShard = new HashMap<>();
+
+ for (ManifestEntry entry : partitionEntries) {
+ DataFileMeta file = entry.file();
+ Long firstRowId = file.firstRowId();
+ if (firstRowId == null) {
+ continue; // Skip files without row tracking
+ }
+
+ // Calculate the row ID range this file covers
+ long fileStartRowId = firstRowId;
+ long fileEndRowId = firstRowId + file.rowCount() - 1;
+
+ // Calculate which shards this file overlaps with
+ long startShardId = fileStartRowId / rowsPerShard;
+ long endShardId = fileEndRowId / rowsPerShard;
+
+ // Add this file to all shards it overlaps with
+ for (long shardId = startShardId; shardId <= endShardId;
shardId++) {
+ long shardStartRowId = shardId * rowsPerShard;
+ filesByShard.computeIfAbsent(shardStartRowId, k -> new
ArrayList<>()).add(file);
+ }
+ }
+
+ // Create DataSplit for each shard with exact ranges
+ Map<Range, DataSplit> shardSplits = new HashMap<>();
+
+ for (Map.Entry<Long, List<DataFileMeta>> shardEntry :
filesByShard.entrySet()) {
+ long startRowId = shardEntry.getKey();
+ List<DataFileMeta> shardFiles = shardEntry.getValue();
+
+ if (shardFiles.isEmpty()) {
+ continue;
+ }
+
+ // Use exact shard boundaries: [n*rowsPerShard,
(n+1)*rowsPerShard - 1]
+ long minRowId = startRowId;
+ long maxRowId = startRowId + rowsPerShard - 1;
+ Range range = new Range(minRowId, maxRowId);
+
+ // Create DataSplit for this shard
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withPartition(partition)
+ .withBucket(0)
+ .withDataFiles(shardFiles)
+ .withBucketPath(pathFactory.apply(partition,
0).toString())
+ .rawConvertible(false)
+ .build();
+
+ shardSplits.put(range, dataSplit);
+ }
+
+ if (!shardSplits.isEmpty()) {
+ result.put(partition, shardSplits);
+ }
+ }
+
+ return result;
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder<CreateGlobalIndexProcedure>() {
+ @Override
+ public CreateGlobalIndexProcedure doBuild() {
+ return new CreateGlobalIndexProcedure(tableCatalog());
+ }
+ };
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
new file mode 100644
index 0000000000..704354af89
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.spark.globalindex.bitmap.BitmapGlobalIndexBuilderFactory
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
new file mode 100644
index 0000000000..f455f017a3
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.PojoDataFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.PojoManifestEntry;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CreateGlobalIndexProcedure}. */
+public class CreateGlobalIndexProcedureTest {
+
+ private final BiFunction<BinaryRow, Integer, Path> pathFactory =
+ (a, b) -> new Path(UUID.randomUUID().toString());
+
+ @Test
+ void testGroupFilesIntoShardsByPartitionSingleFileInSingleShard() {
+ // Create a partition
+ BinaryRow partition = createPartition(0);
+
+ // Create a single file that fits entirely in one shard (rows 0-99,
shard size 1000)
+ DataFileMeta file = createDataFileMeta(0L, 100L);
+ ManifestEntry entry = createManifestEntry(partition, file);
+
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new
HashMap<>();
+ entriesByPartition.put(partition, Collections.singletonList(entry));
+
+ // Execute
+ Map<BinaryRow, Map<Range, DataSplit>> result =
+ CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+ entriesByPartition, 1000L, pathFactory);
+
+ // Verify
+ assertThat(result).hasSize(1);
+ assertThat(result).containsKey(partition);
+
+ Map<Range, DataSplit> shardSplits = result.get(partition);
+ assertThat(shardSplits).hasSize(1);
+
+ // Should be in shard [0, 999]
+ Range expectedRange = new Range(0L, 999L);
+ assertThat(shardSplits).containsKey(expectedRange);
+
+ DataSplit split = shardSplits.get(expectedRange);
+ assertThat(split.dataFiles()).hasSize(1);
+ assertThat(split.dataFiles().get(0)).isEqualTo(file);
+ }
+
+ @Test
+ void testGroupFilesIntoShardsByPartitionFileSpanningMultipleShards() {
+ // Create a partition
+ BinaryRow partition = createPartition(0);
+
+ // Create a file that spans 3 shards (rows 500-2500, shard size 1000)
+ // This should be in shards [0,999], [1000,1999], and [2000,2999]
+ DataFileMeta file = createDataFileMeta(500L, 2001L);
+ ManifestEntry entry = createManifestEntry(partition, file);
+
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new
HashMap<>();
+ entriesByPartition.put(partition, Collections.singletonList(entry));
+
+ // Execute
+ Map<BinaryRow, Map<Range, DataSplit>> result =
+ CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+ entriesByPartition, 1000L, pathFactory);
+
+ // Verify
+ assertThat(result).hasSize(1);
+ Map<Range, DataSplit> shardSplits = result.get(partition);
+ assertThat(shardSplits).hasSize(3);
+
+ // Verify all three shards contain the file
+ Range shard0 = new Range(0L, 999L);
+ Range shard1 = new Range(1000L, 1999L);
+ Range shard2 = new Range(2000L, 2999L);
+
+ assertThat(shardSplits).containsKeys(shard0, shard1, shard2);
+ assertThat(shardSplits.get(shard0).dataFiles()).contains(file);
+ assertThat(shardSplits.get(shard1).dataFiles()).contains(file);
+ assertThat(shardSplits.get(shard2).dataFiles()).contains(file);
+ }
+
+ @Test
+ void testGroupFilesIntoShardsByPartitionMultipleFilesInSameShard() {
+ // Create a partition
+ BinaryRow partition = createPartition(0);
+
+ // Create multiple files in the same shard
+ DataFileMeta file1 = createDataFileMeta(0L, 100L);
+ DataFileMeta file2 = createDataFileMeta(100L, 100L);
+ DataFileMeta file3 = createDataFileMeta(200L, 100L);
+
+ List<ManifestEntry> entries =
+ Arrays.asList(
+ createManifestEntry(partition, file1),
+ createManifestEntry(partition, file2),
+ createManifestEntry(partition, file3));
+
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new
HashMap<>();
+ entriesByPartition.put(partition, entries);
+
+ // Execute
+ Map<BinaryRow, Map<Range, DataSplit>> result =
+ CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+ entriesByPartition, 1000L, pathFactory);
+
+ // Verify
+ assertThat(result).hasSize(1);
+ Map<Range, DataSplit> shardSplits = result.get(partition);
+ assertThat(shardSplits).hasSize(1);
+
+ Range expectedRange = new Range(0L, 999L);
+ DataSplit split = shardSplits.get(expectedRange);
+ assertThat(split.dataFiles()).hasSize(3);
+ assertThat(split.dataFiles()).containsExactlyInAnyOrder(file1, file2,
file3);
+ }
+
+ @Test
+ void testGroupFilesIntoShardsByPartitionMultipleFilesInDifferentShards() {
+ // Create a partition
+ BinaryRow partition = createPartition(0);
+
+ // Create files in different shards
+ DataFileMeta file1 = createDataFileMeta(0L, 100L); // Shard [0, 999]
+ DataFileMeta file2 = createDataFileMeta(1000L, 100L); // Shard [1000,
1999]
+ DataFileMeta file3 = createDataFileMeta(2000L, 100L); // Shard [2000,
2999]
+
+ List<ManifestEntry> entries =
+ Arrays.asList(
+ createManifestEntry(partition, file1),
+ createManifestEntry(partition, file2),
+ createManifestEntry(partition, file3));
+
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new
HashMap<>();
+ entriesByPartition.put(partition, entries);
+
+ // Execute
+ Map<BinaryRow, Map<Range, DataSplit>> result =
+ CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+ entriesByPartition, 1000L, pathFactory);
+
+ // Verify
+ assertThat(result).hasSize(1);
+ Map<Range, DataSplit> shardSplits = result.get(partition);
+ assertThat(shardSplits).hasSize(3);
+
+ // Verify each shard has the correct file
+ Range shard0 = new Range(0L, 999L);
+ Range shard1 = new Range(1000L, 1999L);
+ Range shard2 = new Range(2000L, 2999L);
+
+ assertThat(shardSplits.get(shard0).dataFiles()).containsExactly(file1);
+ assertThat(shardSplits.get(shard1).dataFiles()).containsExactly(file2);
+ assertThat(shardSplits.get(shard2).dataFiles()).containsExactly(file3);
+ }
+
+ @Test
+ void testGroupFilesIntoShardsByPartitionMultiplePartitions() {
+ // Create two partitions
+ BinaryRow partition1 = createPartition(0);
+ BinaryRow partition2 = createPartition(1);
+
+ // Create files for each partition
+ DataFileMeta file1 = createDataFileMeta(0L, 1050L);
+ DataFileMeta file2 = createDataFileMeta(1050L, 1000L);
+
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new
HashMap<>();
+ entriesByPartition.put(
+ partition1,
Collections.singletonList(createManifestEntry(partition1, file1)));
+ entriesByPartition.put(
+ partition2,
Collections.singletonList(createManifestEntry(partition2, file2)));
+
+ // Execute
+ Map<BinaryRow, Map<Range, DataSplit>> result =
+ CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+ entriesByPartition, 100L, pathFactory);
+
+ // Verify
+ assertThat(result).hasSize(2);
+ assertThat(result).containsKeys(partition1, partition2);
+
+ // Verify partition1
+ Map<Range, DataSplit> shardSplits1 = result.get(partition1);
+ assertThat(shardSplits1).hasSize(11);
+ assertThat(shardSplits1).containsKey(new Range(1000, 1099));
+ assertThat(shardSplits1.get(new Range(1000L,
1099L)).dataFiles()).containsExactly(file1);
+
+ // Verify partition2
+ Map<Range, DataSplit> shardSplits2 = result.get(partition2);
+ assertThat(shardSplits2).hasSize(11);
+ assertThat(shardSplits1).containsKey(new Range(1000, 1099));
+ assertThat(shardSplits2.get(new Range(1000L,
1099L)).dataFiles()).containsExactly(file2);
+ }
+
+ @Test
+ void testGroupFilesIntoShardsByPartitionExactShardBoundaries() {
+ // Create a partition
+ BinaryRow partition = createPartition(0);
+
+ // Create a file that ends exactly at shard boundary (rows 0-999,
shard size 1000)
+ DataFileMeta file = createDataFileMeta(0L, 1000L);
+ ManifestEntry entry = createManifestEntry(partition, file);
+
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new
HashMap<>();
+ entriesByPartition.put(partition, Collections.singletonList(entry));
+
+ // Execute
+ Map<BinaryRow, Map<Range, DataSplit>> result =
+ CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+ entriesByPartition, 1000L, pathFactory);
+
+ // Verify - file ending at row 999 should be in shard [0,999] only
+ // File covers rows [0, 999]
+ assertThat(result).hasSize(1);
+ Map<Range, DataSplit> shardSplits = result.get(partition);
+ assertThat(shardSplits).hasSize(1);
+ assertThat(shardSplits).containsKey(new Range(0L, 999L));
+ }
+
+ @Test
+ void testGroupFilesIntoShardsByPartitionSmallShardSize() {
+ // Create a partition
+ BinaryRow partition = createPartition(0);
+
+ // Create a file with small shard size (rows 0-24, shard size 10)
+ DataFileMeta file = createDataFileMeta(0L, 25L);
+ ManifestEntry entry = createManifestEntry(partition, file);
+
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new
HashMap<>();
+ entriesByPartition.put(partition, Collections.singletonList(entry));
+
+ // Execute with shard size of 10
+ Map<BinaryRow, Map<Range, DataSplit>> result =
+ CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+ entriesByPartition, 10L, pathFactory);
+
+ // Verify - file should span 3 shards: [0,9], [10,19], [20,29]
+ assertThat(result).hasSize(1);
+ Map<Range, DataSplit> shardSplits = result.get(partition);
+ assertThat(shardSplits).hasSize(3);
+
+ assertThat(shardSplits)
+ .containsKeys(new Range(0L, 9L), new Range(10L, 19L), new
Range(20L, 29L));
+ }
+
+ private BinaryRow createPartition(int i) {
+ BinaryRow binaryRow = new BinaryRow(1);
+ BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
+ binaryRowWriter.writeInt(0, i);
+ binaryRowWriter.complete();
+ return binaryRow;
+ }
+
+ private DataFileMeta createDataFileMeta(Long firstRowId, Long rowCount) {
+ return new PojoDataFileMeta(
+ "test-file-" + UUID.randomUUID(),
+ 1024L,
+ rowCount,
+ BinaryRow.EMPTY_ROW,
+ BinaryRow.EMPTY_ROW,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ 0L,
+ 0L,
+ 0L,
+ 0,
+ Collections.emptyList(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ firstRowId,
+ null);
+ }
+
+ private ManifestEntry createManifestEntry(BinaryRow partition,
DataFileMeta file) {
+ return new PojoManifestEntry(
+ FileKind.ADD,
+ partition,
+ 0, // bucket
+ 1, // totalBuckets
+ file);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
new file mode 100644
index 0000000000..11eeeab65a
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.streaming.StreamTest
+
+import scala.collection.JavaConverters._
+
+class CreateGlobalIndexProcedureTest extends PaimonSparkTestBase with
StreamTest {
+
+ test("create bitmap global index") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ |""".stripMargin)
+
+ val values =
+ (0 until 100000).map(i => s"($i, 'name_$i')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ val output =
+ spark
+ .sql("CALL sys.create_global_index(table => 'test.T', index_column
=> 'name', index_type => 'bitmap')")
+ .collect()
+ .head
+
+ assert(output.getBoolean(0))
+
+ val table = loadTable("T")
+ val bitmapEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == "bitmap")
+ table.newIndexScanBuilder().shardList()
+ assert(bitmapEntries.nonEmpty)
+ val totalRowCount = bitmapEntries.map(_.indexFile().rowCount()).sum
+ assert(totalRowCount == 100000L)
+ }
+ }
+}