Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21954#discussion_r207102738
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
    @@ -0,0 +1,325 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.expressions
    +
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.codegen._
    +import org.apache.spark.sql.catalyst.expressions.codegen.Block._
    +import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
    +import org.apache.spark.sql.types._
    +
    +/**
    + * A named lambda variable.
    + */
    +case class NamedLambdaVariable(
    +    name: String,
    +    dataType: DataType,
    +    nullable: Boolean,
    +    value: AtomicReference[Any] = new AtomicReference(),
    +    exprId: ExprId = NamedExpression.newExprId)
    +  extends LeafExpression
    +  with NamedExpression {
    +
    +  override def qualifier: Option[String] = None
    +
    +  override def newInstance(): NamedExpression =
    +    copy(value = new AtomicReference(), exprId = NamedExpression.newExprId)
    +
    +  override def toAttribute: Attribute = {
    +    AttributeReference(name, dataType, nullable, Metadata.empty)(exprId, 
None)
    +  }
    +
    +  override def eval(input: InternalRow): Any = value.get
    +
    +  override def genCode(ctx: CodegenContext): ExprCode = {
    +    val suffix = "_lambda_variable_" + exprId.id
    +    ExprCode(
    +      if (nullable) JavaCode.isNullVariable(s"isNull_${name}$suffix") else 
FalseLiteral,
    +      JavaCode.variable(s"value_${name}$suffix", dataType))
    +  }
    +
    +  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
    +    throw new IllegalStateException("NamedLambdaVariable.doGenCode should 
not be called.")
    +  }
    +
    +  override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"
    +
    +  override def simpleString: String = s"lambda $name#${exprId.id}: 
${dataType.simpleString}"
    +}
    +
    +/**
    + * A lambda function and its arguments. A lambda function can be hidden 
when a user wants to
    + * process an completely independent expression in a 
[[HigherOrderFunction]], the lambda function
    + * and its variables are then only used for internal bookkeeping within 
the higher order function.
    + */
    +case class LambdaFunction(
    +    function: Expression,
    +    arguments: Seq[NamedExpression],
    +    hidden: Boolean = false)
    +  extends Expression {
    +
    +  override def children: Seq[Expression] = function +: arguments
    +  override def dataType: DataType = function.dataType
    +  override def nullable: Boolean = function.nullable
    +
    +  lazy val bound: Boolean = arguments.forall(_.resolved)
    +
    +  override def eval(input: InternalRow): Any = function.eval(input)
    +
    +  override def genCode(ctx: CodegenContext): ExprCode = {
    +    function.genCode(ctx)
    +  }
    +
    +  override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
    +    throw new IllegalStateException("LambdaFunction.doGenCode should not 
be called.")
    +  }
    +}
    +
    +/**
    + * A higher order function takes one or more (lambda) functions and 
applies these to some objects.
    + * The function produces a number of variables which can be consumed by 
some lambda function.
    + */
    +trait HigherOrderFunction extends Expression {
    +
    +  override def children: Seq[Expression] = inputs ++ functions
    +
    +  /**
    +   * Inputs to the higher ordered function.
    +   */
    +  def inputs: Seq[Expression]
    +
    +  /**
    +   * All inputs have been resolved. This means that the types and 
nullabilty of (most of) the
    +   * lambda function arguments is known, and that we can start binding the 
lambda functions.
    +   */
    +  lazy val inputResolved: Boolean = inputs.forall(_.resolved)
    +
    +  /**
    +   * Functions applied by the higher order function.
    +   */
    +  def functions: Seq[Expression]
    +
    +  /**
    +   * All inputs must be resolved and all functions must be resolved lambda 
functions.
    +   */
    +  override lazy val resolved: Boolean = inputResolved && functions.forall {
    +    case l: LambdaFunction => l.resolved
    +    case _ => false
    +  }
    +
    +  /**
    +   * Bind the lambda functions to the [[HigherOrderFunction]] using the 
given bind function. The
    +   * bind function takes the potential lambda and it's (partial) arguments 
and converts this into
    +   * a bound lambda function.
    +   */
    +  def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): 
HigherOrderFunction
    +
    +  @transient lazy val functionsForEval: Seq[Expression] = functions.map {
    +    case LambdaFunction(function, arguments, hidden) =>
    +      val argumentMap = arguments.map { arg => arg.exprId -> arg }.toMap
    +      function.transformUp {
    --- End diff --
    
    I'm worried that the `NamedLambdaVariable` is instantiated separately 
during serialization or something. In that case, we might not be able to refer 
the same instance and set the argument values correctly.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to