Github user yhuai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11750#discussion_r56424506
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
    @@ -0,0 +1,469 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.catalog
    +
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.AnalysisException
    +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
    +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
SubqueryAlias}
    +
    +
    +/**
    + * An internal catalog that is used by a Spark Session. This internal 
catalog serves as a
    + * proxy to the underlying metastore (e.g. Hive Metastore) and it also 
manages temporary
    + * tables and functions of the Spark Session that it belongs to.
    + */
    +class SessionCatalog(externalCatalog: ExternalCatalog) {
    +  import ExternalCatalog._
    +
    +  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
    +  private[this] val tempFunctions = new ConcurrentHashMap[String, 
CatalogFunction]
    +
    +  // Note: we track current database here because certain operations do 
not explicitly
    +  // specify the database (e.g. DROP TABLE my_table). In these cases we 
must first
    +  // check whether the temporary table or function exists, then, if not, 
operate on
    +  // the corresponding item in the current database.
    +  private[this] var currentDb = "default"
    +
    +  // 
----------------------------------------------------------------------------
    +  // Databases
    +  // 
----------------------------------------------------------------------------
    +  // All methods in this category interact directly with the underlying 
catalog.
    +  // 
----------------------------------------------------------------------------
    +
    +  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: 
Boolean): Unit = {
    +    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
    +  }
    +
    +  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: 
Boolean): Unit = {
    +    externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
    +  }
    +
    +  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
    +    externalCatalog.alterDatabase(dbDefinition)
    +  }
    +
    +  def getDatabase(db: String): CatalogDatabase = {
    +    externalCatalog.getDatabase(db)
    +  }
    +
    +  def databaseExists(db: String): Boolean = {
    +    externalCatalog.databaseExists(db)
    +  }
    +
    +  def listDatabases(): Seq[String] = {
    +    externalCatalog.listDatabases()
    +  }
    +
    +  def listDatabases(pattern: String): Seq[String] = {
    +    externalCatalog.listDatabases(pattern)
    +  }
    +
    +  def getCurrentDatabase: String = currentDb
    +
    +  def setCurrentDatabase(db: String): Unit = {
    +    if (!databaseExists(db)) {
    +      throw new AnalysisException(s"cannot set current database to 
non-existent '$db'")
    +    }
    +    currentDb = db
    +  }
    +
    +  // 
----------------------------------------------------------------------------
    +  // Tables
    +  // 
----------------------------------------------------------------------------
    +  // There are two kinds of tables, temporary tables and metastore tables.
    +  // Temporary tables are isolated across sessions and do not belong to any
    +  // particular database. Metastore tables can be used across multiple
    +  // sessions as their metadata is persisted in the underlying catalog.
    +  // 
----------------------------------------------------------------------------
    +
    +  // ----------------------------------------------------
    +  // | Methods that interact with metastore tables only |
    +  // ----------------------------------------------------
    +
    +  /**
    +   * Create a metastore table in the database specified in 
`tableDefinition`.
    +   * If no such database is specified, create it in the current database.
    +   */
    +  def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): 
Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
    +  }
    +
    +  /**
    +   * Alter the metadata of an existing metastore table identified by 
`tableDefinition`.
    +   *
    +   * If no database is specified in `tableDefinition`, assume the table is 
in the
    +   * current database.
    +   *
    +   * Note: If the underlying implementation does not support altering a 
certain field,
    +   * this becomes a no-op.
    +   */
    +  def alterTable(tableDefinition: CatalogTable): Unit = {
    +    val db = tableDefinition.name.database.getOrElse(currentDb)
    +    val newTableDefinition = tableDefinition.copy(
    +      name = TableIdentifier(tableDefinition.name.table, Some(db)))
    +    externalCatalog.alterTable(db, newTableDefinition)
    +  }
    +
    +  /**
    +   * Retrieve the metadata of an existing metastore table.
    +   * If no database is specified, assume the table is in the current 
database.
    +   */
    +  def getTable(name: TableIdentifier): CatalogTable = {
    +    val db = name.database.getOrElse(currentDb)
    +    externalCatalog.getTable(db, name.table)
    +  }
    +
    +  // -------------------------------------------------------------
    +  // | Methods that interact with temporary and metastore tables |
    +  // -------------------------------------------------------------
    +
    +  /**
    +   * Create a temporary table.
    +   */
    +  def createTempTable(
    +      name: String,
    +      tableDefinition: LogicalPlan,
    +      ignoreIfExists: Boolean): Unit = {
    +    if (tempTables.containsKey(name) && !ignoreIfExists) {
    +      throw new AnalysisException(s"Temporary table '$name' already 
exists.")
    +    }
    +    tempTables.put(name, tableDefinition)
    +  }
    +
    +  /**
    +   * Rename a table.
    +   *
    +   * If a database is specified in `oldName`, this will rename the table 
in that database.
    +   * If no database is specified, this will first attempt to rename a 
temporary table with
    +   * the same name, then, if that does not exist, rename the table in the 
current database.
    +   *
    +   * This assumes the database specified in `oldName` matches the one 
specified in `newName`.
    +   */
    +  def renameTable(oldName: TableIdentifier, newName: TableIdentifier): 
Unit = {
    +    if (oldName.database != newName.database) {
    +      throw new AnalysisException("rename does not support moving tables 
across databases")
    +    }
    +    val db = oldName.database.getOrElse(currentDb)
    +    if (oldName.database.isDefined || 
!tempTables.containsKey(oldName.table)) {
    +      externalCatalog.renameTable(db, oldName.table, newName.table)
    +    } else {
    +      val table = tempTables.remove(oldName.table)
    +      tempTables.put(newName.table, table)
    +    }
    +  }
    +
    +  /**
    +   * Drop a table.
    +   *
    +   * If a database is specified in `name`, this will drop the table from 
that database.
    +   * If no database is specified, this will first attempt to drop a 
temporary table with
    +   * the same name, then, if that does not exist, drop the table from the 
current database.
    +   */
    +  def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = 
{
    --- End diff --
    
    This is the semantic of postgres and Hive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to