This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bec9ead373 [core] supports multiple partitions in
BTreeGlobalIndexBuilder (#7191)
bec9ead373 is described below
commit bec9ead373252737db903256ccffec0e29677b4b
Author: Faiz <[email protected]>
AuthorDate: Tue Feb 3 15:36:59 2026 +0800
[core] supports multiple partitions in BTreeGlobalIndexBuilder (#7191)
---
.../globalindex/btree/BTreeGlobalIndexBuilder.java | 22 +-
.../btree/BTreeGlobalIndexBuilderTest.java | 241 +++++++++++++++++++++
2 files changed, 256 insertions(+), 7 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
index 9dc9da7e6e..a4b63c5918 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -46,6 +46,7 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.MutableObjectIteratorAdapter;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Range;
import javax.annotation.Nullable;
@@ -57,6 +58,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
+import java.util.stream.IntStream;
import static java.util.Collections.singletonList;
import static
org.apache.paimon.globalindex.GlobalIndexBuilderUtils.createIndexWriter;
@@ -77,6 +79,8 @@ public class BTreeGlobalIndexBuilder implements Serializable {
private String indexType;
private DataField indexField;
+
+ // readRowType is composed by partition fields, indexed field and _ROW_ID
field
private RowType readRowType;
private RowIdIndexFieldsExtractor extractor;
@@ -92,6 +96,10 @@ public class BTreeGlobalIndexBuilder implements Serializable
{
public BTreeGlobalIndexBuilder withIndexType(String indexType) {
this.indexType = indexType;
+ Preconditions.checkArgument(
+ BTreeGlobalIndexerFactory.IDENTIFIER.equals(indexType),
+ "BTreeGlobalInderBuilder only supports %s index type",
+ BTreeGlobalIndexerFactory.IDENTIFIER);
return this;
}
@@ -102,15 +110,14 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
indexField,
table.fullName());
this.indexField = rowType.getField(indexField);
- this.readRowType =
- SpecialFields.rowTypeWithRowId(new
RowType(singletonList(this.indexField)));
List<String> readColumns = new ArrayList<>(table.partitionKeys());
- readColumns.addAll(readRowType.getFieldNames());
+ readColumns.addAll(
+ SpecialFields.rowTypeWithRowId(new
RowType(singletonList(this.indexField)))
+ .getFieldNames());
+ this.readRowType =
SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns);
this.extractor =
new RowIdIndexFieldsExtractor(
-
SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns),
- table.partitionKeys(),
- this.indexField.name());
+ this.readRowType, table.partitionKeys(),
this.indexField.name());
return this;
}
@@ -141,7 +148,8 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
BinaryExternalSortBuffer.create(
ioManager,
readRowType,
- new int[] {0},
+ // sort by <partition, indexed_field>
+ IntStream.range(0, readRowType.getFieldCount() -
1).toArray(),
options.writeBufferSize(),
options.pageSize(),
options.localSortMaxNumFileHandles(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
new file mode 100644
index 0000000000..d37ad973da
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.index.GlobalIndexMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Test class for {@link BTreeGlobalIndexBuilder}. */
+public class BTreeGlobalIndexBuilderTest extends TableTestBase {
+
+ private static final long PART_ROW_NUM = 1000L;
+ private static final KeySerializer KEY_SERIALIZER =
KeySerializer.create(DataTypes.INT());
+ private static final Comparator<Object> COMPARATOR =
KEY_SERIALIZER.createComparator();
+
+ @Override
+ public Schema schemaDefault() {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("dt", DataTypes.STRING());
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+
schemaBuilder.option(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE.key(),
"100");
+ schemaBuilder.partitionKeys(Collections.singletonList("dt"));
+ return schemaBuilder.build();
+ }
+
+ private void write() throws Exception {
+ createTableDefault();
+
+ BatchWriteBuilder builder = getTableDefault().newBatchWriteBuilder();
+ try (BatchTableWrite write0 = builder.newWrite()) {
+ for (int i = 0; i < PART_ROW_NUM; i++) {
+ write0.write(
+ GenericRow.of(
+ BinaryString.fromString("p0"),
+ i,
+ BinaryString.fromString("f1_" + i)));
+ }
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(write0.prepareCommit());
+ }
+
+ try (BatchTableWrite write1 = builder.newWrite()) {
+ for (int i = 0; i < PART_ROW_NUM; i++) {
+ write1.write(
+ GenericRow.of(
+ BinaryString.fromString("p1"),
+ i,
+ BinaryString.fromString("f1_" + i)));
+ }
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(write1.prepareCommit());
+ }
+ }
+
+ private void createIndex(PartitionPredicate partitionPredicate) throws
Exception {
+ FileStoreTable table = getTableDefault();
+
+ BTreeGlobalIndexBuilder builder = new BTreeGlobalIndexBuilder(table);
+ builder.withIndexField("f0");
+ builder.withIndexType("btree");
+ builder.withPartitionPredicate(partitionPredicate);
+ List<CommitMessage> commitMessages = builder.build(builder.scan(),
ioManager);
+
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.commit(commitMessages);
+ }
+ }
+
+ @Test
+ public void testCreateIndexForSinglePartition() throws Exception {
+ write();
+
+ FileStoreTable table = getTableDefault();
+ RowType partType = table.rowType().project("dt");
+ Predicate predicate =
+ PartitionPredicate.createPartitionPredicate(
+ partType, Collections.singletonMap("dt",
BinaryString.fromString("p0")));
+
+ createIndex(PartitionPredicate.fromPredicate(partType, predicate));
+
+ Map<BinaryRow, List<Pair<String, FileStats>>> metasByParts =
gatherIndexMetas(table);
+
+ Assertions.assertEquals(1, metasByParts.size());
+
+ metasByParts.forEach(this::assertFilesNonOverlapping);
+ }
+
+ @Test
+ public void testCreateIndexForMultiplePartitions() throws Exception {
+ write();
+
+ createIndex(null);
+
+ FileStoreTable table = getTableDefault();
+
+ Map<BinaryRow, List<Pair<String, FileStats>>> metasByParts =
gatherIndexMetas(table);
+
+ Assertions.assertEquals(2, metasByParts.size());
+
+ metasByParts.forEach(this::assertFilesNonOverlapping);
+ }
+
+ private Map<BinaryRow, List<Pair<String, FileStats>>>
gatherIndexMetas(FileStoreTable table) {
+ IndexFileHandler handler = table.store().newIndexFileHandler();
+
+ Snapshot snapshot = table.latestSnapshot().get();
+ List<IndexManifestEntry> entries = handler.scan(snapshot, "btree");
+
+ Map<BinaryRow, List<Pair<String, FileStats>>> metasByParts = new
HashMap<>();
+ for (IndexManifestEntry entry : entries) {
+ IndexFileMeta indexFileMeta = entry.indexFile();
+ Assertions.assertNotNull(
+ indexFileMeta.globalIndexMeta(), "Global index meta should
not be null");
+
+ metasByParts
+ .computeIfAbsent(entry.partition(), part -> new
ArrayList<>())
+ .add(
+ Pair.of(
+ indexFileMeta.fileName(),
+
FileStats.fromIndexFileMeta(indexFileMeta)));
+ }
+
+ return metasByParts;
+ }
+
+ private void assertFilesNonOverlapping(
+ BinaryRow partition, List<Pair<String, FileStats>> metas) {
+ if (metas.isEmpty()) {
+ return;
+ }
+
+ metas.sort((m1, m2) -> COMPARATOR.compare(m1.getValue().firstKey,
m2.getValue().firstKey));
+ String lastFileName = metas.get(0).getKey();
+ FileStats lastMeta = metas.get(0).getValue();
+ long rowCount = lastMeta.rowCount;
+ for (int i = 1; i < metas.size(); i++) {
+ String fileName = metas.get(i).getKey();
+ FileStats fileMeta = metas.get(i).getValue();
+ rowCount += fileMeta.rowCount;
+
+ Assertions.assertTrue(
+ COMPARATOR.compare(lastMeta.lastKey, fileMeta.firstKey) <=
0,
+ String.format(
+ "In partition %s, key range [%s:%s] of file %s
overlaps with adjacent file %s [%s:%s]",
+ partition.getString(0),
+ lastMeta.firstKey,
+ lastMeta.lastKey,
+ lastFileName,
+ fileName,
+ fileMeta.firstKey,
+ fileMeta.lastKey));
+
+ lastFileName = fileName;
+ lastMeta = fileMeta;
+ }
+
+ Assertions.assertEquals(
+ PART_ROW_NUM,
+ rowCount,
+ String.format(
+ "In partition %s, total row count of all btree index
files not equals to original data row count.",
+ partition.getString(0)));
+ }
+
+ static Object deserialize(byte[] bytes) {
+ return KEY_SERIALIZER.deserialize(MemorySlice.wrap(bytes));
+ }
+
+ /** File Stats class for assertion. */
+ private static class FileStats {
+
+ final long rowCount;
+ final Object firstKey;
+ final Object lastKey;
+
+ FileStats(long rowCount, Object firstKey, Object lastKey) {
+ this.rowCount = rowCount;
+ this.firstKey = firstKey;
+ this.lastKey = lastKey;
+ }
+
+ static FileStats fromIndexFileMeta(IndexFileMeta meta) {
+ Assertions.assertNotNull(meta.globalIndexMeta());
+ GlobalIndexMeta globalIndexMeta = meta.globalIndexMeta();
+ BTreeIndexMeta btreeMeta =
BTreeIndexMeta.deserialize(globalIndexMeta.indexMeta());
+
+ return new FileStats(
+ meta.rowCount(),
+ deserialize(btreeMeta.getFirstKey()),
+ deserialize(btreeMeta.getLastKey()));
+ }
+ }
+}