This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3503b6f2a9 [spark] Introduce drop global index procedure (#6930)
3503b6f2a9 is described below

commit 3503b6f2a902d21711c4ed556cb8dd135109adfc
Author: YeJunHao <[email protected]>
AuthorDate: Tue Dec 30 17:39:51 2025 +0800

    [spark] Introduce drop global index procedure (#6930)
---
 .../java/org/apache/paimon/io/DataIncrement.java   |   9 +
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 .../spark/procedure/DropGlobalIndexProcedure.java  | 210 +++++++++++++++++++++
 .../procedure/DropGlobalIndexProcedureTest.scala   | 183 ++++++++++++++++++
 4 files changed, 404 insertions(+)

diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
index c3c419b964..d7a5a229e1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataIncrement.java
@@ -69,6 +69,15 @@ public class DataIncrement {
                 Collections.emptyList());
     }
 
+    public static DataIncrement deleteIndexIncrement(List<IndexFileMeta> 
indexFiles) {
+        return new DataIncrement(
+                Collections.emptyList(),
+                Collections.emptyList(),
+                Collections.emptyList(),
+                Collections.emptyList(),
+                indexFiles);
+    }
+
     public List<DataFileMeta> newFiles() {
         return newFiles;
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 10d9792317..74bf04d68c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -33,6 +33,7 @@ import org.apache.paimon.spark.procedure.CreateTagProcedure;
 import org.apache.paimon.spark.procedure.DeleteBranchProcedure;
 import org.apache.paimon.spark.procedure.DeleteTagProcedure;
 import org.apache.paimon.spark.procedure.DropFunctionProcedure;
+import org.apache.paimon.spark.procedure.DropGlobalIndexProcedure;
 import org.apache.paimon.spark.procedure.ExpirePartitionsProcedure;
 import org.apache.paimon.spark.procedure.ExpireSnapshotsProcedure;
 import org.apache.paimon.spark.procedure.ExpireTagsProcedure;
@@ -95,6 +96,7 @@ public class SparkProcedures {
         procedureBuilders.put("expire_tags", ExpireTagsProcedure::builder);
         procedureBuilders.put("create_branch", CreateBranchProcedure::builder);
         procedureBuilders.put("create_global_index", 
CreateGlobalIndexProcedure::builder);
+        procedureBuilders.put("drop_global_index", 
DropGlobalIndexProcedure::builder);
         procedureBuilders.put("delete_branch", DeleteBranchProcedure::builder);
         procedureBuilders.put("compact", CompactProcedure::builder);
         procedureBuilders.put("compact_database", 
CompactDatabaseProcedure::builder);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropGlobalIndexProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropGlobalIndexProcedure.java
new file mode 100644
index 0000000000..74e4cc4aea
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropGlobalIndexProcedure.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.spark.utils.SparkProcedureUtils;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/** Procedure to drop global index files via Spark. */
+public class DropGlobalIndexProcedure extends BaseProcedure {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DropGlobalIndexProcedure.class);
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {
+                ProcedureParameter.required("table", DataTypes.StringType),
+                ProcedureParameter.required("index_column", 
DataTypes.StringType),
+                ProcedureParameter.required("index_type", 
DataTypes.StringType),
+                ProcedureParameter.optional("partitions", StringType),
+            };
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.BooleanType, true, 
Metadata.empty())
+                    });
+
+    protected DropGlobalIndexProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public String description() {
+        return "Drop global index files for a given column.";
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+        String column = args.getString(1);
+        String indexType = args.getString(2).toLowerCase(Locale.ROOT).trim();
+        String partitions =
+                (args.isNullAt(3) || 
StringUtils.isNullOrWhitespaceOnly(args.getString(3)))
+                        ? null
+                        : args.getString(3);
+
+        String finalWhere = partitions != null ? 
SparkProcedureUtils.toWhere(partitions) : null;
+
+        LOG.info("Starting to drop index for table " + tableIdent + " WHERE: " 
+ finalWhere);
+
+        return modifyPaimonTable(
+                tableIdent,
+                t -> {
+                    try {
+                        checkArgument(
+                                t instanceof FileStoreTable,
+                                "Only FileStoreTable supports global index 
creation.");
+                        FileStoreTable table = (FileStoreTable) t;
+
+                        RowType rowType = table.rowType();
+                        checkArgument(
+                                rowType.containsField(column),
+                                "Column '%s' does not exist in table '%s'.",
+                                column,
+                                tableIdent);
+                        DataSourceV2Relation relation = 
createRelation(tableIdent);
+                        PartitionPredicate partitionPredicate =
+                                
SparkProcedureUtils.convertToPartitionPredicate(
+                                        finalWhere,
+                                        table.schema().logicalPartitionType(),
+                                        spark(),
+                                        relation);
+
+                        Snapshot snapshot =
+                                t.latestSnapshot()
+                                        .orElseThrow(
+                                                () ->
+                                                        new 
IllegalStateException(
+                                                                String.format(
+                                                                        "Table 
'%s' has no snapshot.",
+                                                                        
tableIdent)));
+
+                        Filter<IndexManifestEntry> filter =
+                                entry ->
+                                        
entry.indexFile().indexType().equals(indexType)
+                                                && 
entry.indexFile().globalIndexMeta() != null
+                                                && entry.indexFile()
+                                                                
.globalIndexMeta()
+                                                                .indexFieldId()
+                                                        == 
rowType.getField(column).id()
+                                                && (partitionPredicate == null
+                                                        || 
partitionPredicate.test(
+                                                                
entry.partition()));
+
+                        List<IndexManifestEntry> waitDelete =
+                                
table.store().newIndexFileHandler().scan(snapshot, filter);
+
+                        LOG.info(
+                                "Waiting for global index to be deleted size: "
+                                        + waitDelete.size());
+
+                        Map<BinaryRow, List<IndexFileMeta>> deleteEntries =
+                                waitDelete.stream()
+                                        .map(IndexManifestEntry::toDeleteEntry)
+                                        .collect(
+                                                Collectors.groupingBy(
+                                                        
IndexManifestEntry::partition,
+                                                        Collectors.mapping(
+                                                                
IndexManifestEntry::indexFile,
+                                                                
Collectors.toList())));
+
+                        List<CommitMessage> commitMessages = new ArrayList<>();
+
+                        for (Map.Entry<BinaryRow, List<IndexFileMeta>> entry :
+                                deleteEntries.entrySet()) {
+                            BinaryRow partition = entry.getKey();
+                            List<IndexFileMeta> indexFileMetas = 
entry.getValue();
+                            commitMessages.add(
+                                    new CommitMessageImpl(
+                                            partition,
+                                            0,
+                                            null,
+                                            
DataIncrement.deleteIndexIncrement(indexFileMetas),
+                                            
CompactIncrement.emptyIncrement()));
+                        }
+
+                        try (TableCommitImpl commit =
+                                table.newCommit("drop-global-index-" + 
UUID.randomUUID())) {
+                            commit.commit(commitMessages);
+                        }
+
+                        return new InternalRow[] {newInternalRow(true)};
+                    } catch (Exception e) {
+                        throw new RuntimeException(
+                                String.format(
+                                        "Failed to drop %s index for column 
'%s' on table '%s'.",
+                                        indexType, column, tableIdent),
+                                e);
+                    }
+                });
+    }
+
+    public static ProcedureBuilder builder() {
+        return new Builder<DropGlobalIndexProcedure>() {
+            @Override
+            public DropGlobalIndexProcedure doBuild() {
+                return new DropGlobalIndexProcedure(tableCatalog());
+            }
+        };
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/DropGlobalIndexProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/DropGlobalIndexProcedureTest.scala
new file mode 100644
index 0000000000..88caadcfda
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/DropGlobalIndexProcedureTest.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.utils.Range
+
+import org.apache.spark.sql.streaming.StreamTest
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+class DropGlobalIndexProcedureTest extends PaimonSparkTestBase with StreamTest 
{
+
+  test("drop bitmap global index") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, name STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |""".stripMargin)
+
+      val values =
+        (0 until 100000).map(i => s"($i, 'name_$i')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      var output =
+        spark
+          .sql("CALL sys.create_global_index(table => 'test.T', index_column 
=> 'name', index_type => 'bitmap')")
+          .collect()
+          .head
+
+      assert(output.getBoolean(0))
+
+      var table = loadTable("T")
+      var bitmapEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == "bitmap")
+      assert(bitmapEntries.nonEmpty)
+      val totalRowCount = bitmapEntries.map(_.indexFile().rowCount()).sum
+      assert(totalRowCount == 100000L)
+
+      output = spark
+        .sql("CALL sys.drop_global_index(table => 'test.T', index_column => 
'name', index_type => 'bitmap')")
+        .collect()
+        .head
+
+      assert(output.getBoolean(0))
+
+      table = loadTable("T")
+      bitmapEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == "bitmap")
+      assert(bitmapEntries.isEmpty)
+    }
+  }
+
+  test("create bitmap global index with partition") {
+    withTable("T") {
+      spark.sql("""
+                  |CREATE TABLE T (id INT, name STRING, pt STRING)
+                  |TBLPROPERTIES (
+                  |  'bucket' = '-1',
+                  |  'global-index.row-count-per-shard' = '10000',
+                  |  'row-tracking.enabled' = 'true',
+                  |  'data-evolution.enabled' = 'true')
+                  |  PARTITIONED BY (pt)
+                  |""".stripMargin)
+
+      var values =
+        (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 22222).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 100).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 100).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 33333).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      values = (0 until 33333).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+      spark.sql(s"INSERT INTO T VALUES $values")
+
+      var output =
+        spark
+          .sql("CALL sys.create_global_index(table => 'test.T', index_column 
=> 'name', index_type => 'bitmap')")
+          .collect()
+          .head
+
+      assert(output.getBoolean(0))
+
+      val table = loadTable("T")
+      var bitmapEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == "bitmap")
+      assert(bitmapEntries.nonEmpty)
+
+      var ranges = bitmapEntries
+        .map(
+          s =>
+            new Range(
+              s.indexFile().globalIndexMeta().rowRangeStart(),
+              s.indexFile().globalIndexMeta().rowRangeEnd()))
+        .toList
+        .asJava
+      var mergedRange = Range.sortAndMergeOverlap(ranges, true)
+      assert(mergedRange.size() == 1)
+      assert(mergedRange.get(0).equals(new Range(0, 189087)))
+      val totalRowCount = bitmapEntries
+        .map(
+          x =>
+            x.indexFile()
+              .globalIndexMeta()
+              .rowRangeEnd() - x.indexFile().globalIndexMeta().rowRangeStart() 
+ 1)
+        .sum
+      assert(totalRowCount == 189088L)
+
+      output = spark
+        .sql("CALL sys.drop_global_index(table => 'test.T', index_column => 
'name', index_type => 'bitmap', partitions => 'pt=\"p1\"')")
+        .collect()
+        .head
+
+      bitmapEntries = table
+        .store()
+        .newIndexFileHandler()
+        .scanEntries()
+        .asScala
+        .filter(_.indexFile().indexType() == "bitmap")
+      assert(bitmapEntries.nonEmpty)
+
+      ranges = bitmapEntries
+        .map(
+          s =>
+            new Range(
+              s.indexFile().globalIndexMeta().rowRangeStart(),
+              s.indexFile().globalIndexMeta().rowRangeEnd()))
+        .toList
+        .asJava
+      mergedRange = Range.sortAndMergeOverlap(ranges, true)
+      assert(mergedRange.size() == 3)
+      assert(mergedRange.get(0).equals(new Range(0, 64999)))
+      assert(mergedRange.get(1).equals(new Range(100000, 122221)))
+      assert(mergedRange.get(2).equals(new Range(122322, 155754)))
+    }
+  }
+}

Reply via email to