This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new dfd7cde91c5d [SPARK-45842][SQL] Refactor Catalog Function APIs to use analyzer dfd7cde91c5d is described below commit dfd7cde91c5d6f034a11ea492be83afaf771ceb6 Author: Yihong He <yihong...@databricks.com> AuthorDate: Wed Nov 8 18:22:40 2023 -0800 [SPARK-45842][SQL] Refactor Catalog Function APIs to use analyzer ### What changes were proposed in this pull request? - Refactor Catalog Function APIs to use analyzer ### Why are the changes needed? - Less duplicate logics. We should not directly invoke catalog APIs, but go through analyzer. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #43720 from heyihong/SPARK-45842. Authored-by: Yihong He <yihong...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../apache/spark/sql/internal/CatalogImpl.scala | 59 +++++++++++++--------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 5650e9d2399c..b1ad454fc041 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -22,14 +22,14 @@ import scala.util.control.NonFatal import org.apache.spark.sql._ import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table} -import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, OptionList, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, UnresolvedTableSpec, View} import org.apache.spark.sql.catalyst.types.DataTypeUtils -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, NamespaceHelper, TransformHelper} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.ShowTablesCommand @@ -284,6 +284,33 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { CatalogImpl.makeDataset(functions.result(), sparkSession) } + private def toFunctionIdent(functionName: String): Seq[String] = { + val parsed = parseIdent(functionName) + // For backward compatibility (Spark 3.3 and prior), we should check if the function exists in + // the Hive Metastore first. + if (parsed.length <= 2 && + !sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) && + sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) { + qualifyV1Ident(parsed) + } else { + parsed + } + } + + private def functionExists(ident: Seq[String]): Boolean = { + val plan = + UnresolvedFunctionName(ident, CatalogImpl.FUNCTION_EXISTS_COMMAND_NAME, false, None) + try { + sparkSession.sessionState.executePlan(plan).analyzed match { + case _: ResolvedPersistentFunc => true + case _: ResolvedNonPersistentFunc => true + case _ => false + } + } catch { + case e: AnalysisException if e.getErrorClass == "UNRESOLVED_ROUTINE" => false + } + } + private def makeFunction(ident: Seq[String]): Function = { val plan = UnresolvedFunctionName(ident, "Catalog.makeFunction", false, None) sparkSession.sessionState.executePlan(plan).analyzed match { @@ -465,17 +492,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * function. This throws an `AnalysisException` when no `Function` can be found. */ override def getFunction(functionName: String): Function = { - val parsed = parseIdent(functionName) - // For backward compatibility (Spark 3.3 and prior), we should check if the function exists in - // the Hive Metastore first. - val nameParts = if (parsed.length <= 2 && - !sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) && - sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) { - qualifyV1Ident(parsed) - } else { - parsed - } - makeFunction(nameParts) + makeFunction(toFunctionIdent(functionName)) } /** @@ -540,23 +557,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * or a function. */ override def functionExists(functionName: String): Boolean = { - val parsed = parseIdent(functionName) - // For backward compatibility (Spark 3.3 and prior), we should check if the function exists in - // the Hive Metastore first. This also checks if it's a built-in/temp function. - (parsed.length <= 2 && sessionCatalog.functionExists(parsed.asFunctionIdentifier)) || { - val plan = UnresolvedIdentifier(parsed) - sparkSession.sessionState.executePlan(plan).analyzed match { - case ResolvedIdentifier(catalog: FunctionCatalog, ident) => catalog.functionExists(ident) - case _ => false - } - } + functionExists(toFunctionIdent(functionName)) } /** * Checks if the function with the specified name exists in the specified database. */ override def functionExists(dbName: String, functionName: String): Boolean = { - sessionCatalog.functionExists(FunctionIdentifier(functionName, Option(dbName))) + // For backward compatibility (Spark 3.3 and prior), here we always look up the function from + // the Hive Metastore. + functionExists(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, functionName)) } /** @@ -942,4 +952,5 @@ private[sql] object CatalogImpl { new Dataset[T](queryExecution, enc) } + private val FUNCTION_EXISTS_COMMAND_NAME = "Catalog.functionExists" } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org