This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1f15f2c6ad7 [SPARK-39615][SQL] Make listColumns be compatible with 3 layer namespace 1f15f2c6ad7 is described below commit 1f15f2c6ad7ff8e593d39dd264b4a6efa89d67af Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Wed Jun 29 10:00:59 2022 +0800 [SPARK-39615][SQL] Make listColumns be compatible with 3 layer namespace ### What changes were proposed in this pull request? Make listColumns be compatible with 3 layer namespace ### Why are the changes needed? for 3 layer namespace compatiblity ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? added UT Closes #37000 from zhengruifeng/sql_3L_list_cols. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../apache/spark/sql/internal/CatalogImpl.scala | 59 ++++++++++++++++++++-- .../apache/spark/sql/internal/CatalogSuite.scala | 46 +++++++++++++++++ 2 files changed, 102 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 49cb9a3e897..97226736691 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowTables, SubqueryAlias, TableSpec, View} import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, SupportsNamespaces, TableCatalog} -import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, IdentifierHelper, TransformHelper} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.types.StructType @@ -208,8 +208,23 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { */ @throws[AnalysisException]("table does not exist") override def listColumns(tableName: String): Dataset[Column] = { - val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) - listColumns(tableIdent) + // calling `sqlParser.parseTableIdentifier` to parse tableName. If it contains only table name + // and optionally contains a database name(thus a TableIdentifier), then we look up the table in + // sessionCatalog. Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of + // string as the qualified identifier and resolve the table through SQL analyzer. + try { + val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + if (tableExists(ident.database.orNull, ident.table)) { + listColumns(ident) + } else { + val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) + listColumns(ident) + } + } catch { + case e: org.apache.spark.sql.catalyst.parser.ParseException => + val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) + listColumns(ident) + } } /** @@ -238,6 +253,44 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { CatalogImpl.makeDataset(columns, sparkSession) } + private def listColumns(ident: Seq[String]): Dataset[Column] = { + val plan = UnresolvedTableOrView(ident, "Catalog.listColumns", true) + + val columns = sparkSession.sessionState.executePlan(plan).analyzed match { + case ResolvedTable(_, _, table, _) => + val (partitionColumnNames, bucketSpecOpt) = table.partitioning.toSeq.convertTransforms + val bucketColumnNames = bucketSpecOpt.map(_.bucketColumnNames).getOrElse(Nil) + table.schema.map { field => + new Column( + name = field.name, + description = field.getComment().orNull, + dataType = field.dataType.simpleString, + nullable = field.nullable, + isPartition = partitionColumnNames.contains(field.name), + isBucket = bucketColumnNames.contains(field.name)) + } + + case ResolvedView(identifier, _) => + val catalog = sparkSession.sessionState.catalog + val table = identifier.asTableIdentifier + val schema = catalog.getTempViewOrPermanentTableMetadata(table).schema + schema.map { field => + new Column( + name = field.name, + description = field.getComment().orNull, + dataType = field.dataType.simpleString, + nullable = field.nullable, + isPartition = false, + isBucket = false) + } + + case _ => throw QueryCompilationErrors.tableOrViewNotFound(ident) + } + + CatalogImpl.makeDataset(columns, sparkSession) + } + + /** * Gets the database with the specified name. This throws an `AnalysisException` when no * `Database` can be found. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index a1a946ddd71..13f6965a8e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -273,6 +273,52 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf testListColumns("tab1", dbName = Some("db1")) } + test("SPARK-39615: three layer namespace compatibility - listColumns") { + val answers = Map( + "col1" -> ("int", true, false, true), + "col2" -> ("string", true, false, false), + "a" -> ("int", true, true, false), + "b" -> ("string", true, true, false) + ) + + assert(spark.catalog.currentCatalog() === "spark_catalog") + createTable("my_table1") + + val columns1 = spark.catalog.listColumns("my_table1").collect() + assert(answers === + columns1.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap) + + val columns2 = spark.catalog.listColumns("default.my_table1").collect() + assert(answers === + columns2.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap) + + val columns3 = spark.catalog.listColumns("spark_catalog.default.my_table1").collect() + assert(answers === + columns3.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap) + + createDatabase("my_db1") + createTable("my_table2", Some("my_db1")) + + val columns4 = spark.catalog.listColumns("my_db1.my_table2").collect() + assert(answers === + columns4.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap) + + val columns5 = spark.catalog.listColumns("spark_catalog.my_db1.my_table2").collect() + assert(answers === + columns5.map(c => c.name -> (c.dataType, c.nullable, c.isPartition, c.isBucket)).toMap) + + val catalogName = "testcat" + val dbName = "my_db2" + val tableName = "my_table2" + val tableSchema = new StructType().add("i", "int").add("j", "string") + val description = "this is a test managed table" + createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName, tableSchema, + Map.empty[String, String], description) + + val columns6 = spark.catalog.listColumns("testcat.my_db2.my_table2").collect() + assert(Map("i" -> "int", "j" -> "string") === columns6.map(c => c.name -> c.dataType).toMap) + } + test("Database.toString") { assert(new Database("cool_db", "cool_desc", "cool_path").toString == "Database[name='cool_db', description='cool_desc', path='cool_path']") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org