Repository: spark Updated Branches: refs/heads/master 3ed1ae100 -> 43d71d965
[SPARK-21499][SQL] Support creating persistent function for Spark UDAF(UserDefinedAggregateFunction) ## What changes were proposed in this pull request? This PR is to enable users to create persistent Scala UDAF (that extends UserDefinedAggregateFunction). ```SQL CREATE FUNCTION myDoubleAvg AS 'test.org.apache.spark.sql.MyDoubleAvg' ``` Before this PR, Spark UDAF only can be registered through the API `spark.udf.register(...)` ## How was this patch tested? Added test cases Author: gatorsmile <gatorsm...@gmail.com> Closes #18700 from gatorsmile/javaUDFinScala. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/43d71d96 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/43d71d96 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/43d71d96 Branch: refs/heads/master Commit: 43d71d96596baa8d2111a4b20bf21c1c668ad793 Parents: 3ed1ae1 Author: gatorsmile <gatorsm...@gmail.com> Authored: Tue Aug 22 13:01:35 2017 -0700 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Tue Aug 22 13:01:35 2017 -0700 ---------------------------------------------------------------------- .../sql/catalyst/catalog/SessionCatalog.scala | 41 +++++++- .../test/resources/sql-tests/inputs/udaf.sql | 13 +++ .../resources/sql-tests/results/udaf.sql.out | 54 ++++++++++ .../spark/sql/hive/HiveSessionCatalog.scala | 62 ++++++------ .../sql/hive/execution/HiveUDAFSuite.scala | 13 +++ .../spark/sql/hive/execution/HiveUDFSuite.scala | 101 +++++++++++-------- .../sql/hive/execution/SQLQuerySuite.scala | 42 +------- 7 files changed, 204 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 6030d90..0908d68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.catalog +import java.lang.reflect.InvocationTargetException import java.net.URI import java.util.Locale import java.util.concurrent.Callable @@ -24,6 +25,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable import scala.util.{Failure, Success, Try} +import scala.util.control.NonFatal import com.google.common.cache.{Cache, CacheBuilder} import org.apache.hadoop.conf.Configuration @@ -39,7 +41,9 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParserInterface} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias, View} import org.apache.spark.sql.catalyst.util.StringUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils object SessionCatalog { val DEFAULT_DATABASE = "default" @@ -1075,13 +1079,33 @@ class SessionCatalog( // ---------------------------------------------------------------- /** - * Construct a [[FunctionBuilder]] based on the provided class that represents a function. + * Constructs a [[FunctionBuilder]] based on the provided class that represents a function. + */ + private def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = { + val clazz = Utils.classForName(functionClassName) + (input: Seq[Expression]) => makeFunctionExpression(name, clazz, input) + } + + /** + * Constructs a [[Expression]] based on the provided class that represents a function. * * This performs reflection to decide what type of [[Expression]] to return in the builder. */ - protected def makeFunctionBuilder(name: String, functionClassName: String): FunctionBuilder = { - // TODO: at least support UDAFs here - throw new UnsupportedOperationException("Use sqlContext.udf.register(...) instead.") + protected def makeFunctionExpression( + name: String, + clazz: Class[_], + input: Seq[Expression]): Expression = { + val clsForUDAF = + Utils.classForName("org.apache.spark.sql.expressions.UserDefinedAggregateFunction") + if (clsForUDAF.isAssignableFrom(clazz)) { + val cls = Utils.classForName("org.apache.spark.sql.execution.aggregate.ScalaUDAF") + cls.getConstructor(classOf[Seq[Expression]], clsForUDAF, classOf[Int], classOf[Int]) + .newInstance(input, clazz.newInstance().asInstanceOf[Object], Int.box(1), Int.box(1)) + .asInstanceOf[Expression] + } else { + throw new AnalysisException(s"No handler for UDAF '${clazz.getCanonicalName}'. " + + s"Use sparkSession.udf.register(...) instead.") + } } /** @@ -1105,7 +1129,14 @@ class SessionCatalog( } val info = new ExpressionInfo(funcDefinition.className, func.database.orNull, func.funcName) val builder = - functionBuilder.getOrElse(makeFunctionBuilder(func.unquotedString, funcDefinition.className)) + functionBuilder.getOrElse { + val className = funcDefinition.className + if (!Utils.classIsLoadable(className)) { + throw new AnalysisException(s"Can not load class '$className' when registering " + + s"the function '$func', please make sure it is on the classpath") + } + makeFunctionBuilder(func.unquotedString, className) + } functionRegistry.registerFunction(func, info, builder) } http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/core/src/test/resources/sql-tests/inputs/udaf.sql ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/inputs/udaf.sql b/sql/core/src/test/resources/sql-tests/inputs/udaf.sql new file mode 100644 index 0000000..2183ba2 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/udaf.sql @@ -0,0 +1,13 @@ +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +(1), (2), (3), (4) +as t1(int_col1); + +CREATE FUNCTION myDoubleAvg AS 'test.org.apache.spark.sql.MyDoubleAvg'; + +SELECT default.myDoubleAvg(int_col1) as my_avg from t1; + +SELECT default.myDoubleAvg(int_col1, 3) as my_avg from t1; + +CREATE FUNCTION udaf1 AS 'test.non.existent.udaf'; + +SELECT default.udaf1(int_col1) as udaf1 from t1; http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/core/src/test/resources/sql-tests/results/udaf.sql.out ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/udaf.sql.out b/sql/core/src/test/resources/sql-tests/results/udaf.sql.out new file mode 100644 index 0000000..4815a57 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/udaf.sql.out @@ -0,0 +1,54 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 6 + + +-- !query 0 +CREATE OR REPLACE TEMPORARY VIEW t1 AS SELECT * FROM VALUES +(1), (2), (3), (4) +as t1(int_col1) +-- !query 0 schema +struct<> +-- !query 0 output + + + +-- !query 1 +CREATE FUNCTION myDoubleAvg AS 'test.org.apache.spark.sql.MyDoubleAvg' +-- !query 1 schema +struct<> +-- !query 1 output + + + +-- !query 2 +SELECT default.myDoubleAvg(int_col1) as my_avg from t1 +-- !query 2 schema +struct<my_avg:double> +-- !query 2 output +102.5 + + +-- !query 3 +SELECT default.myDoubleAvg(int_col1, 3) as my_avg from t1 +-- !query 3 schema +struct<> +-- !query 3 output +java.lang.AssertionError +assertion failed: Incorrect number of children + + +-- !query 4 +CREATE FUNCTION udaf1 AS 'test.non.existent.udaf' +-- !query 4 schema +struct<> +-- !query 4 output + + + +-- !query 5 +SELECT default.udaf1(int_col1) as udaf1 from t1 +-- !query 5 schema +struct<> +-- !query 5 output +org.apache.spark.sql.AnalysisException +Can not load class 'test.non.existent.udaf' when registering the function 'default.udaf1', please make sure it is on the classpath; line 1 pos 7 http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 0d0269f..b352bf6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -37,7 +37,6 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DecimalType, DoubleType} -import org.apache.spark.util.Utils private[sql] class HiveSessionCatalog( @@ -58,55 +57,52 @@ private[sql] class HiveSessionCatalog( parser, functionResourceLoader) { - override def makeFunctionBuilder(funcName: String, className: String): FunctionBuilder = { - makeFunctionBuilder(funcName, Utils.classForName(className)) - } - /** - * Construct a [[FunctionBuilder]] based on the provided class that represents a function. + * Constructs a [[Expression]] based on the provided class that represents a function. + * + * This performs reflection to decide what type of [[Expression]] to return in the builder. */ - private def makeFunctionBuilder(name: String, clazz: Class[_]): FunctionBuilder = { - // When we instantiate hive UDF wrapper class, we may throw exception if the input - // expressions don't satisfy the hive UDF, such as type mismatch, input number - // mismatch, etc. Here we catch the exception and throw AnalysisException instead. - (children: Seq[Expression]) => { + override def makeFunctionExpression( + name: String, + clazz: Class[_], + input: Seq[Expression]): Expression = { + + Try(super.makeFunctionExpression(name, clazz, input)).getOrElse { + var udfExpr: Option[Expression] = None try { + // When we instantiate hive UDF wrapper class, we may throw exception if the input + // expressions don't satisfy the hive UDF, such as type mismatch, input number + // mismatch, etc. Here we catch the exception and throw AnalysisException instead. if (classOf[UDF].isAssignableFrom(clazz)) { - val udf = HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), children) - udf.dataType // Force it to check input data types. - udf + udfExpr = Some(HiveSimpleUDF(name, new HiveFunctionWrapper(clazz.getName), input)) + udfExpr.get.dataType // Force it to check input data types. } else if (classOf[GenericUDF].isAssignableFrom(clazz)) { - val udf = HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), children) - udf.dataType // Force it to check input data types. - udf + udfExpr = Some(HiveGenericUDF(name, new HiveFunctionWrapper(clazz.getName), input)) + udfExpr.get.dataType // Force it to check input data types. } else if (classOf[AbstractGenericUDAFResolver].isAssignableFrom(clazz)) { - val udaf = HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), children) - udaf.dataType // Force it to check input data types. - udaf + udfExpr = Some(HiveUDAFFunction(name, new HiveFunctionWrapper(clazz.getName), input)) + udfExpr.get.dataType // Force it to check input data types. } else if (classOf[UDAF].isAssignableFrom(clazz)) { - val udaf = HiveUDAFFunction( + udfExpr = Some(HiveUDAFFunction( name, new HiveFunctionWrapper(clazz.getName), - children, - isUDAFBridgeRequired = true) - udaf.dataType // Force it to check input data types. - udaf + input, + isUDAFBridgeRequired = true)) + udfExpr.get.dataType // Force it to check input data types. } else if (classOf[GenericUDTF].isAssignableFrom(clazz)) { - val udtf = HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), children) - udtf.elementSchema // Force it to check input data types. - udtf - } else { - throw new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}'") + udfExpr = Some(HiveGenericUDTF(name, new HiveFunctionWrapper(clazz.getName), input)) + udfExpr.get.asInstanceOf[HiveGenericUDTF].elementSchema // Force it to check data types. } } catch { - case ae: AnalysisException => - throw ae case NonFatal(e) => val analysisException = - new AnalysisException(s"No handler for Hive UDF '${clazz.getCanonicalName}': $e") + new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}': $e") analysisException.setStackTrace(e.getStackTrace) throw analysisException } + udfExpr.getOrElse { + throw new AnalysisException(s"No handler for UDF/UDAF/UDTF '${clazz.getCanonicalName}'") + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala index 479ca1e..8986fb5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDAFSuite.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.util.JavaDataModel import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo +import test.org.apache.spark.sql.MyDoubleAvg import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec @@ -86,6 +87,18 @@ class HiveUDAFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { )) } + test("call JAVA UDAF") { + withTempView("temp") { + withUserDefinedFunction("myDoubleAvg" -> false) { + spark.range(1, 10).toDF("value").createOrReplaceTempView("temp") + sql(s"CREATE FUNCTION myDoubleAvg AS '${classOf[MyDoubleAvg].getName}'") + checkAnswer( + spark.sql("SELECT default.myDoubleAvg(value) as my_avg from temp"), + Row(105.0)) + } + } + } + test("non-deterministic children expressions of UDAF") { withTempView("view1") { spark.range(1).selectExpr("id as x", "id as y").createTempView("view1") http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index cae338c..383d41f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -404,59 +404,34 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("Hive UDFs with insufficient number of input arguments should trigger an analysis error") { - Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF") + withTempView("testUDF") { + Seq((1, 2)).toDF("a", "b").createOrReplaceTempView("testUDF") + + def testErrorMsgForFunc(funcName: String, className: String): Unit = { + withUserDefinedFunction(funcName -> true) { + sql(s"CREATE TEMPORARY FUNCTION $funcName AS '$className'") + val message = intercept[AnalysisException] { + sql(s"SELECT $funcName() FROM testUDF") + }.getMessage + assert(message.contains(s"No handler for UDF/UDAF/UDTF '$className'")) + } + } - { // HiveSimpleUDF - sql(s"CREATE TEMPORARY FUNCTION testUDFTwoListList AS '${classOf[UDFTwoListList].getName}'") - val message = intercept[AnalysisException] { - sql("SELECT testUDFTwoListList() FROM testUDF") - }.getMessage - assert(message.contains("No handler for Hive UDF")) - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFTwoListList") - } + testErrorMsgForFunc("testUDFTwoListList", classOf[UDFTwoListList].getName) - { // HiveGenericUDF - sql(s"CREATE TEMPORARY FUNCTION testUDFAnd AS '${classOf[GenericUDFOPAnd].getName}'") - val message = intercept[AnalysisException] { - sql("SELECT testUDFAnd() FROM testUDF") - }.getMessage - assert(message.contains("No handler for Hive UDF")) - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFAnd") - } + testErrorMsgForFunc("testUDFAnd", classOf[GenericUDFOPAnd].getName) - { // Hive UDAF - sql(s"CREATE TEMPORARY FUNCTION testUDAFPercentile AS '${classOf[UDAFPercentile].getName}'") - val message = intercept[AnalysisException] { - sql("SELECT testUDAFPercentile(a) FROM testUDF GROUP BY b") - }.getMessage - assert(message.contains("No handler for Hive UDF")) - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFPercentile") - } + testErrorMsgForFunc("testUDAFPercentile", classOf[UDAFPercentile].getName) - { // AbstractGenericUDAFResolver - sql(s"CREATE TEMPORARY FUNCTION testUDAFAverage AS '${classOf[GenericUDAFAverage].getName}'") - val message = intercept[AnalysisException] { - sql("SELECT testUDAFAverage() FROM testUDF GROUP BY b") - }.getMessage - assert(message.contains("No handler for Hive UDF")) - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDAFAverage") - } + testErrorMsgForFunc("testUDAFAverage", classOf[GenericUDAFAverage].getName) - { - // Hive UDTF - sql(s"CREATE TEMPORARY FUNCTION testUDTFExplode AS '${classOf[GenericUDTFExplode].getName}'") - val message = intercept[AnalysisException] { - sql("SELECT testUDTFExplode() FROM testUDF") - }.getMessage - assert(message.contains("No handler for Hive UDF")) - sql("DROP TEMPORARY FUNCTION IF EXISTS testUDTFExplode") + // AbstractGenericUDAFResolver + testErrorMsgForFunc("testUDTFExplode", classOf[GenericUDTFExplode].getName) } - - spark.catalog.dropTempView("testUDF") } test("Hive UDF in group by") { @@ -621,6 +596,46 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } } } + + test("UDTF") { + withUserDefinedFunction("udtf_count2" -> true) { + sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath}") + // The function source code can be found at: + // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF + sql( + """ + |CREATE TEMPORARY FUNCTION udtf_count2 + |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' + """.stripMargin) + + checkAnswer( + sql("SELECT key, cc FROM src LATERAL VIEW udtf_count2(value) dd AS cc"), + Row(97, 500) :: Row(97, 500) :: Nil) + + checkAnswer( + sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"), + Row(3) :: Row(3) :: Nil) + } + } + + test("permanent UDTF") { + withUserDefinedFunction("udtf_count_temp" -> false) { + sql( + s""" + |CREATE FUNCTION udtf_count_temp + |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' + |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}' + """.stripMargin) + + checkAnswer( + sql("SELECT key, cc FROM src LATERAL VIEW udtf_count_temp(value) dd AS cc"), + Row(97, 500) :: Row(97, 500) :: Nil) + + checkAnswer( + sql("SELECT udtf_count_temp(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"), + Row(3) :: Row(3) :: Nil) + } + } } class TestPair(x: Int, y: Int) extends Writable with Serializable { http://git-wip-us.apache.org/repos/asf/spark/blob/43d71d96/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index d0e0d20..02cfa02 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.TestUtils import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException} +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} @@ -98,46 +98,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil) } - test("UDTF") { - withUserDefinedFunction("udtf_count2" -> true) { - sql(s"ADD JAR ${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}") - // The function source code can be found at: - // https://cwiki.apache.org/confluence/display/Hive/DeveloperGuide+UDTF - sql( - """ - |CREATE TEMPORARY FUNCTION udtf_count2 - |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' - """.stripMargin) - - checkAnswer( - sql("SELECT key, cc FROM src LATERAL VIEW udtf_count2(value) dd AS cc"), - Row(97, 500) :: Row(97, 500) :: Nil) - - checkAnswer( - sql("SELECT udtf_count2(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"), - Row(3) :: Row(3) :: Nil) - } - } - - test("permanent UDTF") { - withUserDefinedFunction("udtf_count_temp" -> false) { - sql( - s""" - |CREATE FUNCTION udtf_count_temp - |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2' - |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}' - """.stripMargin) - - checkAnswer( - sql("SELECT key, cc FROM src LATERAL VIEW udtf_count_temp(value) dd AS cc"), - Row(97, 500) :: Row(97, 500) :: Nil) - - checkAnswer( - sql("SELECT udtf_count_temp(a) FROM (SELECT 1 AS a FROM src LIMIT 3) t"), - Row(3) :: Row(3) :: Nil) - } - } - test("SPARK-6835: udtf in lateral view") { val df = Seq((1, 1)).toDF("c1", "c2") df.createOrReplaceTempView("table1") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org