[SPARK-19678][SQL] remove MetastoreRelation ## What changes were proposed in this pull request?
`MetastoreRelation` is used to represent table relation for hive tables, and provides some hive related information. We will resolve `SimpleCatalogRelation` to `MetastoreRelation` for hive tables, which is unnecessary as these 2 are the same essentially. This PR merges `SimpleCatalogRelation` and `MetastoreRelation` ## How was this patch tested? existing tests Author: Wenchen Fan <wenc...@databricks.com> Closes #17015 from cloud-fan/table-relation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c7fc30b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c7fc30b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c7fc30b Branch: refs/heads/master Commit: 7c7fc30b4ae8e4ebd4ededf92240fed10481f2dd Parents: b405466 Author: Wenchen Fan <wenc...@databricks.com> Authored: Tue Feb 28 09:24:36 2017 -0800 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Tue Feb 28 09:24:36 2017 -0800 ---------------------------------------------------------------------- .../sql/catalyst/analysis/CheckAnalysis.scala | 2 - .../sql/catalyst/catalog/SessionCatalog.scala | 7 +- .../spark/sql/catalyst/catalog/interface.scala | 68 ++++--- .../apache/spark/sql/catalyst/identifiers.scala | 4 +- .../catalyst/catalog/SessionCatalogSuite.scala | 8 +- .../org/apache/spark/sql/DataFrameWriter.scala | 8 +- .../execution/OptimizeMetadataOnlyQuery.scala | 8 +- .../command/AnalyzeColumnCommand.scala | 49 ++--- .../execution/command/AnalyzeTableCommand.scala | 78 ++++---- .../datasources/DataSourceStrategy.scala | 29 +-- .../spark/sql/execution/datasources/rules.scala | 2 +- .../spark/sql/StatisticsCollectionSuite.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 130 ++++++------- .../spark/sql/hive/HiveSessionState.scala | 2 +- .../apache/spark/sql/hive/HiveStrategies.scala | 82 ++++++--- .../spark/sql/hive/MetastoreRelation.scala | 179 ------------------ .../org/apache/spark/sql/hive/TableReader.scala | 27 +-- .../sql/hive/execution/HiveTableScanExec.scala | 87 +++++---- .../hive/execution/InsertIntoHiveTable.scala | 47 +++-- .../spark/sql/hive/MetastoreRelationSuite.scala | 55 ------ .../apache/spark/sql/hive/StatisticsSuite.scala | 183 +++++++------------ .../sql/hive/execution/HiveComparisonTest.scala | 4 +- .../sql/hive/execution/HiveTableScanSuite.scala | 12 +- .../spark/sql/hive/execution/PruningSuite.scala | 4 +- .../sql/hive/execution/SQLQuerySuite.scala | 28 ++- .../spark/sql/hive/orc/OrcQuerySuite.scala | 5 +- .../apache/spark/sql/hive/parquetSuites.scala | 4 +- 27 files changed, 442 insertions(+), 672 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 36ab8b8..7529f90 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -18,10 +18,8 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.catalog.SimpleCatalogRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression -import org.apache.spark.sql.catalyst.plans.UsingJoin import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0230626..0673489 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -592,7 +592,12 @@ class SessionCatalog( child = parser.parsePlan(viewText)) SubqueryAlias(table, child, Some(name.copy(table = table, database = Some(db)))) } else { - SubqueryAlias(table, SimpleCatalogRelation(metadata), None) + val tableRelation = CatalogRelation( + metadata, + // we assume all the columns are nullable. + metadata.dataSchema.asNullable.toAttributes, + metadata.partitionSchema.asNullable.toAttributes) + SubqueryAlias(table, tableRelation, None) } } else { SubqueryAlias(table, tempTables(table), None) http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 2b3b575..cb93902 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -19,10 +19,11 @@ package org.apache.spark.sql.catalyst.catalog import java.util.Date -import scala.collection.mutable +import com.google.common.base.Objects import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.{CatalystConf, FunctionIdentifier, InternalRow, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.quoteIdentifier @@ -349,36 +350,43 @@ object CatalogTypes { /** - * An interface that is implemented by logical plans to return the underlying catalog table. - * If we can in the future consolidate SimpleCatalogRelation and MetastoreRelation, we should - * probably remove this interface. + * A [[LogicalPlan]] that represents a table. */ -trait CatalogRelation { - def catalogTable: CatalogTable - def output: Seq[Attribute] -} +case class CatalogRelation( + tableMeta: CatalogTable, + dataCols: Seq[Attribute], + partitionCols: Seq[Attribute]) extends LeafNode with MultiInstanceRelation { + assert(tableMeta.identifier.database.isDefined) + assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType)) + assert(tableMeta.dataSchema.sameType(dataCols.toStructType)) + + // The partition column should always appear after data columns. + override def output: Seq[Attribute] = dataCols ++ partitionCols + + def isPartitioned: Boolean = partitionCols.nonEmpty + + override def equals(relation: Any): Boolean = relation match { + case other: CatalogRelation => tableMeta == other.tableMeta && output == other.output + case _ => false + } + override def hashCode(): Int = { + Objects.hashCode(tableMeta.identifier, output) + } -/** - * A [[LogicalPlan]] that wraps [[CatalogTable]]. - * - * Note that in the future we should consolidate this and HiveCatalogRelation. - */ -case class SimpleCatalogRelation( - metadata: CatalogTable) - extends LeafNode with CatalogRelation { - - override def catalogTable: CatalogTable = metadata - - override lazy val resolved: Boolean = false - - override val output: Seq[Attribute] = { - val (partCols, dataCols) = metadata.schema.toAttributes - // Since data can be dumped in randomly with no validation, everything is nullable. - .map(_.withNullability(true).withQualifier(Some(metadata.identifier.table))) - .partition { a => - metadata.partitionColumnNames.contains(a.name) - } - dataCols ++ partCols + /** Only compare table identifier. */ + override lazy val cleanArgs: Seq[Any] = Seq(tableMeta.identifier) + + override def computeStats(conf: CatalystConf): Statistics = { + // For data source tables, we will create a `LogicalRelation` and won't call this method, for + // hive serde tables, we will always generate a statistics. + // TODO: unify the table stats generation. + tableMeta.stats.map(_.toPlanStats(output)).getOrElse { + throw new IllegalStateException("table stats must be specified.") + } } + + override def newInstance(): LogicalPlan = copy( + dataCols = dataCols.map(_.newInstance()), + partitionCols = partitionCols.map(_.newInstance())) } http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala index 26697e9..a3cc452 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala @@ -63,7 +63,9 @@ case class TableIdentifier(table: String, database: Option[String]) } /** A fully qualified identifier for a table (i.e., database.tableName) */ -case class QualifiedTableName(database: String, name: String) +case class QualifiedTableName(database: String, name: String) { + override def toString: String = s"$database.$name" +} object TableIdentifier { def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName) http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 4443432..a755231 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -433,15 +433,15 @@ class SessionCatalogSuite extends PlanTest { sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") // If we explicitly specify the database, we'll look up the relation in that database - assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))) - == SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None)) + assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))).children.head + .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1) // Otherwise, we'll first look up a temporary table with the same name assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) == SubqueryAlias("tbl1", tempTable1, None)) // Then, if that does not exist, look up the relation in the current database sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false) - assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) - == SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None)) + assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")).children.head + .asInstanceOf[CatalogRelation].tableMeta == metastoreTable1) } test("look up view relation") { http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 3939251..49e85dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -349,8 +349,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { // Get all input data source or hive relations of the query. val srcRelations = df.logicalPlan.collect { case LogicalRelation(src: BaseRelation, _, _) => src - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) => - relation.catalogTable.identifier + case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) => + relation.tableMeta.identifier } val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed @@ -360,8 +360,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") // check hive table relation when overwrite mode - case relation: CatalogRelation if DDLUtils.isHiveTable(relation.catalogTable) - && srcRelations.contains(relation.catalogTable.identifier) => + case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) + && srcRelations.contains(relation.tableMeta.identifier) => throw new AnalysisException( s"Cannot overwrite table $tableName that is also being read from") case _ => // OK http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index b8ac070..b02edd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -102,8 +102,8 @@ case class OptimizeMetadataOnlyQuery( LocalRelation(partAttrs, partitionData.map(_.values)) case relation: CatalogRelation => - val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation) - val partitionData = catalog.listPartitions(relation.catalogTable.identifier).map { p => + val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) + val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p => InternalRow.fromSeq(partAttrs.map { attr => // TODO: use correct timezone for partition values. Cast(Literal(p.spec(attr.name)), attr.dataType, @@ -135,8 +135,8 @@ case class OptimizeMetadataOnlyQuery( val partAttrs = getPartitionAttrs(fsRelation.partitionSchema.map(_.name), l) Some(AttributeSet(partAttrs), l) - case relation: CatalogRelation if relation.catalogTable.partitionColumnNames.nonEmpty => - val partAttrs = getPartitionAttrs(relation.catalogTable.partitionColumnNames, relation) + case relation: CatalogRelation if relation.tableMeta.partitionColumnNames.nonEmpty => + val partAttrs = getPartitionAttrs(relation.tableMeta.partitionColumnNames, relation) Some(AttributeSet(partAttrs), relation) case p @ Project(projectList, child) if projectList.forall(_.deterministic) => http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index d024a36..b89014e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -17,15 +17,12 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable} +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.datasources.LogicalRelation /** @@ -40,60 +37,40 @@ case class AnalyzeColumnCommand( val sessionState = sparkSession.sessionState val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) - val relation = - EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed) - - // Compute total size - val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match { - case catalogRel: CatalogRelation => - // This is a Hive serde format table - (catalogRel.catalogTable, - AnalyzeTableCommand.calculateTotalSize(sessionState, catalogRel.catalogTable)) - - case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => - // This is a data source format table - (logicalRel.catalogTable.get, - AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get)) - - case otherRelation => - throw new AnalysisException("ANALYZE TABLE is not supported for " + - s"${otherRelation.nodeName}.") + val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) + if (tableMeta.tableType == CatalogTableType.VIEW) { + throw new AnalysisException("ANALYZE TABLE is not supported on views.") } + val sizeInBytes = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta) // Compute stats for each column - val (rowCount, newColStats) = - AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames) + val (rowCount, newColStats) = computeColumnStats(sparkSession, tableIdentWithDB, columnNames) // We also update table-level stats in order to keep them consistent with column-level stats. val statistics = CatalogStatistics( sizeInBytes = sizeInBytes, rowCount = Some(rowCount), // Newly computed column stats should override the existing ones. - colStats = catalogTable.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats) + colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats) - sessionState.catalog.alterTable(catalogTable.copy(stats = Some(statistics))) + sessionState.catalog.alterTable(tableMeta.copy(stats = Some(statistics))) // Refresh the cached data source table in the catalog. sessionState.catalog.refreshTable(tableIdentWithDB) Seq.empty[Row] } -} - -object AnalyzeColumnCommand extends Logging { /** * Compute stats for the given columns. * @return (row count, map from column name to ColumnStats) - * - * This is visible for testing. */ - def computeColumnStats( + private def computeColumnStats( sparkSession: SparkSession, - tableName: String, - relation: LogicalPlan, + tableIdent: TableIdentifier, columnNames: Seq[String]): (Long, Map[String, ColumnStat]) = { + val relation = sparkSession.table(tableIdent).logicalPlan // Resolve the column names and dedup using AttributeSet val resolver = sparkSession.sessionState.conf.resolver val attributesToAnalyze = AttributeSet(columnNames.map { col => @@ -105,7 +82,7 @@ object AnalyzeColumnCommand extends Logging { attributesToAnalyze.foreach { attr => if (!ColumnStat.supportsType(attr.dataType)) { throw new AnalysisException( - s"Column ${attr.name} in table $tableName is of type ${attr.dataType}, " + + s"Column ${attr.name} in table $tableIdent is of type ${attr.dataType}, " + "and Spark does not support statistics collection on this column type.") } } @@ -116,7 +93,7 @@ object AnalyzeColumnCommand extends Logging { // The layout of each struct follows the layout of the ColumnStats. val ndvMaxErr = sparkSession.sessionState.conf.ndvMaxError val expressions = Count(Literal(1)).toAggregateExpression() +: - attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr)) + attributesToAnalyze.map(ColumnStat.statExprs(_, ndvMaxErr)) val namedExpressions = expressions.map(e => Alias(e, e.toString)()) val statsRow = Dataset.ofRows(sparkSession, Aggregate(Nil, namedExpressions, relation)).head() http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 30b6cc7..d2ea0cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -22,11 +22,9 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable} -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTableType} import org.apache.spark.sql.internal.SessionState @@ -41,53 +39,39 @@ case class AnalyzeTableCommand( val sessionState = sparkSession.sessionState val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) - val relation = - EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed) - - relation match { - case relation: CatalogRelation => - updateTableStats(relation.catalogTable, - AnalyzeTableCommand.calculateTotalSize(sessionState, relation.catalogTable)) - - // data source tables have been converted into LogicalRelations - case logicalRel: LogicalRelation if logicalRel.catalogTable.isDefined => - updateTableStats(logicalRel.catalogTable.get, - AnalyzeTableCommand.calculateTotalSize(sessionState, logicalRel.catalogTable.get)) - - case otherRelation => - throw new AnalysisException("ANALYZE TABLE is not supported for " + - s"${otherRelation.nodeName}.") + val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB) + if (tableMeta.tableType == CatalogTableType.VIEW) { + throw new AnalysisException("ANALYZE TABLE is not supported on views.") } + val newTotalSize = AnalyzeTableCommand.calculateTotalSize(sessionState, tableMeta) - def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = { - val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L) - val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) - var newStats: Option[CatalogStatistics] = None - if (newTotalSize > 0 && newTotalSize != oldTotalSize) { - newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize)) - } - // We only set rowCount when noscan is false, because otherwise: - // 1. when total size is not changed, we don't need to alter the table; - // 2. when total size is changed, `oldRowCount` becomes invalid. - // This is to make sure that we only record the right statistics. - if (!noscan) { - val newRowCount = Dataset.ofRows(sparkSession, relation).count() - if (newRowCount >= 0 && newRowCount != oldRowCount) { - newStats = if (newStats.isDefined) { - newStats.map(_.copy(rowCount = Some(BigInt(newRowCount)))) - } else { - Some(CatalogStatistics( - sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount)))) - } + val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L) + val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L) + var newStats: Option[CatalogStatistics] = None + if (newTotalSize > 0 && newTotalSize != oldTotalSize) { + newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize)) + } + // We only set rowCount when noscan is false, because otherwise: + // 1. when total size is not changed, we don't need to alter the table; + // 2. when total size is changed, `oldRowCount` becomes invalid. + // This is to make sure that we only record the right statistics. + if (!noscan) { + val newRowCount = sparkSession.table(tableIdentWithDB).count() + if (newRowCount >= 0 && newRowCount != oldRowCount) { + newStats = if (newStats.isDefined) { + newStats.map(_.copy(rowCount = Some(BigInt(newRowCount)))) + } else { + Some(CatalogStatistics( + sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount)))) } } - // Update the metastore if the above statistics of the table are different from those - // recorded in the metastore. - if (newStats.isDefined) { - sessionState.catalog.alterTable(catalogTable.copy(stats = newStats)) - // Refresh the cached data source table in the catalog. - sessionState.catalog.refreshTable(tableIdentWithDB) - } + } + // Update the metastore if the above statistics of the table are different from those + // recorded in the metastore. + if (newStats.isDefined) { + sessionState.catalog.alterTable(tableMeta.copy(stats = newStats)) + // Refresh the cached data source table in the catalog. + sessionState.catalog.refreshTable(tableIdentWithDB) } Seq.empty[Row] http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index f429232..f694a0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation} +import org.apache.spark.sql.catalyst.catalog.CatalogRelation import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation @@ -208,16 +208,17 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { /** - * Replaces [[SimpleCatalogRelation]] with data source table if its table provider is not hive. + * Replaces [[CatalogRelation]] with data source table if its table provider is not hive. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { - private def readDataSourceTable(table: CatalogTable): LogicalPlan = { + private def readDataSourceTable(r: CatalogRelation): LogicalPlan = { + val table = r.tableMeta val qualifiedTableName = QualifiedTableName(table.database, table.identifier.table) val cache = sparkSession.sessionState.catalog.tableRelationCache val withHiveSupport = sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive" - cache.get(qualifiedTableName, new Callable[LogicalPlan]() { + val plan = cache.get(qualifiedTableName, new Callable[LogicalPlan]() { override def call(): LogicalPlan = { val pathOption = table.storage.locationUri.map("path" -> _) val dataSource = @@ -233,19 +234,25 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] // TODO: improve `InMemoryCatalog` and remove this limitation. catalogTable = if (withHiveSupport) Some(table) else None) - LogicalRelation(dataSource.resolveRelation(checkFilesExist = false), + LogicalRelation( + dataSource.resolveRelation(checkFilesExist = false), catalogTable = Some(table)) } - }) + }).asInstanceOf[LogicalRelation] + + // It's possible that the table schema is empty and need to be inferred at runtime. We should + // not specify expected outputs for this case. + val expectedOutputs = if (r.output.isEmpty) None else Some(r.output) + plan.copy(expectedOutputAttributes = expectedOutputs) } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) - if DDLUtils.isDatasourceTable(s.metadata) => - i.copy(table = readDataSourceTable(s.metadata)) + case i @ InsertIntoTable(r: CatalogRelation, _, _, _, _) + if DDLUtils.isDatasourceTable(r.tableMeta) => + i.copy(table = readDataSourceTable(r)) - case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => - readDataSourceTable(s.metadata) + case r: CatalogRelation if DDLUtils.isDatasourceTable(r.tableMeta) => + readDataSourceTable(r) } } http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index e7a59d4..4d781b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -379,7 +379,7 @@ case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { case i @ InsertIntoTable(table, _, query, _, _) if table.resolved && query.resolved => table match { case relation: CatalogRelation => - val metadata = relation.catalogTable + val metadata = relation.tableMeta preprocess(i, metadata.identifier.quotedString, metadata.partitionColumnNames) case LogicalRelation(h: HadoopFsRelation, _, catalogTable) => val tblName = catalogTable.map(_.identifier.quotedString).getOrElse("unknown") http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index b38bbd8..bbb31db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -306,7 +306,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils // Analyze only one column. sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1") val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect { - case catalogRel: CatalogRelation => (catalogRel, catalogRel.catalogTable) + case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta) case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get) }.head val emptyColStat = ColumnStat(0, None, None, 0, 4, 4) http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 677da0d..151a69a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import java.net.URI + import com.google.common.util.concurrent.Striped import org.apache.hadoop.fs.Path @@ -26,6 +28,7 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.orc.OrcFileFormat @@ -71,10 +74,8 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log private def getCached( tableIdentifier: QualifiedTableName, pathsInMetastore: Seq[Path], - metastoreRelation: MetastoreRelation, schemaInMetastore: StructType, expectedFileFormat: Class[_ <: FileFormat], - expectedBucketSpec: Option[BucketSpec], partitionSchema: Option[StructType]): Option[LogicalRelation] = { tableRelationCache.getIfPresent(tableIdentifier) match { @@ -89,7 +90,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val useCached = relation.location.rootPaths.toSet == pathsInMetastore.toSet && logical.schema.sameType(schemaInMetastore) && - relation.bucketSpec == expectedBucketSpec && + // We don't support hive bucketed tables. This function `getCached` is only used for + // converting supported Hive tables to data source tables. + relation.bucketSpec.isEmpty && relation.partitionSchema == partitionSchema.getOrElse(StructType(Nil)) if (useCached) { @@ -100,52 +103,48 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log None } case _ => - logWarning( - s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " + - s"should be stored as $expectedFileFormat. However, we are getting " + - s"a ${relation.fileFormat} from the metastore cache. This cached " + - s"entry will be invalidated.") + logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " + + s"However, we are getting a ${relation.fileFormat} from the metastore cache. " + + "This cached entry will be invalidated.") tableRelationCache.invalidate(tableIdentifier) None } case other => - logWarning( - s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + - s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " + - s"This cached entry will be invalidated.") + logWarning(s"Table $tableIdentifier should be stored as $expectedFileFormat. " + + s"However, we are getting a $other from the metastore cache. " + + "This cached entry will be invalidated.") tableRelationCache.invalidate(tableIdentifier) None } } private def convertToLogicalRelation( - metastoreRelation: MetastoreRelation, + relation: CatalogRelation, options: Map[String, String], - defaultSource: FileFormat, fileFormatClass: Class[_ <: FileFormat], fileType: String): LogicalRelation = { - val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) + val metastoreSchema = relation.tableMeta.schema val tableIdentifier = - QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) - val bucketSpec = None // We don't support hive bucketed tables, only ones we write out. + QualifiedTableName(relation.tableMeta.database, relation.tableMeta.identifier.table) val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions - val result = if (metastoreRelation.hiveQlTable.isPartitioned) { - val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) - + val tablePath = new Path(new URI(relation.tableMeta.location)) + val result = if (relation.isPartitioned) { + val partitionSchema = relation.tableMeta.partitionSchema val rootPaths: Seq[Path] = if (lazyPruningEnabled) { - Seq(metastoreRelation.hiveQlTable.getDataLocation) + Seq(tablePath) } else { // By convention (for example, see CatalogFileIndex), the definition of a // partitioned table's paths depends on whether that table has any actual partitions. // Partitioned tables without partitions use the location of the table's base path. // Partitioned tables with partitions use the locations of those partitions' data // locations,_omitting_ the table's base path. - val paths = metastoreRelation.getHiveQlPartitions().map { p => - new Path(p.getLocation) - } + val paths = sparkSession.sharedState.externalCatalog + .listPartitions(tableIdentifier.database, tableIdentifier.name) + .map(p => new Path(new URI(p.storage.locationUri.get))) + if (paths.isEmpty) { - Seq(metastoreRelation.hiveQlTable.getDataLocation) + Seq(tablePath) } else { paths } @@ -155,39 +154,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log val cached = getCached( tableIdentifier, rootPaths, - metastoreRelation, metastoreSchema, fileFormatClass, - bucketSpec, Some(partitionSchema)) val logicalRelation = cached.getOrElse { - val sizeInBytes = - metastoreRelation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong + val sizeInBytes = relation.stats(sparkSession.sessionState.conf).sizeInBytes.toLong val fileIndex = { - val index = new CatalogFileIndex( - sparkSession, metastoreRelation.catalogTable, sizeInBytes) + val index = new CatalogFileIndex(sparkSession, relation.tableMeta, sizeInBytes) if (lazyPruningEnabled) { index } else { index.filterPartitions(Nil) // materialize all the partitions in memory } } - val partitionSchemaColumnNames = partitionSchema.map(_.name.toLowerCase).toSet - val dataSchema = - StructType(metastoreSchema - .filterNot(field => partitionSchemaColumnNames.contains(field.name.toLowerCase))) - val relation = HadoopFsRelation( + val fsRelation = HadoopFsRelation( location = fileIndex, partitionSchema = partitionSchema, - dataSchema = dataSchema, - bucketSpec = bucketSpec, - fileFormat = defaultSource, + dataSchema = relation.tableMeta.dataSchema, + // We don't support hive bucketed tables, only ones we write out. + bucketSpec = None, + fileFormat = fileFormatClass.newInstance(), options = options)(sparkSession = sparkSession) - val created = LogicalRelation(relation, - catalogTable = Some(metastoreRelation.catalogTable)) + val created = LogicalRelation(fsRelation, catalogTable = Some(relation.tableMeta)) tableRelationCache.put(tableIdentifier, created) created } @@ -195,14 +186,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } else { - val rootPath = metastoreRelation.hiveQlTable.getDataLocation + val rootPath = tablePath withTableCreationLock(tableIdentifier, { - val cached = getCached(tableIdentifier, + val cached = getCached( + tableIdentifier, Seq(rootPath), - metastoreRelation, metastoreSchema, fileFormatClass, - bucketSpec, None) val logicalRelation = cached.getOrElse { val created = @@ -210,11 +200,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log DataSource( sparkSession = sparkSession, paths = rootPath.toString :: Nil, - userSpecifiedSchema = Some(metastoreRelation.schema), - bucketSpec = bucketSpec, + userSpecifiedSchema = Some(metastoreSchema), + // We don't support hive bucketed tables, only ones we write out. + bucketSpec = None, options = options, className = fileType).resolveRelation(), - catalogTable = Some(metastoreRelation.catalogTable)) + catalogTable = Some(relation.tableMeta)) tableRelationCache.put(tableIdentifier, created) created @@ -223,7 +214,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logicalRelation }) } - result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) + result.copy(expectedOutputAttributes = Some(relation.output)) } /** @@ -231,33 +222,32 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log * data source relations for better performance. */ object ParquetConversions extends Rule[LogicalPlan] { - private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = { - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") && + private def shouldConvertMetastoreParquet(relation: CatalogRelation): Boolean = { + relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("parquet") && sessionState.convertMetastoreParquet } - private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = { - val defaultSource = new ParquetFileFormat() + private def convertToParquetRelation(relation: CatalogRelation): LogicalRelation = { val fileFormatClass = classOf[ParquetFileFormat] - val mergeSchema = sessionState.convertMetastoreParquetWithSchemaMerging val options = Map(ParquetOptions.MERGE_SCHEMA -> mergeSchema.toString) - convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "parquet") + convertToLogicalRelation(relation, options, fileFormatClass, "parquet") } override def apply(plan: LogicalPlan): LogicalPlan = { plan transformUp { // Write path - case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists) + case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Parquet data source (yet). - if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => + if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && + !r.isPartitioned && shouldConvertMetastoreParquet(r) => InsertIntoTable(convertToParquetRelation(r), partition, query, overwrite, ifNotExists) // Read path - case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => - val parquetRelation = convertToParquetRelation(relation) - SubqueryAlias(relation.tableName, parquetRelation, None) + case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) && + shouldConvertMetastoreParquet(relation) => + convertToParquetRelation(relation) } } } @@ -267,31 +257,31 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log * for better performance. */ object OrcConversions extends Rule[LogicalPlan] { - private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = { - relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") && + private def shouldConvertMetastoreOrc(relation: CatalogRelation): Boolean = { + relation.tableMeta.storage.serde.getOrElse("").toLowerCase.contains("orc") && sessionState.convertMetastoreOrc } - private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = { - val defaultSource = new OrcFileFormat() + private def convertToOrcRelation(relation: CatalogRelation): LogicalRelation = { val fileFormatClass = classOf[OrcFileFormat] val options = Map[String, String]() - convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc") + convertToLogicalRelation(relation, options, fileFormatClass, "orc") } override def apply(plan: LogicalPlan): LogicalPlan = { plan transformUp { // Write path - case InsertIntoTable(r: MetastoreRelation, partition, query, overwrite, ifNotExists) + case InsertIntoTable(r: CatalogRelation, partition, query, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Orc data source (yet). - if query.resolved && !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => + if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && + !r.isPartitioned && shouldConvertMetastoreOrc(r) => InsertIntoTable(convertToOrcRelation(r), partition, query, overwrite, ifNotExists) // Read path - case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => - val orcRelation = convertToOrcRelation(relation) - SubqueryAlias(relation.tableName, orcRelation, None) + case relation: CatalogRelation if DDLUtils.isHiveTable(relation.tableMeta) && + shouldConvertMetastoreOrc(relation) => + convertToOrcRelation(relation) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 273cf85..5a08a6b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -62,10 +62,10 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override val extendedResolutionRules = new ResolveHiveSerdeTable(sparkSession) :: new FindDataSourceTable(sparkSession) :: - new FindHiveSerdeTable(sparkSession) :: new ResolveSQLOnFile(sparkSession) :: Nil override val postHocResolutionRules = + new DetermineTableStats(sparkSession) :: catalog.ParquetConversions :: catalog.OrcConversions :: PreprocessTableCreation(sparkSession) :: http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index f45532c..624cfa2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,8 +17,14 @@ package org.apache.spark.sql.hive +import java.io.IOException +import java.net.URI + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.StatsSetupConst + import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, SimpleCatalogRelation} +import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogStorageFormat, CatalogTable} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation} @@ -91,18 +97,56 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { // Infers the schema, if empty, because the schema could be determined by Hive // serde. - val catalogTable = if (query.isEmpty) { - val withSchema = HiveUtils.inferSchema(withStorage) - if (withSchema.schema.length <= 0) { + val withSchema = if (query.isEmpty) { + val inferred = HiveUtils.inferSchema(withStorage) + if (inferred.schema.length <= 0) { throw new AnalysisException("Unable to infer the schema. " + - s"The schema specification is required to create the table ${withSchema.identifier}.") + s"The schema specification is required to create the table ${inferred.identifier}.") } - withSchema + inferred } else { withStorage } - c.copy(tableDesc = catalogTable) + c.copy(tableDesc = withSchema) + } +} + +class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case relation: CatalogRelation + if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => + val table = relation.tableMeta + // TODO: check if this estimate is valid for tables after partition pruning. + // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be + // relatively cheap if parameters for the table are populated into the metastore. + // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys + // (see StatsSetupConst in Hive) that we can look at in the future. + // When table is external,`totalSize` is always zero, which will influence join strategy + // so when `totalSize` is zero, use `rawDataSize` instead. + val totalSize = table.properties.get(StatsSetupConst.TOTAL_SIZE).map(_.toLong) + val rawDataSize = table.properties.get(StatsSetupConst.RAW_DATA_SIZE).map(_.toLong) + val sizeInBytes = if (totalSize.isDefined && totalSize.get > 0) { + totalSize.get + } else if (rawDataSize.isDefined && rawDataSize.get > 0) { + rawDataSize.get + } else if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) { + try { + val hadoopConf = session.sessionState.newHadoopConf() + val tablePath = new Path(new URI(table.location)) + val fs: FileSystem = tablePath.getFileSystem(hadoopConf) + fs.getContentSummary(tablePath).getLength + } catch { + case e: IOException => + logWarning("Failed to get table size from hdfs.", e) + session.sessionState.conf.defaultSizeInBytes + } + } else { + session.sessionState.conf.defaultSizeInBytes + } + + val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes)))) + relation.copy(tableMeta = withStats) } } @@ -114,8 +158,9 @@ class ResolveHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { */ object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists) => - InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists) + case InsertIntoTable(relation: CatalogRelation, partSpec, query, overwrite, ifNotExists) + if DDLUtils.isHiveTable(relation.tableMeta) => + InsertIntoHiveTable(relation.tableMeta, partSpec, query, overwrite, ifNotExists) case CreateTable(tableDesc, mode, None) if DDLUtils.isHiveTable(tableDesc) => CreateTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore) @@ -125,21 +170,6 @@ object HiveAnalysis extends Rule[LogicalPlan] { } } -/** - * Replaces `SimpleCatalogRelation` with [[MetastoreRelation]] if its table provider is hive. - */ -class FindHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) - if DDLUtils.isHiveTable(s.metadata) => - i.copy(table = - MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session)) - - case s: SimpleCatalogRelation if DDLUtils.isHiveTable(s.metadata) => - MetastoreRelation(s.metadata.database, s.metadata.identifier.table)(s.metadata, session) - } -} - private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. self: SparkPlanner => @@ -161,10 +191,10 @@ private[hive] trait HiveStrategies { */ object HiveTableScans extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) => + case PhysicalOperation(projectList, predicates, relation: CatalogRelation) => // Filter out all predicates that only deal with partition keys, these are given to the // hive table scan operator to be used for partition pruning. - val partitionKeyIds = AttributeSet(relation.partitionKeys) + val partitionKeyIds = AttributeSet(relation.partitionCols) val (pruningPredicates, otherPredicates) = predicates.partition { predicate => !predicate.references.isEmpty && predicate.references.subsetOf(partitionKeyIds) http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala deleted file mode 100644 index 97b1207..0000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import java.io.IOException - -import com.google.common.base.Objects -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.hive.common.StatsSetupConst -import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable} -import org.apache.hadoop.hive.ql.plan.TableDesc - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.CatalystConf -import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} -import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.hive.client.HiveClientImpl -import org.apache.spark.sql.types.StructField - - -private[hive] case class MetastoreRelation( - databaseName: String, - tableName: String) - (val catalogTable: CatalogTable, - @transient private val sparkSession: SparkSession) - extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation { - - override def equals(other: Any): Boolean = other match { - case relation: MetastoreRelation => - databaseName == relation.databaseName && - tableName == relation.tableName && - output == relation.output - case _ => false - } - - override def hashCode(): Int = { - Objects.hashCode(databaseName, tableName, output) - } - - override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil - - @transient val hiveQlTable: HiveTable = HiveClientImpl.toHiveTable(catalogTable) - - @transient override def computeStats(conf: CatalystConf): Statistics = { - catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics( - sizeInBytes = { - val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE) - val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE) - // TODO: check if this estimate is valid for tables after partition pruning. - // NOTE: getting `totalSize` directly from params is kind of hacky, but this should be - // relatively cheap if parameters for the table are populated into the metastore. - // Besides `totalSize`, there are also `numFiles`, `numRows`, `rawDataSize` keys - // (see StatsSetupConst in Hive) that we can look at in the future. - BigInt( - // When table is external,`totalSize` is always zero, which will influence join strategy - // so when `totalSize` is zero, use `rawDataSize` instead - // when `rawDataSize` is also zero, use `HiveExternalCatalog.STATISTICS_TOTAL_SIZE`, - // which is generated by analyze command. - if (totalSize != null && totalSize.toLong > 0L) { - totalSize.toLong - } else if (rawDataSize != null && rawDataSize.toLong > 0) { - rawDataSize.toLong - } else if (sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled) { - try { - val hadoopConf = sparkSession.sessionState.newHadoopConf() - val fs: FileSystem = hiveQlTable.getPath.getFileSystem(hadoopConf) - fs.getContentSummary(hiveQlTable.getPath).getLength - } catch { - case e: IOException => - logWarning("Failed to get table size from hdfs.", e) - sparkSession.sessionState.conf.defaultSizeInBytes - } - } else { - sparkSession.sessionState.conf.defaultSizeInBytes - }) - } - )) - } - - // When metastore partition pruning is turned off, we cache the list of all partitions to - // mimic the behavior of Spark < 1.5 - private lazy val allPartitions: Seq[CatalogTablePartition] = { - sparkSession.sharedState.externalCatalog.listPartitions( - catalogTable.database, - catalogTable.identifier.table) - } - - def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { - val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) { - sparkSession.sharedState.externalCatalog.listPartitionsByFilter( - catalogTable.database, - catalogTable.identifier.table, - predicates) - } else { - allPartitions - } - - rawPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) - } - - /** Only compare database and tablename, not alias. */ - override def sameResult(plan: LogicalPlan): Boolean = { - plan.canonicalized match { - case mr: MetastoreRelation => - mr.databaseName == databaseName && mr.tableName == tableName - case _ => false - } - } - - val tableDesc = new TableDesc( - hiveQlTable.getInputFormatClass, - // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because - // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to - // substitute some output formats, e.g. substituting SequenceFileOutputFormat to - // HiveSequenceFileOutputFormat. - hiveQlTable.getOutputFormatClass, - hiveQlTable.getMetadata - ) - - implicit class SchemaAttribute(f: StructField) { - def toAttribute: AttributeReference = AttributeReference( - f.name, - f.dataType, - // Since data can be dumped in randomly with no validation, everything is nullable. - nullable = true - )(qualifier = Some(tableName)) - } - - /** PartitionKey attributes */ - val partitionKeys = catalogTable.partitionSchema.map(_.toAttribute) - - /** Non-partitionKey attributes */ - val dataColKeys = catalogTable.schema - .filter { c => !catalogTable.partitionColumnNames.contains(c.name) } - .map(_.toAttribute) - - val output = dataColKeys ++ partitionKeys - - /** An attribute map that can be used to lookup original attributes based on expression id. */ - val attributeMap = AttributeMap(output.map(o => (o, o))) - - /** An attribute map for determining the ordinal for non-partition columns. */ - val columnOrdinals = AttributeMap(dataColKeys.zipWithIndex) - - override def inputFiles: Array[String] = { - val partLocations = allPartitions - .flatMap(_.storage.locationUri) - .toArray - if (partLocations.nonEmpty) { - partLocations - } else { - Array( - catalogTable.storage.locationUri.getOrElse( - sys.error(s"Could not get the location of ${catalogTable.qualifiedName}."))) - } - } - - override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName)(catalogTable, sparkSession) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index d48702b..16c1103 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -61,7 +61,8 @@ private[hive] sealed trait TableReader { private[hive] class HadoopTableReader( @transient private val attributes: Seq[Attribute], - @transient private val relation: MetastoreRelation, + @transient private val partitionKeys: Seq[Attribute], + @transient private val tableDesc: TableDesc, @transient private val sparkSession: SparkSession, hadoopConf: Configuration) extends TableReader with Logging { @@ -88,7 +89,7 @@ class HadoopTableReader( override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = makeRDDForTable( hiveTable, - Utils.classForName(relation.tableDesc.getSerdeClassName).asInstanceOf[Class[Deserializer]], + Utils.classForName(tableDesc.getSerdeClassName).asInstanceOf[Class[Deserializer]], filterOpt = None) /** @@ -110,7 +111,7 @@ class HadoopTableReader( // Create local references to member variables, so that the entire `this` object won't be // serialized in the closure below. - val tableDesc = relation.tableDesc + val localTableDesc = tableDesc val broadcastedHadoopConf = _broadcastedHadoopConf val tablePath = hiveTable.getPath @@ -119,7 +120,7 @@ class HadoopTableReader( // logDebug("Table input: %s".format(tablePath)) val ifc = hiveTable.getInputFormatClass .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] - val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) + val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc) val attrsWithIndex = attributes.zipWithIndex val mutableRow = new SpecificInternalRow(attributes.map(_.dataType)) @@ -127,7 +128,7 @@ class HadoopTableReader( val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => val hconf = broadcastedHadoopConf.value.value val deserializer = deserializerClass.newInstance() - deserializer.initialize(hconf, tableDesc.getProperties) + deserializer.initialize(hconf, localTableDesc.getProperties) HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow, deserializer) } @@ -212,8 +213,6 @@ class HadoopTableReader( partCols.map(col => new String(partSpec.get(col))).toArray } - // Create local references so that the outer object isn't serialized. - val tableDesc = relation.tableDesc val broadcastedHiveConf = _broadcastedHadoopConf val localDeserializer = partDeserializer val mutableRow = new SpecificInternalRow(attributes.map(_.dataType)) @@ -222,12 +221,12 @@ class HadoopTableReader( // Attached indices indicate the position of each attribute in the output schema. val (partitionKeyAttrs, nonPartitionKeyAttrs) = attributes.zipWithIndex.partition { case (attr, _) => - relation.partitionKeys.contains(attr) + partitionKeys.contains(attr) } def fillPartitionKeys(rawPartValues: Array[String], row: InternalRow): Unit = { partitionKeyAttrs.foreach { case (attr, ordinal) => - val partOrdinal = relation.partitionKeys.indexOf(attr) + val partOrdinal = partitionKeys.indexOf(attr) row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null) } } @@ -235,9 +234,11 @@ class HadoopTableReader( // Fill all partition keys to the given MutableRow object fillPartitionKeys(partValues, mutableRow) - val tableProperties = relation.tableDesc.getProperties + val tableProperties = tableDesc.getProperties - createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter => + // Create local references so that the outer object isn't serialized. + val localTableDesc = tableDesc + createHadoopRdd(localTableDesc, inputPathStr, ifc).mapPartitions { iter => val hconf = broadcastedHiveConf.value.value val deserializer = localDeserializer.newInstance() // SPARK-13709: For SerDes like AvroSerDe, some essential information (e.g. Avro schema @@ -251,8 +252,8 @@ class HadoopTableReader( } deserializer.initialize(hconf, props) // get the table deserializer - val tableSerDe = tableDesc.getDeserializerClass.newInstance() - tableSerDe.initialize(hconf, tableDesc.getProperties) + val tableSerDe = localTableDesc.getDeserializerClass.newInstance() + tableSerDe.initialize(hconf, localTableDesc.getProperties) // fill the non partition key attributes HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 140c352..14b9565 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} +import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption @@ -29,10 +30,12 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.catalog.CatalogRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.hive._ +import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.types.{BooleanType, DataType} import org.apache.spark.util.Utils @@ -46,12 +49,12 @@ import org.apache.spark.util.Utils private[hive] case class HiveTableScanExec( requestedAttributes: Seq[Attribute], - relation: MetastoreRelation, + relation: CatalogRelation, partitionPruningPred: Seq[Expression])( @transient private val sparkSession: SparkSession) extends LeafExecNode { - require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, + require(partitionPruningPred.isEmpty || relation.isPartitioned, "Partition pruning predicates only supported for partitioned tables.") override lazy val metrics = Map( @@ -60,42 +63,54 @@ case class HiveTableScanExec( override def producedAttributes: AttributeSet = outputSet ++ AttributeSet(partitionPruningPred.flatMap(_.references)) - // Retrieve the original attributes based on expression ID so that capitalization matches. - val attributes = requestedAttributes.map(relation.attributeMap) + private val originalAttributes = AttributeMap(relation.output.map(a => a -> a)) + + override val output: Seq[Attribute] = { + // Retrieve the original attributes based on expression ID so that capitalization matches. + requestedAttributes.map(originalAttributes) + } // Bind all partition key attribute references in the partition pruning predicate for later // evaluation. - private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => + private val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred => require( pred.dataType == BooleanType, s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") - BindReferences.bindReference(pred, relation.partitionKeys) + BindReferences.bindReference(pred, relation.partitionCols) } // Create a local copy of hadoopConf,so that scan specific modifications should not impact // other queries - @transient - private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf() + @transient private val hadoopConf = sparkSession.sessionState.newHadoopConf() + + @transient private val hiveQlTable = HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private val tableDesc = new TableDesc( + hiveQlTable.getInputFormatClass, + hiveQlTable.getOutputFormatClass, + hiveQlTable.getMetadata) // append columns ids and names before broadcast addColumnMetadataToConf(hadoopConf) - @transient - private[this] val hadoopReader = - new HadoopTableReader(attributes, relation, sparkSession, hadoopConf) + @transient private val hadoopReader = new HadoopTableReader( + output, + relation.partitionCols, + tableDesc, + sparkSession, + hadoopConf) - private[this] def castFromString(value: String, dataType: DataType) = { + private def castFromString(value: String, dataType: DataType) = { Cast(Literal(value), dataType).eval(null) } private def addColumnMetadataToConf(hiveConf: Configuration) { // Specifies needed column IDs for those non-partitioning columns. - val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer) + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) - HiveShim.appendReadColumns(hiveConf, neededColumnIDs, attributes.map(_.name)) + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, output.map(_.name)) - val tableDesc = relation.tableDesc val deserializer = tableDesc.getDeserializerClass.newInstance deserializer.initialize(hiveConf, tableDesc.getProperties) @@ -113,7 +128,7 @@ case class HiveTableScanExec( .mkString(",") hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) - hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataColKeys.map(_.name).mkString(",")) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) } /** @@ -126,7 +141,7 @@ case class HiveTableScanExec( boundPruningPred match { case None => partitions case Some(shouldKeep) => partitions.filter { part => - val dataTypes = relation.partitionKeys.map(_.dataType) + val dataTypes = relation.partitionCols.map(_.dataType) val castedValues = part.getValues.asScala.zip(dataTypes) .map { case (value, dataType) => castFromString(value, dataType) } @@ -138,27 +153,35 @@ case class HiveTableScanExec( } } + // exposed for tests + @transient lazy val rawPartitions = { + val prunedPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) { + // Retrieve the original attributes based on expression ID so that capitalization matches. + val normalizedFilters = partitionPruningPred.map(_.transform { + case a: AttributeReference => originalAttributes(a) + }) + sparkSession.sharedState.externalCatalog.listPartitionsByFilter( + relation.tableMeta.database, + relation.tableMeta.identifier.table, + normalizedFilters) + } else { + sparkSession.sharedState.externalCatalog.listPartitions( + relation.tableMeta.database, + relation.tableMeta.identifier.table) + } + prunedPartitions.map(HiveClientImpl.toHivePartition(_, hiveQlTable)) + } + protected override def doExecute(): RDD[InternalRow] = { // Using dummyCallSite, as getCallSite can turn out to be expensive with // with multiple partitions. - val rdd = if (!relation.hiveQlTable.isPartitioned) { + val rdd = if (!relation.isPartitioned) { Utils.withDummyCallSite(sqlContext.sparkContext) { - hadoopReader.makeRDDForTable(relation.hiveQlTable) + hadoopReader.makeRDDForTable(hiveQlTable) } } else { - // The attribute name of predicate could be different than the one in schema in case of - // case insensitive, we should change them to match the one in schema, so we do not need to - // worry about case sensitivity anymore. - val normalizedFilters = partitionPruningPred.map { e => - e transform { - case a: AttributeReference => - a.withName(relation.output.find(_.semanticEquals(a)).get.name) - } - } - Utils.withDummyCallSite(sqlContext.sparkContext) { - hadoopReader.makeRDDForPartitionedTable( - prunePartitions(relation.getHiveQlPartitions(normalizedFilters))) + hadoopReader.makeRDDForPartitionedTable(prunePartitions(rawPartitions)) } } val numOutputRows = longMetric("numOutputRows") @@ -174,8 +197,6 @@ case class HiveTableScanExec( } } - override def output: Seq[Attribute] = attributes - override def sameResult(plan: SparkPlan): Boolean = plan match { case other: HiveTableScanExec => val thisPredicates = partitionPruningPred.map(cleanExpression) http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 142f25d..f107149 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -29,16 +29,18 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.common.FileUtils import org.apache.hadoop.hive.ql.exec.TaskRunner import org.apache.hadoop.hive.ql.ErrorMsg +import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command.RunnableCommand import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} -import org.apache.spark.sql.hive.client.HiveVersion +import org.apache.spark.sql.hive.client.{HiveClientImpl, HiveVersion} import org.apache.spark.SparkException @@ -52,9 +54,7 @@ import org.apache.spark.SparkException * In the future we should converge the write path for Hive with the normal data source write path, * as defined in `org.apache.spark.sql.execution.datasources.FileFormatWriter`. * - * @param table the logical plan representing the table. In the future this should be a - * `org.apache.spark.sql.catalyst.catalog.CatalogTable` once we converge Hive tables - * and data source tables. + * @param table the metadata of the table. * @param partition a map from the partition key to the partition value (optional). If the partition * value is optional, dynamic partition insert will be performed. * As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS ...` would have @@ -74,7 +74,7 @@ import org.apache.spark.SparkException * @param ifNotExists If true, only write if the table or partition does not exist. */ case class InsertIntoHiveTable( - table: MetastoreRelation, + table: CatalogTable, partition: Map[String, Option[String]], query: LogicalPlan, overwrite: Boolean, @@ -218,10 +218,19 @@ case class InsertIntoHiveTable( val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") + val hiveQlTable = HiveClientImpl.toHiveTable(table) // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. - val tableDesc = table.tableDesc - val tableLocation = table.hiveQlTable.getDataLocation + val tableDesc = new TableDesc( + hiveQlTable.getInputFormatClass, + // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because + // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to + // substitute some output formats, e.g. substituting SequenceFileOutputFormat to + // HiveSequenceFileOutputFormat. + hiveQlTable.getOutputFormatClass, + hiveQlTable.getMetadata + ) + val tableLocation = hiveQlTable.getDataLocation val tmpLocation = getExternalTmpPath(tableLocation, hiveVersion, hadoopConf, stagingDir, scratchDir) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) @@ -254,9 +263,9 @@ case class InsertIntoHiveTable( // By this time, the partition map must match the table's partition columns if (partitionColumnNames.toSet != partition.keySet) { throw new SparkException( - s"""Requested partitioning does not match the ${table.tableName} table: + s"""Requested partitioning does not match the ${table.identifier.table} table: |Requested partitions: ${partition.keys.mkString(",")} - |Table partitions: ${table.partitionKeys.map(_.name).mkString(",")}""".stripMargin) + |Table partitions: ${table.partitionColumnNames.mkString(",")}""".stripMargin) } // Validate partition spec if there exist any dynamic partitions @@ -307,8 +316,8 @@ case class InsertIntoHiveTable( if (partition.nonEmpty) { if (numDynamicPartitions > 0) { externalCatalog.loadDynamicPartitions( - db = table.catalogTable.database, - table = table.catalogTable.identifier.table, + db = table.database, + table = table.identifier.table, tmpLocation.toString, partitionSpec, overwrite, @@ -320,8 +329,8 @@ case class InsertIntoHiveTable( // scalastyle:on val oldPart = externalCatalog.getPartitionOption( - table.catalogTable.database, - table.catalogTable.identifier.table, + table.database, + table.identifier.table, partitionSpec) var doHiveOverwrite = overwrite @@ -350,8 +359,8 @@ case class InsertIntoHiveTable( // which is currently considered as a Hive native command. val inheritTableSpecs = true externalCatalog.loadPartition( - table.catalogTable.database, - table.catalogTable.identifier.table, + table.database, + table.identifier.table, tmpLocation.toString, partitionSpec, isOverwrite = doHiveOverwrite, @@ -361,8 +370,8 @@ case class InsertIntoHiveTable( } } else { externalCatalog.loadTable( - table.catalogTable.database, - table.catalogTable.identifier.table, + table.database, + table.identifier.table, tmpLocation.toString, // TODO: URI overwrite, isSrcLocal = false) @@ -378,8 +387,8 @@ case class InsertIntoHiveTable( } // Invalidate the cache. - sparkSession.sharedState.cacheManager.invalidateCache(table) - sparkSession.sessionState.catalog.refreshTable(table.catalogTable.identifier) + sparkSession.catalog.uncacheTable(table.qualifiedName) + sparkSession.sessionState.catalog.refreshTable(table.identifier) // It would be nice to just return the childRdd unchanged so insert operations could be chained, // however for now we return an empty list to simplify compatibility checks with hive, which http://git-wip-us.apache.org/repos/asf/spark/blob/7c7fc30b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala deleted file mode 100644 index 91ff711..0000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreRelationSuite.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.sql.hive - -import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.types.{IntegerType, StructField, StructType} - -class MetastoreRelationSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { - test("makeCopy and toJSON should work") { - val table = CatalogTable( - identifier = TableIdentifier("test", Some("db")), - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = StructType(StructField("a", IntegerType, true) :: Nil)) - val relation = MetastoreRelation("db", "test")(table, null) - - // No exception should be thrown - relation.makeCopy(Array("db", "test")) - // No exception should be thrown - relation.toJSON - } - - test("SPARK-17409: Do Not Optimize Query in CTAS (Hive Serde Table) More Than Once") { - withTable("bar") { - withTempView("foo") { - sql("select 0 as id").createOrReplaceTempView("foo") - // If we optimize the query in CTAS more than once, the following saveAsTable will fail - // with the error: `GROUP BY position 0 is not in select list (valid range is [1, 1])` - sql("CREATE TABLE bar AS SELECT * FROM foo group by id") - checkAnswer(spark.table("bar"), Row(0) :: Nil) - val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier("bar")) - assert(tableMetadata.provider == Some("hive"), "the expected table is a Hive serde table") - } - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org