Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14618#discussion_r74550719
  
    --- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---
    @@ -1,494 +0,0 @@
    -/*
    - * Licensed to the Apache Software Foundation (ASF) under one or more
    - * contributor license agreements.  See the NOTICE file distributed with
    - * this work for additional information regarding copyright ownership.
    - * The ASF licenses this file to You under the Apache License, Version 2.0
    - * (the "License"); you may not use this file except in compliance with
    - * the License.  You may obtain a copy of the License at
    - *
    - *    http://www.apache.org/licenses/LICENSE-2.0
    - *
    - * Unless required by applicable law or agreed to in writing, software
    - * distributed under the License is distributed on an "AS IS" BASIS,
    - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    - * See the License for the specific language governing permissions and
    - * limitations under the License.
    - */
    -
    -package org.apache.spark.sql.hive
    -
    -import scala.collection.JavaConverters._
    -
    -import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
    -import org.apache.hadoop.fs.Path
    -
    -import org.apache.spark.internal.Logging
    -import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
    -import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
    -import org.apache.spark.sql.catalyst.catalog._
    -import org.apache.spark.sql.catalyst.expressions._
    -import org.apache.spark.sql.catalyst.plans.logical._
    -import org.apache.spark.sql.catalyst.rules._
    -import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
    -import org.apache.spark.sql.execution.datasources.{Partition => _, _}
    -import 
org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetOptions}
    -import org.apache.spark.sql.hive.orc.OrcFileFormat
    -import org.apache.spark.sql.types._
    -
    -
    -/**
    - * Legacy catalog for interacting with the Hive metastore.
    - *
    - * This is still used for things like creating data source tables, but in 
the future will be
    - * cleaned up to integrate more nicely with [[HiveExternalCatalog]].
    - */
    -private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) 
extends Logging {
    -  private val sessionState = 
sparkSession.sessionState.asInstanceOf[HiveSessionState]
    -  private val client = 
sparkSession.sharedState.asInstanceOf[HiveSharedState].metadataHive
    -
    -  /** A fully qualified identifier for a table (i.e., database.tableName) 
*/
    -  case class QualifiedTableName(database: String, name: String)
    -
    -  private def getCurrentDatabase: String = 
sessionState.catalog.getCurrentDatabase
    -
    -  def getQualifiedTableName(tableIdent: TableIdentifier): 
QualifiedTableName = {
    -    QualifiedTableName(
    -      tableIdent.database.getOrElse(getCurrentDatabase).toLowerCase,
    -      tableIdent.table.toLowerCase)
    -  }
    -
    -  private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = 
{
    -    QualifiedTableName(
    -      t.identifier.database.getOrElse(getCurrentDatabase).toLowerCase,
    -      t.identifier.table.toLowerCase)
    -  }
    -
    -  /** A cache of Spark SQL data source tables that have been accessed. */
    -  protected[hive] val cachedDataSourceTables: 
LoadingCache[QualifiedTableName, LogicalPlan] = {
    -    val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
    -      override def load(in: QualifiedTableName): LogicalPlan = {
    -        logDebug(s"Creating new cached data source for $in")
    -        val table = client.getTable(in.database, in.name)
    -
    -        // TODO: the following code is duplicated with 
FindDataSourceTable.readDataSourceTable
    -
    -        def schemaStringFromParts: Option[String] = {
    -          table.properties.get(DATASOURCE_SCHEMA_NUMPARTS).map { numParts 
=>
    -            val parts = (0 until numParts.toInt).map { index =>
    -              val part = 
table.properties.get(s"$DATASOURCE_SCHEMA_PART_PREFIX$index").orNull
    -              if (part == null) {
    -                throw new AnalysisException(
    -                  "Could not read schema from the metastore because it is 
corrupted " +
    -                    s"(missing part $index of the schema, $numParts parts 
are expected).")
    -              }
    -
    -              part
    -            }
    -            // Stick all parts back to a single schema string.
    -            parts.mkString
    -          }
    -        }
    -
    -        def getColumnNames(colType: String): Seq[String] = {
    -          
table.properties.get(s"$DATASOURCE_SCHEMA.num${colType.capitalize}Cols").map {
    -            numCols => (0 until numCols.toInt).map { index =>
    -              
table.properties.getOrElse(s"$DATASOURCE_SCHEMA_PREFIX${colType}Col.$index",
    -                throw new AnalysisException(
    -                  s"Could not read $colType columns from the metastore 
because it is corrupted " +
    -                    s"(missing part $index of it, $numCols parts are 
expected)."))
    -            }
    -          }.getOrElse(Nil)
    -        }
    -
    -        // Originally, we used spark.sql.sources.schema to store the 
schema of a data source table.
    -        // After SPARK-6024, we removed this flag.
    -        // Although we are not using spark.sql.sources.schema any more, we 
need to still support.
    -        val schemaString = 
table.properties.get(DATASOURCE_SCHEMA).orElse(schemaStringFromParts)
    -
    -        val userSpecifiedSchema =
    -          schemaString.map(s => 
DataType.fromJson(s).asInstanceOf[StructType])
    -
    -        // We only need names at here since userSpecifiedSchema we loaded 
from the metastore
    -        // contains partition columns. We can always get data types of 
partitioning columns
    -        // from userSpecifiedSchema.
    -        val partitionColumns = getColumnNames("part")
    -
    -        val bucketSpec = 
table.properties.get(DATASOURCE_SCHEMA_NUMBUCKETS).map { n =>
    -          BucketSpec(n.toInt, getColumnNames("bucket"), 
getColumnNames("sort"))
    -        }
    -
    -        val options = table.storage.properties
    -        val dataSource =
    -          DataSource(
    -            sparkSession,
    -            userSpecifiedSchema = userSpecifiedSchema,
    -            partitionColumns = partitionColumns,
    -            bucketSpec = bucketSpec,
    -            className = table.properties(DATASOURCE_PROVIDER),
    -            options = options)
    -
    -        LogicalRelation(
    -          dataSource.resolveRelation(checkPathExist = true),
    -          metastoreTableIdentifier = Some(TableIdentifier(in.name, 
Some(in.database))))
    -      }
    -    }
    -
    -    CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
    -  }
    -
    -  def refreshTable(tableIdent: TableIdentifier): Unit = {
    -    // refreshTable does not eagerly reload the cache. It just invalidate 
the cache.
    -    // Next time when we use the table, it will be populated in the cache.
    -    // Since we also cache ParquetRelations converted from Hive Parquet 
tables and
    -    // adding converted ParquetRelations into the cache is not defined in 
the load function
    -    // of the cache (instead, we add the cache entry in 
convertToParquetRelation),
    -    // it is better at here to invalidate the cache to avoid confusing 
waring logs from the
    -    // cache loader (e.g. cannot find data source provider, which is only 
defined for
    -    // data source table.).
    -    cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
    -  }
    -
    -  def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
    -    // Code based on: hiveWarehouse.getTablePath(currentDatabase, 
tableName)
    -    val QualifiedTableName(dbName, tblName) = 
getQualifiedTableName(tableIdent)
    -    new Path(new Path(client.getDatabase(dbName).locationUri), 
tblName).toString
    -  }
    -
    -  def lookupRelation(
    -      tableIdent: TableIdentifier,
    -      alias: Option[String]): LogicalPlan = {
    -    val qualifiedTableName = getQualifiedTableName(tableIdent)
    -    val table = client.getTable(qualifiedTableName.database, 
qualifiedTableName.name)
    -
    -    if (table.properties.get(DATASOURCE_PROVIDER).isDefined) {
    -      val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
    -      val qualifiedTable = SubqueryAlias(qualifiedTableName.name, 
dataSourceTable)
    -      // Then, if alias is specified, wrap the table with a Subquery using 
the alias.
    -      // Otherwise, wrap the table with a Subquery using the table name.
    -      alias.map(a => SubqueryAlias(a, 
qualifiedTable)).getOrElse(qualifiedTable)
    -    } else if (table.tableType == CatalogTableType.VIEW) {
    -      val viewText = table.viewText.getOrElse(sys.error("Invalid view 
without text."))
    -      alias match {
    -        case None =>
    -          SubqueryAlias(table.identifier.table,
    -            sparkSession.sessionState.sqlParser.parsePlan(viewText))
    -        case Some(aliasText) =>
    -          SubqueryAlias(aliasText, 
sessionState.sqlParser.parsePlan(viewText))
    -      }
    -    } else {
    -      val qualifiedTable =
    -        MetastoreRelation(
    -          qualifiedTableName.database, qualifiedTableName.name)(table, 
client, sparkSession)
    -      alias.map(a => SubqueryAlias(a, 
qualifiedTable)).getOrElse(qualifiedTable)
    -    }
    -  }
    -
    -  private def getCached(
    --- End diff --
    
    This is a utility function for rule `OrcConversions` and 
`ParquetConversions`. Thus, it is moved to `HiveStrategies` See the [new 
location](https://github.com/gatorsmile/spark/blob/9fe620567aa7d61038ef497bf2358e6fff374d38/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala#L323-L374).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to