This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d470c5a805 [spark] Support drop partition from top level (#6154)
d470c5a805 is described below
commit d470c5a8050f476999a07c10ac8ecf4e8d3ad370
Author: baiyangtx <[email protected]>
AuthorDate: Thu Dec 11 16:53:27 2025 +0800
[spark] Support drop partition from top level (#6154)
Co-authored-by: zhangyongxiang.alpha <[email protected]>
---
.../spark/catalyst/analysis/PaimonAnalysis.scala | 10 +-
.../plans/logical/PaimonDropPartitions.scala | 65 ++++++++++
.../spark/execution/PaimonDropPartitionsExec.scala | 71 +++++++++++
.../paimon/spark/execution/PaimonStrategy.scala | 21 ++++
.../scala/org/apache/spark/sql/PaimonUtils.scala | 8 ++
.../AbstractPaimonSparkSqlExtensionsParser.scala | 3 +-
.../extensions/RewriteSparkDDLCommands.scala | 44 +++++++
.../extensions/UnresolvedPaimonRelation.scala | 49 ++++++++
.../spark/extensions/DropPartitionParserTest.java | 139 +++++++++++++++++++++
.../org/apache/paimon/spark/sql/DDLTestBase.scala | 39 +++++-
.../spark/sql/PaimonPartitionManagementTest.scala | 4 -
11 files changed, 445 insertions(+), 8 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
index 1ee0687faa..c6e10fabf1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.catalyst.analysis
import org.apache.paimon.spark.{SparkConnectorOptions, SparkTable}
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
+import org.apache.paimon.spark.catalyst.plans.logical.PaimonDropPartitions
import org.apache.paimon.spark.commands.{PaimonAnalyzeTableColumnCommand,
PaimonDynamicPartitionOverwriteCommand, PaimonShowColumnsCommand,
PaimonTruncateTableCommand}
import org.apache.paimon.spark.util.OptionUtils
import org.apache.paimon.table.FileStoreTable
@@ -33,13 +34,13 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
import org.apache.spark.sql.connector.catalog.TableCapability
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits,
DataSourceV2Relation}
import org.apache.spark.sql.types._
import scala.collection.mutable
class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
-
+ import DataSourceV2Implicits._
override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsDown {
case a @ PaimonV2WriteCommand(table) if !paimonWriteResolved(a.query,
table) =>
@@ -60,6 +61,11 @@ class PaimonAnalysis(session: SparkSession) extends
Rule[LogicalPlan] {
case s @ ShowColumns(PaimonRelation(table), _, _) if s.resolved =>
PaimonShowColumnsCommand(table)
+
+ case d @ PaimonDropPartitions(ResolvedTable(_, _, table: SparkTable, _),
parts, _, _)
+ if d.resolved =>
+ PaimonDropPartitions.validate(table, parts.asResolvedPartitionSpecs)
+ d
}
private def writeOptions(v2WriteCommand: V2WriteCommand): Map[String,
String] = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonDropPartitions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonDropPartitions.scala
new file mode 100644
index 0000000000..7fe80b23ab
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonDropPartitions.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.plans.logical
+
+import org.apache.paimon.spark.SparkTable
+
+import org.apache.spark.sql.{types, PaimonUtils}
+import org.apache.spark.sql.catalyst.analysis.{PartitionSpec,
ResolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
V2PartitionCommand}
+import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits.TableHelper
+import org.apache.spark.sql.types.StructType;
+
+/** Drop partitions command. */
+case class PaimonDropPartitions(
+ table: LogicalPlan,
+ parts: Seq[PartitionSpec],
+ ifExists: Boolean,
+ purge: Boolean)
+ extends V2PartitionCommand {
+
+ override def allowPartialPartitionSpec: Boolean = true
+ override protected def withNewChildInternal(newChild: LogicalPlan):
PaimonDropPartitions =
+ copy(table = newChild)
+}
+
+/**
+ * If a paimon table has partition spec like (pt1, pt2, pt3), then the drop
partition command must
+ * provide all fields or a prefix subset of partition fields. For example,
(pt1 = 'v1', pt2 = 'v2')
+ * is valid, but (pt2 = 'v2') is not.
+ */
+object PaimonDropPartitions {
+ def validate(table: SparkTable, partialSpecs: Seq[ResolvedPartitionSpec]):
Unit = {
+ val partitionSchema = table.asPartitionable.partitionSchema();
+ partialSpecs.foreach {
+ partialSpec =>
+ if (!partitionSchema.names.toSeq.startsWith(partialSpec.names)) {
+ val values = partialSpec.names.zipWithIndex.map {
+ case (name, ordinal) =>
+ partialSpec.ident.get(ordinal,
partitionSchema.apply(name).dataType).toString
+ }
+ val spec = partialSpec.names
+ .zip(values)
+ .map { case (name, value) => s"$name = '$value'" }
+ .mkString(",")
+ throw PaimonUtils.invalidPartitionSpecError(spec,
partitionSchema.fieldNames, table.name)
+ }
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
new file mode 100644
index 0000000000..7d06bc49a4
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonDropPartitionsExec.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.spark.SparkTable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException,
ResolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits.TableHelper
+import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
+
+case class PaimonDropPartitionsExec(
+ table: SparkTable,
+ partSpecs: Seq[ResolvedPartitionSpec],
+ ignoreIfNotExists: Boolean,
+ purge: Boolean,
+ refreshCache: () => Unit)
+ extends LeafV2CommandExec
+ with Logging {
+ override protected def run(): Seq[InternalRow] = {
+ val partitionSchema = table.asPartitionable.partitionSchema()
+ val (partialPartSpecs, fullPartSpecs) =
+ partSpecs.partition(_.ident.numFields != partitionSchema.length)
+
+ val (existsPartIdents, notExistsPartIdents) =
+ fullPartSpecs.map(_.ident).partition(table.partitionExists)
+ if (notExistsPartIdents.nonEmpty && !ignoreIfNotExists) {
+ throw new NoSuchPartitionsException(
+ table.name(),
+ notExistsPartIdents,
+ table.asPartitionable.partitionSchema())
+ }
+ val allExistsPartIdents = existsPartIdents ++
partialPartSpecs.flatMap(expendPartialSpec)
+ logDebug("Try to drop partitions: " + allExistsPartIdents.mkString(","))
+ val isTableAltered = if (allExistsPartIdents.nonEmpty) {
+ allExistsPartIdents
+ .map(
+ partIdent => {
+ if (purge) table.purgePartition(partIdent) else
table.dropPartition(partIdent)
+ })
+ .reduce(_ || _)
+ } else false
+
+ if (isTableAltered) refreshCache()
+ Seq.empty
+ }
+
+ private def expendPartialSpec(partialSpec: ResolvedPartitionSpec):
Seq[InternalRow] = {
+ table.listPartitionIdentifiers(partialSpec.names.toArray,
partialSpec.ident).toSeq
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index 6f21aa23d8..bc0627d89f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.spark.{SparkCatalog,
SparkGenericCatalog, SparkTable, S
import org.apache.paimon.spark.catalog.{SparkBaseCatalog, SupportView}
import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView
import
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand,
CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand,
RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand}
+import org.apache.paimon.spark.catalyst.plans.logical.PaimonDropPartitions
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -30,7 +31,9 @@ import org.apache.spark.sql.catalyst.expressions.{Expression,
GenericInternalRow
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect,
DescribeRelation, LogicalPlan, ShowCreateTable}
import org.apache.spark.sql.connector.catalog.{Identifier,
PaimonLookupCatalog, TableCatalog}
import org.apache.spark.sql.execution.{PaimonDescribeTableExec, SparkPlan,
SparkStrategy}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits,
DataSourceV2Relation}
import org.apache.spark.sql.execution.shim.PaimonCreateTableAsSelectStrategy
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
import scala.collection.JavaConverters._
@@ -39,6 +42,7 @@ case class PaimonStrategy(spark: SparkSession)
with PredicateHelper
with PaimonLookupCatalog {
+ import DataSourceV2Implicits._
protected lazy val catalogManager = spark.sessionState.catalogManager
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
@@ -121,6 +125,18 @@ case class PaimonStrategy(spark: SparkSession)
case _ => Nil
}
+ case d @ PaimonDropPartitions(
+ r @ ResolvedTable(_, _, table: SparkTable, _),
+ parts,
+ ifExists,
+ purge) =>
+ PaimonDropPartitionsExec(
+ table,
+ parts.asResolvedPartitionSpecs,
+ ifExists,
+ purge,
+ recacheTable(r)) :: Nil
+
case _ => Nil
}
@@ -146,4 +162,9 @@ case class PaimonStrategy(spark: SparkSession)
}
}
}
+
+ private def recacheTable(r: ResolvedTable)(): Unit = {
+ val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog),
Some(r.identifier))
+ SparkShimLoader.shim.classicApi.recacheByPlan(spark, v2Relation)
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index 68b878a018..fe96ad2e14 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.expressions.FieldReference
import org.apache.spark.sql.connector.expressions.filter.Predicate
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping
import org.apache.spark.sql.internal.connector.PredicateUtils
@@ -138,4 +139,11 @@ object PaimonUtils {
def classForName(clazz: String): Class[_] = {
SparkUtils.classForName(clazz)
}
+
+ def invalidPartitionSpecError(
+ specKeys: String,
+ partitionColumnNames: Seq[String],
+ tableName: String): Throwable = {
+ QueryCompilationErrors.invalidPartitionSpecError(specKeys,
partitionColumnNames, tableName)
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
index f5b3d89a67..27fde13857 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala
@@ -79,7 +79,8 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val
delegate: ParserInterf
private def parserRules(sparkSession: SparkSession): Seq[Rule[LogicalPlan]]
= {
Seq(
RewritePaimonViewCommands(sparkSession),
- RewritePaimonFunctionCommands(sparkSession)
+ RewritePaimonFunctionCommands(sparkSession),
+ RewriteSparkDDLCommands(sparkSession)
)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewriteSparkDDLCommands.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewriteSparkDDLCommands.scala
new file mode 100644
index 0000000000..5227d1f0f6
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/RewriteSparkDDLCommands.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser.extensions
+
+import org.apache.paimon.spark.catalog.SupportView
+import org.apache.paimon.spark.catalyst.plans.logical.{PaimonDropPartitions,
ResolvedIdentifier}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.{DropPartitions,
LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
+
+case class RewriteSparkDDLCommands(spark: SparkSession)
+ extends Rule[LogicalPlan]
+ with LookupCatalog {
+
+ protected lazy val catalogManager: CatalogManager =
spark.sessionState.catalogManager
+
+ override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp
{
+
+ // A new member was added to CreatePaimonView since spark4.0,
+ // unapply pattern matching is not used here to ensure compatibility
across multiple spark versions.
+ case DropPartitions(UnresolvedPaimonRelation(aliasedTable), parts,
ifExists, purge) =>
+ PaimonDropPartitions(aliasedTable, parts, ifExists, purge)
+
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/UnresolvedPaimonRelation.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/UnresolvedPaimonRelation.scala
new file mode 100644
index 0000000000..a1eaba02a2
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/UnresolvedPaimonRelation.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.parser.extensions
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases,
UnresolvedTable}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connector.catalog.{PaimonLookupCatalog,
TableCatalog}
+
+import scala.util.Try
+
+object UnresolvedPaimonRelation extends PaimonLookupCatalog {
+ protected lazy val catalogManager =
SparkSession.active.sessionState.catalogManager
+
+ def unapply(plan: LogicalPlan): Option[LogicalPlan] = {
+ EliminateSubqueryAliases(plan) match {
+ case UnresolvedTable(multipartIdentifier, _, _) if
isPaimonTable(multipartIdentifier) =>
+ Some(plan)
+ case _ => None
+ }
+ }
+
+ private def isPaimonTable(multipartIdentifier: Seq[String]): Boolean = {
+ multipartIdentifier match {
+ case CatalogAndIdentifier(catalog: TableCatalog, ident) =>
+ Try(catalog.loadTable(ident))
+ .map(t => t.isInstanceOf[org.apache.paimon.spark.SparkTable])
+ .getOrElse(false)
+ case _ => false
+ }
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/DropPartitionParserTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/DropPartitionParserTest.java
new file mode 100644
index 0000000000..9e41f2a94a
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/extensions/DropPartitionParserTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.extensions;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.spark.SparkCatalog;
+import org.apache.paimon.spark.SparkGenericCatalog;
+import org.apache.paimon.spark.catalyst.plans.logical.PaimonDropPartitions;
+import org.apache.paimon.utils.FileIOUtils;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.catalyst.parser.ParserInterface;
+import org.apache.spark.sql.catalyst.plans.logical.DropPartitions;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.io.File;
+import java.io.IOException;
+
+import scala.Option;
+
+/** Test for DropPartition parser. */
+public class DropPartitionParserTest {
+
+ private static SparkSession spark = null;
+ private static ParserInterface parser = null;
+ private final String dbName = "test_db";
+ private final String tableName = "test_paimon";
+ private final String hiveTableName = "test_hive";
+ private static Path warehousePath;
+
+ @BeforeAll
+ public static void startSparkSession(@TempDir java.nio.file.Path tempDir) {
+ warehousePath = new Path("file:" + tempDir.toString());
+ // Stops and clears active session to avoid loading previous
non-stopped session.
+ Option<SparkSession> optionalSession =
+
SparkSession.getActiveSession().orElse(SparkSession::getDefaultSession);
+ if (!optionalSession.isEmpty()) {
+ optionalSession.get().stop();
+ }
+ SparkSession.clearActiveSession();
+ spark =
+ SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.catalog.paimon",
SparkCatalog.class.getName())
+ .config("spark.sql.catalog.paimon.warehouse",
warehousePath.toString())
+ .config(
+ "spark.sql.catalog.spark_catalog",
+ SparkGenericCatalog.class.getName())
+ .config(
+
"spark.sql.catalog.spark_catalog.warehouse.dir",
+ warehousePath.toString())
+ .config("spark.sql.warehouse.dir",
warehousePath.toString())
+ .config(
+ "spark.sql.extensions",
+ PaimonSparkSessionExtensions.class.getName())
+ .getOrCreate();
+ parser = spark.sessionState().sqlParser();
+ }
+
+ @AfterAll
+ public static void stopSparkSession() throws IOException {
+ if (spark != null) {
+ FileIOUtils.deleteDirectory(new File(warehousePath.toString()));
+ spark.stop();
+ spark = null;
+ parser = null;
+ }
+ }
+
+ @AfterEach
+ public void clear() {
+ if (spark != null) {
+ spark.sql("DROP TABLE IF EXISTS " + dbName + "." + tableName);
+ spark.sql("DROP TABLE IF EXISTS " + dbName + "." + hiveTableName);
+ spark.sql("DROP DATABASE " + dbName + " CASCADE");
+ }
+ }
+
+ @ParameterizedTest
+ @CsvSource({"false", "true"})
+ public void testOnPaimonCatalog(boolean defaultCatalog)
+ throws ParseException, NoSuchTableException {
+ String catalogName = !defaultCatalog ? "paimon" : "spark_catalog";
+ spark.sql("USE " + catalogName);
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS " + dbName);
+ spark.sql("USE " + dbName);
+ spark.sql(
+ "CREATE TABLE "
+ + tableName
+ + " (id INT, name STRING) USING paimon PARTITIONED BY
(dt STRING) ");
+ TableCatalog catalog =
+ (TableCatalog)
spark.sessionState().catalogManager().currentCatalog();
+ Table table = catalog.loadTable(Identifier.of(new String[] {dbName},
tableName));
+ Assertions.assertNotNull(table);
+ LogicalPlan plan =
+ parser.parsePlan("ALTER TABLE " + tableName + " DROP
PARTITION(dt='2024-01-01')");
+ Assertions.assertTrue(plan instanceof PaimonDropPartitions);
+ if (defaultCatalog) {
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS "
+ + hiveTableName
+ + " (id INT, name STRING) USING parquet
PARTITIONED BY (dt STRING) ");
+ plan =
+ parser.parsePlan(
+ "ALTER TABLE " + hiveTableName + " DROP
PARTITION(dt='2024-01-01')");
+ Assertions.assertFalse(plan instanceof PaimonDropPartitions);
+ Assertions.assertTrue(plan instanceof DropPartitions);
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index 40a992d1a3..958b5a1514 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -24,7 +24,8 @@ import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.types.DataTypes
import org.apache.spark.SparkException
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException
import org.junit.jupiter.api.Assertions
import java.sql.{Date, Timestamp}
@@ -558,4 +559,40 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
sql("CREATE TABLE t (id INT) USING paimon1")
}.getMessage.contains("Provider is not supported: paimon1"))
}
+
+ test("Paimon DDL: Drop Partition by partial spec") {
+ withTable("tbl") {
+ spark.sql(
+ s"CREATE TABLE tbl (id int, data string) USING paimon " +
+ s"PARTITIONED BY (dt string, hour string, event string) ")
+ spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-01', '00',
'event1')")
+ spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-02', '00',
'event1')")
+ spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-02', '00',
'event2')")
+ spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-02', '00',
'event3')")
+ spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-02', '02',
'event1')")
+ spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-02', '02',
'event2')")
+ spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-02', '03',
'event1')")
+ spark.sql(s"INSERT INTO tbl VALUES (1, 'a', '2023-01-03', '00',
'event1')")
+ val query = () => spark.sql("SELECT * FROM tbl")
+ assert(query().count() == 8)
+ // drop full parts level
+ spark.sql("ALTER TABLE tbl DROP PARTITION (dt='2023-01-01', hour='00',
event='event1')")
+ assert(query().count() == 7)
+ // drop first + second level
+ spark.sql("ALTER TABLE tbl DROP PARTITION (dt='2023-01-02', hour='00')")
+ assert(query().count() == 4)
+ // drop first level
+ spark.sql("ALTER TABLE tbl DROP PARTITION (dt='2023-01-02')")
+ assert(query().count() == 1)
+ // no effected drop
+ spark.sql("ALTER TABLE tbl DROP PARTITION (dt='2023-01-01')")
+ assert(query().count() == 1)
+ assertThrows[AnalysisException] {
+ spark.sql("ALTER TABLE tbl DROP PARTITION (hour='00', event='event1')")
+ }
+ assertThrows[NoSuchPartitionsException] {
+ spark.sql("ALTER TABLE tbl DROP PARTITION (dt='2023-01-01', hour='00',
event='event1')")
+ }
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
index 3ed99bad59..6a279be424 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPartitionManagementTest.scala
@@ -148,10 +148,6 @@ class PaimonPartitionManagementTest extends
PaimonSparkTestBase {
spark.sql(
"alter table T drop partition (dt=20240101, hh='00'), partition
(dt=20240102, hh='00')")
- assertThrows[AnalysisException] {
- spark.sql("alter table T drop partition (dt=20230816)")
- }
-
assertThrows[AnalysisException] {
spark.sql("alter table T drop partition (hh='1134')")
}