This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dfd7cde91c5d [SPARK-45842][SQL] Refactor Catalog Function APIs to use 
analyzer
dfd7cde91c5d is described below

commit dfd7cde91c5d6f034a11ea492be83afaf771ceb6
Author: Yihong He <yihong...@databricks.com>
AuthorDate: Wed Nov 8 18:22:40 2023 -0800

    [SPARK-45842][SQL] Refactor Catalog Function APIs to use analyzer
    
    ### What changes were proposed in this pull request?
    
    - Refactor Catalog Function APIs to use analyzer
    
    ### Why are the changes needed?
    
    - Less duplicate logics. We should not directly invoke catalog APIs, but go 
through analyzer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43720 from heyihong/SPARK-45842.
    
    Authored-by: Yihong He <yihong...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/sql/internal/CatalogImpl.scala    | 59 +++++++++++++---------
 1 file changed, 35 insertions(+), 24 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 5650e9d2399c..b1ad454fc041 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -22,14 +22,14 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, 
Database, Function, Table}
-import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, 
FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, 
LocalRelation, LogicalPlan, OptionList, RecoverPartitions, ShowFunctions, 
ShowNamespaces, ShowTables, UnresolvedTableSpec, View}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => 
V2Table, TableCatalog, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
CatalogV2Util, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, 
V1Table}
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, 
MultipartIdentifierHelper, NamespaceHelper, TransformHelper}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.command.ShowTablesCommand
@@ -284,6 +284,33 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
     CatalogImpl.makeDataset(functions.result(), sparkSession)
   }
 
+  private def toFunctionIdent(functionName: String): Seq[String] = {
+    val parsed = parseIdent(functionName)
+    // For backward compatibility (Spark 3.3 and prior), we should check if 
the function exists in
+    // the Hive Metastore first.
+    if (parsed.length <= 2 &&
+      !sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) &&
+      sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) {
+      qualifyV1Ident(parsed)
+    } else {
+      parsed
+    }
+  }
+
+  private def functionExists(ident: Seq[String]): Boolean = {
+    val plan =
+      UnresolvedFunctionName(ident, CatalogImpl.FUNCTION_EXISTS_COMMAND_NAME, 
false, None)
+    try {
+      sparkSession.sessionState.executePlan(plan).analyzed match {
+        case _: ResolvedPersistentFunc => true
+        case _: ResolvedNonPersistentFunc => true
+        case _ => false
+      }
+    } catch {
+      case e: AnalysisException if e.getErrorClass == "UNRESOLVED_ROUTINE" => 
false
+    }
+  }
+
   private def makeFunction(ident: Seq[String]): Function = {
     val plan = UnresolvedFunctionName(ident, "Catalog.makeFunction", false, 
None)
     sparkSession.sessionState.executePlan(plan).analyzed match {
@@ -465,17 +492,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * function. This throws an `AnalysisException` when no `Function` can be 
found.
    */
   override def getFunction(functionName: String): Function = {
-    val parsed = parseIdent(functionName)
-    // For backward compatibility (Spark 3.3 and prior), we should check if 
the function exists in
-    // the Hive Metastore first.
-    val nameParts = if (parsed.length <= 2 &&
-      !sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) &&
-      sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) {
-      qualifyV1Ident(parsed)
-    } else {
-      parsed
-    }
-    makeFunction(nameParts)
+    makeFunction(toFunctionIdent(functionName))
   }
 
   /**
@@ -540,23 +557,16 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * or a function.
    */
   override def functionExists(functionName: String): Boolean = {
-    val parsed = parseIdent(functionName)
-    // For backward compatibility (Spark 3.3 and prior), we should check if 
the function exists in
-    // the Hive Metastore first. This also checks if it's a built-in/temp 
function.
-    (parsed.length <= 2 && 
sessionCatalog.functionExists(parsed.asFunctionIdentifier)) || {
-      val plan = UnresolvedIdentifier(parsed)
-      sparkSession.sessionState.executePlan(plan).analyzed match {
-        case ResolvedIdentifier(catalog: FunctionCatalog, ident) => 
catalog.functionExists(ident)
-        case _ => false
-      }
-    }
+    functionExists(toFunctionIdent(functionName))
   }
 
   /**
    * Checks if the function with the specified name exists in the specified 
database.
    */
   override def functionExists(dbName: String, functionName: String): Boolean = 
{
-    sessionCatalog.functionExists(FunctionIdentifier(functionName, 
Option(dbName)))
+    // For backward compatibility (Spark 3.3 and prior), here we always look 
up the function from
+    // the Hive Metastore.
+    functionExists(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, 
functionName))
   }
 
   /**
@@ -942,4 +952,5 @@ private[sql] object CatalogImpl {
     new Dataset[T](queryExecution, enc)
   }
 
+  private val FUNCTION_EXISTS_COMMAND_NAME = "Catalog.functionExists"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to