Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12781#discussion_r62079118
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---
    @@ -452,3 +455,241 @@ case class ShowTablePropertiesCommand(table: 
TableIdentifier, propertyKey: Optio
         }
       }
     }
    +
    +/**
    + * A command to list the column names for a table. This function creates a
    + * [[ShowColumnsCommand]] logical plan.
    + *
    + * The syntax of using this command in SQL is:
    + * {{{
    + *   SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database];
    + * }}}
    + */
    +case class ShowColumnsCommand(table: TableIdentifier) extends 
RunnableCommand {
    +  // The result of SHOW COLUMNS has one column called 'result'
    +  override val output: Seq[Attribute] = {
    +    AttributeReference("result", StringType, nullable = false)() :: Nil
    +  }
    +
    +  override def run(sparkSession: SparkSession): Seq[Row] = {
    +    sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { 
c =>
    +      Row(c.name)
    +    }
    +  }
    +}
    +
    +/**
    + * A command to list the partition names of a table. If the partition spec 
is specified,
    + * partitions that match the spec are returned. [[AnalysisException]] 
exception is thrown under
    + * the following conditions:
    + *
    + * 1. If the command is called for a non partitioned table.
    + * 2. If the partition spec refers to the columns that are not defined as 
partitioning columns.
    + *
    + * This function creates a [[ShowPartitionsCommand]] logical plan
    + *
    + * The syntax of using this command in SQL is:
    + * {{{
    + *   SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)]
    + * }}}
    + */
    +case class ShowPartitionsCommand(
    +    table: TableIdentifier,
    +    spec: Option[TablePartitionSpec]) extends RunnableCommand {
    +  // The result of SHOW PARTITIONS has one column called 'result'
    +  override val output: Seq[Attribute] = {
    +    AttributeReference("result", StringType, nullable = false)() :: Nil
    +  }
    +
    +  private def getPartName(spec: TablePartitionSpec, partColNames: 
Seq[String]): String = {
    +    partColNames.map { name =>
    +      PartitioningUtils.escapePathName(name) + "=" + 
PartitioningUtils.escapePathName(spec(name))
    +    }.mkString(File.separator)
    +  }
    +
    +  override def run(sparkSession: SparkSession): Seq[Row] = {
    +    val catalog = sparkSession.sessionState.catalog
    +
    +    if (catalog.isTemporaryTable(table)) {
    +      throw new AnalysisException(
    +        s"SHOW PARTITIONS is not allowed on a temporary table: 
${table.unquotedString}")
    +    }
    +
    +    val tab = catalog.getTableMetadata(table)
    +
    +    /**
    +     * Validate and throws an [[AnalysisException]] exception under the 
following conditions:
    +     * 1. If the table is not partitioned.
    +     * 2. If it is a datasource table.
    +     * 3. If it is a view or index table.
    +     */
    +    if (tab.tableType == VIEW ||
    +      tab.tableType == INDEX) {
    +      throw new AnalysisException(
    +        s"SHOW PARTITIONS is not allowed on a view or index table: 
${tab.qualifiedName}")
    +    }
    +
    +    if (!DDLUtils.isTablePartitioned(tab)) {
    +      throw new AnalysisException(
    +        s"SHOW PARTITIONS is not allowed on a table that is not 
partitioned: ${tab.qualifiedName}")
    +    }
    +
    +    if (DDLUtils.isDatasourceTable(tab)) {
    +      throw new AnalysisException(
    +        s"SHOW PARTITIONS is not allowed on a datasource table: 
${tab.qualifiedName}")
    +    }
    +
    +    /**
    +     * Validate the partitioning spec by making sure all the referenced 
columns are
    +     * defined as partitioning columns in table definition. An 
AnalysisException exception is
    +     * thrown if the partitioning spec is invalid.
    +     */
    +    if (spec.isDefined) {
    +      val badColumns = 
spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains)
    +      if (badColumns.nonEmpty) {
    +        val badCols = badColumns.mkString("[", ", ", "]")
    +        throw new AnalysisException(
    +          s"Non-partitioning column(s) $badCols are specified for SHOW 
PARTITIONS")
    +      }
    +    }
    +
    +    val partNames = catalog.listPartitions(table, spec).map { p =>
    +      getPartName(p.spec, tab.partitionColumnNames)
    +    }
    +
    +    partNames.map(Row(_))
    +  }
    +}
    +
    +case class ShowCreateTableCommand(table: TableIdentifier) extends 
RunnableCommand {
    +  override val output: Seq[Attribute] = Seq(
    +    AttributeReference("createtab_stmt", StringType, nullable = false)()
    +  )
    +
    +  override def run(sparkSession: SparkSession): Seq[Row] = {
    +    val catalog = sparkSession.sessionState.catalog
    +
    +    if (catalog.isTemporaryTable(table)) {
    +      throw new AnalysisException(
    +        s"SHOW CREATE TABLE cannot be applied to temporary table")
    +    }
    +
    +    if (!catalog.tableExists(table)) {
    +      throw new AnalysisException(s"Table $table doesn't exist")
    +    }
    +
    +    val tableMetadata = catalog.getTableMetadata(table)
    +
    +    val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) {
    +      showCreateDataSourceTable(tableMetadata)
    +    } else {
    +      throw new UnsupportedOperationException(
    +        "SHOW CREATE TABLE only supports Spark SQL data source tables.")
    +    }
    +
    +    Seq(Row(stmt))
    +  }
    +
    +  private def showCreateDataSourceTable(metadata: CatalogTable): String = {
    +    val builder = StringBuilder.newBuilder
    +
    +    builder ++= s"CREATE TABLE ${table.quotedString} "
    +    showDataSourceTableDataCols(metadata, builder)
    +    showDataSourceTableOptions(metadata, builder)
    +    showDataSourceTableNonDataColumns(metadata, builder)
    +
    +    builder.toString()
    +  }
    +
    +  private def showDataSourceTableDataCols(metadata: CatalogTable, builder: 
StringBuilder): Unit = {
    +    val props = metadata.properties
    +    val schemaParts = for {
    +      numParts <- props.get("spark.sql.sources.schema.numParts").toSeq
    +      index <- 0 until numParts.toInt
    +    } yield props.getOrElse(
    +      s"spark.sql.sources.schema.part.$index",
    +      throw new AnalysisException(
    +        s"Corrupted schema in catalog: $numParts parts expected, but part 
$index is missing."
    +      )
    +    )
    +
    +    if (schemaParts.nonEmpty) {
    +      val fields = 
DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType].fields
    +      val colTypeList = fields.map(f => s"${quoteIdentifier(f.name)} 
${f.dataType.sql}")
    +      builder ++= colTypeList.mkString("(", ", ", ")")
    +    }
    +
    +    builder ++= "\n"
    +  }
    +
    +  private def showDataSourceTableOptions(metadata: CatalogTable, builder: 
StringBuilder): Unit = {
    +    val props = metadata.properties
    +
    +    builder ++= s"USING ${props("spark.sql.sources.provider")}\n"
    +
    +    val dataSourceOptions = metadata.storage.serdeProperties.filterNot {
    +      case (key, value) =>
    +        // If it's a managed table, omit PATH option. Spark SQL always 
creates external table
    +        // when the table creation DDL contains the PATH option.
    +        key.toLowerCase == "path" && metadata.tableType == MANAGED
    +    }.map {
    +      case (key, value) => s"${quoteIdentifier(key)} 
'${escapeSingleQuotedString(value)}'"
    +    }
    +
    +    if (dataSourceOptions.nonEmpty) {
    +      builder ++= "OPTIONS (\n"
    +      builder ++= dataSourceOptions.mkString("  ", ",\n  ", "\n")
    +      builder ++= ")\n"
    +    }
    +  }
    +
    +  private def showDataSourceTableNonDataColumns(
    +      metadata: CatalogTable, builder: StringBuilder): Unit = {
    +    val props = metadata.properties
    +
    +    def getColumnNamesByType(colType: String, typeName: String): 
Seq[String] = {
    +      (for {
    +        numCols <- 
props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq
    +        index <- 0 until numCols.toInt
    +      } yield props.getOrElse(
    +        s"spark.sql.sources.schema.${colType}Col.$index",
    +        throw new AnalysisException(
    +          s"Corrupted $typeName in catalog: $numCols parts expected, but 
part $index is missing."
    +        )
    +      )).map(quoteIdentifier)
    +    }
    +
    +    val partCols = getColumnNamesByType("part", "partitioning columns")
    +    if (partCols.nonEmpty) {
    +      builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n"
    +    }
    --- End diff --
    
    We should also have utility functions for reconstructing partition columns, 
bucketing columns, and sorting columns from strings.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to