This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 95f1972a5f [spark] Create global index procedure should divide range 
more precisely (#6763)
95f1972a5f is described below

commit 95f1972a5f8b56a05e95296f0a683cb16955ab8d
Author: YeJunHao <[email protected]>
AuthorDate: Fri Dec 12 13:53:15 2025 +0800

    [spark] Create global index procedure should divide range more precisely 
(#6763)
---
 .../procedure/CreateGlobalIndexProcedure.java      |  99 +++++++++++---
 .../procedure/CreateGlobalIndexProcedureTest.java  | 143 ++++++++++++++++++---
 .../procedure/CreateGlobalIndexProcedureTest.scala |  75 +++++++++++
 3 files changed, 278 insertions(+), 39 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
index 2a28cdf323..d20dc22ff8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
@@ -60,6 +60,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -337,30 +338,58 @@ public class CreateGlobalIndexProcedure extends 
BaseProcedure {
             // Create DataSplit for each shard with exact ranges
             List<IndexedSplit> shardSplits = new ArrayList<>();
             for (Map.Entry<Long, List<DataFileMeta>> shardEntry : 
filesByShard.entrySet()) {
-                long startRowId = shardEntry.getKey();
+                long shardStart = shardEntry.getKey();
+                long shardEnd = shardStart + rowsPerShard - 1;
                 List<DataFileMeta> shardFiles = shardEntry.getValue();
 
                 if (shardFiles.isEmpty()) {
                     continue;
                 }
 
-                // Use exact shard boundaries: [n*rowsPerShard, 
(n+1)*rowsPerShard - 1]
-                long minRowId = startRowId;
-                long maxRowId = startRowId + rowsPerShard - 1;
-                Range range = new Range(minRowId, maxRowId);
-
-                // Create DataSplit for this shard
-                DataSplit dataSplit =
-                        DataSplit.builder()
-                                .withPartition(partition)
-                                .withBucket(0)
-                                .withDataFiles(shardFiles)
-                                .withBucketPath(pathFactory.apply(partition, 
0).toString())
-                                .rawConvertible(false)
-                                .build();
-
-                shardSplits.add(
-                        new IndexedSplit(dataSplit, 
Collections.singletonList(range), null));
+                // Sort files by firstRowId to ensure sequential order
+                
shardFiles.sort(Comparator.comparingLong(DataFileMeta::nonNullFirstRowId));
+
+                // Group contiguous files and create separate DataSplits for 
each group
+                List<DataFileMeta> currentGroup = new ArrayList<>();
+                long currentGroupEnd = -1;
+
+                for (DataFileMeta file : shardFiles) {
+                    long fileStart = file.nonNullFirstRowId();
+                    long fileEnd = fileStart + file.rowCount() - 1;
+
+                    if (currentGroup.isEmpty()) {
+                        // Start a new group
+                        currentGroup.add(file);
+                        currentGroupEnd = fileEnd;
+                    } else if (fileStart <= currentGroupEnd + 1) {
+                        // File is contiguous with current group (adjacent or 
overlapping)
+                        currentGroup.add(file);
+                        currentGroupEnd = Math.max(currentGroupEnd, fileEnd);
+                    } else {
+                        // Gap detected, finalize current group and start a 
new one
+                        createDataSplitForGroup(
+                                currentGroup,
+                                shardStart,
+                                shardEnd,
+                                partition,
+                                pathFactory,
+                                shardSplits);
+                        currentGroup = new ArrayList<>();
+                        currentGroup.add(file);
+                        currentGroupEnd = fileEnd;
+                    }
+                }
+
+                // Don't forget to process the last group
+                if (!currentGroup.isEmpty()) {
+                    createDataSplitForGroup(
+                            currentGroup,
+                            shardStart,
+                            shardEnd,
+                            partition,
+                            pathFactory,
+                            shardSplits);
+                }
             }
 
             if (!shardSplits.isEmpty()) {
@@ -371,6 +400,40 @@ public class CreateGlobalIndexProcedure extends 
BaseProcedure {
         return result;
     }
 
+    private static void createDataSplitForGroup(
+            List<DataFileMeta> files,
+            long shardStart,
+            long shardEnd,
+            BinaryRow partition,
+            BiFunction<BinaryRow, Integer, Path> pathFactory,
+            List<IndexedSplit> shardSplits) {
+        // Calculate the actual row range covered by the files
+        long groupMinRowId = files.get(0).nonNullFirstRowId();
+        long groupMaxRowId =
+                files.stream()
+                        .mapToLong(f -> f.nonNullFirstRowId() + f.rowCount() - 
1)
+                        .max()
+                        .getAsLong();
+
+        // Clamp to shard boundaries
+        // Range.from >= shardStart, Range.to <= shardEnd
+        long rangeFrom = Math.max(groupMinRowId, shardStart);
+        long rangeTo = Math.min(groupMaxRowId, shardEnd);
+
+        Range range = new Range(rangeFrom, rangeTo);
+
+        DataSplit dataSplit =
+                DataSplit.builder()
+                        .withPartition(partition)
+                        .withBucket(0)
+                        .withDataFiles(files)
+                        .withBucketPath(pathFactory.apply(partition, 
0).toString())
+                        .rawConvertible(false)
+                        .build();
+
+        shardSplits.add(new IndexedSplit(dataSplit, 
Collections.singletonList(range), null));
+    }
+
     public static ProcedureBuilder builder() {
         return new BaseProcedure.Builder<CreateGlobalIndexProcedure>() {
             @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
index a525901753..8fdc3bebe8 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
@@ -75,7 +75,7 @@ public class CreateGlobalIndexProcedureTest {
         assertThat(shardSplits).hasSize(1);
 
         // Should be in shard [0, 999]
-        Range expectedRange = new Range(0L, 999L);
+        Range expectedRange = new Range(0L, 99L);
         
assertThat(shardSplits.get(0).rowRanges()).containsExactly(expectedRange);
 
         DataSplit split = shardSplits.get(0).dataSplit();
@@ -89,7 +89,7 @@ public class CreateGlobalIndexProcedureTest {
         BinaryRow partition = createPartition(0);
 
         // Create a file that spans 3 shards (rows 500-2500, shard size 1000)
-        // This should be in shards [0,999], [1000,1999], and [2000,2999]
+        // File covers [500, 2500]
         DataFileMeta file = createDataFileMeta(500L, 2001L);
         ManifestEntry entry = createManifestEntry(partition, file);
 
@@ -106,10 +106,13 @@ public class CreateGlobalIndexProcedureTest {
         List<IndexedSplit> shardSplits = result.get(partition);
         assertThat(shardSplits).hasSize(3);
 
-        // Verify all three shards contain the file
-        Range shard0 = new Range(0L, 999L);
+        // Verify all three shards contain the file with ranges clamped to 
actual coverage
+        // Shard [0, 999]: file overlaps [500, 999]
+        Range shard0 = new Range(500L, 999L);
+        // Shard [1000, 1999]: file overlaps [1000, 1999]
         Range shard1 = new Range(1000L, 1999L);
-        Range shard2 = new Range(2000L, 2999L);
+        // Shard [2000, 2999]: file overlaps [2000, 2500]
+        Range shard2 = new Range(2000L, 2500L);
 
         assertThat(
                         shardSplits.stream()
@@ -126,16 +129,18 @@ public class CreateGlobalIndexProcedureTest {
         // Create a partition
         BinaryRow partition = createPartition(0);
 
-        // Create multiple files in the same shard
+        // Create multiple contiguous files in the same shard
+        // file1: [0, 99], file2: [100, 199], file3: [200, 299]
         DataFileMeta file1 = createDataFileMeta(0L, 100L);
         DataFileMeta file2 = createDataFileMeta(100L, 100L);
         DataFileMeta file3 = createDataFileMeta(200L, 100L);
 
+        // Add entries in non-sorted order to verify sorting
         List<ManifestEntry> entries =
                 Arrays.asList(
+                        createManifestEntry(partition, file3),
                         createManifestEntry(partition, file1),
-                        createManifestEntry(partition, file2),
-                        createManifestEntry(partition, file3));
+                        createManifestEntry(partition, file2));
 
         Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new 
HashMap<>();
         entriesByPartition.put(partition, entries);
@@ -150,10 +155,11 @@ public class CreateGlobalIndexProcedureTest {
         List<IndexedSplit> shardSplits = result.get(partition);
         assertThat(shardSplits).hasSize(1);
 
-        Range expectedRange = new Range(0L, 999L);
+        Range expectedRange = new Range(0L, 299L);
         DataSplit split = shardSplits.get(0).dataSplit();
         assertThat(split.dataFiles()).hasSize(3);
-        assertThat(split.dataFiles()).containsExactlyInAnyOrder(file1, file2, 
file3);
+        // Files should be sorted by firstRowId
+        assertThat(split.dataFiles()).containsExactly(file1, file2, file3);
     }
 
     @Test
@@ -162,9 +168,10 @@ public class CreateGlobalIndexProcedureTest {
         BinaryRow partition = createPartition(0);
 
         // Create files in different shards
-        DataFileMeta file1 = createDataFileMeta(0L, 100L); // Shard [0, 999]
-        DataFileMeta file2 = createDataFileMeta(1000L, 100L); // Shard [1000, 
1999]
-        DataFileMeta file3 = createDataFileMeta(2000L, 100L); // Shard [2000, 
2999]
+        // file1: [0, 99], file2: [1000, 1099], file3: [2000, 2099]
+        DataFileMeta file1 = createDataFileMeta(0L, 100L);
+        DataFileMeta file2 = createDataFileMeta(1000L, 100L);
+        DataFileMeta file3 = createDataFileMeta(2000L, 100L);
 
         List<ManifestEntry> entries =
                 Arrays.asList(
@@ -185,10 +192,10 @@ public class CreateGlobalIndexProcedureTest {
         List<IndexedSplit> shardSplits = result.get(partition);
         assertThat(shardSplits).hasSize(3);
 
-        // Verify each shard has the correct file
-        Range shard0 = new Range(0L, 999L);
-        Range shard1 = new Range(1000L, 1999L);
-        Range shard2 = new Range(2000L, 2999L);
+        // Verify each shard has the correct file with range clamped to actual 
coverage
+        Range shard0 = new Range(0L, 99L);
+        Range shard1 = new Range(1000L, 1099L);
+        Range shard2 = new Range(2000L, 2099L);
 
         Map<Range, DataSplit> shardToSplit =
                 shardSplits.stream()
@@ -207,7 +214,9 @@ public class CreateGlobalIndexProcedureTest {
         BinaryRow partition2 = createPartition(1);
 
         // Create files for each partition
+        // file1: firstRowId=0, covers [0, 1049], spans 11 shards with size 100
         DataFileMeta file1 = createDataFileMeta(0L, 1050L);
+        // file2: firstRowId=1050, covers [1050, 2049], spans 11 shards with 
size 100
         DataFileMeta file2 = createDataFileMeta(1050L, 1000L);
 
         Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new 
HashMap<>();
@@ -231,7 +240,7 @@ public class CreateGlobalIndexProcedureTest {
         assertThat(shardSplits1).hasSize(11);
         IndexedSplit split =
                 shardSplits1.stream()
-                        .filter(f -> f.rowRanges().contains(new Range(1000, 
1099)))
+                        .filter(f -> f.rowRanges().contains(new Range(1000, 
1049)))
                         .findFirst()
                         .get();
         assertThat(split.dataSplit().dataFiles()).containsExactly(file1);
@@ -241,7 +250,7 @@ public class CreateGlobalIndexProcedureTest {
         assertThat(shardSplits2).hasSize(11);
         split =
                 shardSplits2.stream()
-                        .filter(f -> f.rowRanges().contains(new Range(1000, 
1099)))
+                        .filter(f -> f.rowRanges().contains(new Range(1050, 
1099)))
                         .findFirst()
                         .get();
         assertThat(split.dataSplit().dataFiles()).containsExactly(file2);
@@ -289,14 +298,106 @@ public class CreateGlobalIndexProcedureTest {
                 CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
                         entriesByPartition, 10L, pathFactory);
 
-        // Verify - file should span 3 shards: [0,9], [10,19], [20,29]
+        // Verify - file [0, 24] spans 3 shards with ranges clamped:
+        // Shard [0, 9]: Range [0, 9]
+        // Shard [10, 19]: Range [10, 19]
+        // Shard [20, 29]: Range [20, 24]
         assertThat(result).hasSize(1);
         List<IndexedSplit> shardSplits = result.get(partition);
         assertThat(shardSplits).hasSize(3);
 
         assertThat(shardSplits.stream().flatMap(s -> s.rowRanges().stream()))
                 .containsExactlyInAnyOrder(
-                        new Range(0L, 9L), new Range(10L, 19L), new Range(20L, 
29L));
+                        new Range(0L, 9L), new Range(10L, 19L), new Range(20L, 
24L));
+    }
+
+    @Test
+    void testGroupFilesIntoShardsByPartitionNonContiguousFiles() {
+        // Create a partition
+        BinaryRow partition = createPartition(0);
+
+        // Create non-contiguous files within the same shard
+        // file1: [100, 199], file2: [300, 399] - gap between them
+        DataFileMeta file1 = createDataFileMeta(100L, 100L);
+        DataFileMeta file2 = createDataFileMeta(300L, 100L);
+
+        List<ManifestEntry> entries =
+                Arrays.asList(
+                        createManifestEntry(partition, file1),
+                        createManifestEntry(partition, file2));
+
+        Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new 
HashMap<>();
+        entriesByPartition.put(partition, entries);
+
+        // Execute
+        Map<BinaryRow, List<IndexedSplit>> result =
+                CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+                        entriesByPartition, 1000L, pathFactory);
+
+        // Verify - should create 2 separate DataSplits due to gap
+        assertThat(result).hasSize(1);
+        Map<Range, DataSplit> shardSplits = new HashMap<>();
+        for (IndexedSplit split : result.get(partition)) {
+            shardSplits.put(split.rowRanges().get(0), split.dataSplit());
+        }
+        assertThat(shardSplits).hasSize(2);
+
+        // First group: file1 with Range [100, 199]
+        Range range1 = new Range(100L, 199L);
+        assertThat(shardSplits).containsKey(range1);
+        assertThat(shardSplits.get(range1).dataFiles()).containsExactly(file1);
+
+        // Second group: file2 with Range [300, 399]
+        Range range2 = new Range(300L, 399L);
+        assertThat(shardSplits).containsKey(range2);
+        assertThat(shardSplits.get(range2).dataFiles()).containsExactly(file2);
+    }
+
+    @Test
+    void testGroupFilesIntoShardsByPartitionMixedContiguousAndNonContiguous() {
+        // Create a partition
+        BinaryRow partition = createPartition(0);
+
+        // Create a mix of contiguous and non-contiguous files
+        // Group 1: file1 [0, 99], file2 [100, 199] - contiguous
+        // Gap
+        // Group 2: file3 [500, 599]
+        DataFileMeta file1 = createDataFileMeta(0L, 100L);
+        DataFileMeta file2 = createDataFileMeta(100L, 100L);
+        DataFileMeta file3 = createDataFileMeta(500L, 100L);
+
+        // Add in non-sorted order
+        List<ManifestEntry> entries =
+                Arrays.asList(
+                        createManifestEntry(partition, file3),
+                        createManifestEntry(partition, file1),
+                        createManifestEntry(partition, file2));
+
+        Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new 
HashMap<>();
+        entriesByPartition.put(partition, entries);
+
+        // Execute
+        Map<BinaryRow, List<IndexedSplit>> result =
+                CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+                        entriesByPartition, 1000L, pathFactory);
+
+        // Verify - should create 2 DataSplits
+        assertThat(result).hasSize(1);
+        Map<Range, DataSplit> shardSplits = new HashMap<>();
+        for (IndexedSplit split : result.get(partition)) {
+            shardSplits.put(split.rowRanges().get(0), split.dataSplit());
+        }
+        assertThat(shardSplits).hasSize(2);
+
+        // First group: file1 + file2 (contiguous), Range [0, 199]
+        Range range1 = new Range(0L, 199L);
+        assertThat(shardSplits).containsKey(range1);
+        assertThat(shardSplits.get(range1).dataFiles()).containsExactly(file1, 
file2);
+
+        // Second group: file3, Range [500, 599]
+        Range range2 = new Range(500L, 599L);
+        assertThat(shardSplits).containsKey(range2);
+        assertThat(shardSplits.get(range2).dataFiles()).containsExactly(file3);
     }
 
     private BinaryRow createPartition(int i) {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
index eba8d51625..e4ecff618a 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
@@ -19,10 +19,12 @@
 package org.apache.paimon.spark.procedure
 
 import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.utils.Range
 
 import org.apache.spark.sql.streaming.StreamTest
 
 import scala.collection.JavaConverters._
+import scala.collection.immutable
 
 class CreateGlobalIndexProcedureTest extends PaimonSparkTestBase with 
StreamTest {
 
@@ -62,4 +64,77 @@ class CreateGlobalIndexProcedureTest extends 
PaimonSparkTestBase with StreamTest
       assert(totalRowCount == 100000L)
     }
   }
+
+  test("create bitmap global index with partition") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, name STRING, pt STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |  PARTITIONED BY (pt)
+                  |""".stripMargin)
+
+      var values =
+        (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 22222).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 100).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 100).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 33333).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 33333).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      val output =
+        spark
+          .sql("CALL sys.create_global_index(table => 'test.T', index_column 
=> 'name', index_type => 'bitmap')")
+          .collect()
+          .head
+
+      assert(output.getBoolean(0))
+
+      val table = loadTable("T")
+      val bitmapEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == "bitmap")
+      assert(bitmapEntries.nonEmpty)
+
+      val ranges = bitmapEntries
+        .map(
+          s =>
+            new Range(
+              s.indexFile().globalIndexMeta().rowRangeStart(),
+              s.indexFile().globalIndexMeta().rowRangeEnd()))
+        .toList
+        .asJava
+      val mergedRange = Range.sortAndMergeOverlap(ranges, true)
+      assert(mergedRange.size() == 1)
+      assert(mergedRange.get(0).equals(new Range(0, 189087)))
+      val totalRowCount = bitmapEntries
+        .map(
+          x =>
+            x.indexFile()
+              .globalIndexMeta()
+              .rowRangeEnd() - x.indexFile().globalIndexMeta().rowRangeStart() 
+ 1)
+        .sum
+      assert(totalRowCount == 189088L)
+    }
+  }
 }

Reply via email to