Repository: spark
Updated Branches:
  refs/heads/master c4e19b381 -> d6cd3a18e


[SPARK-11599] [SQL] fix NPE when resolve Hive UDF in SQLParser

The DataFrame APIs that takes a SQL expression always use SQLParser, then the 
HiveFunctionRegistry will called outside of Hive state, cause NPE if there is 
not a active Session State for current thread (in PySpark).

cc rxin yhuai

Author: Davies Liu <dav...@databricks.com>

Closes #9576 from davies/hive_udf.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6cd3a18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6cd3a18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6cd3a18

Branch: refs/heads/master
Commit: d6cd3a18e720e8f6f1f307e0dffad3512952d997
Parents: c4e19b3
Author: Davies Liu <dav...@databricks.com>
Authored: Mon Nov 9 23:27:36 2015 -0800
Committer: Davies Liu <davies....@gmail.com>
Committed: Mon Nov 9 23:27:36 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/HiveContext.scala | 10 +++++-
 .../sql/hive/execution/HiveQuerySuite.scala     | 33 +++++++++++++++-----
 2 files changed, 34 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d6cd3a18/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 2d72b95..c5f6965 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -454,7 +454,15 @@ class HiveContext private[hive](
   // Note that HiveUDFs will be overridden by functions registered in this 
context.
   @transient
   override protected[sql] lazy val functionRegistry: FunctionRegistry =
-    new HiveFunctionRegistry(FunctionRegistry.builtin.copy())
+    new HiveFunctionRegistry(FunctionRegistry.builtin.copy()) {
+      override def lookupFunction(name: String, children: Seq[Expression]): 
Expression = {
+        // Hive Registry need current database to lookup function
+        // TODO: the current database of executionHive should be consistent 
with metadataHive
+        executionHive.withHiveState {
+          super.lookupFunction(name, children)
+        }
+      }
+    }
 
   // The Hive UDF current_database() is foldable, will be evaluated by 
optimizer, but the optimizer
   // can't access the SessionState of metadataHive.

http://git-wip-us.apache.org/repos/asf/spark/blob/d6cd3a18/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 78378c8..f0a7a6c 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -20,22 +20,19 @@ package org.apache.spark.sql.hive.execution
 import java.io.File
 import java.util.{Locale, TimeZone}
 
-import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin
-
 import scala.util.Try
 
-import org.scalatest.BeforeAndAfter
-
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SparkFiles, SparkException}
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.catalyst.expressions.Cast
 import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoin
 import org.apache.spark.sql.hive._
-import org.apache.spark.sql.hive.test.TestHiveContext
-import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
+import org.apache.spark.{SparkException, SparkFiles}
 
 case class TestData(a: Int, b: String)
 
@@ -1237,6 +1234,26 @@ class HiveQuerySuite extends HiveComparisonTest with 
BeforeAndAfter {
 
   }
 
+  test("lookup hive UDF in another thread") {
+    val e = intercept[AnalysisException] {
+      range(1).selectExpr("not_a_udf()")
+    }
+    assert(e.getMessage.contains("undefined function not_a_udf"))
+    var success = false
+    val t = new Thread("test") {
+      override def run(): Unit = {
+        val e = intercept[AnalysisException] {
+          range(1).selectExpr("not_a_udf()")
+        }
+        assert(e.getMessage.contains("undefined function not_a_udf"))
+        success = true
+      }
+    }
+    t.start()
+    t.join()
+    assert(success)
+  }
+
   createQueryTest("select from thrift based table",
     "SELECT * from src_thrift")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to