Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/14618#discussion_r74550719 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala --- @@ -1,494 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import scala.collection.JavaConverters._ - -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} -import org.apache.hadoop.fs.Path - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} -import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} -import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._ -import org.apache.spark.sql.execution.datasources.{Partition => _, _} -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} -import org.apache.spark.sql.hive.orc.OrcFileFormat -import org.apache.spark.sql.types._ - - -/** - * Legacy catalog for interacting with the Hive metastore. - * - * This is still used for things like creating data source tables, but in the future will be - * cleaned up to integrate more nicely with [[HiveExternalCatalog]]. - */ -private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging { - private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] - private val client = sparkSession.sharedState.asInstanceOf[HiveSharedState].metadataHive - - /** A fully qualified identifier for a table (i.e., database.tableName) */ - case class QualifiedTableName(database: String, name: String) - - private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase - - def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = { - QualifiedTableName( - tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase, - tableIdent.table.toLowerCase) - } - - private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = { - QualifiedTableName( - t.identifier.database.getOrElse(getCurrentDatabase).toLowerCase, - t.identifier.table.toLowerCase) - } - - /** A cache of Spark SQL data source tables that have been accessed. */ - protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, LogicalPlan] = { - val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { - override def load(in: QualifiedTableName): LogicalPlan = { - logDebug(s"Creating new cached data source for $in") - val table = client.getTable(in.database, in.name) - - // TODO: the following code is duplicated with FindDataSourceTable.readDataSourceTable - - def schemaStringFromParts: Option[String] = { - table.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts => - val parts = (0 until numParts.toInt).map { index => - val part = table.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull - if (part == null) { - throw new AnalysisException( - "Could not read schema from the metastore because it is corrupted " + - s"(missing part $index of the schema, $numParts parts are expected).") - } - - part - } - // Stick all parts back to a single schema string. - parts.mkString - } - } - - def getColumnNames(colType: String): Seq[String] = { - table.properties.get(s"$DATASOURCE_SCHEMA.num${colType.capitalize}Cols").map { - numCols => (0 until numCols.toInt).map { index => - table.properties.getOrElse(s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index", - throw new AnalysisException( - s"Could not read $colType columns from the metastore because it is corrupted " + - s"(missing part $index of it, $numCols parts are expected).")) - } - }.getOrElse(Nil) - } - - // Originally, we used spark.sql.sources.schema to store the schema of a data source table. - // After SPARK-6024, we removed this flag. - // Although we are not using spark.sql.sources.schema any more, we need to still support. - val schemaString = table.properties.get(DATASOURCE_SCHEMA).orElse(schemaStringFromParts) - - val userSpecifiedSchema = - schemaString.map(s => DataType.fromJson(s).asInstanceOf[StructType]) - - // We only need names at here since userSpecifiedSchema we loaded from the metastore - // contains partition columns. We can always get data types of partitioning columns - // from userSpecifiedSchema. - val partitionColumns = getColumnNames("part") - - val bucketSpec = table.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { n => - BucketSpec(n.toInt, getColumnNames("bucket"), getColumnNames("sort")) - } - - val options = table.storage.properties - val dataSource = - DataSource( - sparkSession, - userSpecifiedSchema = userSpecifiedSchema, - partitionColumns = partitionColumns, - bucketSpec = bucketSpec, - className = table.properties(DATASOURCE_PROVIDER), - options = options) - - LogicalRelation( - dataSource.resolveRelation(checkPathExist = true), - metastoreTableIdentifier = Some(TableIdentifier(in.name, Some(in.database)))) - } - } - - CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader) - } - - def refreshTable(tableIdent: TableIdentifier): Unit = { - // refreshTable does not eagerly reload the cache. It just invalidate the cache. - // Next time when we use the table, it will be populated in the cache. - // Since we also cache ParquetRelations converted from Hive Parquet tables and - // adding converted ParquetRelations into the cache is not defined in the load function - // of the cache (instead, we add the cache entry in convertToParquetRelation), - // it is better at here to invalidate the cache to avoid confusing waring logs from the - // cache loader (e.g. cannot find data source provider, which is only defined for - // data source table.). - cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent)) - } - - def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = { - // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName) - val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent) - new Path(new Path(client.getDatabase(dbName).locationUri), tblName).toString - } - - def lookupRelation( - tableIdent: TableIdentifier, - alias: Option[String]): LogicalPlan = { - val qualifiedTableName = getQualifiedTableName(tableIdent) - val table = client.getTable(qualifiedTableName.database, qualifiedTableName.name) - - if (table.properties.get(DATASOURCE_PROVIDER).isDefined) { - val dataSourceTable = cachedDataSourceTables(qualifiedTableName) - val qualifiedTable = SubqueryAlias(qualifiedTableName.name, dataSourceTable) - // Then, if alias is specified, wrap the table with a Subquery using the alias. - // Otherwise, wrap the table with a Subquery using the table name. - alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) - } else if (table.tableType == CatalogTableType.VIEW) { - val viewText = table.viewText.getOrElse(sys.error("Invalid view without text.")) - alias match { - case None => - SubqueryAlias(table.identifier.table, - sparkSession.sessionState.sqlParser.parsePlan(viewText)) - case Some(aliasText) => - SubqueryAlias(aliasText, sessionState.sqlParser.parsePlan(viewText)) - } - } else { - val qualifiedTable = - MetastoreRelation( - qualifiedTableName.database, qualifiedTableName.name)(table, client, sparkSession) - alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) - } - } - - private def getCached( --- End diff -- This is a utility function for rule `OrcConversions` and `ParquetConversions`. Thus, it is moved to `HiveStrategies` See the [new location](https://github.com/gatorsmile/spark/blob/9fe620567aa7d61038ef497bf2358e6fff374d38/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala#L323-L374).
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org