[SPARK-14123][SPARK-14384][SQL] Handle CreateFunction/DropFunction

## What changes were proposed in this pull request?
This PR implements CreateFunction and DropFunction commands. Besides 
implementing these two commands, we also change how to manage functions. Here 
are the main changes.
* `FunctionRegistry` will be a container to store all functions builders and it 
will not actively load any functions. Because of this change, we do not need to 
maintain a separate registry for HiveContext. So, `HiveFunctionRegistry` is 
deleted.
* SessionCatalog takes care the job of loading a function if this function is 
not in the `FunctionRegistry` but its metadata is stored in the external 
catalog. For this case, SessionCatalog will (1) load the metadata from the 
external catalog, (2) load all needed resources (i.e. jars and files), (3) 
create a function builder based on the function definition, (4) register the 
function builder in the `FunctionRegistry`.
* A `UnresolvedGenerator` is created. So, the parser will not need to call 
`FunctionRegistry` directly during parsing, which is not a good time to create 
a Hive UDTF. In the analysis phase, we will resolve `UnresolvedGenerator`.

This PR is based on viirya's https://github.com/apache/spark/pull/12036/

## How was this patch tested?
Existing tests and new tests.

## TODOs
[x] Self-review
[x] Cleanup
[x] More tests for create/drop functions (we need to more tests for permanent 
functions).
[ ] File JIRAs for all TODOs
[x] Standardize the error message when a function does not exist.

Author: Yin Huai <yh...@databricks.com>
Author: Liang-Chi Hsieh <sim...@tw.ibm.com>

Closes #12117 from yhuai/function.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72544d6f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72544d6f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72544d6f

Branch: refs/heads/master
Commit: 72544d6f2a72b9e44e0a32de1fb379e3342be5c3
Parents: bc36df1
Author: Yin Huai <yh...@databricks.com>
Authored: Tue Apr 5 12:27:06 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Apr 5 12:27:06 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  17 +-
 .../catalyst/analysis/FunctionRegistry.scala    |   2 +
 .../sql/catalyst/analysis/unresolved.scala      |  31 +++-
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |   5 -
 .../sql/catalyst/catalog/SessionCatalog.scala   | 171 ++++++++++++-------
 .../catalyst/catalog/functionResources.scala    |  61 +++++++
 .../spark/sql/catalyst/catalog/interface.scala  |  16 +-
 .../apache/spark/sql/catalyst/identifiers.scala |  16 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  14 +-
 .../sql/catalyst/analysis/AnalysisTest.scala    |   2 +-
 .../analysis/DecimalPrecisionSuite.scala        |   2 +-
 .../sql/catalyst/catalog/CatalogTestCases.scala |  20 +--
 .../catalyst/catalog/SessionCatalogSuite.scala  | 132 ++++----------
 .../optimizer/BooleanSimplificationSuite.scala  |   1 -
 .../optimizer/EliminateSortsSuite.scala         |   2 +-
 .../sql/catalyst/parser/PlanParserSuite.scala   |  15 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  18 +-
 .../spark/sql/execution/SparkSqlParser.scala    |   7 +-
 .../spark/sql/execution/command/commands.scala  |  29 ++--
 .../spark/sql/execution/command/ddl.scala       |  20 ---
 .../spark/sql/execution/command/functions.scala | 114 +++++++++++++
 .../spark/sql/internal/SessionState.scala       |  11 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    |   8 +-
 .../scala/org/apache/spark/sql/UDFSuite.scala   |   3 +-
 .../sql/execution/command/DDLCommandSuite.scala |  12 +-
 .../apache/spark/sql/test/SQLTestUtils.scala    |  22 +++
 .../thriftserver/HiveThriftServer2Suites.scala  |  94 +++++-----
 .../spark/sql/hive/HiveExternalCatalog.scala    |  11 +-
 .../spark/sql/hive/HiveSessionCatalog.scala     | 143 +++++++++++++++-
 .../spark/sql/hive/HiveSessionState.scala       |  18 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |  20 ++-
 .../sql/hive/execution/HiveSqlParser.scala      |  17 +-
 .../org/apache/spark/sql/hive/hiveUDFs.scala    | 119 +------------
 .../apache/spark/sql/hive/test/TestHive.scala   |  16 +-
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   | 165 ++++++++++++++++++
 .../org/apache/spark/sql/hive/UDFSuite.scala    | 167 +++++++++++++++++-
 .../sql/hive/execution/HiveQuerySuite.scala     |   8 +-
 .../spark/sql/hive/execution/HiveUDFSuite.scala |  10 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |  74 +++++---
 39 files changed, 1100 insertions(+), 513 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 3e0a6d2..473c91e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -45,10 +45,7 @@ object SimpleAnalyzer
     new SimpleCatalystConf(caseSensitiveAnalysis = true))
 
 class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
-  extends Analyzer(
-    new SessionCatalog(new InMemoryCatalog, functionRegistry, conf),
-    functionRegistry,
-    conf)
+  extends Analyzer(new SessionCatalog(new InMemoryCatalog, functionRegistry, 
conf), conf)
 
 /**
  * Provides a logical query plan analyzer, which translates 
[[UnresolvedAttribute]]s and
@@ -57,7 +54,6 @@ class SimpleAnalyzer(functionRegistry: FunctionRegistry, 
conf: CatalystConf)
  */
 class Analyzer(
     catalog: SessionCatalog,
-    registry: FunctionRegistry,
     conf: CatalystConf,
     maxIterations: Int = 100)
   extends RuleExecutor[LogicalPlan] with CheckAnalysis {
@@ -756,9 +752,18 @@ class Analyzer(
       case q: LogicalPlan =>
         q transformExpressions {
           case u if !u.childrenResolved => u // Skip until children are 
resolved.
+          case u @ UnresolvedGenerator(name, children) =>
+            withPosition(u) {
+              catalog.lookupFunction(name, children) match {
+                case generator: Generator => generator
+                case other =>
+                  failAnalysis(s"$name is expected to be a generator. However, 
" +
+                    s"its class is ${other.getClass.getCanonicalName}, which 
is not a generator.")
+              }
+            }
           case u @ UnresolvedFunction(name, children, isDistinct) =>
             withPosition(u) {
-              registry.lookupFunction(name, children) match {
+              catalog.lookupFunction(name, children) match {
                 // DISTINCT is not meaningful for a Max or a Min.
                 case max: Max if isDistinct =>
                   AggregateExpression(max, Complete, isDistinct = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index ca8db3c..7af5ffb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -52,6 +52,8 @@ trait FunctionRegistry {
   /** Drop a function and return whether the function existed. */
   def dropFunction(name: String): Boolean
 
+  /** Checks if a function with a given name exists. */
+  def functionExists(name: String): Boolean = lookupFunction(name).isDefined
 }
 
 class SimpleFunctionRegistry extends FunctionRegistry {

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index fbbf630..b2f362b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -18,9 +18,9 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{errors, TableIdentifier}
+import org.apache.spark.sql.catalyst.{errors, InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
CodegenFallback, ExprCode}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
@@ -133,6 +133,33 @@ object UnresolvedAttribute {
   }
 }
 
+/**
+ * Represents an unresolved generator, which will be created by the parser for
+ * the [[org.apache.spark.sql.catalyst.plans.logical.Generate]] operator.
+ * The analyzer will resolve this generator.
+ */
+case class UnresolvedGenerator(name: String, children: Seq[Expression]) 
extends Generator {
+
+  override def elementTypes: Seq[(DataType, Boolean, String)] =
+    throw new UnresolvedException(this, "elementTypes")
+  override def dataType: DataType = throw new UnresolvedException(this, 
"dataType")
+  override def foldable: Boolean = throw new UnresolvedException(this, 
"foldable")
+  override def nullable: Boolean = throw new UnresolvedException(this, 
"nullable")
+  override lazy val resolved = false
+
+  override def prettyName: String = name
+  override def toString: String = s"'$name(${children.mkString(", ")})"
+
+  override def eval(input: InternalRow = null): TraversableOnce[InternalRow] =
+    throw new UnsupportedOperationException(s"Cannot evaluate expression: 
$this")
+
+  override protected def genCode(ctx: CodegenContext, ev: ExprCode): String =
+    throw new UnsupportedOperationException(s"Cannot evaluate expression: 
$this")
+
+  override def terminate(): TraversableOnce[InternalRow] =
+    throw new UnsupportedOperationException(s"Cannot evaluate expression: 
$this")
+}
+
 case class UnresolvedFunction(
     name: String,
     children: Seq[Expression],

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 2bbb970..2af0107 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -315,11 +315,6 @@ class InMemoryCatalog extends ExternalCatalog {
     catalog(db).functions.put(newName, newFunc)
   }
 
-  override def alterFunction(db: String, funcDefinition: CatalogFunction): 
Unit = synchronized {
-    requireFunctionExists(db, funcDefinition.identifier.funcName)
-    catalog(db).functions.put(funcDefinition.identifier.funcName, 
funcDefinition)
-  }
-
   override def getFunction(db: String, funcName: String): CatalogFunction = 
synchronized {
     requireFunctionExists(db, funcName)
     catalog(db).functions(funcName)

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 3b8ce63..c08ffbb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, 
SimpleFunctionRegistry}
+import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, 
NoSuchFunctionException, SimpleFunctionRegistry}
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
@@ -39,17 +39,21 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
  */
 class SessionCatalog(
     externalCatalog: ExternalCatalog,
+    functionResourceLoader: FunctionResourceLoader,
     functionRegistry: FunctionRegistry,
     conf: CatalystConf) {
   import ExternalCatalog._
 
-  def this(externalCatalog: ExternalCatalog, functionRegistry: 
FunctionRegistry) {
-    this(externalCatalog, functionRegistry, new SimpleCatalystConf(true))
+  def this(
+      externalCatalog: ExternalCatalog,
+      functionRegistry: FunctionRegistry,
+      conf: CatalystConf) {
+    this(externalCatalog, DummyFunctionResourceLoader, functionRegistry, conf)
   }
 
   // For testing only.
   def this(externalCatalog: ExternalCatalog) {
-    this(externalCatalog, new SimpleFunctionRegistry)
+    this(externalCatalog, new SimpleFunctionRegistry, new 
SimpleCatalystConf(true))
   }
 
   protected[this] val tempTables = new mutable.HashMap[String, LogicalPlan]
@@ -439,53 +443,88 @@ class SessionCatalog(
    */
   def dropFunction(name: FunctionIdentifier): Unit = {
     val db = name.database.getOrElse(currentDb)
+    val qualified = name.copy(database = Some(db)).unquotedString
+    if (functionRegistry.functionExists(qualified)) {
+      // If we have loaded this function into the FunctionRegistry,
+      // also drop it from there.
+      // For a permanent function, because we loaded it to the FunctionRegistry
+      // when it's first used, we also need to drop it from the 
FunctionRegistry.
+      functionRegistry.dropFunction(qualified)
+    }
     externalCatalog.dropFunction(db, name.funcName)
   }
 
   /**
-   * Alter a metastore function whose name that matches the one specified in 
`funcDefinition`.
-   *
-   * If no database is specified in `funcDefinition`, assume the function is 
in the
-   * current database.
-   *
-   * Note: If the underlying implementation does not support altering a 
certain field,
-   * this becomes a no-op.
-   */
-  def alterFunction(funcDefinition: CatalogFunction): Unit = {
-    val db = funcDefinition.identifier.database.getOrElse(currentDb)
-    val newFuncDefinition = funcDefinition.copy(
-      identifier = FunctionIdentifier(funcDefinition.identifier.funcName, 
Some(db)))
-    externalCatalog.alterFunction(db, newFuncDefinition)
-  }
-
-  /**
    * Retrieve the metadata of a metastore function.
    *
    * If a database is specified in `name`, this will return the function in 
that database.
    * If no database is specified, this will return the function in the current 
database.
    */
+  // TODO: have a better name. This method is actually for fetching the 
metadata of a function.
   def getFunction(name: FunctionIdentifier): CatalogFunction = {
     val db = name.database.getOrElse(currentDb)
     externalCatalog.getFunction(db, name.funcName)
   }
 
+  /**
+   * Check if the specified function exists.
+   */
+  def functionExists(name: FunctionIdentifier): Boolean = {
+    if (functionRegistry.functionExists(name.unquotedString)) {
+      // This function exists in the FunctionRegistry.
+      true
+    } else {
+      // Need to check if this function exists in the metastore.
+      try {
+        // TODO: It's better to ask external catalog if this function exists.
+        // So, we can avoid of having this hacky try/catch block.
+        getFunction(name) != null
+      } catch {
+        case _: NoSuchFunctionException => false
+        case _: AnalysisException => false // HiveExternalCatalog wraps all 
exceptions with it.
+      }
+    }
+  }
 
   // ----------------------------------------------------------------
   // | Methods that interact with temporary and metastore functions |
   // ----------------------------------------------------------------
 
   /**
+   * Construct a [[FunctionBuilder]] based on the provided class that 
represents a function.
+   *
+   * This performs reflection to decide what type of [[Expression]] to return 
in the builder.
+   */
+  private[sql] def makeFunctionBuilder(name: String, functionClassName: 
String): FunctionBuilder = {
+    // TODO: at least support UDAFs here
+    throw new UnsupportedOperationException("Use sqlContext.udf.register(...) 
instead.")
+  }
+
+  /**
+   * Loads resources such as JARs and Files for a function. Every resource is 
represented
+   * by a tuple (resource type, resource uri).
+   */
+  def loadFunctionResources(resources: Seq[(String, String)]): Unit = {
+    resources.foreach { case (resourceType, uri) =>
+      val functionResource =
+        
FunctionResource(FunctionResourceType.fromString(resourceType.toLowerCase), uri)
+      functionResourceLoader.loadResource(functionResource)
+    }
+  }
+
+  /**
    * Create a temporary function.
    * This assumes no database is specified in `funcDefinition`.
    */
   def createTempFunction(
       name: String,
+      info: ExpressionInfo,
       funcDefinition: FunctionBuilder,
       ignoreIfExists: Boolean): Unit = {
     if (functionRegistry.lookupFunctionBuilder(name).isDefined && 
!ignoreIfExists) {
       throw new AnalysisException(s"Temporary function '$name' already 
exists.")
     }
-    functionRegistry.registerFunction(name, funcDefinition)
+    functionRegistry.registerFunction(name, info, funcDefinition)
   }
 
   /**
@@ -501,41 +540,59 @@ class SessionCatalog(
     }
   }
 
-  /**
-   * Rename a function.
-   *
-   * If a database is specified in `oldName`, this will rename the function in 
that database.
-   * If no database is specified, this will first attempt to rename a 
temporary function with
-   * the same name, then, if that does not exist, rename the function in the 
current database.
-   *
-   * This assumes the database specified in `oldName` matches the one 
specified in `newName`.
-   */
-  def renameFunction(oldName: FunctionIdentifier, newName: 
FunctionIdentifier): Unit = {
-    if (oldName.database != newName.database) {
-      throw new AnalysisException("rename does not support moving functions 
across databases")
-    }
-    val db = oldName.database.getOrElse(currentDb)
-    val oldBuilder = functionRegistry.lookupFunctionBuilder(oldName.funcName)
-    if (oldName.database.isDefined || oldBuilder.isEmpty) {
-      externalCatalog.renameFunction(db, oldName.funcName, newName.funcName)
-    } else {
-      val oldExpressionInfo = 
functionRegistry.lookupFunction(oldName.funcName).get
-      val newExpressionInfo = new ExpressionInfo(
-        oldExpressionInfo.getClassName,
-        newName.funcName,
-        oldExpressionInfo.getUsage,
-        oldExpressionInfo.getExtended)
-      functionRegistry.dropFunction(oldName.funcName)
-      functionRegistry.registerFunction(newName.funcName, newExpressionInfo, 
oldBuilder.get)
-    }
+  protected def failFunctionLookup(name: String): Nothing = {
+    throw new AnalysisException(s"Undefined function: $name. This function is 
" +
+      s"neither a registered temporary function nor " +
+      s"a permanent function registered in the database $currentDb.")
   }
 
   /**
    * Return an [[Expression]] that represents the specified function, assuming 
it exists.
-   * Note: This is currently only used for temporary functions.
+   *
+   * For a temporary function or a permanent function that has been loaded,
+   * this method will simply lookup the function through the
+   * FunctionRegistry and create an expression based on the builder.
+   *
+   * For a permanent function that has not been loaded, we will first fetch 
its metadata
+   * from the underlying external catalog. Then, we will load all resources 
associated
+   * with this function (i.e. jars and files). Finally, we create a function 
builder
+   * based on the function class and put the builder into the FunctionRegistry.
+   * The name of this function in the FunctionRegistry will be 
`databaseName.functionName`.
    */
   def lookupFunction(name: String, children: Seq[Expression]): Expression = {
-    functionRegistry.lookupFunction(name, children)
+    // TODO: Right now, the name can be qualified or not qualified.
+    // It will be better to get a FunctionIdentifier.
+    // TODO: Right now, we assume that name is not qualified!
+    val qualifiedName = FunctionIdentifier(name, 
Some(currentDb)).unquotedString
+    if (functionRegistry.functionExists(name)) {
+      // This function has been already loaded into the function registry.
+      functionRegistry.lookupFunction(name, children)
+    } else if (functionRegistry.functionExists(qualifiedName)) {
+      // This function has been already loaded into the function registry.
+      // Unlike the above block, we find this function by using the qualified 
name.
+      functionRegistry.lookupFunction(qualifiedName, children)
+    } else {
+      // The function has not been loaded to the function registry, which means
+      // that the function is a permanent function (if it actually has been 
registered
+      // in the metastore). We need to first put the function in the 
FunctionRegistry.
+      val catalogFunction = try {
+        externalCatalog.getFunction(currentDb, name)
+      } catch {
+        case e: AnalysisException => failFunctionLookup(name)
+        case e: NoSuchFunctionException => failFunctionLookup(name)
+      }
+      loadFunctionResources(catalogFunction.resources)
+      // Please note that qualifiedName is provided by the user. However,
+      // catalogFunction.identifier.unquotedString is returned by the 
underlying
+      // catalog. So, it is possible that qualifiedName is not exactly the 
same as
+      // catalogFunction.identifier.unquotedString (difference is on 
case-sensitivity).
+      // At here, we preserve the input from the user.
+      val info = new ExpressionInfo(catalogFunction.className, qualifiedName)
+      val builder = makeFunctionBuilder(qualifiedName, 
catalogFunction.className)
+      createTempFunction(qualifiedName, info, builder, ignoreIfExists = false)
+      // Now, we need to create the Expression.
+      functionRegistry.lookupFunction(qualifiedName, children)
+    }
   }
 
   /**
@@ -545,17 +602,11 @@ class SessionCatalog(
     val dbFunctions =
       externalCatalog.listFunctions(db, pattern).map { f => 
FunctionIdentifier(f, Some(db)) }
     val regex = pattern.replaceAll("\\*", ".*").r
-    val _tempFunctions = functionRegistry.listFunction()
+    val loadedFunctions = functionRegistry.listFunction()
       .filter { f => regex.pattern.matcher(f).matches() }
       .map { f => FunctionIdentifier(f) }
-    dbFunctions ++ _tempFunctions
-  }
-
-  /**
-   * Return a temporary function. For testing only.
-   */
-  private[catalog] def getTempFunction(name: String): Option[FunctionBuilder] 
= {
-    functionRegistry.lookupFunctionBuilder(name)
+    // TODO: Actually, there will be dbFunctions that have been loaded into 
the FunctionRegistry.
+    // So, the returned list may have two entries for the same function.
+    dbFunctions ++ loadedFunctions
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala
new file mode 100644
index 0000000..5adcc89
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/functionResources.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import org.apache.spark.sql.AnalysisException
+
+/** An trait that represents the type of a resourced needed by a function. */
+sealed trait FunctionResourceType
+
+object JarResource extends FunctionResourceType
+
+object FileResource extends FunctionResourceType
+
+// We do not allow users to specify a archive because it is YARN specific.
+// When loading resources, we will throw an exception and ask users to
+// use --archive with spark submit.
+object ArchiveResource extends FunctionResourceType
+
+object FunctionResourceType {
+  def fromString(resourceType: String): FunctionResourceType = {
+    resourceType.toLowerCase match {
+      case "jar" => JarResource
+      case "file" => FileResource
+      case "archive" => ArchiveResource
+      case other =>
+        throw new AnalysisException(s"Resource Type '$resourceType' is not 
supported.")
+    }
+  }
+}
+
+case class FunctionResource(resourceType: FunctionResourceType, uri: String)
+
+/**
+ * A simple trait representing a class that can be used to load resources used 
by
+ * a function. Because only a SQLContext can load resources, we create this 
trait
+ * to avoid of explicitly passing SQLContext around.
+ */
+trait FunctionResourceLoader {
+  def loadResource(resource: FunctionResource): Unit
+}
+
+object DummyFunctionResourceLoader extends FunctionResourceLoader {
+  override def loadResource(resource: FunctionResource): Unit = {
+    throw new UnsupportedOperationException
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 303846d..97b9946 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -150,15 +150,6 @@ abstract class ExternalCatalog {
 
   def renameFunction(db: String, oldName: String, newName: String): Unit
 
-  /**
-   * Alter a function whose name that matches the one specified in 
`funcDefinition`,
-   * assuming the function exists.
-   *
-   * Note: If the underlying implementation does not support altering a 
certain field,
-   * this becomes a no-op.
-   */
-  def alterFunction(db: String, funcDefinition: CatalogFunction): Unit
-
   def getFunction(db: String, funcName: String): CatalogFunction
 
   def listFunctions(db: String, pattern: String): Seq[String]
@@ -171,8 +162,13 @@ abstract class ExternalCatalog {
  *
  * @param identifier name of the function
  * @param className fully qualified class name, e.g. 
"org.apache.spark.util.MyFunc"
+ * @param resources resource types and Uris used by the function
  */
-case class CatalogFunction(identifier: FunctionIdentifier, className: String)
+// TODO: Use FunctionResource instead of (String, String) as the element type 
of resources.
+case class CatalogFunction(
+    identifier: FunctionIdentifier,
+    className: String,
+    resources: Seq[(String, String)])
 
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index 87f4d1b..aae7595 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -25,10 +25,10 @@ package org.apache.spark.sql.catalyst
  * Format (quoted): "`name`" or "`db`.`name`"
  */
 sealed trait IdentifierWithDatabase {
-  val name: String
+  val identifier: String
   def database: Option[String]
-  def quotedString: String = database.map(db => 
s"`$db`.`$name`").getOrElse(s"`$name`")
-  def unquotedString: String = database.map(db => s"$db.$name").getOrElse(name)
+  def quotedString: String = database.map(db => 
s"`$db`.`$identifier`").getOrElse(s"`$identifier`")
+  def unquotedString: String = database.map(db => 
s"$db.$identifier").getOrElse(identifier)
   override def toString: String = quotedString
 }
 
@@ -36,13 +36,15 @@ sealed trait IdentifierWithDatabase {
 /**
  * Identifies a table in a database.
  * If `database` is not defined, the current database is used.
+ * When we register a permenent function in the FunctionRegistry, we use
+ * unquotedString as the function name.
  */
 case class TableIdentifier(table: String, database: Option[String])
   extends IdentifierWithDatabase {
 
-  override val name: String = table
+  override val identifier: String = table
 
-  def this(name: String) = this(name, None)
+  def this(table: String) = this(table, None)
 
 }
 
@@ -58,9 +60,9 @@ object TableIdentifier {
 case class FunctionIdentifier(funcName: String, database: Option[String])
   extends IdentifierWithDatabase {
 
-  override val name: String = funcName
+  override val identifier: String = funcName
 
-  def this(name: String) = this(name, None)
+  def this(funcName: String) = this(funcName, None)
 }
 
 object FunctionIdentifier {

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 14c9091..5a3aebf 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -549,8 +549,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
Logging {
         Explode(expressions.head)
       case "json_tuple" =>
         JsonTuple(expressions)
-      case other =>
-        withGenerator(other, expressions, ctx)
+      case name =>
+        UnresolvedGenerator(name, expressions)
     }
 
     Generate(
@@ -563,16 +563,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
Logging {
   }
 
   /**
-   * Create a [[Generator]]. Override this method in order to support custom 
Generators.
-   */
-  protected def withGenerator(
-      name: String,
-      expressions: Seq[Expression],
-      ctx: LateralViewContext): Generator = {
-    throw new ParseException(s"Generator function '$name' is not supported", 
ctx)
-  }
-
-  /**
    * Create a joins between two or more logical plans.
    */
   override def visitJoinRelation(ctx: JoinRelationContext): LogicalPlan = 
withOrigin(ctx) {

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
index 3ec95ef..b1fcf01 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala
@@ -32,7 +32,7 @@ trait AnalysisTest extends PlanTest {
     val conf = new SimpleCatalystConf(caseSensitive)
     val catalog = new SessionCatalog(new InMemoryCatalog, 
EmptyFunctionRegistry, conf)
     catalog.createTempTable("TaBlE", TestRelations.testRelation, 
overrideIfExists = true)
-    new Analyzer(catalog, EmptyFunctionRegistry, conf) {
+    new Analyzer(catalog, conf) {
       override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
index 1a350bf..b3b1f5b 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.types._
 class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter {
   private val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true)
   private val catalog = new SessionCatalog(new InMemoryCatalog, 
EmptyFunctionRegistry, conf)
-  private val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
+  private val analyzer = new Analyzer(catalog, conf)
 
   private val relation = LocalRelation(
     AttributeReference("i", IntegerType)(),

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index 959bd56..fbcac09 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -433,7 +433,8 @@ abstract class CatalogTestCases extends SparkFunSuite with 
BeforeAndAfterEach {
   test("get function") {
     val catalog = newBasicCatalog()
     assert(catalog.getFunction("db2", "func1") ==
-      CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass))
+      CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
+        Seq.empty[(String, String)]))
     intercept[AnalysisException] {
       catalog.getFunction("db2", "does_not_exist")
     }
@@ -464,21 +465,6 @@ abstract class CatalogTestCases extends SparkFunSuite with 
BeforeAndAfterEach {
     }
   }
 
-  test("alter function") {
-    val catalog = newBasicCatalog()
-    assert(catalog.getFunction("db2", "func1").className == funcClass)
-    catalog.alterFunction("db2", newFunc("func1").copy(className = "muhaha"))
-    assert(catalog.getFunction("db2", "func1").className == "muhaha")
-    intercept[AnalysisException] { catalog.alterFunction("db2", 
newFunc("funcky")) }
-  }
-
-  test("alter function when database does not exist") {
-    val catalog = newBasicCatalog()
-    intercept[AnalysisException] {
-      catalog.alterFunction("does_not_exist", newFunc())
-    }
-  }
-
   test("list functions") {
     val catalog = newBasicCatalog()
     catalog.createFunction("db2", newFunc("func2"))
@@ -557,7 +543,7 @@ abstract class CatalogTestUtils {
   }
 
   def newFunc(name: String, database: Option[String] = None): CatalogFunction 
= {
-    CatalogFunction(FunctionIdentifier(name, database), funcClass)
+    CatalogFunction(FunctionIdentifier(name, database), funcClass, 
Seq.empty[(String, String)])
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index acd9759..4d56d00 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, 
Literal}
 import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias}
 
 
@@ -685,19 +685,26 @@ class SessionCatalogSuite extends SparkFunSuite {
     val catalog = new SessionCatalog(newBasicCatalog())
     val tempFunc1 = (e: Seq[Expression]) => e.head
     val tempFunc2 = (e: Seq[Expression]) => e.last
-    catalog.createTempFunction("temp1", tempFunc1, ignoreIfExists = false)
-    catalog.createTempFunction("temp2", tempFunc2, ignoreIfExists = false)
-    assert(catalog.getTempFunction("temp1") == Some(tempFunc1))
-    assert(catalog.getTempFunction("temp2") == Some(tempFunc2))
-    assert(catalog.getTempFunction("temp3") == None)
+    val info1 = new ExpressionInfo("tempFunc1", "temp1")
+    val info2 = new ExpressionInfo("tempFunc2", "temp2")
+    catalog.createTempFunction("temp1", info1, tempFunc1, ignoreIfExists = 
false)
+    catalog.createTempFunction("temp2", info2, tempFunc2, ignoreIfExists = 
false)
+    val arguments = Seq(Literal(1), Literal(2), Literal(3))
+    assert(catalog.lookupFunction("temp1", arguments) === Literal(1))
+    assert(catalog.lookupFunction("temp2", arguments) === Literal(3))
+    // Temporary function does not exist.
+    intercept[AnalysisException] {
+      catalog.lookupFunction("temp3", arguments)
+    }
     val tempFunc3 = (e: Seq[Expression]) => Literal(e.size)
+    val info3 = new ExpressionInfo("tempFunc3", "temp1")
     // Temporary function already exists
     intercept[AnalysisException] {
-      catalog.createTempFunction("temp1", tempFunc3, ignoreIfExists = false)
+      catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = 
false)
     }
     // Temporary function is overridden
-    catalog.createTempFunction("temp1", tempFunc3, ignoreIfExists = true)
-    assert(catalog.getTempFunction("temp1") == Some(tempFunc3))
+    catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = 
true)
+    assert(catalog.lookupFunction("temp1", arguments) === 
Literal(arguments.length))
   }
 
   test("drop function") {
@@ -726,11 +733,15 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("drop temp function") {
     val catalog = new SessionCatalog(newBasicCatalog())
+    val info = new ExpressionInfo("tempFunc", "func1")
     val tempFunc = (e: Seq[Expression]) => e.head
-    catalog.createTempFunction("func1", tempFunc, ignoreIfExists = false)
-    assert(catalog.getTempFunction("func1") == Some(tempFunc))
+    catalog.createTempFunction("func1", info, tempFunc, ignoreIfExists = false)
+    val arguments = Seq(Literal(1), Literal(2), Literal(3))
+    assert(catalog.lookupFunction("func1", arguments) === Literal(1))
     catalog.dropTempFunction("func1", ignoreIfNotExists = false)
-    assert(catalog.getTempFunction("func1") == None)
+    intercept[AnalysisException] {
+      catalog.lookupFunction("func1", arguments)
+    }
     intercept[AnalysisException] {
       catalog.dropTempFunction("func1", ignoreIfNotExists = false)
     }
@@ -739,7 +750,9 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("get function") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    val expected = CatalogFunction(FunctionIdentifier("func1", Some("db2")), 
funcClass)
+    val expected =
+      CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass,
+      Seq.empty[(String, String)])
     assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == 
expected)
     // Get function without explicitly specifying database
     catalog.setCurrentDatabase("db2")
@@ -758,8 +771,9 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("lookup temp function") {
     val catalog = new SessionCatalog(newBasicCatalog())
+    val info1 = new ExpressionInfo("tempFunc1", "func1")
     val tempFunc1 = (e: Seq[Expression]) => e.head
-    catalog.createTempFunction("func1", tempFunc1, ignoreIfExists = false)
+    catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = 
false)
     assert(catalog.lookupFunction("func1", Seq(Literal(1), Literal(2), 
Literal(3))) == Literal(1))
     catalog.dropTempFunction("func1", ignoreIfNotExists = false)
     intercept[AnalysisException] {
@@ -767,98 +781,16 @@ class SessionCatalogSuite extends SparkFunSuite {
     }
   }
 
-  test("rename function") {
-    val externalCatalog = newBasicCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    val newName = "funcky"
-    assert(sessionCatalog.getFunction(
-      FunctionIdentifier("func1", Some("db2"))) == newFunc("func1", 
Some("db2")))
-    assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
-    sessionCatalog.renameFunction(
-      FunctionIdentifier("func1", Some("db2")), FunctionIdentifier(newName, 
Some("db2")))
-    assert(sessionCatalog.getFunction(
-      FunctionIdentifier(newName, Some("db2"))) == newFunc(newName, 
Some("db2")))
-    assert(externalCatalog.listFunctions("db2", "*").toSet == Set(newName))
-    // Rename function without explicitly specifying database
-    sessionCatalog.setCurrentDatabase("db2")
-    sessionCatalog.renameFunction(FunctionIdentifier(newName), 
FunctionIdentifier("func1"))
-    assert(sessionCatalog.getFunction(
-      FunctionIdentifier("func1")) == newFunc("func1", Some("db2")))
-    assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1"))
-    // Renaming "db2.func1" to "db1.func2" should fail because databases don't 
match
-    intercept[AnalysisException] {
-      sessionCatalog.renameFunction(
-        FunctionIdentifier("func1", Some("db2")), FunctionIdentifier("func2", 
Some("db1")))
-    }
-  }
-
-  test("rename function when database/function does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[AnalysisException] {
-      catalog.renameFunction(
-        FunctionIdentifier("func1", Some("does_not_exist")),
-        FunctionIdentifier("func5", Some("does_not_exist")))
-    }
-    intercept[AnalysisException] {
-      catalog.renameFunction(
-        FunctionIdentifier("does_not_exist", Some("db2")),
-        FunctionIdentifier("x", Some("db2")))
-    }
-  }
-
-  test("rename temp function") {
-    val externalCatalog = newBasicCatalog()
-    val sessionCatalog = new SessionCatalog(externalCatalog)
-    val tempFunc = (e: Seq[Expression]) => e.head
-    sessionCatalog.createTempFunction("func1", tempFunc, ignoreIfExists = 
false)
-    sessionCatalog.setCurrentDatabase("db2")
-    // If a database is specified, we'll always rename the function in that 
database
-    sessionCatalog.renameFunction(
-      FunctionIdentifier("func1", Some("db2")), FunctionIdentifier("func3", 
Some("db2")))
-    assert(sessionCatalog.getTempFunction("func1") == Some(tempFunc))
-    assert(sessionCatalog.getTempFunction("func3") == None)
-    assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func3"))
-    // If no database is specified, we'll first rename temporary functions
-    sessionCatalog.createFunction(newFunc("func1", Some("db2")))
-    sessionCatalog.renameFunction(FunctionIdentifier("func1"), 
FunctionIdentifier("func4"))
-    assert(sessionCatalog.getTempFunction("func4") == Some(tempFunc))
-    assert(sessionCatalog.getTempFunction("func1") == None)
-    assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1", 
"func3"))
-    // Then, if no such temporary function exist, rename the function in the 
current database
-    sessionCatalog.renameFunction(FunctionIdentifier("func1"), 
FunctionIdentifier("func5"))
-    assert(sessionCatalog.getTempFunction("func5") == None)
-    assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func3", 
"func5"))
-  }
-
-  test("alter function") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    assert(catalog.getFunction(FunctionIdentifier("func1", 
Some("db2"))).className == funcClass)
-    catalog.alterFunction(newFunc("func1", Some("db2")).copy(className = 
"muhaha"))
-    assert(catalog.getFunction(FunctionIdentifier("func1", 
Some("db2"))).className == "muhaha")
-    // Alter function without explicitly specifying database
-    catalog.setCurrentDatabase("db2")
-    catalog.alterFunction(newFunc("func1").copy(className = "derpy"))
-    assert(catalog.getFunction(FunctionIdentifier("func1")).className == 
"derpy")
-  }
-
-  test("alter function when database/function does not exist") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    intercept[AnalysisException] {
-      catalog.alterFunction(newFunc("func5", Some("does_not_exist")))
-    }
-    intercept[AnalysisException] {
-      catalog.alterFunction(newFunc("funcky", Some("db2")))
-    }
-  }
-
   test("list functions") {
     val catalog = new SessionCatalog(newBasicCatalog())
+    val info1 = new ExpressionInfo("tempFunc1", "func1")
+    val info2 = new ExpressionInfo("tempFunc2", "yes_me")
     val tempFunc1 = (e: Seq[Expression]) => e.head
     val tempFunc2 = (e: Seq[Expression]) => e.last
     catalog.createFunction(newFunc("func2", Some("db2")))
     catalog.createFunction(newFunc("not_me", Some("db2")))
-    catalog.createTempFunction("func1", tempFunc1, ignoreIfExists = false)
-    catalog.createTempFunction("yes_me", tempFunc2, ignoreIfExists = false)
+    catalog.createTempFunction("func1", info1, tempFunc1, ignoreIfExists = 
false)
+    catalog.createTempFunction("yes_me", info2, tempFunc2, ignoreIfExists = 
false)
     assert(catalog.listFunctions("db1", "*").toSet ==
       Set(FunctionIdentifier("func1"),
         FunctionIdentifier("yes_me")))

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
index dd6b5ca..8147d06 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala
@@ -141,7 +141,6 @@ class BooleanSimplificationSuite extends PlanTest with 
PredicateHelper {
   private val caseInsensitiveConf = new SimpleCatalystConf(false)
   private val caseInsensitiveAnalyzer = new Analyzer(
     new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, 
caseInsensitiveConf),
-    EmptyFunctionRegistry,
     caseInsensitiveConf)
 
   test("(a && b) || (a && c) => a && (b || c) when case insensitive") {

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index 009889d..8c92ad8 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules._
 class EliminateSortsSuite extends PlanTest {
   val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, 
orderByOrdinal = false)
   val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, 
conf)
-  val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)
+  val analyzer = new Analyzer(catalog, conf)
 
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 9e1660d..262537d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.catalyst.parser
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -296,10 +297,18 @@ class PlanParserSuite extends PlanTest {
         .insertInto("t2"),
         from.where('s < 10).select(star()).insertInto("t3")))
 
-    // Unsupported generator.
-    intercept(
+    // Unresolved generator.
+    val expected = table("t")
+      .generate(
+        UnresolvedGenerator("posexplode", Seq('x)),
+        join = true,
+        outer = false,
+        Some("posexpl"),
+        Seq("x", "y"))
+      .select(star())
+    assertEqual(
       "select * from t lateral view posexplode(x) posexpl as x, y",
-      "Generator function 'posexplode' is not supported")
+      expected)
   }
 
   test("joins") {

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index d4290fe..587ba1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.encoderFor
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, 
LogicalPlan, Range}
@@ -208,6 +208,22 @@ class SQLContext private[sql](
     sparkContext.addJar(path)
   }
 
+  /** A [[FunctionResourceLoader]] that can be used in SessionCatalog. */
+  @transient protected[sql] lazy val functionResourceLoader: 
FunctionResourceLoader = {
+    new FunctionResourceLoader {
+      override def loadResource(resource: FunctionResource): Unit = {
+        resource.resourceType match {
+          case JarResource => addJar(resource.uri)
+          case FileResource => sparkContext.addFile(resource.uri)
+          case ArchiveResource =>
+            throw new AnalysisException(
+              "Archive is not allowed to be loaded. If YARN mode is used, " +
+                "please use --archives options while calling spark-submit.")
+        }
+      }
+    }
+  }
+
   /**
    * :: Experimental ::
    * A collection of methods that are considered experimental, but can be used 
to hook into

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index fb106d1..382cc61 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -337,10 +337,9 @@ class SparkSqlAstBuilder extends AstBuilder {
     CreateFunction(
       database,
       function,
-      string(ctx.className), // TODO this is not an alias.
+      string(ctx.className),
       resources,
-      ctx.TEMPORARY != null)(
-      command(ctx))
+      ctx.TEMPORARY != null)
   }
 
   /**
@@ -353,7 +352,7 @@ class SparkSqlAstBuilder extends AstBuilder {
    */
   override def visitDropFunction(ctx: DropFunctionContext): LogicalPlan = 
withOrigin(ctx) {
     val (database, function) = visitFunctionName(ctx.qualifiedName)
-    DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != 
null)(command(ctx))
+    DropFunction(database, function, ctx.EXISTS != null, ctx.TEMPORARY != null)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index a4be3bc..faa7a2c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -426,8 +426,12 @@ case class ShowTablePropertiesCommand(
  * A command for users to list all of the registered functions.
  * The syntax of using this command in SQL is:
  * {{{
- *    SHOW FUNCTIONS
+ *    SHOW FUNCTIONS [LIKE pattern]
  * }}}
+ * For the pattern, '*' matches any sequence of characters (including no 
characters) and
+ * '|' is for alternation.
+ * For example, "show functions like 'yea*|windo*'" will return "window" and 
"year".
+ *
  * TODO currently we are simply ignore the db
  */
 case class ShowFunctions(db: Option[String], pattern: Option[String]) extends 
RunnableCommand {
@@ -438,18 +442,17 @@ case class ShowFunctions(db: Option[String], pattern: 
Option[String]) extends Ru
     schema.toAttributes
   }
 
-  override def run(sqlContext: SQLContext): Seq[Row] = pattern match {
-    case Some(p) =>
-      try {
-        val regex = java.util.regex.Pattern.compile(p)
-        sqlContext.sessionState.functionRegistry.listFunction()
-          .filter(regex.matcher(_).matches()).map(Row(_))
-      } catch {
-        // probably will failed in the regex that user provided, then returns 
empty row.
-        case _: Throwable => Seq.empty[Row]
-      }
-    case None =>
-      sqlContext.sessionState.functionRegistry.listFunction().map(Row(_))
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val dbName = 
db.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
+    // If pattern is not specified, we use '*', which is used to
+    // match any sequence of characters (including no characters).
+    val functionNames =
+      sqlContext.sessionState.catalog
+        .listFunctions(dbName, pattern.getOrElse("*"))
+        .map(_.unquotedString)
+    // The session catalog caches some persistent functions in the 
FunctionRegistry
+    // so there can be duplicates.
+    functionNames.distinct.sorted.map(Row(_))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index cd7e0ee..6896881 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -175,26 +175,6 @@ case class DescribeDatabase(
   }
 }
 
-case class CreateFunction(
-    databaseName: Option[String],
-    functionName: String,
-    alias: String,
-    resources: Seq[(String, String)],
-    isTemp: Boolean)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
-
-/**
- * The DDL command that drops a function.
- * ifExists: returns an error if the function doesn't exist, unless this is 
true.
- * isTemp: indicates if it is a temporary function.
- */
-case class DropFunction(
-    databaseName: Option[String],
-    functionName: String,
-    ifExists: Boolean,
-    isTemp: Boolean)(sql: String)
-  extends NativeDDLCommand(sql) with Logging
-
 /** Rename in ALTER TABLE/VIEW: change the name of a table/view to a different 
name. */
 case class AlterTableRename(
     oldName: TableIdentifier,

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
new file mode 100644
index 0000000..66d17e3
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogFunction
+import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
+
+
+/**
+ * The DDL command that creates a function.
+ * To create a temporary function, the syntax of using this command in SQL is:
+ * {{{
+ *    CREATE TEMPORARY FUNCTION functionName
+ *    AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
+ * }}}
+ *
+ * To create a permanent function, the syntax in SQL is:
+ * {{{
+ *    CREATE FUNCTION [databaseName.]functionName
+ *    AS className [USING JAR\FILE 'uri' [, JAR|FILE 'uri']]
+ * }}}
+ */
+// TODO: Use Seq[FunctionResource] instead of Seq[(String, String)] for 
resources.
+case class CreateFunction(
+    databaseName: Option[String],
+    functionName: String,
+    className: String,
+    resources: Seq[(String, String)],
+    isTemp: Boolean)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    if (isTemp) {
+      if (databaseName.isDefined) {
+        throw new AnalysisException(
+          s"It is not allowed to provide database name when defining a 
temporary function. " +
+            s"However, database name ${databaseName.get} is provided.")
+      }
+      // We first load resources and then put the builder in the function 
registry.
+      // Please note that it is allowed to overwrite an existing temp function.
+      sqlContext.sessionState.catalog.loadFunctionResources(resources)
+      val info = new ExpressionInfo(className, functionName)
+      val builder =
+        sqlContext.sessionState.catalog.makeFunctionBuilder(functionName, 
className)
+      sqlContext.sessionState.catalog.createTempFunction(
+        functionName, info, builder, ignoreIfExists = false)
+    } else {
+      // For a permanent, we will store the metadata into underlying external 
catalog.
+      // This function will be loaded into the FunctionRegistry when a query 
uses it.
+      // We do not load it into FunctionRegistry right now.
+      val dbName = 
databaseName.getOrElse(sqlContext.sessionState.catalog.getCurrentDatabase)
+      val func = FunctionIdentifier(functionName, Some(dbName))
+      val catalogFunc = CatalogFunction(func, className, resources)
+      if (sqlContext.sessionState.catalog.functionExists(func)) {
+        throw new AnalysisException(
+          s"Function '$functionName' already exists in database '$dbName'.")
+      }
+      sqlContext.sessionState.catalog.createFunction(catalogFunc)
+    }
+    Seq.empty[Row]
+  }
+}
+
+/**
+ * The DDL command that drops a function.
+ * ifExists: returns an error if the function doesn't exist, unless this is 
true.
+ * isTemp: indicates if it is a temporary function.
+ */
+case class DropFunction(
+    databaseName: Option[String],
+    functionName: String,
+    ifExists: Boolean,
+    isTemp: Boolean)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val catalog = sqlContext.sessionState.catalog
+    if (isTemp) {
+      if (databaseName.isDefined) {
+        throw new AnalysisException(
+          s"It is not allowed to provide database name when dropping a 
temporary function. " +
+            s"However, database name ${databaseName.get} is provided.")
+      }
+      catalog.dropTempFunction(functionName, ifExists)
+    } else {
+      // We are dropping a permanent function.
+      val dbName = databaseName.getOrElse(catalog.getCurrentDatabase)
+      val func = FunctionIdentifier(functionName, Some(dbName))
+      if (!ifExists && !catalog.functionExists(func)) {
+        throw new AnalysisException(
+          s"Function '$functionName' does not exist in database '$dbName'.")
+      }
+      catalog.dropFunction(func)
+    }
+    Seq.empty[Row]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index cd29def..69e3358 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -51,7 +51,12 @@ private[sql] class SessionState(ctx: SQLContext) {
   /**
    * Internal catalog for managing table and database states.
    */
-  lazy val catalog = new SessionCatalog(ctx.externalCatalog, functionRegistry, 
conf)
+  lazy val catalog =
+    new SessionCatalog(
+      ctx.externalCatalog,
+      ctx.functionResourceLoader,
+      functionRegistry,
+      conf)
 
   /**
    * Interface exposed to the user for registering user-defined functions.
@@ -62,7 +67,7 @@ private[sql] class SessionState(ctx: SQLContext) {
    * Logical query plan analyzer for resolving unresolved attributes and 
relations.
    */
   lazy val analyzer: Analyzer = {
-    new Analyzer(catalog, functionRegistry, conf) {
+    new Analyzer(catalog, conf) {
       override val extendedResolutionRules =
         PreInsertCastAndRename ::
         DataSourceAnalysis ::
@@ -98,5 +103,5 @@ private[sql] class SessionState(ctx: SQLContext) {
    * Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s.
    */
   lazy val continuousQueryManager: ContinuousQueryManager = new 
ContinuousQueryManager(ctx)
-
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index b727e88..5a851b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -61,8 +61,12 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
         .filter(regex.matcher(_).matches()).map(Row(_))
     }
     checkAnswer(sql("SHOW functions"), getFunctions(".*"))
-    Seq("^c.*", ".*e$", "log.*", ".*date.*").foreach { pattern =>
-      checkAnswer(sql(s"SHOW FUNCTIONS '$pattern'"), getFunctions(pattern))
+    Seq("^c*", "*e$", "log*", "*date*").foreach { pattern =>
+      // For the pattern part, only '*' and '|' are allowed as wildcards.
+      // For '*', we need to replace it to '.*'.
+      checkAnswer(
+        sql(s"SHOW FUNCTIONS '$pattern'"),
+        getFunctions(pattern.replaceAll("\\*", ".*")))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index fd73671..ec95033 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -83,7 +83,8 @@ class UDFSuite extends QueryTest with SharedSQLContext {
     val e = intercept[AnalysisException] {
       df.selectExpr("a_function_that_does_not_exist()")
     }
-    assert(e.getMessage.contains("undefined function"))
+    assert(e.getMessage.contains("Undefined function"))
+    assert(e.getMessage.contains("a_function_that_does_not_exist"))
   }
 
   test("Simple UDF") {

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 47e295a..c42e8e7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -147,13 +147,13 @@ class DDLCommandSuite extends PlanTest {
       "helloworld",
       "com.matthewrathbone.example.SimpleUDFExample",
       Seq(("jar", "/path/to/jar1"), ("jar", "/path/to/jar2")),
-      isTemp = true)(sql1)
+      isTemp = true)
     val expected2 = CreateFunction(
       Some("hello"),
       "world",
       "com.matthewrathbone.example.SimpleUDFExample",
       Seq(("archive", "/path/to/archive"), ("file", "/path/to/file")),
-      isTemp = false)(sql2)
+      isTemp = false)
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)
   }
@@ -173,22 +173,22 @@ class DDLCommandSuite extends PlanTest {
       None,
       "helloworld",
       ifExists = false,
-      isTemp = true)(sql1)
+      isTemp = true)
     val expected2 = DropFunction(
       None,
       "helloworld",
       ifExists = true,
-      isTemp = true)(sql2)
+      isTemp = true)
     val expected3 = DropFunction(
       Some("hello"),
       "world",
       ifExists = false,
-      isTemp = false)(sql3)
+      isTemp = false)
     val expected4 = DropFunction(
       Some("hello"),
       "world",
       ifExists = true,
-      isTemp = false)(sql4)
+      isTemp = false)
 
     comparePlans(parsed1, expected1)
     comparePlans(parsed2, expected2)

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
index 80a85a6..7844d1b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala
@@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.Filter
@@ -132,6 +133,27 @@ private[sql] trait SQLTestUtils
   }
 
   /**
+   * Drops functions after calling `f`. A function is represented by 
(functionName, isTemporary).
+   */
+  protected def withUserDefinedFunction(functions: (String, Boolean)*)(f: => 
Unit): Unit = {
+    try {
+      f
+    } catch {
+      case cause: Throwable => throw cause
+    } finally {
+      // If the test failed part way, we don't want to mask the failure by 
failing to remove
+      // temp tables that never got created.
+      try functions.foreach { case (functionName, isTemporary) =>
+        val withTemporary = if (isTemporary) "TEMPORARY" else ""
+        sqlContext.sql(s"DROP $withTemporary FUNCTION IF EXISTS $functionName")
+        assert(
+          
!sqlContext.sessionState.catalog.functionExists(FunctionIdentifier(functionName)),
+          s"Function $functionName should have been dropped. But, it still 
exists.")
+      }
+    }
+  }
+
+  /**
    * Drops temporary table `tableName` after calling `f`.
    */
   protected def withTempTable(tableNames: String*)(f: => Unit): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 2c7358e..a1268b8 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -491,46 +491,50 @@ class HiveThriftBinaryServerSuite extends 
HiveThriftJdbcTest {
 
   test("SPARK-11595 ADD JAR with input path having URL scheme") {
     withJdbcStatement { statement =>
-      val jarPath = "../hive/src/test/resources/TestUDTF.jar"
-      val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
+      try {
+        val jarPath = "../hive/src/test/resources/TestUDTF.jar"
+        val jarURL = s"file://${System.getProperty("user.dir")}/$jarPath"
 
-      Seq(
-        s"ADD JAR $jarURL",
-        s"""CREATE TEMPORARY FUNCTION udtf_count2
-           |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
-         """.stripMargin
-      ).foreach(statement.execute)
+        Seq(
+          s"ADD JAR $jarURL",
+          s"""CREATE TEMPORARY FUNCTION udtf_count2
+             |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
+           """.stripMargin
+        ).foreach(statement.execute)
 
-      val rs1 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
+        val rs1 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
 
-      assert(rs1.next())
-      assert(rs1.getString(1) === "Function: udtf_count2")
+        assert(rs1.next())
+        assert(rs1.getString(1) === "Function: udtf_count2")
 
-      assert(rs1.next())
-      assertResult("Class: 
org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
-        rs1.getString(1)
-      }
+        assert(rs1.next())
+        assertResult("Class: 
org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
+          rs1.getString(1)
+        }
 
-      assert(rs1.next())
-      assert(rs1.getString(1) === "Usage: To be added.")
+        assert(rs1.next())
+        assert(rs1.getString(1) === "Usage: To be added.")
 
-      val dataPath = "../hive/src/test/resources/data/files/kv1.txt"
+        val dataPath = "../hive/src/test/resources/data/files/kv1.txt"
 
-      Seq(
-        s"CREATE TABLE test_udtf(key INT, value STRING)",
-        s"LOAD DATA LOCAL INPATH '$dataPath' OVERWRITE INTO TABLE test_udtf"
-      ).foreach(statement.execute)
+        Seq(
+          s"CREATE TABLE test_udtf(key INT, value STRING)",
+          s"LOAD DATA LOCAL INPATH '$dataPath' OVERWRITE INTO TABLE test_udtf"
+        ).foreach(statement.execute)
 
-      val rs2 = statement.executeQuery(
-        "SELECT key, cc FROM test_udtf LATERAL VIEW udtf_count2(value) dd AS 
cc")
+        val rs2 = statement.executeQuery(
+          "SELECT key, cc FROM test_udtf LATERAL VIEW udtf_count2(value) dd AS 
cc")
 
-      assert(rs2.next())
-      assert(rs2.getInt(1) === 97)
-      assert(rs2.getInt(2) === 500)
+        assert(rs2.next())
+        assert(rs2.getInt(1) === 97)
+        assert(rs2.getInt(2) === 500)
 
-      assert(rs2.next())
-      assert(rs2.getInt(1) === 97)
-      assert(rs2.getInt(2) === 500)
+        assert(rs2.next())
+        assert(rs2.getInt(1) === 97)
+        assert(rs2.getInt(2) === 500)
+      } finally {
+        statement.executeQuery("DROP TEMPORARY FUNCTION udtf_count2")
+      }
     }
   }
 
@@ -565,24 +569,28 @@ class SingleSessionSuite extends HiveThriftJdbcTest {
       },
 
       { statement =>
-        val rs1 = statement.executeQuery("SET foo")
+        try {
+          val rs1 = statement.executeQuery("SET foo")
 
-        assert(rs1.next())
-        assert(rs1.getString(1) === "foo")
-        assert(rs1.getString(2) === "bar")
+          assert(rs1.next())
+          assert(rs1.getString(1) === "foo")
+          assert(rs1.getString(2) === "bar")
 
-        val rs2 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
+          val rs2 = statement.executeQuery("DESCRIBE FUNCTION udtf_count2")
 
-        assert(rs2.next())
-        assert(rs2.getString(1) === "Function: udtf_count2")
+          assert(rs2.next())
+          assert(rs2.getString(1) === "Function: udtf_count2")
 
-        assert(rs2.next())
-        assertResult("Class: 
org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
-          rs2.getString(1)
-        }
+          assert(rs2.next())
+          assertResult("Class: 
org.apache.spark.sql.hive.execution.GenericUDTFCount2") {
+            rs2.getString(1)
+          }
 
-        assert(rs2.next())
-        assert(rs2.getString(1) === "Usage: To be added.")
+          assert(rs2.next())
+          assert(rs2.getString(1) === "Usage: To be added.")
+        } finally {
+          statement.executeQuery("DROP TEMPORARY FUNCTION udtf_count2")
+        }
       }
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 11205ae..98a5998 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -272,7 +272,12 @@ private[spark] class HiveExternalCatalog(client: 
HiveClient) extends ExternalCat
   override def createFunction(
       db: String,
       funcDefinition: CatalogFunction): Unit = withClient {
-    client.createFunction(db, funcDefinition)
+    // Hive's metastore is case insensitive. However, Hive's createFunction 
does
+    // not normalize the function name (unlike the getFunction part). So,
+    // we are normalizing the function name.
+    val functionName = funcDefinition.identifier.funcName.toLowerCase
+    val functionIdentifier = funcDefinition.identifier.copy(funcName = 
functionName)
+    client.createFunction(db, funcDefinition.copy(identifier = 
functionIdentifier))
   }
 
   override def dropFunction(db: String, name: String): Unit = withClient {
@@ -283,10 +288,6 @@ private[spark] class HiveExternalCatalog(client: 
HiveClient) extends ExternalCat
     client.renameFunction(db, oldName, newName)
   }
 
-  override def alterFunction(db: String, funcDefinition: CatalogFunction): 
Unit = withClient {
-    client.alterFunction(db, funcDefinition)
-  }
-
   override def getFunction(db: String, funcName: String): CatalogFunction = 
withClient {
     client.getFunction(db, funcName)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index dfbf22c..d315f39 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -17,27 +17,39 @@
 
 package org.apache.spark.sql.hive
 
+import scala.util.{Failure, Success, Try}
+import scala.util.control.NonFatal
+
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
+import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => 
HiveFunctionRegistry}
+import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, 
GenericUDF, GenericUDTF}
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
+import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, 
SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.execution.datasources.BucketSpec
+import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
 import org.apache.spark.sql.hive.client.HiveClient
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
 
 
-class HiveSessionCatalog(
+private[sql] class HiveSessionCatalog(
     externalCatalog: HiveExternalCatalog,
     client: HiveClient,
     context: HiveContext,
+    functionResourceLoader: FunctionResourceLoader,
     functionRegistry: FunctionRegistry,
     conf: SQLConf)
-  extends SessionCatalog(externalCatalog, functionRegistry, conf) {
+  extends SessionCatalog(externalCatalog, functionResourceLoader, 
functionRegistry, conf) {
 
   override def setCurrentDatabase(db: String): Unit = {
     super.setCurrentDatabase(db)
@@ -112,4 +124,129 @@ class HiveSessionCatalog(
     metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
   }
 
+  override def makeFunctionBuilder(funcName: String, className: String): 
FunctionBuilder = {
+    makeFunctionBuilder(funcName, Utils.classForName(className))
+  }
+
+  /**
+   * Construct a [[FunctionBuilder]] based on the provided class that 
represents a function.
+   */
+  private def makeFunctionBuilder(name: String, clazz: Class[_]): 
FunctionBuilder = {
+    // When we instantiate hive UDF wrapper class, we may throw exception if 
the input
+    // expressions don't satisfy the hive UDF, such as type mismatch, input 
number
+    // mismatch, etc. Here we catch the exception and throw AnalysisException 
instead.
+    (children: Seq[Expression]) => {
+      try {
+        if (classOf[UDF].isAssignableFrom(clazz)) {
+          val udf = HiveSimpleUDF(name, new 
HiveFunctionWrapper(clazz.getName), children)
+          udf.dataType // Force it to check input data types.
+          udf
+        } else if (classOf[GenericUDF].isAssignableFrom(clazz)) {
+          val udf = HiveGenericUDF(name, new 
HiveFunctionWrapper(clazz.getName), children)
+          udf.dataType // Force it to check input data types.
+          udf
+        } else if 
(classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) {
+          val udaf = HiveUDAFFunction(name, new 
HiveFunctionWrapper(clazz.getName), children)
+          udaf.dataType // Force it to check input data types.
+          udaf
+        } else if (classOf[UDAF].isAssignableFrom(clazz)) {
+          val udaf = HiveUDAFFunction(
+            name,
+            new HiveFunctionWrapper(clazz.getName),
+            children,
+            isUDAFBridgeRequired = true)
+          udaf.dataType  // Force it to check input data types.
+          udaf
+        } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) {
+          val udtf = HiveGenericUDTF(name, new 
HiveFunctionWrapper(clazz.getName), children)
+          udtf.elementTypes // Force it to check input data types.
+          udtf
+        } else {
+          throw new AnalysisException(s"No handler for Hive UDF 
'${clazz.getCanonicalName}'")
+        }
+      } catch {
+        case ae: AnalysisException =>
+          throw ae
+        case NonFatal(e) =>
+          val analysisException =
+            new AnalysisException(s"No handler for Hive UDF 
'${clazz.getCanonicalName}': $e")
+          analysisException.setStackTrace(e.getStackTrace)
+          throw analysisException
+      }
+    }
+  }
+
+  // We have a list of Hive built-in functions that we do not support. So, we 
will check
+  // Hive's function registry and lazily load needed functions into our own 
function registry.
+  // Those Hive built-in functions are
+  // assert_true, collect_list, collect_set, compute_stats, context_ngrams, 
create_union,
+  // current_user ,elt, ewah_bitmap, ewah_bitmap_and, ewah_bitmap_empty, 
ewah_bitmap_or, field,
+  // histogram_numeric, in_file, index, inline, java_method, map_keys, 
map_values,
+  // matchpath, ngrams, noop, noopstreaming, noopwithmap, noopwithmapstreaming,
+  // parse_url, parse_url_tuple, percentile, percentile_approx, posexplode, 
reflect, reflect2,
+  // regexp, sentences, stack, std, str_to_map, windowingtablefunction, xpath, 
xpath_boolean,
+  // xpath_double, xpath_float, xpath_int, xpath_long, xpath_number,
+  // xpath_short, and xpath_string.
+  override def lookupFunction(name: String, children: Seq[Expression]): 
Expression = {
+    // TODO: Once lookupFunction accepts a FunctionIdentifier, we should 
refactor this method to
+    // if (super.functionExists(name)) {
+    //   super.lookupFunction(name, children)
+    // } else {
+    //   // This function is a Hive builtin function.
+    //   ...
+    // }
+    Try(super.lookupFunction(name, children)) match {
+      case Success(expr) => expr
+      case Failure(error) =>
+        if (functionRegistry.functionExists(name)) {
+          // If the function actually exists in functionRegistry, it means 
that there is an
+          // error when we create the Expression using the given children.
+          // We need to throw the original exception.
+          throw error
+        } else {
+          // This function is not in functionRegistry, let's try to load it as 
a Hive's
+          // built-in function.
+          // Hive is case insensitive.
+          val functionName = name.toLowerCase
+          // TODO: This may not really work for current_user because 
current_user is not evaluated
+          // with session info.
+          // We do not need to use executionHive at here because we only load
+          // Hive's builtin functions, which do not need current db.
+          val functionInfo = {
+            try {
+              
Option(HiveFunctionRegistry.getFunctionInfo(functionName)).getOrElse(
+                failFunctionLookup(name))
+            } catch {
+              // If HiveFunctionRegistry.getFunctionInfo throws an exception,
+              // we are failing to load a Hive builtin function, which means 
that
+              // the given function is not a Hive builtin function.
+              case NonFatal(e) => failFunctionLookup(name)
+            }
+          }
+          val className = functionInfo.getFunctionClass.getName
+          val builder = makeFunctionBuilder(functionName, className)
+          // Put this Hive built-in function to our function registry.
+          val info = new ExpressionInfo(className, functionName)
+          createTempFunction(functionName, info, builder, ignoreIfExists = 
false)
+          // Now, we need to create the Expression.
+          functionRegistry.lookupFunction(functionName, children)
+        }
+    }
+  }
+
+  // Pre-load a few commonly used Hive built-in functions.
+  HiveSessionCatalog.preloadedHiveBuiltinFunctions.foreach {
+    case (functionName, clazz) =>
+      val builder = makeFunctionBuilder(functionName, clazz)
+      val info = new ExpressionInfo(clazz.getCanonicalName, functionName)
+      createTempFunction(functionName, info, builder, ignoreIfExists = false)
+  }
+}
+
+private[sql] object HiveSessionCatalog {
+  // This is the list of Hive's built-in functions that are commonly used and 
we want to
+  // pre-load when we create the FunctionRegistry.
+  val preloadedHiveBuiltinFunctions =
+    ("collect_set", 
classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectSet]) ::
+    ("collect_list", 
classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDAFCollectList]) :: Nil
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/72544d6f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 829afa8..cff24e2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -36,25 +36,23 @@ private[hive] class HiveSessionState(ctx: HiveContext) 
extends SessionState(ctx)
   }
 
   /**
-   * Internal catalog for managing functions registered by the user.
-   * Note that HiveUDFs will be overridden by functions registered in this 
context.
-   */
-  override lazy val functionRegistry: FunctionRegistry = {
-    new HiveFunctionRegistry(FunctionRegistry.builtin.copy(), 
ctx.executionHive)
-  }
-
-  /**
    * Internal catalog for managing table and database states.
    */
   override lazy val catalog = {
-    new HiveSessionCatalog(ctx.hiveCatalog, ctx.metadataHive, ctx, 
functionRegistry, conf)
+    new HiveSessionCatalog(
+      ctx.hiveCatalog,
+      ctx.metadataHive,
+      ctx,
+      ctx.functionResourceLoader,
+      functionRegistry,
+      conf)
   }
 
   /**
    * An analyzer that uses the Hive metastore.
    */
   override lazy val analyzer: Analyzer = {
-    new Analyzer(catalog, functionRegistry, conf) {
+    new Analyzer(catalog, conf) {
       override val extendedResolutionRules =
         catalog.ParquetConversions ::
         catalog.OrcConversions ::


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to