cloud-fan commented on a change in pull request #31791: URL: https://github.com/apache/spark/pull/31791#discussion_r600755787
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala ########## @@ -707,3 +721,101 @@ object FunctionRegistry { (name, (info, outerBuilder)) } } + +trait TableFunctionRegistry extends FunctionRegistryBase[LogicalPlan] { + + /** Create a copy of this registry with identical functions as this registry. */ + override def clone(): TableFunctionRegistry = throw new CloneNotSupportedException() +} + +class SimpleTableFunctionRegistry + extends SimpleFunctionRegistryBase[LogicalPlan] + with TableFunctionRegistry { + + override def clone(): SimpleTableFunctionRegistry = synchronized { + val registry = new SimpleTableFunctionRegistry + functionBuilders.iterator.foreach { case (name, (info, builder)) => + registry.registerFunction(name, info, builder) + } + registry + } +} + +object EmptyTableFunctionRegistry + extends EmptyFunctionRegistryBase[LogicalPlan] + with TableFunctionRegistry { + + override def clone(): TableFunctionRegistry = this +} + +object TableFunctionRegistry { + + type TableFunctionBuilder = Seq[Expression] => LogicalPlan + + private def logicalPlan[T <: LogicalPlan](name: String) + (implicit tag: ClassTag[T]): (String, (ExpressionInfo, TableFunctionBuilder)) = { + val constructors = tag.runtimeClass.getConstructors + val info = expressionInfo[T](name) + val builder = (expressions: Seq[Expression]) => { + val argTypes = expressions.map(_.dataType.typeName).mkString(", ") + val params = Seq.fill(expressions.size)(classOf[Expression]) + val f = constructors.find(_.getParameterTypes.toSeq == params).getOrElse { + val validParametersCount = constructors + .filter(_.getParameterTypes.forall(_ == classOf[Expression])) + .map(_.getParameterCount).distinct.sorted + throw QueryCompilationErrors.invalidFunctionArgumentNumberError( + validParametersCount, name, params) + } + try { + f.newInstance(expressions : _*).asInstanceOf[LogicalPlan] + } catch { + // the exception is an invocation exception. To get a meaningful message, we need the + // cause. + case e: Exception => + val details = if (e.getCause != null) e.getCause.getMessage else e.toString + throw QueryCompilationErrors.cannotApplyTableValuedFunctionError( + name, argTypes, info.getUsage, details) + } + } + (name, (info, builder)) + } + + /** + * Creates an [[ExpressionInfo]] for the function as defined by LogicalPlan T + * using the given name. + */ + private def expressionInfo[T <: LogicalPlan : ClassTag](name: String): ExpressionInfo = { Review comment: shall we move this to the base class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org