This is an automated email from the ASF dual-hosted git repository.
xuzifu666 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 300cc67c20 [spark] show table extended (#4603)
300cc67c20 is described below
commit 300cc67c208c4b86e2edf58ad1981b86649fe892
Author: Yann Byron <[email protected]>
AuthorDate: Tue Dec 3 11:35:16 2024 +0800
[spark] show table extended (#4603)
* [spark] show table extended
* update
* [update] doc
---
docs/content/spark/auxiliary.md | 11 ++
.../paimon/spark/PaimonPartitionManagement.scala | 2 +-
.../analysis/PaimonResolvePartitionSpec.scala | 75 +++++++++++++
.../extensions/PaimonSparkSessionExtensions.scala | 2 +
.../scala/org/apache/spark/sql/PaimonUtils.scala | 19 ++++
...logUtils.scala => PaimonCatalogImplicits.scala} | 25 +----
.../sql/connector/catalog/PaimonCatalogUtils.scala | 3 +
.../apache/spark/sql/paimon/shims/SparkShim.scala | 4 +
.../paimon/spark/sql/DescribeTableTest.scala | 70 ++++++++++++
.../catalyst/analysis/Spark3ResolutionRules.scala | 56 ++++++++++
.../commands/PaimonShowTablePartitionCommand.scala | 96 ++++++++++++++++
.../commands/PaimonShowTablesExtendedCommand.scala | 123 +++++++++++++++++++++
.../apache/spark/sql/paimon/shims/Spark3Shim.scala | 8 +-
.../catalyst/analysis/Spark4ResolutionRules.scala} | 26 +----
.../apache/spark/sql/paimon/shims/Spark4Shim.scala | 9 +-
15 files changed, 486 insertions(+), 43 deletions(-)
diff --git a/docs/content/spark/auxiliary.md b/docs/content/spark/auxiliary.md
index 6330ca27ce..5de0289565 100644
--- a/docs/content/spark/auxiliary.md
+++ b/docs/content/spark/auxiliary.md
@@ -96,6 +96,17 @@ SHOW PARTITIONS my_table;
SHOW PARTITIONS my_table PARTITION (dt=20230817);
```
+## Show Table Extended
+The SHOW TABLE EXTENDED statement is used to list table or partition
information.
+
+```sql
+-- Lists tables that satisfy regular expressions
+SHOW TABLE EXTENDED IN db_name LIKE 'test*';
+
+-- Lists the specified partition information for the table
+SHOW TABLE EXTENDED IN db_name LIKE 'table_name' PARTITION(pt = '2024');
+```
+
## Analyze table
The ANALYZE TABLE statement collects statistics about the table, that are to
be used by the query optimizer to find a better query execution plan.
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 9a305ca59a..840f1341a6 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -100,7 +100,7 @@ trait PaimonPartitionManagement extends
SupportsAtomicPartitionManagement {
}
override def loadPartitionMetadata(ident: InternalRow): JMap[String, String]
= {
- throw new UnsupportedOperationException("Load partition is not supported")
+ Map.empty[String, String].asJava
}
override def listPartitionIdentifiers(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonResolvePartitionSpec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonResolvePartitionSpec.scala
new file mode 100644
index 0000000000..5d6a5a063c
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonResolvePartitionSpec.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.spark.sql.PaimonUtils.{normalizePartitionSpec,
requireExactMatchedPartitionSpec}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.{PartitionSpec,
ResolvedPartitionSpec, UnresolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.analysis.ResolvePartitionSpec.conf
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+
+object PaimonResolvePartitionSpec {
+
+ def resolve(
+ catalog: TableCatalog,
+ tableIndent: Identifier,
+ partitionSpec: PartitionSpec): ResolvedPartitionSpec = {
+ val table = catalog.loadTable(tableIndent).asPartitionable
+ partitionSpec match {
+ case u: UnresolvedPartitionSpec =>
+ val partitionSchema = table.partitionSchema()
+ resolvePartitionSpec(table.name(), u, partitionSchema,
allowPartitionSpec = false)
+ case o => o.asInstanceOf[ResolvedPartitionSpec]
+ }
+ }
+
+ private def resolvePartitionSpec(
+ tableName: String,
+ partSpec: UnresolvedPartitionSpec,
+ partSchema: StructType,
+ allowPartitionSpec: Boolean): ResolvedPartitionSpec = {
+ val normalizedSpec = normalizePartitionSpec(partSpec.spec, partSchema,
tableName, conf.resolver)
+ if (!allowPartitionSpec) {
+ requireExactMatchedPartitionSpec(tableName, normalizedSpec,
partSchema.fieldNames)
+ }
+ val partitionNames = normalizedSpec.keySet
+ val requestedFields = partSchema.filter(field =>
partitionNames.contains(field.name))
+ ResolvedPartitionSpec(
+ requestedFields.map(_.name),
+ convertToPartIdent(normalizedSpec, requestedFields),
+ partSpec.location)
+ }
+
+ def convertToPartIdent(
+ partitionSpec: TablePartitionSpec,
+ schema: Seq[StructField]): InternalRow = {
+ val partValues = schema.map {
+ part =>
+ val raw = partitionSpec.get(part.name).orNull
+ val dt = CharVarcharUtils.replaceCharVarcharWithString(part.dataType)
+ Cast(Literal.create(raw, StringType), dt,
Some(conf.sessionLocalTimeZone)).eval()
+ }
+ InternalRow.fromSeq(partValues)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index e8f75d394a..f73df64fb8 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -40,6 +40,8 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
extensions.injectResolutionRule(spark => new PaimonAnalysis(spark))
extensions.injectResolutionRule(spark => PaimonProcedureResolver(spark))
extensions.injectResolutionRule(spark => PaimonViewResolver(spark))
+ extensions.injectResolutionRule(
+ spark => SparkShimLoader.getSparkShim.createCustomResolution(spark))
extensions.injectResolutionRule(spark =>
PaimonIncompatibleResolutionRules(spark))
extensions.injectPostHocResolutionRule(spark =>
PaimonPostHocResolutionRules(spark))
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index 4492d856ad..cc49e787dc 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -20,11 +20,15 @@ package org.apache.spark.sql
import org.apache.spark.executor.OutputMetrics
import org.apache.spark.rdd.InputFileBlockHolder
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.expressions.{FieldReference,
NamedReference}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.PartitioningUtils
import org.apache.spark.util.{Utils => SparkUtils}
/**
@@ -87,4 +91,19 @@ object PaimonUtils {
outputMetrics.setBytesWritten(bytesWritten)
outputMetrics.setRecordsWritten(recordsWritten)
}
+
+ def normalizePartitionSpec[T](
+ partitionSpec: Map[String, T],
+ partCols: StructType,
+ tblName: String,
+ resolver: Resolver): Map[String, T] = {
+ PartitioningUtils.normalizePartitionSpec(partitionSpec, partCols, tblName,
resolver)
+ }
+
+ def requireExactMatchedPartitionSpec(
+ tableName: String,
+ spec: TablePartitionSpec,
+ partitionColumnNames: Seq[String]): Unit = {
+ PartitioningUtils.requireExactMatchedPartitionSpec(tableName, spec,
partitionColumnNames)
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogImplicits.scala
similarity index 51%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogImplicits.scala
index 2ab3dc4945..f1f20fb6fb 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogImplicits.scala
@@ -18,26 +18,13 @@
package org.apache.spark.sql.connector.catalog
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.SparkConf
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
-import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.paimon.ReflectUtils
+object PaimonCatalogImplicits {
-object PaimonCatalogUtils {
+ import CatalogV2Implicits._
- def buildExternalCatalog(conf: SparkConf, hadoopConf: Configuration):
ExternalCatalog = {
- val externalCatalogClassName =
- if
(SparkSession.active.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive")) {
- "org.apache.spark.sql.hive.HiveExternalCatalog"
- } else {
- "org.apache.spark.sql.catalyst.catalog.InMemoryCatalog"
- }
- ReflectUtils.reflect[ExternalCatalog, SparkConf, Configuration](
- externalCatalogClassName,
- conf,
- hadoopConf)
- }
+ implicit class PaimonCatalogHelper(plugin: CatalogPlugin) extends
CatalogHelper(plugin)
+ implicit class PaimonNamespaceHelper(namespace: Array[String]) extends
NamespaceHelper(namespace)
+
+// implicit class PaimonTableHelper(table: Table) extends TableHelper(table)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
index 2ab3dc4945..5db6894ba0 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
+import org.apache.spark.sql.connector.catalog.CatalogV2Util
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.paimon.ReflectUtils
@@ -40,4 +41,6 @@ object PaimonCatalogUtils {
hadoopConf)
}
+ val TABLE_RESERVED_PROPERTIES: Seq[String] =
CatalogV2Util.TABLE_RESERVED_PROPERTIES
+
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index bd85282737..334bd6e931 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -24,6 +24,8 @@ import org.apache.paimon.types.{DataType, RowType}
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.parser.ParserInterface
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
@@ -39,6 +41,8 @@ trait SparkShim {
def createSparkParser(delegate: ParserInterface): ParserInterface
+ def createCustomResolution(spark: SparkSession): Rule[LogicalPlan]
+
def createSparkInternalRow(rowType: RowType): SparkInternalRow
def createSparkArrayData(elementType: DataType): SparkArrayData
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
index 528dcd3cd1..ae538fa48c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTest.scala
@@ -27,6 +27,76 @@ import java.util.Objects
class DescribeTableTest extends PaimonSparkTestBase {
+ test("Paimon show: show table extended") {
+ val testDB = "test_show"
+ withDatabase(testDB) {
+ spark.sql("CREATE TABLE s1 (id INT)")
+
+ spark.sql(s"CREATE DATABASE $testDB")
+ spark.sql(s"USE $testDB")
+ spark.sql("CREATE TABLE s2 (id INT, pt STRING) PARTITIONED BY (pt)")
+ spark.sql("CREATE TABLE s3 (id INT, pt1 STRING, pt2 STRING) PARTITIONED
BY (pt1, pt2)")
+
+ spark.sql("INSERT INTO s2 VALUES (1, '2024'), (2, '2024'), (3, '2025'),
(4, '2026')")
+ spark.sql("""
+ |INSERT INTO s3
+ |VALUES
+ |(1, '2024', '11'), (2, '2024', '12'), (3, '2025', '11'),
(4, '2025', '12')
+ |""".stripMargin)
+
+ // SHOW TABL EXTENDED will give four columns: namespace, tableName,
isTemporary, information.
+ checkAnswer(
+ sql(s"SHOW TABLE EXTENDED IN $dbName0 LIKE '*'")
+ .select("namespace", "tableName", "isTemporary"),
+ Row("test", "s1", false))
+ checkAnswer(
+ sql(s"SHOW TABLE EXTENDED IN $testDB LIKE '*'")
+ .select("namespace", "tableName", "isTemporary"),
+ Row(testDB, "s2", false) :: Row(testDB, "s3", false) :: Nil
+ )
+
+ // check table s1
+ val res1 = spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE
's2'").select("information")
+ Assertions.assertEquals(1, res1.count())
+ val information1 = res1
+ .collect()
+ .head
+ .getString(0)
+ .split("\n")
+ .map {
+ line =>
+ val kv = line.split(": ", 2)
+ kv(0) -> kv(1)
+ }
+ .toMap
+ Assertions.assertEquals(information1("Catalog"), "paimon")
+ Assertions.assertEquals(information1("Namespace"), testDB)
+ Assertions.assertEquals(information1("Table"), "s2")
+ Assertions.assertEquals(information1("Provider"), "paimon")
+ Assertions.assertEquals(information1("Location"), loadTable(testDB,
"s2").location().toString)
+
+ // check table s2 partition info
+ val error1 = intercept[Exception] {
+ spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's2'
PARTITION(pt='2022')")
+ }.getMessage
+ assert(error1.contains("PARTITIONS_NOT_FOUND"))
+
+ val error2 = intercept[Exception] {
+ spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's3'
PARTITION(pt1='2024')")
+ }.getMessage
+ assert(error2.contains("Partition spec is invalid"))
+
+ val res2 =
+ spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's3' PARTITION(pt1 =
'2024', pt2 = 11)")
+ checkAnswer(
+ res2.select("namespace", "tableName", "isTemporary"),
+ Row(testDB, "s3", false)
+ )
+ Assertions.assertTrue(
+
res2.select("information").collect().head.getString(0).contains("Partition
Values"))
+ }
+ }
+
test(s"Paimon describe: describe table comment") {
var comment = "test comment"
spark.sql(s"""
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark3ResolutionRules.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark3ResolutionRules.scala
new file mode 100644
index 0000000000..924df2d1e3
--- /dev/null
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark3ResolutionRules.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.paimon.spark.commands.{PaimonShowTablePartitionCommand,
PaimonShowTablesExtendedCommand}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.analysis.{PartitionSpec,
ResolvedNamespace, UnresolvedPartitionSpec}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
ShowTableExtended}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.Identifier
+
+case class Spark3ResolutionRules(session: SparkSession)
+ extends Rule[LogicalPlan]
+ with SQLConfHelper {
+
+ import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
+
+ override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsDown {
+ case ShowTableExtended(
+ ResolvedNamespace(catalog, ns),
+ pattern,
+ partitionSpec @ (None | Some(UnresolvedPartitionSpec(_, _))),
+ output) =>
+ partitionSpec
+ .map {
+ spec: PartitionSpec =>
+ val table = Identifier.of(ns.toArray, pattern)
+ val resolvedSpec =
+ PaimonResolvePartitionSpec.resolve(catalog.asTableCatalog,
table, spec)
+ PaimonShowTablePartitionCommand(output, catalog.asTableCatalog,
table, resolvedSpec)
+ }
+ .getOrElse {
+ PaimonShowTablesExtendedCommand(catalog.asTableCatalog, ns, pattern,
output)
+ }
+
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala
new file mode 100644
index 0000000000..32f9498585
--- /dev/null
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec
+import
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
+import org.apache.spark.sql.catalyst.expressions.{Attribute, ToPrettyString}
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.connector.catalog.{Identifier,
SupportsPartitionManagement, TableCatalog}
+import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+case class PaimonShowTablePartitionCommand(
+ override val output: Seq[Attribute],
+ catalog: TableCatalog,
+ tableIndent: Identifier,
+ partSpec: ResolvedPartitionSpec)
+ extends PaimonLeafRunnableCommand {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val rows = new mutable.ArrayBuffer[Row]()
+ val table = catalog.loadTable(tableIndent)
+ val information = getTablePartitionDetails(tableIndent,
table.asPartitionable, partSpec)
+ rows += Row(tableIndent.namespace.quoted, tableIndent.name(), false,
s"$information\n")
+
+ rows.toSeq
+ }
+
+ private def getTablePartitionDetails(
+ tableIdent: Identifier,
+ partitionTable: SupportsPartitionManagement,
+ partSpec: ResolvedPartitionSpec): String = {
+ val results = new mutable.LinkedHashMap[String, String]()
+
+ // "Partition Values"
+ val partitionSchema = partitionTable.partitionSchema()
+ val (names, ident) = (partSpec.names, partSpec.ident)
+ val partitionIdentifiers =
partitionTable.listPartitionIdentifiers(names.toArray, ident)
+ if (partitionIdentifiers.isEmpty) {
+ val part = ident
+ .toSeq(partitionSchema)
+ .zip(partitionSchema.map(_.name))
+ .map(kv => s"${kv._2}" + s" = ${kv._1}")
+ .mkString(", ")
+ throw new RuntimeException(
+ s"""
+ |[PARTITIONS_NOT_FOUND] The partition(s) PARTITION ($part) cannot
be found in table ${tableIdent.toString}.
+ |Verify the partition specification and table name.
+ |""".stripMargin)
+ }
+ assert(partitionIdentifiers.length == 1)
+ val row = partitionIdentifiers.head
+ val len = partitionSchema.length
+ val partitions = new Array[String](len)
+ val timeZoneId = conf.sessionLocalTimeZone
+ for (i <- 0 until len) {
+ val dataType = partitionSchema(i).dataType
+ val partValueUTF8String =
+ ToPrettyString(Literal(row.get(i, dataType), dataType),
Some(timeZoneId)).eval(null)
+ val partValueStr = if (partValueUTF8String == null) "null" else
partValueUTF8String.toString
+ partitions(i) = escapePathName(partitionSchema(i).name) + "=" +
escapePathName(partValueStr)
+ }
+ val partitionValues = partitions.mkString("[", ", ", "]")
+ results.put("Partition Values", s"$partitionValues")
+
+ // TODO "Partition Parameters", "Created Time", "Last Access", "Partition
Statistics"
+
+ results
+ .map {
+ case (key, value) =>
+ if (value.isEmpty) key else s"$key: $value"
+ }
+ .mkString("", "\n", "")
+ }
+}
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablesExtendedCommand.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablesExtendedCommand.scala
new file mode 100644
index 0000000000..b393982e25
--- /dev/null
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablesExtendedCommand.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.util.{QuotingUtils, StringUtils}
+import org.apache.spark.sql.connector.catalog.{Identifier, PaimonCatalogUtils,
SupportsPartitionManagement, Table, TableCatalog}
+import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+case class PaimonShowTablesExtendedCommand(
+ catalog: TableCatalog,
+ namespace: Seq[String],
+ pattern: String,
+ override val output: Seq[Attribute],
+ isExtended: Boolean = false,
+ partitionSpec: Option[TablePartitionSpec] = None)
+ extends PaimonLeafRunnableCommand {
+
+ override def run(spark: SparkSession): Seq[Row] = {
+ val rows = new mutable.ArrayBuffer[Row]()
+
+ val tables = catalog.listTables(namespace.toArray)
+ tables.map {
+ tableIdent: Identifier =>
+ if (StringUtils.filterPattern(Seq(tableIdent.name()),
pattern).nonEmpty) {
+ val table = catalog.loadTable(tableIdent)
+ val information = getTableDetails(catalog.name, tableIdent, table)
+ rows += Row(tableIdent.namespace().quoted, tableIdent.name(), false,
s"$information\n")
+ }
+ }
+
+ // TODO: view
+
+ rows.toSeq
+ }
+
+ private def getTableDetails(catalogName: String, identifier: Identifier,
table: Table): String = {
+ val results = new mutable.LinkedHashMap[String, String]()
+
+ results.put("Catalog", catalogName)
+ results.put("Namespace", identifier.namespace().quoted)
+ results.put("Table", identifier.name())
+ val tableType = if
(table.properties().containsKey(TableCatalog.PROP_EXTERNAL)) {
+ CatalogTableType.EXTERNAL
+ } else {
+ CatalogTableType.MANAGED
+ }
+ results.put("Type", tableType.name)
+
+ PaimonCatalogUtils.TABLE_RESERVED_PROPERTIES
+ .filterNot(_ == TableCatalog.PROP_EXTERNAL)
+ .foreach(
+ propKey => {
+ if (table.properties.containsKey(propKey)) {
+ results.put(propKey.capitalize, table.properties.get(propKey))
+ }
+ })
+
+ val properties: Seq[String] =
+ conf
+ .redactOptions(table.properties.asScala.toMap)
+ .toList
+ .filter(kv =>
!PaimonCatalogUtils.TABLE_RESERVED_PROPERTIES.contains(kv._1))
+ .sortBy(_._1)
+ .map { case (key, value) => key + "=" + value }
+ if (!table.properties().isEmpty) {
+ results.put("Table Properties", properties.mkString("[", ", ", "]"))
+ }
+
+ // Partition Provider & Partition Columns
+ if (supportsPartitions(table) &&
table.asPartitionable.partitionSchema().nonEmpty) {
+ results.put("Partition Provider", "Catalog")
+ results.put(
+ "Partition Columns",
+ table.asPartitionable
+ .partitionSchema()
+ .map(field => QuotingUtils.quoteIdentifier(field.name))
+ .mkString("[", ", ", "]"))
+ }
+
+ if (table.schema().nonEmpty) {
+ results.put("Schema", table.schema().treeString)
+ }
+
+ results
+ .map {
+ case (key, value) =>
+ if (value.isEmpty) key else s"$key: $value"
+ }
+ .mkString("", "\n", "")
+ }
+
+ private def supportsPartitions(table: Table): Boolean = table match {
+ case _: SupportsPartitionManagement => true
+ case _ => false
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index 57d79d6474..f508e2605c 100644
---
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.paimon.shims
+import org.apache.paimon.spark.catalyst.analysis.Spark3ResolutionRules
import
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark3SqlExtensionsParser
import org.apache.paimon.spark.data.{Spark3ArrayData, Spark3InternalRow,
SparkArrayData, SparkInternalRow}
import org.apache.paimon.types.{DataType, RowType}
@@ -25,7 +26,8 @@ import org.apache.paimon.types.{DataType, RowType}
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.Aggregate
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.types.StructType
@@ -38,6 +40,10 @@ class Spark3Shim extends SparkShim {
new PaimonSpark3SqlExtensionsParser(delegate)
}
+ override def createCustomResolution(spark: SparkSession): Rule[LogicalPlan]
= {
+ Spark3ResolutionRules(spark)
+ }
+
override def createSparkInternalRow(rowType: RowType): SparkInternalRow = {
new Spark3InternalRow(rowType)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala
similarity index 50%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
copy to
paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala
index 2ab3dc4945..461cbd0c93 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/connector/catalog/PaimonCatalogUtils.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala
@@ -16,28 +16,12 @@
* limitations under the License.
*/
-package org.apache.spark.sql.connector.catalog
+package org.apache.paimon.spark.catalyst.analysis
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.catalog.ExternalCatalog
-import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.paimon.ReflectUtils
-
-object PaimonCatalogUtils {
-
- def buildExternalCatalog(conf: SparkConf, hadoopConf: Configuration):
ExternalCatalog = {
- val externalCatalogClassName =
- if
(SparkSession.active.conf.get(CATALOG_IMPLEMENTATION.key).equals("hive")) {
- "org.apache.spark.sql.hive.HiveExternalCatalog"
- } else {
- "org.apache.spark.sql.catalyst.catalog.InMemoryCatalog"
- }
- ReflectUtils.reflect[ExternalCatalog, SparkConf, Configuration](
- externalCatalogClassName,
- conf,
- hadoopConf)
- }
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+case class Spark4ResolutionRules(session: SparkSession) extends
Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan
}
diff --git
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index dfec4eb71f..eefddafdbf 100644
---
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.paimon.shims
+import org.apache.paimon.spark.catalyst.analysis.Spark4ResolutionRules
import
org.apache.paimon.spark.catalyst.parser.extensions.PaimonSpark4SqlExtensionsParser
import org.apache.paimon.spark.data.{Spark4ArrayData, Spark4InternalRow,
SparkArrayData, SparkInternalRow}
import org.apache.paimon.types.{DataType, RowType}
@@ -25,7 +26,8 @@ import org.apache.paimon.types.{DataType, RowType}
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.Aggregate
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier,
Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.internal.ExpressionUtils
@@ -38,6 +40,11 @@ class Spark4Shim extends SparkShim {
override def createSparkParser(delegate: ParserInterface): ParserInterface =
{
new PaimonSpark4SqlExtensionsParser(delegate)
}
+
+ override def createCustomResolution(spark: SparkSession): Rule[LogicalPlan]
= {
+ Spark4ResolutionRules(spark)
+ }
+
override def createSparkInternalRow(rowType: RowType): SparkInternalRow = {
new Spark4InternalRow(rowType)
}