This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 95f1972a5f [spark] Create global index procedure should divide range
more precisely (#6763)
95f1972a5f is described below
commit 95f1972a5f8b56a05e95296f0a683cb16955ab8d
Author: YeJunHao <[email protected]>
AuthorDate: Fri Dec 12 13:53:15 2025 +0800
[spark] Create global index procedure should divide range more precisely
(#6763)
---
.../procedure/CreateGlobalIndexProcedure.java | 99 +++++++++++---
.../procedure/CreateGlobalIndexProcedureTest.java | 143 ++++++++++++++++++---
.../procedure/CreateGlobalIndexProcedureTest.scala | 75 +++++++++++
3 files changed, 278 insertions(+), 39 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
index 2a28cdf323..d20dc22ff8 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
@@ -60,6 +60,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -337,30 +338,58 @@ public class CreateGlobalIndexProcedure extends
BaseProcedure {
// Create DataSplit for each shard with exact ranges
List<IndexedSplit> shardSplits = new ArrayList<>();
for (Map.Entry<Long, List<DataFileMeta>> shardEntry :
filesByShard.entrySet()) {
- long startRowId = shardEntry.getKey();
+ long shardStart = shardEntry.getKey();
+ long shardEnd = shardStart + rowsPerShard - 1;
List<DataFileMeta> shardFiles = shardEntry.getValue();
if (shardFiles.isEmpty()) {
continue;
}
- // Use exact shard boundaries: [n*rowsPerShard,
(n+1)*rowsPerShard - 1]
- long minRowId = startRowId;
- long maxRowId = startRowId + rowsPerShard - 1;
- Range range = new Range(minRowId, maxRowId);
-
- // Create DataSplit for this shard
- DataSplit dataSplit =
- DataSplit.builder()
- .withPartition(partition)
- .withBucket(0)
- .withDataFiles(shardFiles)
- .withBucketPath(pathFactory.apply(partition,
0).toString())
- .rawConvertible(false)
- .build();
-
- shardSplits.add(
- new IndexedSplit(dataSplit,
Collections.singletonList(range), null));
+ // Sort files by firstRowId to ensure sequential order
+
shardFiles.sort(Comparator.comparingLong(DataFileMeta::nonNullFirstRowId));
+
+ // Group contiguous files and create separate DataSplits for
each group
+ List<DataFileMeta> currentGroup = new ArrayList<>();
+ long currentGroupEnd = -1;
+
+ for (DataFileMeta file : shardFiles) {
+ long fileStart = file.nonNullFirstRowId();
+ long fileEnd = fileStart + file.rowCount() - 1;
+
+ if (currentGroup.isEmpty()) {
+ // Start a new group
+ currentGroup.add(file);
+ currentGroupEnd = fileEnd;
+ } else if (fileStart <= currentGroupEnd + 1) {
+ // File is contiguous with current group (adjacent or
overlapping)
+ currentGroup.add(file);
+ currentGroupEnd = Math.max(currentGroupEnd, fileEnd);
+ } else {
+ // Gap detected, finalize current group and start a
new one
+ createDataSplitForGroup(
+ currentGroup,
+ shardStart,
+ shardEnd,
+ partition,
+ pathFactory,
+ shardSplits);
+ currentGroup = new ArrayList<>();
+ currentGroup.add(file);
+ currentGroupEnd = fileEnd;
+ }
+ }
+
+ // Don't forget to process the last group
+ if (!currentGroup.isEmpty()) {
+ createDataSplitForGroup(
+ currentGroup,
+ shardStart,
+ shardEnd,
+ partition,
+ pathFactory,
+ shardSplits);
+ }
}
if (!shardSplits.isEmpty()) {
@@ -371,6 +400,40 @@ public class CreateGlobalIndexProcedure extends
BaseProcedure {
return result;
}
+ private static void createDataSplitForGroup(
+ List<DataFileMeta> files,
+ long shardStart,
+ long shardEnd,
+ BinaryRow partition,
+ BiFunction<BinaryRow, Integer, Path> pathFactory,
+ List<IndexedSplit> shardSplits) {
+ // Calculate the actual row range covered by the files
+ long groupMinRowId = files.get(0).nonNullFirstRowId();
+ long groupMaxRowId =
+ files.stream()
+ .mapToLong(f -> f.nonNullFirstRowId() + f.rowCount() -
1)
+ .max()
+ .getAsLong();
+
+ // Clamp to shard boundaries
+ // Range.from >= shardStart, Range.to <= shardEnd
+ long rangeFrom = Math.max(groupMinRowId, shardStart);
+ long rangeTo = Math.min(groupMaxRowId, shardEnd);
+
+ Range range = new Range(rangeFrom, rangeTo);
+
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withPartition(partition)
+ .withBucket(0)
+ .withDataFiles(files)
+ .withBucketPath(pathFactory.apply(partition,
0).toString())
+ .rawConvertible(false)
+ .build();
+
+ shardSplits.add(new IndexedSplit(dataSplit,
Collections.singletonList(range), null));
+ }
+
public static ProcedureBuilder builder() {
return new BaseProcedure.Builder<CreateGlobalIndexProcedure>() {
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
index a525901753..8fdc3bebe8 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.java
@@ -75,7 +75,7 @@ public class CreateGlobalIndexProcedureTest {
assertThat(shardSplits).hasSize(1);
// Should be in shard [0, 999]
- Range expectedRange = new Range(0L, 999L);
+ Range expectedRange = new Range(0L, 99L);
assertThat(shardSplits.get(0).rowRanges()).containsExactly(expectedRange);
DataSplit split = shardSplits.get(0).dataSplit();
@@ -89,7 +89,7 @@ public class CreateGlobalIndexProcedureTest {
BinaryRow partition = createPartition(0);
// Create a file that spans 3 shards (rows 500-2500, shard size 1000)
- // This should be in shards [0,999], [1000,1999], and [2000,2999]
+ // File covers [500, 2500]
DataFileMeta file = createDataFileMeta(500L, 2001L);
ManifestEntry entry = createManifestEntry(partition, file);
@@ -106,10 +106,13 @@ public class CreateGlobalIndexProcedureTest {
List<IndexedSplit> shardSplits = result.get(partition);
assertThat(shardSplits).hasSize(3);
- // Verify all three shards contain the file
- Range shard0 = new Range(0L, 999L);
+ // Verify all three shards contain the file with ranges clamped to
actual coverage
+ // Shard [0, 999]: file overlaps [500, 999]
+ Range shard0 = new Range(500L, 999L);
+ // Shard [1000, 1999]: file overlaps [1000, 1999]
Range shard1 = new Range(1000L, 1999L);
- Range shard2 = new Range(2000L, 2999L);
+ // Shard [2000, 2999]: file overlaps [2000, 2500]
+ Range shard2 = new Range(2000L, 2500L);
assertThat(
shardSplits.stream()
@@ -126,16 +129,18 @@ public class CreateGlobalIndexProcedureTest {
// Create a partition
BinaryRow partition = createPartition(0);
- // Create multiple files in the same shard
+ // Create multiple contiguous files in the same shard
+ // file1: [0, 99], file2: [100, 199], file3: [200, 299]
DataFileMeta file1 = createDataFileMeta(0L, 100L);
DataFileMeta file2 = createDataFileMeta(100L, 100L);
DataFileMeta file3 = createDataFileMeta(200L, 100L);
+ // Add entries in non-sorted order to verify sorting
List<ManifestEntry> entries =
Arrays.asList(
+ createManifestEntry(partition, file3),
createManifestEntry(partition, file1),
- createManifestEntry(partition, file2),
- createManifestEntry(partition, file3));
+ createManifestEntry(partition, file2));
Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new
HashMap<>();
entriesByPartition.put(partition, entries);
@@ -150,10 +155,11 @@ public class CreateGlobalIndexProcedureTest {
List<IndexedSplit> shardSplits = result.get(partition);
assertThat(shardSplits).hasSize(1);
- Range expectedRange = new Range(0L, 999L);
+ Range expectedRange = new Range(0L, 299L);
DataSplit split = shardSplits.get(0).dataSplit();
assertThat(split.dataFiles()).hasSize(3);
- assertThat(split.dataFiles()).containsExactlyInAnyOrder(file1, file2,
file3);
+ // Files should be sorted by firstRowId
+ assertThat(split.dataFiles()).containsExactly(file1, file2, file3);
}
@Test
@@ -162,9 +168,10 @@ public class CreateGlobalIndexProcedureTest {
BinaryRow partition = createPartition(0);
// Create files in different shards
- DataFileMeta file1 = createDataFileMeta(0L, 100L); // Shard [0, 999]
- DataFileMeta file2 = createDataFileMeta(1000L, 100L); // Shard [1000,
1999]
- DataFileMeta file3 = createDataFileMeta(2000L, 100L); // Shard [2000,
2999]
+ // file1: [0, 99], file2: [1000, 1099], file3: [2000, 2099]
+ DataFileMeta file1 = createDataFileMeta(0L, 100L);
+ DataFileMeta file2 = createDataFileMeta(1000L, 100L);
+ DataFileMeta file3 = createDataFileMeta(2000L, 100L);
List<ManifestEntry> entries =
Arrays.asList(
@@ -185,10 +192,10 @@ public class CreateGlobalIndexProcedureTest {
List<IndexedSplit> shardSplits = result.get(partition);
assertThat(shardSplits).hasSize(3);
- // Verify each shard has the correct file
- Range shard0 = new Range(0L, 999L);
- Range shard1 = new Range(1000L, 1999L);
- Range shard2 = new Range(2000L, 2999L);
+ // Verify each shard has the correct file with range clamped to actual
coverage
+ Range shard0 = new Range(0L, 99L);
+ Range shard1 = new Range(1000L, 1099L);
+ Range shard2 = new Range(2000L, 2099L);
Map<Range, DataSplit> shardToSplit =
shardSplits.stream()
@@ -207,7 +214,9 @@ public class CreateGlobalIndexProcedureTest {
BinaryRow partition2 = createPartition(1);
// Create files for each partition
+ // file1: firstRowId=0, covers [0, 1049], spans 11 shards with size 100
DataFileMeta file1 = createDataFileMeta(0L, 1050L);
+ // file2: firstRowId=1050, covers [1050, 2049], spans 11 shards with
size 100
DataFileMeta file2 = createDataFileMeta(1050L, 1000L);
Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new
HashMap<>();
@@ -231,7 +240,7 @@ public class CreateGlobalIndexProcedureTest {
assertThat(shardSplits1).hasSize(11);
IndexedSplit split =
shardSplits1.stream()
- .filter(f -> f.rowRanges().contains(new Range(1000,
1099)))
+ .filter(f -> f.rowRanges().contains(new Range(1000,
1049)))
.findFirst()
.get();
assertThat(split.dataSplit().dataFiles()).containsExactly(file1);
@@ -241,7 +250,7 @@ public class CreateGlobalIndexProcedureTest {
assertThat(shardSplits2).hasSize(11);
split =
shardSplits2.stream()
- .filter(f -> f.rowRanges().contains(new Range(1000,
1099)))
+ .filter(f -> f.rowRanges().contains(new Range(1050,
1099)))
.findFirst()
.get();
assertThat(split.dataSplit().dataFiles()).containsExactly(file2);
@@ -289,14 +298,106 @@ public class CreateGlobalIndexProcedureTest {
CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
entriesByPartition, 10L, pathFactory);
- // Verify - file should span 3 shards: [0,9], [10,19], [20,29]
+ // Verify - file [0, 24] spans 3 shards with ranges clamped:
+ // Shard [0, 9]: Range [0, 9]
+ // Shard [10, 19]: Range [10, 19]
+ // Shard [20, 29]: Range [20, 24]
assertThat(result).hasSize(1);
List<IndexedSplit> shardSplits = result.get(partition);
assertThat(shardSplits).hasSize(3);
assertThat(shardSplits.stream().flatMap(s -> s.rowRanges().stream()))
.containsExactlyInAnyOrder(
- new Range(0L, 9L), new Range(10L, 19L), new Range(20L,
29L));
+ new Range(0L, 9L), new Range(10L, 19L), new Range(20L,
24L));
+ }
+
+ @Test
+ void testGroupFilesIntoShardsByPartitionNonContiguousFiles() {
+ // Create a partition
+ BinaryRow partition = createPartition(0);
+
+ // Create non-contiguous files within the same shard
+ // file1: [100, 199], file2: [300, 399] - gap between them
+ DataFileMeta file1 = createDataFileMeta(100L, 100L);
+ DataFileMeta file2 = createDataFileMeta(300L, 100L);
+
+ List<ManifestEntry> entries =
+ Arrays.asList(
+ createManifestEntry(partition, file1),
+ createManifestEntry(partition, file2));
+
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new
HashMap<>();
+ entriesByPartition.put(partition, entries);
+
+ // Execute
+ Map<BinaryRow, List<IndexedSplit>> result =
+ CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+ entriesByPartition, 1000L, pathFactory);
+
+ // Verify - should create 2 separate DataSplits due to gap
+ assertThat(result).hasSize(1);
+ Map<Range, DataSplit> shardSplits = new HashMap<>();
+ for (IndexedSplit split : result.get(partition)) {
+ shardSplits.put(split.rowRanges().get(0), split.dataSplit());
+ }
+ assertThat(shardSplits).hasSize(2);
+
+ // First group: file1 with Range [100, 199]
+ Range range1 = new Range(100L, 199L);
+ assertThat(shardSplits).containsKey(range1);
+ assertThat(shardSplits.get(range1).dataFiles()).containsExactly(file1);
+
+ // Second group: file2 with Range [300, 399]
+ Range range2 = new Range(300L, 399L);
+ assertThat(shardSplits).containsKey(range2);
+ assertThat(shardSplits.get(range2).dataFiles()).containsExactly(file2);
+ }
+
+ @Test
+ void testGroupFilesIntoShardsByPartitionMixedContiguousAndNonContiguous() {
+ // Create a partition
+ BinaryRow partition = createPartition(0);
+
+ // Create a mix of contiguous and non-contiguous files
+ // Group 1: file1 [0, 99], file2 [100, 199] - contiguous
+ // Gap
+ // Group 2: file3 [500, 599]
+ DataFileMeta file1 = createDataFileMeta(0L, 100L);
+ DataFileMeta file2 = createDataFileMeta(100L, 100L);
+ DataFileMeta file3 = createDataFileMeta(500L, 100L);
+
+ // Add in non-sorted order
+ List<ManifestEntry> entries =
+ Arrays.asList(
+ createManifestEntry(partition, file3),
+ createManifestEntry(partition, file1),
+ createManifestEntry(partition, file2));
+
+ Map<BinaryRow, List<ManifestEntry>> entriesByPartition = new
HashMap<>();
+ entriesByPartition.put(partition, entries);
+
+ // Execute
+ Map<BinaryRow, List<IndexedSplit>> result =
+ CreateGlobalIndexProcedure.groupFilesIntoShardsByPartition(
+ entriesByPartition, 1000L, pathFactory);
+
+ // Verify - should create 2 DataSplits
+ assertThat(result).hasSize(1);
+ Map<Range, DataSplit> shardSplits = new HashMap<>();
+ for (IndexedSplit split : result.get(partition)) {
+ shardSplits.put(split.rowRanges().get(0), split.dataSplit());
+ }
+ assertThat(shardSplits).hasSize(2);
+
+ // First group: file1 + file2 (contiguous), Range [0, 199]
+ Range range1 = new Range(0L, 199L);
+ assertThat(shardSplits).containsKey(range1);
+ assertThat(shardSplits.get(range1).dataFiles()).containsExactly(file1,
file2);
+
+ // Second group: file3, Range [500, 599]
+ Range range2 = new Range(500L, 599L);
+ assertThat(shardSplits).containsKey(range2);
+ assertThat(shardSplits.get(range2).dataFiles()).containsExactly(file3);
}
private BinaryRow createPartition(int i) {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
index eba8d51625..e4ecff618a 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
@@ -19,10 +19,12 @@
package org.apache.paimon.spark.procedure
import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.utils.Range
import org.apache.spark.sql.streaming.StreamTest
import scala.collection.JavaConverters._
+import scala.collection.immutable
class CreateGlobalIndexProcedureTest extends PaimonSparkTestBase with
StreamTest {
@@ -62,4 +64,77 @@ class CreateGlobalIndexProcedureTest extends
PaimonSparkTestBase with StreamTest
assert(totalRowCount == 100000L)
}
}
+
+ test("create bitmap global index with partition") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING, pt STRING)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ | PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ var values =
+ (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 22222).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 100).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 100).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 33333).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 33333).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ val output =
+ spark
+ .sql("CALL sys.create_global_index(table => 'test.T', index_column
=> 'name', index_type => 'bitmap')")
+ .collect()
+ .head
+
+ assert(output.getBoolean(0))
+
+ val table = loadTable("T")
+ val bitmapEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == "bitmap")
+ assert(bitmapEntries.nonEmpty)
+
+ val ranges = bitmapEntries
+ .map(
+ s =>
+ new Range(
+ s.indexFile().globalIndexMeta().rowRangeStart(),
+ s.indexFile().globalIndexMeta().rowRangeEnd()))
+ .toList
+ .asJava
+ val mergedRange = Range.sortAndMergeOverlap(ranges, true)
+ assert(mergedRange.size() == 1)
+ assert(mergedRange.get(0).equals(new Range(0, 189087)))
+ val totalRowCount = bitmapEntries
+ .map(
+ x =>
+ x.indexFile()
+ .globalIndexMeta()
+ .rowRangeEnd() - x.indexFile().globalIndexMeta().rowRangeStart()
+ 1)
+ .sum
+ assert(totalRowCount == 189088L)
+ }
+ }
}