Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/12781#discussion_r62162145 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -452,3 +455,241 @@ case class ShowTablePropertiesCommand(table: TableIdentifier, propertyKey: Optio } } } + +/** + * A command to list the column names for a table. This function creates a + * [[ShowColumnsCommand]] logical plan. + * + * The syntax of using this command in SQL is: + * {{{ + * SHOW COLUMNS (FROM | IN) table_identifier [(FROM | IN) database]; + * }}} + */ +case class ShowColumnsCommand(table: TableIdentifier) extends RunnableCommand { + // The result of SHOW COLUMNS has one column called 'result' + override val output: Seq[Attribute] = { + AttributeReference("result", StringType, nullable = false)() :: Nil + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + sparkSession.sessionState.catalog.getTableMetadata(table).schema.map { c => + Row(c.name) + } + } +} + +/** + * A command to list the partition names of a table. If the partition spec is specified, + * partitions that match the spec are returned. [[AnalysisException]] exception is thrown under + * the following conditions: + * + * 1. If the command is called for a non partitioned table. + * 2. If the partition spec refers to the columns that are not defined as partitioning columns. + * + * This function creates a [[ShowPartitionsCommand]] logical plan + * + * The syntax of using this command in SQL is: + * {{{ + * SHOW PARTITIONS [db_name.]table_name [PARTITION(partition_spec)] + * }}} + */ +case class ShowPartitionsCommand( + table: TableIdentifier, + spec: Option[TablePartitionSpec]) extends RunnableCommand { + // The result of SHOW PARTITIONS has one column called 'result' + override val output: Seq[Attribute] = { + AttributeReference("result", StringType, nullable = false)() :: Nil + } + + private def getPartName(spec: TablePartitionSpec, partColNames: Seq[String]): String = { + partColNames.map { name => + PartitioningUtils.escapePathName(name) + "=" + PartitioningUtils.escapePathName(spec(name)) + }.mkString(File.separator) + } + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + + if (catalog.isTemporaryTable(table)) { + throw new AnalysisException( + s"SHOW PARTITIONS is not allowed on a temporary table: ${table.unquotedString}") + } + + val tab = catalog.getTableMetadata(table) + + /** + * Validate and throws an [[AnalysisException]] exception under the following conditions: + * 1. If the table is not partitioned. + * 2. If it is a datasource table. + * 3. If it is a view or index table. + */ + if (tab.tableType == VIEW || + tab.tableType == INDEX) { + throw new AnalysisException( + s"SHOW PARTITIONS is not allowed on a view or index table: ${tab.qualifiedName}") + } + + if (!DDLUtils.isTablePartitioned(tab)) { + throw new AnalysisException( + s"SHOW PARTITIONS is not allowed on a table that is not partitioned: ${tab.qualifiedName}") + } + + if (DDLUtils.isDatasourceTable(tab)) { + throw new AnalysisException( + s"SHOW PARTITIONS is not allowed on a datasource table: ${tab.qualifiedName}") + } + + /** + * Validate the partitioning spec by making sure all the referenced columns are + * defined as partitioning columns in table definition. An AnalysisException exception is + * thrown if the partitioning spec is invalid. + */ + if (spec.isDefined) { + val badColumns = spec.get.keySet.filterNot(tab.partitionColumns.map(_.name).contains) + if (badColumns.nonEmpty) { + val badCols = badColumns.mkString("[", ", ", "]") + throw new AnalysisException( + s"Non-partitioning column(s) $badCols are specified for SHOW PARTITIONS") + } + } + + val partNames = catalog.listPartitions(table, spec).map { p => + getPartName(p.spec, tab.partitionColumnNames) + } + + partNames.map(Row(_)) + } +} + +case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableCommand { + override val output: Seq[Attribute] = Seq( + AttributeReference("createtab_stmt", StringType, nullable = false)() + ) + + override def run(sparkSession: SparkSession): Seq[Row] = { + val catalog = sparkSession.sessionState.catalog + + if (catalog.isTemporaryTable(table)) { + throw new AnalysisException( + s"SHOW CREATE TABLE cannot be applied to temporary table") + } + + if (!catalog.tableExists(table)) { + throw new AnalysisException(s"Table $table doesn't exist") + } + + val tableMetadata = catalog.getTableMetadata(table) + + val stmt = if (DDLUtils.isDatasourceTable(tableMetadata)) { + showCreateDataSourceTable(tableMetadata) + } else { + throw new UnsupportedOperationException( + "SHOW CREATE TABLE only supports Spark SQL data source tables.") + } + + Seq(Row(stmt)) + } + + private def showCreateDataSourceTable(metadata: CatalogTable): String = { + val builder = StringBuilder.newBuilder + + builder ++= s"CREATE TABLE ${table.quotedString} " + showDataSourceTableDataCols(metadata, builder) + showDataSourceTableOptions(metadata, builder) + showDataSourceTableNonDataColumns(metadata, builder) + + builder.toString() + } + + private def showDataSourceTableDataCols(metadata: CatalogTable, builder: StringBuilder): Unit = { + val props = metadata.properties + val schemaParts = for { + numParts <- props.get("spark.sql.sources.schema.numParts").toSeq + index <- 0 until numParts.toInt + } yield props.getOrElse( + s"spark.sql.sources.schema.part.$index", + throw new AnalysisException( + s"Corrupted schema in catalog: $numParts parts expected, but part $index is missing." + ) + ) + + if (schemaParts.nonEmpty) { + val fields = DataType.fromJson(schemaParts.mkString).asInstanceOf[StructType].fields + val colTypeList = fields.map(f => s"${quoteIdentifier(f.name)} ${f.dataType.sql}") + builder ++= colTypeList.mkString("(", ", ", ")") + } + + builder ++= "\n" + } + + private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = { + val props = metadata.properties + + builder ++= s"USING ${props("spark.sql.sources.provider")}\n" + + val dataSourceOptions = metadata.storage.serdeProperties.filterNot { + case (key, value) => + // If it's a managed table, omit PATH option. Spark SQL always creates external table + // when the table creation DDL contains the PATH option. + key.toLowerCase == "path" && metadata.tableType == MANAGED + }.map { + case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'" + } + + if (dataSourceOptions.nonEmpty) { + builder ++= "OPTIONS (\n" + builder ++= dataSourceOptions.mkString(" ", ",\n ", "\n") + builder ++= ")\n" + } + } + + private def showDataSourceTableNonDataColumns( + metadata: CatalogTable, builder: StringBuilder): Unit = { + val props = metadata.properties + + def getColumnNamesByType(colType: String, typeName: String): Seq[String] = { + (for { + numCols <- props.get(s"spark.sql.sources.schema.num${colType.capitalize}Cols").toSeq + index <- 0 until numCols.toInt + } yield props.getOrElse( + s"spark.sql.sources.schema.${colType}Col.$index", + throw new AnalysisException( + s"Corrupted $typeName in catalog: $numCols parts expected, but part $index is missing." + ) + )).map(quoteIdentifier) + } + + val partCols = getColumnNamesByType("part", "partitioning columns") + if (partCols.nonEmpty) { + builder ++= s"PARTITIONED BY ${partCols.mkString("(", ", ", ")")}\n" + } --- End diff -- I think we should also add a TODO here. Putting these informations in table property is a workaround for hive. After we have a better isolation of hive, we should be able to hide this logic in hive external catalog and don't need to do this trick here.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org