This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8ba53392df [spark] Format table support show partitions (#5798)
8ba53392df is described below
commit 8ba53392df5fb473667e1e910b6a5e79fb565f7a
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jun 26 22:06:41 2025 +0800
[spark] Format table support show partitions (#5798)
---
.../spark/sql/execution/PaimonFormatTable.scala | 79 ++++++++++++++++------
.../paimon/spark/sql/FormatTableTestBase.scala | 20 ++++++
2 files changed, 80 insertions(+), 19 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
index 842009b4a5..8e6eea0127 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.execution
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Literal}
+import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable
import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
@@ -29,6 +32,8 @@ import
org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFile
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import java.util
+
import scala.collection.JavaConverters._
object PaimonFormatTable {
@@ -108,6 +113,45 @@ object PaimonFormatTable {
}
// Paimon Format Table
+trait PartitionedFormatTable extends SupportsPartitionManagement {
+
+ val partitionSchema_ : StructType
+
+ val fileIndex: PartitioningAwareFileIndex
+
+ override def partitionSchema(): StructType = partitionSchema_
+
+ override def listPartitionIdentifiers(
+ names: Array[String],
+ ident: InternalRow): Array[InternalRow] = {
+ val partitionFilters = names.zipWithIndex.map {
+ case (name, index) =>
+ val f = partitionSchema().apply(name)
+ EqualTo(
+ AttributeReference(f.name, f.dataType, f.nullable)(),
+ Literal(ident.get(index, f.dataType), f.dataType))
+ }.toSeq
+ fileIndex.listFiles(partitionFilters, Seq.empty).map(_.values).toArray
+ }
+
+ override def createPartition(ident: InternalRow, properties:
util.Map[String, String]): Unit = {
+ throw new UnsupportedOperationException()
+ }
+
+ override def dropPartition(ident: InternalRow): Boolean = {
+ throw new UnsupportedOperationException()
+ }
+
+ override def replacePartitionMetadata(
+ ident: InternalRow,
+ properties: util.Map[String, String]): Unit = {
+ throw new UnsupportedOperationException()
+ }
+
+ override def loadPartitionMetadata(ident: InternalRow): util.Map[String,
String] = {
+ Map.empty[String, String].asJava
+ }
+}
class PartitionedCSVTable(
name: String,
@@ -116,16 +160,16 @@ class PartitionedCSVTable(
paths: Seq[String],
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat],
- partitionSchema: StructType
-) extends CSVTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat) {
-
+ override val partitionSchema_ : StructType)
+ extends CSVTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat)
+ with PartitionedFormatTable {
override lazy val fileIndex: PartitioningAwareFileIndex = {
PaimonFormatTable.createFileIndex(
options,
sparkSession,
paths,
userSpecifiedSchema,
- partitionSchema)
+ partitionSchema())
}
}
@@ -136,8 +180,9 @@ class PartitionedOrcTable(
paths: Seq[String],
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat],
- partitionSchema: StructType
-) extends OrcTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat) {
+ override val partitionSchema_ : StructType
+) extends OrcTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat)
+ with PartitionedFormatTable {
override lazy val fileIndex: PartitioningAwareFileIndex = {
PaimonFormatTable.createFileIndex(
@@ -145,7 +190,7 @@ class PartitionedOrcTable(
sparkSession,
paths,
userSpecifiedSchema,
- partitionSchema)
+ partitionSchema())
}
}
@@ -156,14 +201,9 @@ class PartitionedParquetTable(
paths: Seq[String],
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat],
- partitionSchema: StructType
-) extends ParquetTable(
- name,
- sparkSession,
- options,
- paths,
- userSpecifiedSchema,
- fallbackFileFormat) {
+ override val partitionSchema_ : StructType
+) extends ParquetTable(name, sparkSession, options, paths,
userSpecifiedSchema, fallbackFileFormat)
+ with PartitionedFormatTable {
override lazy val fileIndex: PartitioningAwareFileIndex = {
PaimonFormatTable.createFileIndex(
@@ -171,7 +211,7 @@ class PartitionedParquetTable(
sparkSession,
paths,
userSpecifiedSchema,
- partitionSchema)
+ partitionSchema())
}
}
@@ -182,8 +222,9 @@ class PartitionedJsonTable(
paths: Seq[String],
userSpecifiedSchema: Option[StructType],
fallbackFileFormat: Class[_ <: FileFormat],
- partitionSchema: StructType)
- extends JsonTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat) {
+ override val partitionSchema_ : StructType)
+ extends JsonTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat)
+ with PartitionedFormatTable {
override lazy val fileIndex: PartitioningAwareFileIndex = {
PaimonFormatTable.createFileIndex(
@@ -191,6 +232,6 @@ class PartitionedJsonTable(
sparkSession,
paths,
userSpecifiedSchema,
- partitionSchema)
+ partitionSchema())
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
index 56c14aa64b..8c5b96a7d6 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -58,6 +58,26 @@ abstract class FormatTableTestBase extends
PaimonHiveTestBase {
}
}
+ test("Format table: show partitions") {
+ for (format <- Seq("csv", "orc", "parquet", "json")) {
+ withTable("t") {
+ sql(s"CREATE TABLE t (id INT, p1 INT, p2 STRING) USING $format
PARTITIONED BY (p1, p2)")
+ sql("INSERT INTO t VALUES (1, 1, '1')")
+ sql("INSERT INTO t VALUES (2, 1, '1')")
+ sql("INSERT INTO t VALUES (3, 2, '1')")
+ sql("INSERT INTO t VALUES (3, 2, '2')")
+
+ checkAnswer(
+ spark.sql("SHOW PARTITIONS T"),
+ Seq(Row("p1=1/p2=1"), Row("p1=2/p2=1"), Row("p1=2/p2=2")))
+
+ checkAnswer(spark.sql("SHOW PARTITIONS T PARTITION (p1=1)"),
Seq(Row("p1=1/p2=1")))
+
+ checkAnswer(spark.sql("SHOW PARTITIONS T PARTITION (p1=2, p2='2')"),
Seq(Row("p1=2/p2=2")))
+ }
+ }
+ }
+
test("Format table: CTAS with partitioned table") {
withTable("t1", "t2") {
sql("CREATE TABLE t1 (id INT, p1 INT, p2 INT) USING csv PARTITIONED BY
(p1, p2)")