HI all, I use Spark 3.0.2, I have written an Aggregator function, and I wanna register it to Spark SQL, so I can call it by ThriftServer. In Spark 2.4, I can extends `UserDefinedAggregationFunction`, and use the following statement to register it in Spark SQL shell: ``` CREATE FUNCTION funnel AS 'org.example.udaf.FunnelUDAF' USING JAR '/home/xxx/funnel-udaf-1.0-SNAPSHOT-jar-with-dependencies-spark2.jar'; ``` So my web application can execute following SQL by ThriftServer to get the result: ``` select funnel(args1, args2, args3) from table; ```
But in Spark 3.0.2, something changed. I have rewritten my UDAF to satisfy Spark 3.0.2. First, I rewritten UDAF to extend `Aggregator`, instead of `UserDefinedAggregationFunction`. And I use the same statement to register it in the Spark SQL shell, except for different function name and class name. ``` CREATE FUNCTION funnel_spark_3 AS 'org.example.udaf.FunnelUDAFSpark3' USING JAR '/home/xxx/funnel-udaf-1.0-SNAPSHOT-jar-with-dependencies-spark3.jar'; ``` And when I call this function in Spark SQL shell, I got following exception: ``` Error: Error operating EXECUTE_STATEMENT: org.apache.spark.sql.AnalysisException: No handler for UDF/UDAF/UDTF 'org.example.udaf.FunnelUDAFSpark3'; line 9 pos 0 at org.apache.spark.sql.hive.HiveSessionCatalog.$anonfun$makeFunctionExpression$4(HiveSessionCatalog.scala:113) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.hive.HiveSessionCatalog.$anonfun$makeFunctionExpression$3(HiveSessionCatalog.scala:113) at scala.util.Failure.getOrElse(Try.scala:222) at org.apache.spark.sql.hive.HiveSessionCatalog.$anonfun$makeFunctionExpression$1(HiveSessionCatalog.scala:72) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:221) at org.apache.spark.sql.hive.HiveSessionCatalog.makeFunctionExpression(HiveSessionCatalog.scala:72) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.$anonfun$makeFunctionBuilder$1(SessionCatalog.scala:1277) at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:121) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1470) at org.apache.spark.sql.hive.HiveSessionCatalog.super$lookupFunction(HiveSessionCatalog.scala:135) ``` I read `SessionCatalog.scala`'s source code, and I found It only accepts `UserDefinedAggregationFunction`. So my question is, is there a way to register an Aggregator to use in Thrift Server for Spark3? Please help me. Thanks.