This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d470c5a805 [spark] Support drop partition from top level (#6154)
d470c5a805 is described below

commit d470c5a8050f476999a07c10ac8ecf4e8d3ad370
Author: baiyangtx <[email protected]>
AuthorDate: Thu Dec 11 16:53:27 2025 +0800

    [spark] Support drop partition from top level (#6154)
    
    Co-authored-by: zhangyongxiang.alpha <[email protected]>
---
 .../spark/catalyst/analysis/PaimonAnalysis.scala   |  10 +-
 .../plans/logical/PaimonDropPartitions.scala       |  65 ++++++++++
 .../spark/execution/PaimonDropPartitionsExec.scala |  71 +++++++++++
 .../paimon/spark/execution/PaimonStrategy.scala    |  21 ++++
 .../scala/org/apache/spark/sql/PaimonUtils.scala   |   8 ++
 .../AbstractPaimonSparkSqlExtensionsParser.scala   |   3 +-
 .../extensions/RewriteSparkDDLCommands.scala       |  44 +++++++
 .../extensions/UnresolvedPaimonRelation.scala      |  49 ++++++++
 .../spark/extensions/DropPartitionParserTest.java  | 139 +++++++++++++++++++++
 .../org/apache/paimon/spark/sql/DDLTestBase.scala  |  39 +++++-
 .../spark/sql/PaimonPartitionManagementTest.scala  |   4 -
 11 files changed, 445 insertions(+), 8 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
index 1ee0687faa..c6e10fabf1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.catalyst.analysis
 import org.apache.paimon.spark.{SparkConnectorOptions, SparkTable}
 import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
+import org.apache.paimon.spark.catalyst.plans.logical.PaimonDropPartitions
 import org.apache.paimon.spark.commands.{PaimonAnalyzeTableColumnCommand, 
PaimonDynamicPartitionOverwriteCommand, PaimonShowColumnsCommand, 
PaimonTruncateTableCommand}
 import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.table.FileStoreTable
@@ -33,13 +34,13 @@ import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
 import org.apache.spark.sql.connector.catalog.TableCapability
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, 
DataSourceV2Relation}
 import org.apache.spark.sql.types._
 
 import scala.collection.mutable
 
 class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
-
+  import DataSourceV2Implicits._
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
 
     case a @ PaimonV2WriteCommand(table) if !paimonWriteResolved(a.query, 
table) =>
@@ -60,6 +61,11 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
 
     case s @ ShowColumns(PaimonRelation(table), _, _) if s.resolved =>
       PaimonShowColumnsCommand(table)
+
+    case d @ PaimonDropPartitions(ResolvedTable(_, _, table: SparkTable, _), 
parts, _, _)
+        if d.resolved =>
+      PaimonDropPartitions.validate(table, parts.asResolvedPartitionSpecs)
+      d
   }
 
   private def writeOptions(v2WriteCommand: V2WriteCommand): Map[String, 
String] = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonDropPartitions.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonDropPartitions.scala
new file mode 100644
index 0000000000..7fe80b23ab
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonDropPartitions.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.plans.logical
+
+import org.apache.paimon.spark.SparkTable
+
+import org.apache.spark.sql.{types, PaimonUtils}
+import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, 
ResolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
V2PartitionCommand}
+import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits.TableHelper
+import org.apache.spark.sql.types.StructType;
+
+/** Drop partitions command. */
+case class PaimonDropPartitions(
+    table: LogicalPlan,
+    parts: Seq[PartitionSpec],
+    ifExists: Boolean,
+    purge: Boolean)
+  extends V2PartitionCommand {
+
+  override def allowPartialPartitionSpec: Boolean = true
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
PaimonDropPartitions =
+    copy(table = newChild)
+}
+
+/**
+ * If a paimon table has partition spec like (pt1, pt2, pt3), then the drop 
partition command must
+ * provide all fields or a prefix subset of partition fields. For example, 
(pt1 = 'v1', pt2 = 'v2')
+ * is valid, but (pt2 = 'v2') is not.
+ */
+object PaimonDropPartitions {
+  def validate(table: SparkTable, partialSpecs: Seq[ResolvedPartitionSpec]): 
Unit = {
+    val partitionSchema = table.asPartitionable.partitionSchema();
+    partialSpecs.foreach {
+      partialSpec =>
+        if (!partitionSchema.names.toSeq.startsWith(partialSpec.names)) {
+          val values = partialSpec.names.zipWithIndex.map {
+            case (name, ordinal) =>
+              partialSpec.ident.get(ordinal, 
partitionSchema.apply(name).dataType).toString
+          }
+          val spec = partialSpec.names
+            .zip(values)
+            .map { case (name, value) => s"$name = '$value'" }
+            .mkString(",")
+          throw PaimonUtils.invalidPartitionSpecError(spec, 
partitionSchema.fieldNames, table.name)
+        }
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
new file mode 100644
index 0000000000..7d06bc49a4
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.SparkTable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, 
ResolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits.TableHelper
+import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
+
+case class PaimonDropPartitionsExec(
+    table: SparkTable,
+    partSpecs: Seq[ResolvedPartitionSpec],
+    ignoreIfNotExists: Boolean,
+    purge: Boolean,
+    refreshCache: () => Unit)
+  extends LeafV2CommandExec
+  with Logging {
+  override protected def run(): Seq[InternalRow] = {
+    val partitionSchema = table.asPartitionable.partitionSchema()
+    val (partialPartSpecs, fullPartSpecs) =
+      partSpecs.partition(_.ident.numFields != partitionSchema.length)
+
+    val (existsPartIdents, notExistsPartIdents) =
+      fullPartSpecs.map(_.ident).partition(table.partitionExists)
+    if (notExistsPartIdents.nonEmpty && !ignoreIfNotExists) {
+      throw new NoSuchPartitionsException(
+        table.name(),
+        notExistsPartIdents,
+        table.asPartitionable.partitionSchema())
+    }
+    val allExistsPartIdents = existsPartIdents ++ 
partialPartSpecs.flatMap(expendPartialSpec)
+    logDebug("Try to drop partitions: " + allExistsPartIdents.mkString(","))
+    val isTableAltered = if (allExistsPartIdents.nonEmpty) {
+      allExistsPartIdents
+        .map(
+          partIdent => {
+            if (purge) table.purgePartition(partIdent) else 
table.dropPartition(partIdent)
+          })
+        .reduce(_ || _)
+    } else false
+
+    if (isTableAltered) refreshCache()
+    Seq.empty
+  }
+
+  private def expendPartialSpec(partialSpec: ResolvedPartitionSpec): 
Seq[InternalRow] = {
+    table.listPartitionIdentifiers(partialSpec.names.toArray, 
partialSpec.ident).toSeq
+  }
+
+  override def output: Seq[Attribute] = Seq.empty
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index 6f21aa23d8..bc0627d89f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.spark.{SparkCatalog, 
SparkGenericCatalog, SparkTable, S
 import org.apache.paimon.spark.catalog.{SparkBaseCatalog, SupportView}
 import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView
 import 
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, 
CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, 
RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand}
+import org.apache.paimon.spark.catalyst.plans.logical.PaimonDropPartitions
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
@@ -30,7 +31,9 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, 
GenericInternalRow
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, 
DescribeRelation, LogicalPlan, ShowCreateTable}
 import org.apache.spark.sql.connector.catalog.{Identifier, 
PaimonLookupCatalog, TableCatalog}
 import org.apache.spark.sql.execution.{PaimonDescribeTableExec, SparkPlan, 
SparkStrategy}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, 
DataSourceV2Relation}
 import org.apache.spark.sql.execution.shim.PaimonCreateTableAsSelectStrategy
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
 
 import scala.collection.JavaConverters._
 
@@ -39,6 +42,7 @@ case class PaimonStrategy(spark: SparkSession)
   with PredicateHelper
   with PaimonLookupCatalog {
 
+  import DataSourceV2Implicits._
   protected lazy val catalogManager = spark.sessionState.catalogManager
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
@@ -121,6 +125,18 @@ case class PaimonStrategy(spark: SparkSession)
         case _ => Nil
       }
 
+    case d @ PaimonDropPartitions(
+          r @ ResolvedTable(_, _, table: SparkTable, _),
+          parts,
+          ifExists,
+          purge) =>
+      PaimonDropPartitionsExec(
+        table,
+        parts.asResolvedPartitionSpecs,
+        ifExists,
+        purge,
+        recacheTable(r)) :: Nil
+
     case _ => Nil
   }
 
@@ -146,4 +162,9 @@ case class PaimonStrategy(spark: SparkSession)
       }
     }
   }
+
+  private def recacheTable(r: ResolvedTable)(): Unit = {
+    val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog), 
Some(r.identifier))
+    SparkShimLoader.shim.classicApi.recacheByPlan(spark, v2Relation)
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index 68b878a018..fe96ad2e14 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
Expression}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.connector.expressions.FieldReference
 import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping
 import org.apache.spark.sql.internal.connector.PredicateUtils
@@ -138,4 +139,11 @@ object PaimonUtils {
   def classForName(clazz: String): Class[_] = {
     SparkUtils.classForName(clazz)
   }
+
+  def invalidPartitionSpecError(
+      specKeys: String,
+      partitionColumnNames: Seq[String],
+      tableName: String): Throwable = {
+    QueryCompilationErrors.invalidPartitionSpecError(specKeys, 
partitionColumnNames, tableName)
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
index f5b3d89a67..27fde13857 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
@@ -79,7 +79,8 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val 
delegate: ParserInterf
   private def parserRules(sparkSession: SparkSession): Seq[Rule[LogicalPlan]] 
= {
     Seq(
       RewritePaimonViewCommands(sparkSession),
-      RewritePaimonFunctionCommands(sparkSession)
+      RewritePaimonFunctionCommands(sparkSession),
+      RewriteSparkDDLCommands(sparkSession)
     )
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewriteSparkDDLCommands.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewriteSparkDDLCommands.scala
new file mode 100644
index 0000000000..5227d1f0f6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewriteSparkDDLCommands.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser.extensions
+
+import org.apache.paimon.spark.catalog.SupportView
+import org.apache.paimon.spark.catalyst.plans.logical.{PaimonDropPartitions, 
ResolvedIdentifier}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.{DropPartitions, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
+
+case class RewriteSparkDDLCommands(spark: SparkSession)
+  extends Rule[LogicalPlan]
+  with LookupCatalog {
+
+  protected lazy val catalogManager: CatalogManager = 
spark.sessionState.catalogManager
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp 
{
+
+    // A new member was added to CreatePaimonView since spark4.0,
+    // unapply pattern matching is not used here to ensure compatibility 
across multiple spark versions.
+    case DropPartitions(UnresolvedPaimonRelation(aliasedTable), parts, 
ifExists, purge) =>
+      PaimonDropPartitions(aliasedTable, parts, ifExists, purge)
+
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/UnresolvedPaimonRelation.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/UnresolvedPaimonRelation.scala
new file mode 100644
index 0000000000..a1eaba02a2
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/UnresolvedPaimonRelation.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser.extensions
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
UnresolvedTable}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connector.catalog.{PaimonLookupCatalog, 
TableCatalog}
+
+import scala.util.Try
+
+object UnresolvedPaimonRelation extends PaimonLookupCatalog {
+  protected lazy val catalogManager = 
SparkSession.active.sessionState.catalogManager
+
+  def unapply(plan: LogicalPlan): Option[LogicalPlan] = {
+    EliminateSubqueryAliases(plan) match {
+      case UnresolvedTable(multipartIdentifier, _, _) if 
isPaimonTable(multipartIdentifier) =>
+        Some(plan)
+      case _ => None
+    }
+  }
+
+  private def isPaimonTable(multipartIdentifier: Seq[String]): Boolean = {
+    multipartIdentifier match {
+      case CatalogAndIdentifier(catalog: TableCatalog, ident) =>
+        Try(catalog.loadTable(ident))
+          .map(t => t.isInstanceOf[org.apache.paimon.spark.SparkTable])
+          .getOrElse(false)
+      case _ => false
+    }
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/DropPartitionParserTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/DropPartitionParserTest.java
new file mode 100644
index 0000000000..9e41f2a94a
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/DropPartitionParserTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.extensions;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.spark.SparkCatalog;
+import org.apache.paimon.spark.SparkGenericCatalog;
+import org.apache.paimon.spark.catalyst.plans.logical.PaimonDropPartitions;
+import org.apache.paimon.utils.FileIOUtils;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.parser.ParserInterface;
+import org.apache.spark.sql.catalyst.plans.logical.DropPartitions;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.io.File;
+import java.io.IOException;
+
+import scala.Option;
+
+/** Test for DropPartition parser. */
+public class DropPartitionParserTest {
+
+    private static SparkSession spark = null;
+    private static ParserInterface parser = null;
+    private final String dbName = "test_db";
+    private final String tableName = "test_paimon";
+    private final String hiveTableName = "test_hive";
+    private static Path warehousePath;
+
+    @BeforeAll
+    public static void startSparkSession(@TempDir java.nio.file.Path tempDir) {
+        warehousePath = new Path("file:" + tempDir.toString());
+        // Stops and clears active session to avoid loading previous 
non-stopped session.
+        Option<SparkSession> optionalSession =
+                
SparkSession.getActiveSession().orElse(SparkSession::getDefaultSession);
+        if (!optionalSession.isEmpty()) {
+            optionalSession.get().stop();
+        }
+        SparkSession.clearActiveSession();
+        spark =
+                SparkSession.builder()
+                        .master("local[2]")
+                        .config("spark.sql.catalog.paimon", 
SparkCatalog.class.getName())
+                        .config("spark.sql.catalog.paimon.warehouse", 
warehousePath.toString())
+                        .config(
+                                "spark.sql.catalog.spark_catalog",
+                                SparkGenericCatalog.class.getName())
+                        .config(
+                                
"spark.sql.catalog.spark_catalog.warehouse.dir",
+                                warehousePath.toString())
+                        .config("spark.sql.warehouse.dir", 
warehousePath.toString())
+                        .config(
+                                "spark.sql.extensions",
+                                PaimonSparkSessionExtensions.class.getName())
+                        .getOrCreate();
+        parser = spark.sessionState().sqlParser();
+    }
+
+    @AfterAll
+    public static void stopSparkSession() throws IOException {
+        if (spark != null) {
+            FileIOUtils.deleteDirectory(new File(warehousePath.toString()));
+            spark.stop();
+            spark = null;
+            parser = null;
+        }
+    }
+
+    @AfterEach
+    public void clear() {
+        if (spark != null) {
+            spark.sql("DROP TABLE IF EXISTS " + dbName + "." + tableName);
+            spark.sql("DROP TABLE IF EXISTS " + dbName + "." + hiveTableName);
+            spark.sql("DROP DATABASE " + dbName + " CASCADE");
+        }
+    }
+
+    @ParameterizedTest
+    @CsvSource({"false", "true"})
+    public void testOnPaimonCatalog(boolean defaultCatalog)
+            throws ParseException, NoSuchTableException {
+        String catalogName = !defaultCatalog ? "paimon" : "spark_catalog";
+        spark.sql("USE " + catalogName);
+
+        spark.sql("CREATE DATABASE IF NOT EXISTS " + dbName);
+        spark.sql("USE " + dbName);
+        spark.sql(
+                "CREATE TABLE "
+                        + tableName
+                        + " (id INT, name STRING) USING paimon PARTITIONED BY 
(dt STRING) ");
+        TableCatalog catalog =
+                (TableCatalog) 
spark.sessionState().catalogManager().currentCatalog();
+        Table table = catalog.loadTable(Identifier.of(new String[] {dbName}, 
tableName));
+        Assertions.assertNotNull(table);
+        LogicalPlan plan =
+                parser.parsePlan("ALTER TABLE " + tableName + " DROP 
PARTITION(dt='2024-01-01')");
+        Assertions.assertTrue(plan instanceof PaimonDropPartitions);
+        if (defaultCatalog) {
+            spark.sql(
+                    "CREATE TABLE IF NOT EXISTS "
+                            + hiveTableName
+                            + " (id INT, name STRING) USING parquet 
PARTITIONED BY (dt STRING) ");
+            plan =
+                    parser.parsePlan(
+                            "ALTER TABLE " + hiveTableName + " DROP 
PARTITION(dt='2024-01-01')");
+            Assertions.assertFalse(plan instanceof PaimonDropPartitions);
+            Assertions.assertTrue(plan instanceof DropPartitions);
+        }
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index 40a992d1a3..958b5a1514 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -24,7 +24,8 @@ import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.types.DataTypes
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException
 import org.junit.jupiter.api.Assertions
 
 import java.sql.{Date, Timestamp}
@@ -558,4 +559,40 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
       sql("CREATE TABLE t (id INT) USING paimon1")
     }.getMessage.contains("Provider is not supported: paimon1"))
   }
+
+  test("Paimon DDL: Drop Partition by partial spec") {
+    withTable("tbl") {
+      spark.sql(
+        s"CREATE TABLE tbl (id int, data string) USING paimon " +
+          s"PARTITIONED BY (dt string, hour string, event string) ")
+      spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-01', '00', 
'event1')")
+      spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-02', '00', 
'event1')")
+      spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-02', '00', 
'event2')")
+      spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-02', '00', 
'event3')")
+      spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-02', '02', 
'event1')")
+      spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-02', '02', 
'event2')")
+      spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-02', '03', 
'event1')")
+      spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-03', '00', 
'event1')")
+      val query = () => spark.sql("SELECT * FROM tbl")
+      assert(query().count() == 8)
+      // drop full parts level
+      spark.sql("ALTER TABLE tbl DROP PARTITION (dt='2023-01-01', hour='00', 
event='event1')")
+      assert(query().count() == 7)
+      // drop first + second level
+      spark.sql("ALTER TABLE tbl DROP PARTITION (dt='2023-01-02', hour='00')")
+      assert(query().count() == 4)
+      // drop first level
+      spark.sql("ALTER TABLE tbl DROP PARTITION (dt='2023-01-02')")
+      assert(query().count() == 1)
+      // no effected drop
+      spark.sql("ALTER TABLE tbl DROP PARTITION (dt='2023-01-01')")
+      assert(query().count() == 1)
+      assertThrows[AnalysisException] {
+        spark.sql("ALTER TABLE tbl DROP PARTITION (hour='00', event='event1')")
+      }
+      assertThrows[NoSuchPartitionsException] {
+        spark.sql("ALTER TABLE tbl DROP PARTITION (dt='2023-01-01', hour='00', 
event='event1')")
+      }
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
index 3ed99bad59..6a279be424 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
@@ -148,10 +148,6 @@ class PaimonPartitionManagementTest extends 
PaimonSparkTestBase {
             spark.sql(
               "alter table T drop partition (dt=20240101, hh='00'), partition 
(dt=20240102, hh='00')")
 
-            assertThrows[AnalysisException] {
-              spark.sql("alter table T drop partition (dt=20230816)")
-            }
-
             assertThrows[AnalysisException] {
               spark.sql("alter table T drop partition (hh='1134')")
             }

Reply via email to