Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21954#discussion_r208272808 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.DataType + +/** + * Resolve a higher order functions from the catalog. This is different from regular function + * resolution because lambda functions can only be resolved after the function has been resolved; + * so we need to resolve higher order function when all children are either resolved or a lambda + * function. + */ +case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators { + case q: LogicalPlan => + q.transformExpressions { + case u @ UnresolvedFunction(fn, children, false) + if hasLambdaAndResolvedArguments(children) => + withPosition(u) { + catalog.lookupFunction(fn, children) match { + case func: HigherOrderFunction => func + case other => other.failAnalysis( + "A lambda function should only be used in a higher order function. However, " + + s"its class is ${other.getClass.getCanonicalName}, which is not a " + + s"higher order function.") + } + } + } + } + + /** + * Check if the arguments of a function are either resolved or a lambda function. + */ + private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): Boolean = { + val (lambdas, others) = expressions.partition(_.isInstanceOf[LambdaFunction]) + lambdas.nonEmpty && others.forall(_.resolved) + } +} + +/** + * Resolve the lambda variables exposed by a higher order functions. + * + * This rule works in two steps: + * [1]. Bind the anonymous variables exposed by the higher order function to the lambda function's + * arguments; this creates named and typed lambda variables. The argument names are checked + * for duplicates and the number of arguments are checked during this step. + * [2]. Resolve the used lambda variables used in the lambda function's function expression tree. + * Note that we allow the use of variables from outside the current lambda, this can either + * be a lambda function defined in an outer scope, or a attribute in produced by the plan's + * child. If names are duplicate, the name defined in the most inner scope is used. + */ +case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] { + + type LambdaVariableMap = Map[String, NamedExpression] + + private val canonicalizer = { + if (!conf.caseSensitiveAnalysis) { + s: String => s.toLowerCase + } else { + s: String => s + } + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperators { + case q: LogicalPlan => + q.mapExpressions(resolve(_, Map.empty)) + } + } + + /** + * Create a bound lambda function by binding the arguments of a lambda function to the given + * partial arguments (dataType and nullability only). If the expression happens to be an already + * bound lambda function then we assume it has been bound to the correct arguments and do + * nothing. This function will produce a lambda function with hidden arguments when it is passed + * an arbitrary expression. + */ + private def createLambda( + e: Expression, + partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e match { --- End diff -- why call it "partial"?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org