This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 22bc3abc86 [spark] support building BTree index through procedure
(#6956)
22bc3abc86 is described below
commit 22bc3abc86dd7458087031462c9c9353c5318e23
Author: Faiz <[email protected]>
AuthorDate: Wed Jan 7 14:18:32 2026 +0800
[spark] support building BTree index through procedure (#6956)
---
.../globalindex/btree/BTreeIndexOptions.java | 14 +-
....apache.paimon.globalindex.GlobalIndexerFactory | 1 +
.../paimon/index/IndexFileMetaSerializer.java | 2 +-
.../manifest/IndexManifestEntrySerializer.java | 2 +-
.../globalindex/DefaultGlobalIndexBuilder.java | 51 +++++
.../spark/globalindex/GlobalIndexBuilder.java | 57 +-----
.../globalindex/GlobalIndexBuilderContext.java | 42 ++--
.../spark/globalindex/GlobalIndexTopoBuilder.java | 12 +-
.../globalindex/btree/BTreeGlobalIndexBuilder.java | 140 +++++++++++++
.../btree/BTreeGlobalIndexBuilderFactory.java | 46 +++++
.../globalindex/btree/BTreeIndexTopoBuilder.java | 184 +++++++++++++++++
.../globalindex/btree/IndexFieldsExtractor.java | 59 ++++++
.../procedure/CreateGlobalIndexProcedure.java | 65 ++++--
...mon.spark.globalindex.GlobalIndexBuilderFactory | 2 +-
.../procedure/CreateGlobalIndexProcedureTest.scala | 218 +++++++++++++++++++++
15 files changed, 805 insertions(+), 90 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
index acac020db3..8d0758a1fc 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/btree/BTreeIndexOptions.java
@@ -32,7 +32,7 @@ public class BTreeIndexOptions {
.withDescription("The compression algorithm to use for
BTreeIndex");
public static final ConfigOption<Integer> BTREE_INDEX_COMPRESSION_LEVEL =
- ConfigOptions.key("btree-index.compression")
+ ConfigOptions.key("btree-index.compression-level")
.intType()
.defaultValue(1)
.withDescription("The compression level of the compression
algorithm");
@@ -54,4 +54,16 @@ public class BTreeIndexOptions {
.doubleType()
.defaultValue(0.1)
.withDescription("The high priority pool ratio to use for
BTreeIndex");
+
+ public static final ConfigOption<Long> BTREE_INDEX_RECORDS_PER_RANGE =
+ ConfigOptions.key("btree-index.records-per-range")
+ .longType()
+ .defaultValue(1000_000L)
+ .withDescription("The expected number of records per BTree
Index File.");
+
+ public static final ConfigOption<Integer>
BTREE_INDEX_BUILD_MAX_PARALLELISM =
+ ConfigOptions.key("btree-index.build.max-parallelism")
+ .intType()
+ .defaultValue(4096)
+ .withDescription("The max parallelism of Flink/Spark for
building BTreeIndex.");
}
diff --git
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
index e1e63b1057..4c3fe70db9 100644
---
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
+++
b/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
@@ -14,3 +14,4 @@
# limitations under the License.
org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory
+org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory
\ No newline at end of file
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
index 9e3ead6909..6d98e61248 100644
---
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
@@ -47,7 +47,7 @@ public class IndexFileMetaSerializer extends
ObjectSerializer<IndexFileMeta> {
globalIndexMeta.rowRangeStart(),
globalIndexMeta.rowRangeEnd(),
globalIndexMeta.indexFieldId(),
- globalIndexMeta.indexMeta() == null
+ globalIndexMeta.extraFieldIds() == null
? null
: new
GenericArray(globalIndexMeta.extraFieldIds()),
globalIndexMeta.indexMeta());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
index cd7a1efd84..98ab4df6f1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
@@ -54,7 +54,7 @@ public class IndexManifestEntrySerializer extends
VersionedObjectSerializer<Inde
globalIndexMeta.rowRangeStart(),
globalIndexMeta.rowRangeEnd(),
globalIndexMeta.indexFieldId(),
- globalIndexMeta.indexMeta() == null
+ globalIndexMeta.extraFieldIds() == null
? null
: new
GenericArray(globalIndexMeta.extraFieldIds()),
globalIndexMeta.indexMeta());
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
index d2aa11b35c..5ec81c7389 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
@@ -18,9 +18,60 @@
package org.apache.paimon.spark.globalindex;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.LongCounter;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
/** Default {@link GlobalIndexBuilder}. */
public class DefaultGlobalIndexBuilder extends GlobalIndexBuilder {
public DefaultGlobalIndexBuilder(GlobalIndexBuilderContext context) {
super(context);
}
+
+ @Override
+ public List<CommitMessage> build(CloseableIterator<InternalRow> data)
throws IOException {
+ LongCounter rowCounter = new LongCounter(0);
+ List<ResultEntry> resultEntries = writePaimonRows(data, rowCounter);
+ List<IndexFileMeta> indexFileMetas =
+ convertToIndexMeta(
+ context.startOffset(),
+ context.startOffset() + rowCounter.getValue() - 1,
+ resultEntries);
+ DataIncrement dataIncrement =
DataIncrement.indexIncrement(indexFileMetas);
+ return Collections.singletonList(
+ new CommitMessageImpl(
+ context.partition(),
+ 0,
+ null,
+ dataIncrement,
+ CompactIncrement.emptyIncrement()));
+ }
+
+ private List<ResultEntry> writePaimonRows(
+ CloseableIterator<InternalRow> rows, LongCounter rowCounter)
throws IOException {
+ GlobalIndexSingletonWriter indexWriter = (GlobalIndexSingletonWriter)
createIndexWriter();
+
+ InternalRow.FieldGetter getter =
+ InternalRow.createFieldGetter(
+ context.indexField().type(),
+
context.readType().getFieldIndex(context.indexField().name()));
+ rows.forEachRemaining(
+ row -> {
+ Object indexO = getter.getFieldOrNull(row);
+ indexWriter.write(indexO);
+ rowCounter.add(1);
+ });
+ return indexWriter.finish();
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
index 1bc3b1b8d1..621f5205ae 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilder.java
@@ -20,19 +20,13 @@ package org.apache.paimon.spark.globalindex;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
-import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
import org.apache.paimon.globalindex.GlobalIndexer;
-import org.apache.paimon.globalindex.IndexedSplit;
import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.CommitMessageImpl;
-import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.CloseableIterator;
import java.io.IOException;
import java.util.ArrayList;
@@ -47,35 +41,19 @@ public abstract class GlobalIndexBuilder {
this.context = context;
}
- public CommitMessage build(IndexedSplit indexedSplit) throws IOException {
- ReadBuilder builder = context.table().newReadBuilder();
- builder.withReadType(context.readType());
- RecordReader<InternalRow> rows =
builder.newRead().createReader(indexedSplit);
- LongCounter rowCounter = new LongCounter(0);
- List<ResultEntry> resultEntries = writePaimonRows(context, rows,
rowCounter);
- List<IndexFileMeta> indexFileMetas =
- convertToIndexMeta(context, rowCounter.getValue(),
resultEntries);
- DataIncrement dataIncrement =
DataIncrement.indexIncrement(indexFileMetas);
- return new CommitMessageImpl(
- context.partition(), 0, null, dataIncrement,
CompactIncrement.emptyIncrement());
- }
+ public abstract List<CommitMessage> build(CloseableIterator<InternalRow>
data)
+ throws IOException;
- private static List<IndexFileMeta> convertToIndexMeta(
- GlobalIndexBuilderContext context, long totalRowCount,
List<ResultEntry> entries)
- throws IOException {
+ protected List<IndexFileMeta> convertToIndexMeta(
+ long rangeStart, long rangeEnd, List<ResultEntry> entries) throws
IOException {
List<IndexFileMeta> results = new ArrayList<>();
- long rangeEnd = context.startOffset() + totalRowCount - 1;
for (ResultEntry entry : entries) {
String fileName = entry.fileName();
GlobalIndexFileReadWrite readWrite =
context.globalIndexFileReadWrite();
long fileSize = readWrite.fileSize(fileName);
GlobalIndexMeta globalIndexMeta =
new GlobalIndexMeta(
- context.startOffset(),
- rangeEnd,
- context.indexField().id(),
- null,
- entry.meta());
+ rangeStart, rangeEnd, context.indexField().id(),
null, entry.meta());
IndexFileMeta indexFileMeta =
new IndexFileMeta(
context.indexType(),
@@ -88,26 +66,9 @@ public abstract class GlobalIndexBuilder {
return results;
}
- private static List<ResultEntry> writePaimonRows(
- GlobalIndexBuilderContext context,
- RecordReader<InternalRow> rows,
- LongCounter rowCounter)
- throws IOException {
+ protected GlobalIndexWriter createIndexWriter() throws IOException {
GlobalIndexer globalIndexer =
GlobalIndexer.create(context.indexType(),
context.indexField(), context.options());
- GlobalIndexSingletonWriter globalIndexWriter =
- (GlobalIndexSingletonWriter)
-
globalIndexer.createWriter(context.globalIndexFileReadWrite());
- InternalRow.FieldGetter getter =
- InternalRow.createFieldGetter(
- context.indexField().type(),
-
context.readType().getFieldIndex(context.indexField().name()));
- rows.forEachRemaining(
- row -> {
- Object indexO = getter.getFieldOrNull(row);
- globalIndexWriter.write(indexO);
- rowCounter.add(1);
- });
- return globalIndexWriter.finish();
+ return globalIndexer.createWriter(context.globalIndexFileReadWrite());
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
index 8e4a4d7fab..dbf2f9a74a 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderContext.java
@@ -27,6 +27,9 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Range;
+
+import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
@@ -36,45 +39,57 @@ import java.io.Serializable;
*
* <p>This class is serializable to support Spark distributed execution. The
partition is stored
* both as a transient {@link BinaryRow} and as serialized bytes to ensure
proper serialization
- * across executor nodes.
+ * across executor nodes. Partition info can be null if the actual index
partition is dynamically
+ * extracted from data.
*/
public class GlobalIndexBuilderContext implements Serializable {
private final FileStoreTable table;
- private final BinaryRowSerializer binaryRowSerializer;
- private final byte[] partitionBytes;
+ @Nullable private final BinaryRowSerializer binaryRowSerializer;
+ @Nullable private final byte[] partitionBytes;
private final RowType readType;
private final DataField indexField;
private final String indexType;
private final long startOffset;
private final Options options;
+ @Nullable private final Range fullRange;
public GlobalIndexBuilderContext(
FileStoreTable table,
- BinaryRow partition,
+ @Nullable BinaryRow partition,
RowType readType,
DataField indexField,
String indexType,
long startOffset,
- Options options) {
+ Options options,
+ @Nullable Range fullRange) {
this.table = table;
this.readType = readType;
this.indexField = indexField;
this.indexType = indexType;
this.startOffset = startOffset;
this.options = options;
+ this.fullRange = fullRange;
- this.binaryRowSerializer = new
BinaryRowSerializer(partition.getFieldCount());
- try {
- this.partitionBytes =
binaryRowSerializer.serializeToBytes(partition);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ if (partition != null) {
+ this.binaryRowSerializer = new
BinaryRowSerializer(partition.getFieldCount());
+ try {
+ this.partitionBytes =
binaryRowSerializer.serializeToBytes(partition);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ this.binaryRowSerializer = null;
+ this.partitionBytes = null;
}
}
+ @Nullable
public BinaryRow partition() {
try {
- return binaryRowSerializer.deserializeFromBytes(partitionBytes);
+ return partitionBytes == null || binaryRowSerializer == null
+ ? null
+ : binaryRowSerializer.deserializeFromBytes(partitionBytes);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -109,4 +124,9 @@ public class GlobalIndexBuilderContext implements
Serializable {
IndexPathFactory indexPathFactory =
table.store().pathFactory().globalIndexFileFactory();
return new GlobalIndexFileReadWrite(fileIO, indexPathFactory);
}
+
+ @Nullable
+ public Range fullRange() {
+ return fullRange;
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopoBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopoBuilder.java
index dfa5916cd5..7e4d6ff8b8 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopoBuilder.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopoBuilder.java
@@ -18,27 +18,27 @@
package org.apache.paimon.spark.globalindex;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.globalindex.IndexedSplit;
import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
-import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
/** User defined topology builder. */
public interface GlobalIndexTopoBuilder {
List<CommitMessage> buildIndex(
- JavaSparkContext javaSparkContext,
+ SparkSession spark,
+ DataSourceV2Relation relation,
+ PartitionPredicate partitionPredicate,
FileStoreTable table,
- Map<BinaryRow, List<IndexedSplit>> preparedDS,
String indexType,
RowType readType,
DataField indexField,
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
new file mode 100644
index 0000000000..f9443b0bdc
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.btree;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The {@link GlobalIndexBuilder} implementation for BTree Index. The caller
of {@link
+ * BTreeGlobalIndexBuilder#build(CloseableIterator) build} must ensure the
input data is sorted by
+ * partitions and indexed field.
+ */
+public class BTreeGlobalIndexBuilder extends GlobalIndexBuilder {
+ private static final double FLOATING = 1.2;
+
+ private final IndexFieldsExtractor extractor;
+ private final long recordsPerRange;
+ private BinaryRow currentPart = null;
+ private GlobalIndexParallelWriter currentWriter = null;
+ private LongCounter counter = new LongCounter();
+
+ protected BTreeGlobalIndexBuilder(GlobalIndexBuilderContext context) {
+ super(context);
+ Preconditions.checkNotNull(
+ context.fullRange(), "Full range cannot be null for
BTreeGlobalIndexBuilder.");
+
+ FileStoreTable table = context.table();
+ List<String> readColumns = new ArrayList<>(table.partitionKeys());
+ readColumns.addAll(context.readType().getFieldNames());
+ this.extractor =
+ new IndexFieldsExtractor(
+
SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns),
+ table.partitionKeys(),
+ context.indexField().name());
+
+ // Each partition boundary is derived from sampling, so we introduce a
slack factor
+ // to avoid generating too many small files due to sampling variance.
+ this.recordsPerRange =
+ (long)
+
(context.options().get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE)
+ * FLOATING);
+ }
+
+ @Override
+ public List<CommitMessage> build(CloseableIterator<InternalRow> data)
throws IOException {
+ List<CommitMessage> commitMessages = new ArrayList<>();
+
+ while (data.hasNext()) {
+ InternalRow row = data.next();
+
+ BinaryRow partRow = extractor.extractPartition(row);
+ // may flush last part data
+ // this is correct only if the input is sorted by <partition,
indexedField>
+ if (currentPart != null && !partRow.equals(currentPart)
+ || counter.getValue() >= recordsPerRange) {
+ flushIndex(commitMessages);
+ }
+
+ createWriterIfNeeded();
+
+ // write <value, rowId> pair to index file
+ currentPart = partRow;
+ counter.add(1);
+ currentWriter.write(extractor.extractIndexField(row),
extractor.extractRowId(row));
+ }
+
+ flushIndex(commitMessages);
+
+ return commitMessages;
+ }
+
+ private void flushIndex(List<CommitMessage> resultMessages) throws
IOException {
+ if (counter.getValue() == 0 || currentWriter == null || currentPart ==
null) {
+ return;
+ }
+
+ List<ResultEntry> resultEntries = currentWriter.finish();
+ List<IndexFileMeta> fileMetas =
+ convertToIndexMeta(context.fullRange().from,
context.fullRange().to, resultEntries);
+ DataIncrement dataIncrement = DataIncrement.indexIncrement(fileMetas);
+ CommitMessage commitMessage =
+ new CommitMessageImpl(
+ currentPart, 0, null, dataIncrement,
CompactIncrement.emptyIncrement());
+
+ // reset writer
+ currentWriter = null;
+ currentPart = null;
+ counter.reset();
+
+ resultMessages.add(commitMessage);
+ }
+
+ private void createWriterIfNeeded() throws IOException {
+ if (currentWriter == null) {
+ GlobalIndexWriter indexWriter = createIndexWriter();
+ if (!(indexWriter instanceof GlobalIndexParallelWriter)) {
+ throw new RuntimeException(
+ "Unexpected implementation, the index writer of BTree
should be an instance of GlobalIndexParallelWriter, but found: "
+ + indexWriter.getClass().getName());
+ }
+ currentWriter = (GlobalIndexParallelWriter) indexWriter;
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilderFactory.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilderFactory.java
new file mode 100644
index 0000000000..38f0997915
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilderFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.btree;
+
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory;
+import org.apache.paimon.spark.globalindex.GlobalIndexTopoBuilder;
+
+/** The {@link GlobalIndexBuilderFactory} implementation for BTree index type.
*/
+public class BTreeGlobalIndexBuilderFactory implements
GlobalIndexBuilderFactory {
+
+ // keep this identifier consistent with BTreeGlobalIndexerFactory
+ private static final String IDENTIFIER = "btree";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public GlobalIndexBuilder create(GlobalIndexBuilderContext context) {
+ return new BTreeGlobalIndexBuilder(context);
+ }
+
+ @Override
+ public GlobalIndexTopoBuilder createTopoBuilder() {
+ return new BTreeIndexTopoBuilder();
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
new file mode 100644
index 0000000000..e898a0e168
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.btree;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.spark.SparkRow;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
+import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactoryUtils;
+import org.apache.paimon.spark.globalindex.GlobalIndexTopoBuilder;
+import org.apache.paimon.spark.util.ScanPlanHelper$;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageSerializer;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.Range;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.PaimonUtils;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.functions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/** The {@link GlobalIndexTopoBuilder} for BTree index. */
+public class BTreeIndexTopoBuilder implements GlobalIndexTopoBuilder {
+
+ @Override
+ public List<CommitMessage> buildIndex(
+ SparkSession spark,
+ DataSourceV2Relation relation,
+ PartitionPredicate partitionPredicate,
+ FileStoreTable table,
+ String indexType,
+ RowType readType,
+ DataField indexField,
+ Options options)
+ throws IOException {
+
+ // 1. read the whole dataset of target partitions
+ SnapshotReader snapshotReader = table.newSnapshotReader();
+ if (partitionPredicate != null) {
+ snapshotReader =
snapshotReader.withPartitionFilter(partitionPredicate);
+ }
+
+ List<DataSplit> dataSplits = snapshotReader.read().dataSplits();
+ Range fullRange = calcRowRange(dataSplits);
+ if (dataSplits.isEmpty() || fullRange == null) {
+ return Collections.emptyList();
+ }
+
+ // we need to read all partition columns for shuffle
+ List<String> selectedColumns = new ArrayList<>();
+ selectedColumns.addAll(table.partitionKeys());
+ selectedColumns.addAll(readType.getFieldNames());
+
+ Dataset<Row> source =
+ PaimonUtils.createDataset(
+ spark,
+ ScanPlanHelper$.MODULE$.createNewScanPlan(
+ dataSplits.toArray(new DataSplit[0]),
relation));
+
+ Dataset<Row> selected =
+
source.select(selectedColumns.stream().map(functions::col).toArray(Column[]::new));
+
+ // 2. shuffle and sort by partitions and index keys
+ Column[] sortFields =
+ selectedColumns.stream()
+ .filter(name ->
!SpecialFields.ROW_ID.name().equals(name))
+ .map(functions::col)
+ .toArray(Column[]::new);
+
+ long recordsPerRange =
options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
+ // this should be superfast since append only table can utilize
count-start pushdown well.
+ long rowCount = source.count();
+ int partitionNum = Math.max((int) (rowCount / recordsPerRange), 1);
+ int maxParallelism =
options.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
+ partitionNum = Math.min(partitionNum, maxParallelism);
+
+ // For efficiency, we do not repartition within each paimon partition.
Instead, we directly
+ // divide ranges by <partitions, index field>, and each subtask is
expected to process
+ // records from multiple partitions. The drawback is that if a Paimon
partition spans
+ // multiple Spark partitions, the first and last output files may
contain relatively few
+ // records.
+ Dataset<Row> partitioned =
+ selected.repartitionByRange(partitionNum, sortFields)
+ .sortWithinPartitions(sortFields);
+
+ // 3. write index for each partition & range
+ final GlobalIndexBuilderContext context =
+ new GlobalIndexBuilderContext(
+ table, null, readType, indexField, indexType, 0,
options, fullRange);
+ final RowType rowType =
+
SpecialFields.rowTypeWithRowId(table.rowType()).project(selectedColumns);
+ JavaRDD<byte[]> written =
+ partitioned
+ .javaRDD()
+ .map(row -> (InternalRow) (new SparkRow(rowType, row)))
+ .mapPartitions(
+ (FlatMapFunction<Iterator<InternalRow>,
byte[]>)
+ iter -> {
+ CommitMessageSerializer
commitMessageSerializer =
+ new
CommitMessageSerializer();
+
+ GlobalIndexBuilder
globalIndexBuilder =
+
GlobalIndexBuilderFactoryUtils
+
.createIndexBuilder(context);
+
+ List<CommitMessage> commitMessages
=
+ globalIndexBuilder.build(
+
CloseableIterator.adapterForIterator(
+ iter));
+ List<byte[]> messageBytes = new
ArrayList<>();
+
+ for (CommitMessage commitMessage :
commitMessages) {
+ messageBytes.add(
+
commitMessageSerializer.serialize(
+
commitMessage));
+ }
+
+ return messageBytes.iterator();
+ });
+
+ // 4. collect all commit messages and return
+ List<byte[]> commitBytes = written.collect();
+ List<CommitMessage> result = new ArrayList<>();
+ CommitMessageSerializer commitMessageSerializer = new
CommitMessageSerializer();
+ for (byte[] commitByte : commitBytes) {
+ result.add(
+ commitMessageSerializer.deserialize(
+ commitMessageSerializer.getVersion(), commitByte));
+ }
+
+ return result;
+ }
+
+ private Range calcRowRange(List<DataSplit> dataSplits) {
+ long start = Long.MAX_VALUE;
+ long end = Long.MIN_VALUE;
+ for (DataSplit dataSplit : dataSplits) {
+ for (DataFileMeta file : dataSplit.dataFiles()) {
+ if (file.firstRowId() != null) {
+ start = Math.min(start, file.firstRowId());
+ end = Math.max(end, file.firstRowId() + file.rowCount());
+ }
+ }
+ }
+ return start == Long.MAX_VALUE ? null : new Range(start, end);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/IndexFieldsExtractor.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/IndexFieldsExtractor.java
new file mode 100644
index 0000000000..bbe328d9d5
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/IndexFieldsExtractor.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.globalindex.btree;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** The extractor to get partition, index field and row id from records. */
+public class IndexFieldsExtractor {
+ private final Projection partitionProjection;
+ private final InternalRow.FieldGetter indexFieldGetter;
+ private final int rowIdPos;
+
+ public IndexFieldsExtractor(RowType readType, List<String> partitionKeys,
String indexField) {
+ this.partitionProjection = CodeGenUtils.newProjection(readType,
partitionKeys);
+ int indexFieldPos = readType.getFieldIndex(indexField);
+ this.indexFieldGetter =
+
InternalRow.createFieldGetter(readType.getTypeAt(indexFieldPos), indexFieldPos);
+ this.rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name());
+ }
+
+ public BinaryRow extractPartition(InternalRow record) {
+ // projection will reuse returning record, copy is necessary
+ return partitionProjection.apply(record).copy();
+ }
+
+ @Nullable
+ public Object extractIndexField(InternalRow record) {
+ return indexFieldGetter.getFieldOrNull(record);
+ }
+
+ public Long extractRowId(InternalRow record) {
+ return record.getLong(rowIdPos);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
index 48a5b9d7b6..9b7c3e98be 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
@@ -25,6 +25,7 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.spark.globalindex.GlobalIndexBuilder;
import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext;
import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactoryUtils;
@@ -36,11 +37,14 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.InstantiationUtil;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.ProcedureUtils;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.StringUtils;
@@ -164,33 +168,39 @@ public class CreateGlobalIndexProcedure extends
BaseProcedure {
ProcedureUtils.putAllOptions(parsedOptions,
optionString);
Options userOptions = Options.fromMap(parsedOptions);
Options tableOptions = new Options(table.options());
- long rowsPerShard =
- tableOptions
-
.getOptional(GLOBAL_INDEX_ROW_COUNT_PER_SHARD)
-
.orElse(GLOBAL_INDEX_ROW_COUNT_PER_SHARD.defaultValue());
- checkArgument(
- rowsPerShard > 0,
- "Option 'global-index.row-count-per-shard'
must be greater than 0.");
-
- // Step 1: generate splits for each partition&&shard
- Map<BinaryRow, List<IndexedSplit>> splits =
- split(table, partitionPredicate, rowsPerShard);
List<CommitMessage> indexResults;
- // Step 2: build index by certain index system
- GlobalIndexTopoBuilder topoBuildr =
+ // Step 1: build index by certain index system
+ GlobalIndexTopoBuilder topoBuilder =
GlobalIndexBuilderFactoryUtils.createTopoBuilder(indexType);
- if (topoBuildr != null) {
+
+ if (topoBuilder != null) {
+ // do not need to prepare index shards for custom
topo builder
indexResults =
- topoBuildr.buildIndex(
- new
JavaSparkContext(spark().sparkContext()),
+ topoBuilder.buildIndex(
+ spark(),
+ relation,
+ partitionPredicate,
table,
- splits,
indexType,
readRowType,
indexField,
userOptions);
} else {
+ long rowsPerShard =
+ tableOptions
+
.getOptional(GLOBAL_INDEX_ROW_COUNT_PER_SHARD)
+ .orElse(
+
GLOBAL_INDEX_ROW_COUNT_PER_SHARD
+ .defaultValue());
+ checkArgument(
+ rowsPerShard > 0,
+ "Option 'global-index.row-count-per-shard'
must be greater than 0.");
+
+ // generate splits for each partition&&shard
+ Map<BinaryRow, List<IndexedSplit>> splits =
+ split(table, partitionPredicate,
rowsPerShard);
+
indexResults =
buildIndex(
table,
@@ -201,7 +211,7 @@ public class CreateGlobalIndexProcedure extends
BaseProcedure {
userOptions);
}
- // Step 3: commit index meta to a new snapshot
+ // Step 2: commit index meta to a new snapshot
commit(table, indexResults);
return new InternalRow[] {newInternalRow(true)};
@@ -241,7 +251,8 @@ public class CreateGlobalIndexProcedure extends
BaseProcedure {
indexField,
indexType,
indexedSplit.rowRanges().get(0).from,
- options);
+ options,
+ null);
byte[] dsBytes =
InstantiationUtil.serializeObject(indexedSplit);
taskList.add(Pair.of(builderContext, dsBytes));
@@ -264,8 +275,20 @@ public class CreateGlobalIndexProcedure extends
BaseProcedure {
GlobalIndexBuilder globalIndexBuilder =
GlobalIndexBuilderFactoryUtils.createIndexBuilder(
builderContext);
- return commitMessageSerializer.serialize(
- globalIndexBuilder.build(split));
+ ReadBuilder builder =
builderContext.table().newReadBuilder();
+
builder.withReadType(builderContext.readType());
+
+ try
(RecordReader<org.apache.paimon.data.InternalRow>
+ recordReader =
+
builder.newRead().createReader(split);
+
CloseableIterator<org.apache.paimon.data.InternalRow>
+ data =
recordReader.toCloseableIterator()) {
+ List<CommitMessage> commitMessage =
+ globalIndexBuilder.build(data);
+
Preconditions.checkState(commitMessage.size() == 1);
+ return
commitMessageSerializer.serialize(
+ commitMessage.get(0));
+ }
})
.collect();
diff --git
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
similarity index 91%
copy from
paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
copy to
paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
index e1e63b1057..e99a1bdb2e 100644
---
a/paimon-common/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory
+++
b/paimon-spark/paimon-spark-common/src/main/resources/META-INF/services/org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory
+org.apache.paimon.spark.globalindex.btree.BTreeGlobalIndexBuilderFactory
\ No newline at end of file
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
index e4ecff618a..23ed2921c6 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
@@ -18,7 +18,10 @@
package org.apache.paimon.spark.procedure
+import org.apache.paimon.globalindex.btree.{BTreeIndexMeta, KeySerializer}
+import org.apache.paimon.memory.MemorySlice
import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.types.VarCharType
import org.apache.paimon.utils.Range
import org.apache.spark.sql.streaming.StreamTest
@@ -137,4 +140,219 @@ class CreateGlobalIndexProcedureTest extends
PaimonSparkTestBase with StreamTest
assert(totalRowCount == 189088L)
}
}
+
+ test("create btree global index") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true',
+ | 'btree-index.records-per-range' = '1000')
+ |""".stripMargin)
+
+ val values =
+ (0 until 100000).map(i => s"($i, 'name_$i')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ val output =
+ spark
+ .sql(
+ "CALL sys.create_global_index(table => 'test.T', index_column =>
'name', index_type => 'btree'," +
+ " options => 'btree-index.records-per-range=1000')")
+ .collect()
+ .head
+
+ assert(output.getBoolean(0))
+ val table = loadTable("T")
+ val btreeEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == "btree")
+ .map(_.indexFile())
+ table.store().newGlobalIndexScanBuilder().shardList()
+ assert(btreeEntries.nonEmpty)
+
+ // 1. assert total row count and file count
+ val totalRowCount = btreeEntries.map(_.rowCount()).sum
+ assert(btreeEntries.size == 100)
+ assert(totalRowCount == 100000L)
+
+ // 2. assert global index meta not null
+ btreeEntries.foreach(e => assert(e.globalIndexMeta() != null))
+
+ // 3. assert btree index file range non-overlapping
+ case class MetaWithKey(meta: BTreeIndexMeta, first: Object, last: Object)
+ val keySerializer = KeySerializer.create(new VarCharType())
+ val comparator = keySerializer.createComparator()
+
+ def deserialize(bytes: Array[Byte]): Object = {
+ keySerializer.deserialize(MemorySlice.wrap(bytes))
+ }
+
+ val btreeMetas = btreeEntries
+ .map(_.globalIndexMeta().indexMeta())
+ .map(meta => BTreeIndexMeta.deserialize(meta))
+ .map(
+ m => {
+ assert(m.getFirstKey != null)
+ assert(m.getLastKey != null)
+ MetaWithKey(m, deserialize(m.getFirstKey),
deserialize(m.getLastKey))
+ })
+
+ // sort by first key
+ val sorted = btreeMetas.sortWith((m1, m2) =>
comparator.compare(m1.first, m2.first) < 0)
+
+ // should not overlap
+ sorted.sliding(2).foreach {
+ case Seq(prev: MetaWithKey, next: MetaWithKey) =>
+ assert(comparator.compare(prev.last, next.first) <= 0)
+ case _ => // ignore
+ }
+ }
+ }
+
+ test("create btree global index with multiple partitions") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING, pt STRING)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ | PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ var values =
+ (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 22222).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 100).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 100).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 33333).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 33333).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ val output =
+ spark
+ .sql(
+ "CALL sys.create_global_index(table => 'test.T', index_column =>
'name', index_type => 'btree'," +
+ " options => 'btree-index.records-per-range=1000')")
+ .collect()
+ .head
+
+ assert(output.getBoolean(0))
+
+ assertMultiplePartitionsResult("T", 189088L, 3)
+ }
+ }
+
+ test("create btree index within one spark partition") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING, pt STRING)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ | PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ var values =
+ (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ // force output parallelism = 1
+ val output =
+ spark
+ .sql("CALL sys.create_global_index(table => 'test.T', index_column
=> 'name', index_type => 'btree'," +
+ " options =>
'btree-index.records-per-range=1000,btree-index.build.max-parallelism=1')")
+ .collect()
+ .head
+
+ assert(output.getBoolean(0))
+
+ assertMultiplePartitionsResult("T", 100000L, 2)
+ }
+ }
+
+ private def assertMultiplePartitionsResult(
+ tableName: String,
+ rowCount: Long,
+ partCount: Int
+ ): Unit = {
+ val table = loadTable(tableName)
+ val btreeEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == "btree")
+ table.store().newGlobalIndexScanBuilder().shardList()
+ assert(btreeEntries.nonEmpty)
+
+ // 1. assert total row count
+ val totalRowCount = btreeEntries.map(_.indexFile().rowCount()).sum
+ assert(totalRowCount == rowCount)
+
+ // 2. assert global index meta not null
+ btreeEntries.foreach(e => assert(e.indexFile().globalIndexMeta() != null))
+
+ // 3. assert non-overlapped within each partition
+ val entriesByPart = btreeEntries.groupBy(_.partition())
+ assert(entriesByPart.size == partCount)
+
+ case class MetaWithKey(meta: BTreeIndexMeta, first: Object, last: Object)
+ val keySerializer = KeySerializer.create(new VarCharType())
+ val comparator = keySerializer.createComparator()
+
+ def deserialize(bytes: Array[Byte]): Object = {
+ keySerializer.deserialize(MemorySlice.wrap(bytes))
+ }
+
+ for ((k, v) <- entriesByPart) {
+ val metas = v
+ .map(_.indexFile().globalIndexMeta().indexMeta())
+ .map(bytes => BTreeIndexMeta.deserialize(bytes))
+ .map(
+ m => {
+ assert(m.getFirstKey != null)
+ assert(m.getLastKey != null)
+ MetaWithKey(m, deserialize(m.getFirstKey),
deserialize(m.getLastKey))
+ })
+
+ val sorted = metas.sortWith((m1, m2) => comparator.compare(m1.first,
m2.first) < 0)
+
+ // should not overlap
+ sorted.sliding(2).foreach {
+ case Seq(prev: MetaWithKey, next: MetaWithKey) =>
+ assert(
+ comparator.compare(prev.last, next.first) <= 0,
+ s"Found overlap for partition ${k.getString(0).toString}. The last
key ${prev.last}, next first key ${next.first}"
+ )
+ case _ => // ignore
+ }
+ }
+ }
}