Repository: spark
Updated Branches:
  refs/heads/master e0f4f206b -> 0ce11d0e3


[SPARK-23486] cache the function name from the external catalog for 
lookupFunctions

## What changes were proposed in this pull request?

This PR will cache the function name from external catalog, it is used by 
lookupFunctions in the analyzer, and it is cached for each query plan. The 
original problem is reported in the [ 
spark-19737](https://issues.apache.org/jira/browse/SPARK-19737)

## How was this patch tested?

create new test file LookupFunctionsSuite and add test case in 
SessionCatalogSuite

Author: Kevin Yu <q...@us.ibm.com>

Closes #20795 from kevinyu98/spark-23486.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ce11d0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ce11d0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ce11d0e

Branch: refs/heads/master
Commit: 0ce11d0e3a7c8c48d9f7305d2dd39c7b281b2a53
Parents: e0f4f20
Author: Kevin Yu <q...@us.ibm.com>
Authored: Thu Jul 12 22:20:06 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Thu Jul 12 22:20:06 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  45 ++++++--
 .../sql/catalyst/catalog/SessionCatalog.scala   |  16 +++
 .../analysis/LookupFunctionsSuite.scala         | 104 +++++++++++++++++++
 .../catalyst/catalog/SessionCatalogSuite.scala  |  36 +++++++
 .../spark/sql/hive/HiveSessionCatalog.scala     |   4 +
 5 files changed, 199 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0ce11d0e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 9749893..960ee27 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import java.util.Locale
+
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.Random
 
@@ -1208,16 +1211,46 @@ class Analyzer(
    * only performs simple existence check according to the function identifier 
to quickly identify
    * undefined functions without triggering relation resolution, which may 
incur potentially
    * expensive partition/schema discovery process in some cases.
-   *
+   * In order to avoid duplicate external functions lookup, the external 
function identifier will
+   * store in the local hash set externalFunctionNameSet.
    * @see [[ResolveFunctions]]
    * @see https://issues.apache.org/jira/browse/SPARK-19737
    */
   object LookupFunctions extends Rule[LogicalPlan] {
-    override def apply(plan: LogicalPlan): LogicalPlan = 
plan.transformAllExpressions {
-      case f: UnresolvedFunction if !catalog.functionExists(f.name) =>
-        withPosition(f) {
-          throw new 
NoSuchFunctionException(f.name.database.getOrElse("default"), f.name.funcName)
-        }
+    override def apply(plan: LogicalPlan): LogicalPlan = {
+      val externalFunctionNameSet = new mutable.HashSet[FunctionIdentifier]()
+      plan.transformAllExpressions {
+        case f: UnresolvedFunction
+          if externalFunctionNameSet.contains(normalizeFuncName(f.name)) => f
+        case f: UnresolvedFunction if catalog.isRegisteredFunction(f.name) => f
+        case f: UnresolvedFunction if catalog.isPersistentFunction(f.name) =>
+          externalFunctionNameSet.add(normalizeFuncName(f.name))
+          f
+        case f: UnresolvedFunction =>
+          withPosition(f) {
+            throw new 
NoSuchFunctionException(f.name.database.getOrElse(catalog.getCurrentDatabase),
+              f.name.funcName)
+          }
+      }
+    }
+
+    def normalizeFuncName(name: FunctionIdentifier): FunctionIdentifier = {
+      val funcName = if (conf.caseSensitiveAnalysis) {
+        name.funcName
+      } else {
+        name.funcName.toLowerCase(Locale.ROOT)
+      }
+
+      val databaseName = name.database match {
+        case Some(a) => formatDatabaseName(a)
+        case None => catalog.getCurrentDatabase
+      }
+
+      FunctionIdentifier(funcName, Some(databaseName))
+    }
+
+    protected def formatDatabaseName(name: String): String = {
+      if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0ce11d0e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index c26a345..b09b81e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -1193,6 +1193,22 @@ class SessionCatalog(
       !hiveFunctions.contains(name.funcName.toLowerCase(Locale.ROOT))
   }
 
+  /**
+   * Return whether this function has been registered in the function registry 
of the current
+   * session. If not existed, return false.
+   */
+  def isRegisteredFunction(name: FunctionIdentifier): Boolean = {
+    functionRegistry.functionExists(name)
+  }
+
+  /**
+   * Returns whether it is a persistent function. If not existed, returns 
false.
+   */
+  def isPersistentFunction(name: FunctionIdentifier): Boolean = {
+    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
+    databaseExists(db) && externalCatalog.functionExists(db, name.funcName)
+  }
+
   protected def failFunctionLookup(name: FunctionIdentifier): Nothing = {
     throw new NoSuchFunctionException(
       db = name.database.getOrElse(getCurrentDatabase), func = name.funcName)

http://git-wip-us.apache.org/repos/asf/spark/blob/0ce11d0e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
new file mode 100644
index 0000000..cea0f2a
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import java.net.URI
+
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, 
InMemoryCatalog, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+
+class LookupFunctionsSuite extends PlanTest {
+
+  test("SPARK-23486: the functionExists for the Persistent function check") {
+    val externalCatalog = new CustomInMemoryCatalog
+    val conf = new SQLConf()
+    val catalog = new SessionCatalog(externalCatalog, 
FunctionRegistry.builtin, conf)
+    val analyzer = {
+      catalog.createDatabase(
+        CatalogDatabase("default", "", new URI("loc"), Map.empty),
+        ignoreIfExists = false)
+      new Analyzer(catalog, conf)
+    }
+
+    def table(ref: String): LogicalPlan = 
UnresolvedRelation(TableIdentifier(ref))
+    val unresolvedPersistentFunc = UnresolvedFunction("func", Seq.empty, false)
+    val unresolvedRegisteredFunc = UnresolvedFunction("max", Seq.empty, false)
+    val plan = Project(
+      Seq(Alias(unresolvedPersistentFunc, "call1")(), 
Alias(unresolvedPersistentFunc, "call2")(),
+        Alias(unresolvedPersistentFunc, "call3")(), 
Alias(unresolvedRegisteredFunc, "call4")(),
+        Alias(unresolvedRegisteredFunc, "call5")()),
+      table("TaBlE"))
+    analyzer.LookupFunctions.apply(plan)
+
+    assert(externalCatalog.getFunctionExistsCalledTimes == 1)
+    assert(analyzer.LookupFunctions.normalizeFuncName
+      (unresolvedPersistentFunc.name).database == Some("default"))
+  }
+
+  test("SPARK-23486: the functionExists for the Registered function check") {
+    val externalCatalog = new InMemoryCatalog
+    val conf = new SQLConf()
+    val customerFunctionReg = new CustomerFunctionRegistry
+    val catalog = new SessionCatalog(externalCatalog, customerFunctionReg, 
conf)
+    val analyzer = {
+      catalog.createDatabase(
+        CatalogDatabase("default", "", new URI("loc"), Map.empty),
+        ignoreIfExists = false)
+      new Analyzer(catalog, conf)
+    }
+
+    def table(ref: String): LogicalPlan = 
UnresolvedRelation(TableIdentifier(ref))
+    val unresolvedRegisteredFunc = UnresolvedFunction("max", Seq.empty, false)
+    val plan = Project(
+      Seq(Alias(unresolvedRegisteredFunc, "call1")(), 
Alias(unresolvedRegisteredFunc, "call2")()),
+      table("TaBlE"))
+    analyzer.LookupFunctions.apply(plan)
+
+    assert(customerFunctionReg.getIsRegisteredFunctionCalledTimes == 2)
+    assert(analyzer.LookupFunctions.normalizeFuncName
+      (unresolvedRegisteredFunc.name).database == Some("default"))
+  }
+}
+
+class CustomerFunctionRegistry extends SimpleFunctionRegistry {
+
+  private var isRegisteredFunctionCalledTimes: Int = 0;
+
+  override def functionExists(funcN: FunctionIdentifier): Boolean = 
synchronized {
+    isRegisteredFunctionCalledTimes = isRegisteredFunctionCalledTimes + 1
+    true
+  }
+
+  def getIsRegisteredFunctionCalledTimes: Int = isRegisteredFunctionCalledTimes
+}
+
+class CustomInMemoryCatalog extends InMemoryCatalog {
+
+  private var functionExistsCalledTimes: Int = 0
+
+  override def functionExists(db: String, funcName: String): Boolean = 
synchronized {
+    functionExistsCalledTimes = functionExistsCalledTimes + 1
+    true
+  }
+
+  def getFunctionExistsCalledTimes: Int = functionExistsCalledTimes
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0ce11d0e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 6a7375e..50496a0 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -1217,6 +1217,42 @@ abstract class SessionCatalogSuite extends AnalysisTest {
     }
   }
 
+  test("isRegisteredFunction") {
+    withBasicCatalog { catalog =>
+      // Returns false when the function does not register
+      assert(!catalog.isRegisteredFunction(FunctionIdentifier("temp1")))
+
+      // Returns true when the function does register
+      val tempFunc1 = (e: Seq[Expression]) => e.head
+      catalog.registerFunction(newFunc("iff", None), overrideIfExists = false,
+        functionBuilder = Some(tempFunc1) )
+      assert(catalog.isRegisteredFunction(FunctionIdentifier("iff")))
+
+      // Returns false when using the createFunction
+      catalog.createFunction(newFunc("sum", Some("db2")), ignoreIfExists = 
false)
+      assert(!catalog.isRegisteredFunction(FunctionIdentifier("sum")))
+      assert(!catalog.isRegisteredFunction(FunctionIdentifier("sum", 
Some("db2"))))
+    }
+  }
+
+  test("isPersistentFunction") {
+    withBasicCatalog { catalog =>
+      // Returns false when the function does not register
+      assert(!catalog.isPersistentFunction(FunctionIdentifier("temp2")))
+
+      // Returns false when the function does register
+      val tempFunc2 = (e: Seq[Expression]) => e.head
+      catalog.registerFunction(newFunc("iff", None), overrideIfExists = false,
+        functionBuilder = Some(tempFunc2))
+      assert(!catalog.isPersistentFunction(FunctionIdentifier("iff")))
+
+      // Return true when using the createFunction
+      catalog.createFunction(newFunc("sum", Some("db2")), ignoreIfExists = 
false)
+      assert(catalog.isPersistentFunction(FunctionIdentifier("sum", 
Some("db2"))))
+      assert(!catalog.isPersistentFunction(FunctionIdentifier("db2.sum")))
+    }
+  }
+
   test("drop function") {
     withBasicCatalog { catalog =>
       assert(catalog.externalCatalog.listFunctions("db2", "*").toSet == 
Set("func1"))

http://git-wip-us.apache.org/repos/asf/spark/blob/0ce11d0e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 94ddeae..de41bb4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -175,6 +175,10 @@ private[sql] class HiveSessionCatalog(
     super.functionExists(name) || hiveFunctions.contains(name.funcName)
   }
 
+  override def isPersistentFunction(name: FunctionIdentifier): Boolean = {
+    super.isPersistentFunction(name) || hiveFunctions.contains(name.funcName)
+  }
+
   /** List of functions we pass over to Hive. Note that over time this list 
should go to 0. */
   // We have a list of Hive built-in functions that we do not support. So, we 
will check
   // Hive's function registry and lazily load needed functions into our own 
function registry.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to