Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r208282782
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/higherOrderFunctions.scala
 ---
    @@ -0,0 +1,166 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.analysis
    +
    +import org.apache.spark.sql.catalyst.catalog.SessionCatalog
    +import org.apache.spark.sql.catalyst.expressions._
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.internal.SQLConf
    +import org.apache.spark.sql.types.DataType
    +
    +/**
    + * Resolve a higher order functions from the catalog. This is different 
from regular function
    + * resolution because lambda functions can only be resolved after the 
function has been resolved;
    + * so we need to resolve higher order function when all children are 
either resolved or a lambda
    + * function.
    + */
    +case class ResolveHigherOrderFunctions(catalog: SessionCatalog) extends 
Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperators {
    +    case q: LogicalPlan =>
    +      q.transformExpressions {
    +        case u @ UnresolvedFunction(fn, children, false)
    +            if hasLambdaAndResolvedArguments(children) =>
    +          withPosition(u) {
    +            catalog.lookupFunction(fn, children) match {
    +              case func: HigherOrderFunction => func
    +              case other => other.failAnalysis(
    +                "A lambda function should only be used in a higher order 
function. However, " +
    +                  s"its class is ${other.getClass.getCanonicalName}, which 
is not a " +
    +                  s"higher order function.")
    +            }
    +          }
    +      }
    +  }
    +
    +  /**
    +   * Check if the arguments of a function are either resolved or a lambda 
function.
    +   */
    +  private def hasLambdaAndResolvedArguments(expressions: Seq[Expression]): 
Boolean = {
    +    val (lambdas, others) = 
expressions.partition(_.isInstanceOf[LambdaFunction])
    +    lambdas.nonEmpty && others.forall(_.resolved)
    +  }
    +}
    +
    +/**
    + * Resolve the lambda variables exposed by a higher order functions.
    + *
    + * This rule works in two steps:
    + * [1]. Bind the anonymous variables exposed by the higher order function 
to the lambda function's
    + *      arguments; this creates named and typed lambda variables. The 
argument names are checked
    + *      for duplicates and the number of arguments are checked during this 
step.
    + * [2]. Resolve the used lambda variables used in the lambda function's 
function expression tree.
    + *      Note that we allow the use of variables from outside the current 
lambda, this can either
    + *      be a lambda function defined in an outer scope, or a attribute in 
produced by the plan's
    + *      child. If names are duplicate, the name defined in the most inner 
scope is used.
    + */
    +case class ResolveLambdaVariables(conf: SQLConf) extends Rule[LogicalPlan] 
{
    +
    +  type LambdaVariableMap = Map[String, NamedExpression]
    +
    +  private val canonicalizer = {
    +    if (!conf.caseSensitiveAnalysis) {
    +      s: String => s.toLowerCase
    +    } else {
    +      s: String => s
    +    }
    +  }
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +    plan.resolveOperators {
    +      case q: LogicalPlan =>
    +        q.mapExpressions(resolve(_, Map.empty))
    +    }
    +  }
    +
    +  /**
    +   * Create a bound lambda function by binding the arguments of a lambda 
function to the given
    +   * partial arguments (dataType and nullability only). If the expression 
happens to be an already
    +   * bound lambda function then we assume it has been bound to the correct 
arguments and do
    +   * nothing. This function will produce a lambda function with hidden 
arguments when it is passed
    +   * an arbitrary expression.
    +   */
    +  private def createLambda(
    +      e: Expression,
    +      partialArguments: Seq[(DataType, Boolean)]): LambdaFunction = e 
match {
    --- End diff --
    
    how about `argInfo`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to