This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3503b6f2a9 [spark] Introduce drop global index procedure (#6930)
3503b6f2a9 is described below
commit 3503b6f2a902d21711c4ed556cb8dd135109adfc
Author: YeJunHao <[email protected]>
AuthorDate: Tue Dec 30 17:39:51 2025 +0800
[spark] Introduce drop global index procedure (#6930)
---
.../java/org/apache/paimon/io/DataIncrement.java | 9 +
.../org/apache/paimon/spark/SparkProcedures.java | 2 +
.../spark/procedure/DropGlobalIndexProcedure.java | 210 +++++++++++++++++++++
.../procedure/DropGlobalIndexProcedureTest.scala | 183 ++++++++++++++++++
4 files changed, 404 insertions(+)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
index c3c419b964..d7a5a229e1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
@@ -69,6 +69,15 @@ public class DataIncrement {
Collections.emptyList());
}
+ public static DataIncrement deleteIndexIncrement(List<IndexFileMeta>
indexFiles) {
+ return new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ indexFiles);
+ }
+
public List<DataFileMeta> newFiles() {
return newFiles;
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 10d9792317..74bf04d68c 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -33,6 +33,7 @@ import org.apache.paimon.spark.procedure.CreateTagProcedure;
import org.apache.paimon.spark.procedure.DeleteBranchProcedure;
import org.apache.paimon.spark.procedure.DeleteTagProcedure;
import org.apache.paimon.spark.procedure.DropFunctionProcedure;
+import org.apache.paimon.spark.procedure.DropGlobalIndexProcedure;
import org.apache.paimon.spark.procedure.ExpirePartitionsProcedure;
import org.apache.paimon.spark.procedure.ExpireSnapshotsProcedure;
import org.apache.paimon.spark.procedure.ExpireTagsProcedure;
@@ -95,6 +96,7 @@ public class SparkProcedures {
procedureBuilders.put("expire_tags", ExpireTagsProcedure::builder);
procedureBuilders.put("create_branch", CreateBranchProcedure::builder);
procedureBuilders.put("create_global_index",
CreateGlobalIndexProcedure::builder);
+ procedureBuilders.put("drop_global_index",
DropGlobalIndexProcedure::builder);
procedureBuilders.put("delete_branch", DeleteBranchProcedure::builder);
procedureBuilders.put("compact", CompactProcedure::builder);
procedureBuilders.put("compact_database",
CompactDatabaseProcedure::builder);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropGlobalIndexProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropGlobalIndexProcedure.java
new file mode 100644
index 0000000000..74e4cc4aea
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropGlobalIndexProcedure.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.spark.utils.SparkProcedureUtils;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** Procedure to drop global index files via Spark. */
+public class DropGlobalIndexProcedure extends BaseProcedure {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DropGlobalIndexProcedure.class);
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.required("index_column",
DataTypes.StringType),
+ ProcedureParameter.required("index_type",
DataTypes.StringType),
+ ProcedureParameter.optional("partitions", StringType),
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true,
Metadata.empty())
+ });
+
+ protected DropGlobalIndexProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public String description() {
+ return "Drop global index files for a given column.";
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ String column = args.getString(1);
+ String indexType = args.getString(2).toLowerCase(Locale.ROOT).trim();
+ String partitions =
+ (args.isNullAt(3) ||
StringUtils.isNullOrWhitespaceOnly(args.getString(3)))
+ ? null
+ : args.getString(3);
+
+ String finalWhere = partitions != null ?
SparkProcedureUtils.toWhere(partitions) : null;
+
+ LOG.info("Starting to drop index for table " + tableIdent + " WHERE: "
+ finalWhere);
+
+ return modifyPaimonTable(
+ tableIdent,
+ t -> {
+ try {
+ checkArgument(
+ t instanceof FileStoreTable,
+ "Only FileStoreTable supports global index
creation.");
+ FileStoreTable table = (FileStoreTable) t;
+
+ RowType rowType = table.rowType();
+ checkArgument(
+ rowType.containsField(column),
+ "Column '%s' does not exist in table '%s'.",
+ column,
+ tableIdent);
+ DataSourceV2Relation relation =
createRelation(tableIdent);
+ PartitionPredicate partitionPredicate =
+
SparkProcedureUtils.convertToPartitionPredicate(
+ finalWhere,
+ table.schema().logicalPartitionType(),
+ spark(),
+ relation);
+
+ Snapshot snapshot =
+ t.latestSnapshot()
+ .orElseThrow(
+ () ->
+ new
IllegalStateException(
+ String.format(
+ "Table
'%s' has no snapshot.",
+
tableIdent)));
+
+ Filter<IndexManifestEntry> filter =
+ entry ->
+
entry.indexFile().indexType().equals(indexType)
+ &&
entry.indexFile().globalIndexMeta() != null
+ && entry.indexFile()
+
.globalIndexMeta()
+ .indexFieldId()
+ ==
rowType.getField(column).id()
+ && (partitionPredicate == null
+ ||
partitionPredicate.test(
+
entry.partition()));
+
+ List<IndexManifestEntry> waitDelete =
+
table.store().newIndexFileHandler().scan(snapshot, filter);
+
+ LOG.info(
+ "Waiting for global index to be deleted size: "
+ + waitDelete.size());
+
+ Map<BinaryRow, List<IndexFileMeta>> deleteEntries =
+ waitDelete.stream()
+ .map(IndexManifestEntry::toDeleteEntry)
+ .collect(
+ Collectors.groupingBy(
+
IndexManifestEntry::partition,
+ Collectors.mapping(
+
IndexManifestEntry::indexFile,
+
Collectors.toList())));
+
+ List<CommitMessage> commitMessages = new ArrayList<>();
+
+ for (Map.Entry<BinaryRow, List<IndexFileMeta>> entry :
+ deleteEntries.entrySet()) {
+ BinaryRow partition = entry.getKey();
+ List<IndexFileMeta> indexFileMetas =
entry.getValue();
+ commitMessages.add(
+ new CommitMessageImpl(
+ partition,
+ 0,
+ null,
+
DataIncrement.deleteIndexIncrement(indexFileMetas),
+
CompactIncrement.emptyIncrement()));
+ }
+
+ try (TableCommitImpl commit =
+ table.newCommit("drop-global-index-" +
UUID.randomUUID())) {
+ commit.commit(commitMessages);
+ }
+
+ return new InternalRow[] {newInternalRow(true)};
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to drop %s index for column
'%s' on table '%s'.",
+ indexType, column, tableIdent),
+ e);
+ }
+ });
+ }
+
+ public static ProcedureBuilder builder() {
+ return new Builder<DropGlobalIndexProcedure>() {
+ @Override
+ public DropGlobalIndexProcedure doBuild() {
+ return new DropGlobalIndexProcedure(tableCatalog());
+ }
+ };
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/DropGlobalIndexProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/DropGlobalIndexProcedureTest.scala
new file mode 100644
index 0000000000..88caadcfda
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/DropGlobalIndexProcedureTest.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.utils.Range
+
+import org.apache.spark.sql.streaming.StreamTest
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+class DropGlobalIndexProcedureTest extends PaimonSparkTestBase with StreamTest
{
+
+ test("drop bitmap global index") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ |""".stripMargin)
+
+ val values =
+ (0 until 100000).map(i => s"($i, 'name_$i')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ var output =
+ spark
+ .sql("CALL sys.create_global_index(table => 'test.T', index_column
=> 'name', index_type => 'bitmap')")
+ .collect()
+ .head
+
+ assert(output.getBoolean(0))
+
+ var table = loadTable("T")
+ var bitmapEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == "bitmap")
+ assert(bitmapEntries.nonEmpty)
+ val totalRowCount = bitmapEntries.map(_.indexFile().rowCount()).sum
+ assert(totalRowCount == 100000L)
+
+ output = spark
+ .sql("CALL sys.drop_global_index(table => 'test.T', index_column =>
'name', index_type => 'bitmap')")
+ .collect()
+ .head
+
+ assert(output.getBoolean(0))
+
+ table = loadTable("T")
+ bitmapEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == "bitmap")
+ assert(bitmapEntries.isEmpty)
+ }
+ }
+
+ test("create bitmap global index with partition") {
+ withTable("T") {
+ spark.sql("""
+ |CREATE TABLE T (id INT, name STRING, pt STRING)
+ |TBLPROPERTIES (
+ | 'bucket' = '-1',
+ | 'global-index.row-count-per-shard' = '10000',
+ | 'row-tracking.enabled' = 'true',
+ | 'data-evolution.enabled' = 'true')
+ | PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ var values =
+ (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 22222).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 100).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 100).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 33333).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ values = (0 until 33333).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+ spark.sql(s"INSERT INTO T VALUES $values")
+
+ var output =
+ spark
+ .sql("CALL sys.create_global_index(table => 'test.T', index_column
=> 'name', index_type => 'bitmap')")
+ .collect()
+ .head
+
+ assert(output.getBoolean(0))
+
+ val table = loadTable("T")
+ var bitmapEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == "bitmap")
+ assert(bitmapEntries.nonEmpty)
+
+ var ranges = bitmapEntries
+ .map(
+ s =>
+ new Range(
+ s.indexFile().globalIndexMeta().rowRangeStart(),
+ s.indexFile().globalIndexMeta().rowRangeEnd()))
+ .toList
+ .asJava
+ var mergedRange = Range.sortAndMergeOverlap(ranges, true)
+ assert(mergedRange.size() == 1)
+ assert(mergedRange.get(0).equals(new Range(0, 189087)))
+ val totalRowCount = bitmapEntries
+ .map(
+ x =>
+ x.indexFile()
+ .globalIndexMeta()
+ .rowRangeEnd() - x.indexFile().globalIndexMeta().rowRangeStart()
+ 1)
+ .sum
+ assert(totalRowCount == 189088L)
+
+ output = spark
+ .sql("CALL sys.drop_global_index(table => 'test.T', index_column =>
'name', index_type => 'bitmap', partitions => 'pt=\"p1\"')")
+ .collect()
+ .head
+
+ bitmapEntries = table
+ .store()
+ .newIndexFileHandler()
+ .scanEntries()
+ .asScala
+ .filter(_.indexFile().indexType() == "bitmap")
+ assert(bitmapEntries.nonEmpty)
+
+ ranges = bitmapEntries
+ .map(
+ s =>
+ new Range(
+ s.indexFile().globalIndexMeta().rowRangeStart(),
+ s.indexFile().globalIndexMeta().rowRangeEnd()))
+ .toList
+ .asJava
+ mergedRange = Range.sortAndMergeOverlap(ranges, true)
+ assert(mergedRange.size() == 3)
+ assert(mergedRange.get(0).equals(new Range(0, 64999)))
+ assert(mergedRange.get(1).equals(new Range(100000, 122221)))
+ assert(mergedRange.get(2).equals(new Range(122322, 155754)))
+ }
+ }
+}