Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21986#discussion_r207908454 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -205,29 +230,85 @@ case class ArrayTransform( (elementVar, indexVar) } - override def eval(input: InternalRow): Any = { - val arr = this.input.eval(input).asInstanceOf[ArrayData] - if (arr == null) { - null - } else { - val f = functionForEval - val result = new GenericArrayData(new Array[Any](arr.numElements)) - var i = 0 - while (i < arr.numElements) { - elementVar.value.set(arr.get(i, elementVar.dataType)) - if (indexVar.isDefined) { - indexVar.get.value.set(i) - } - result.update(i, f.eval(input)) - i += 1 + override def nullSafeEval(inputRow: InternalRow, inputValue: Any): Any = { + val arr = inputValue.asInstanceOf[ArrayData] + val f = functionForEval + val result = new GenericArrayData(new Array[Any](arr.numElements)) + var i = 0 + while (i < arr.numElements) { + elementVar.value.set(arr.get(i, elementVar.dataType)) + if (indexVar.isDefined) { + indexVar.get.value.set(i) } - result + result.update(i, f.eval(inputRow)) + i += 1 } + result } override def prettyName: String = "transform" } +/** + * Filters entries in a map using the provided function. + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Filters entries in a map using the function.", +examples = """ + Examples: + > SELECT _FUNC_(map(1, 0, 2, 2, 3, -1), (k, v) -> k > v); + [1 -> 0, 3 -> -1] + """, +since = "2.4.0") +case class MapFilter( + input: Expression, + function: Expression) + extends MapBasedUnaryHigherOrderFunction with CodegenFallback { + + @transient val (keyType, valueType, valueContainsNull) = input.dataType match { + case MapType(kType, vType, vContainsNull) => (kType, vType, vContainsNull) + case _ => + val MapType(kType, vType, vContainsNull) = MapType.defaultConcreteType + (kType, vType, vContainsNull) + } + + @transient lazy val (keyVar, valueVar) = { + val args = function.asInstanceOf[LambdaFunction].arguments + (args.head.asInstanceOf[NamedLambdaVariable], args.tail.head.asInstanceOf[NamedLambdaVariable]) + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapFilter = { + function match { + case LambdaFunction(_, _, _) => --- End diff -- Is this pattern matching necessary? If so, shouldn't ```ArrayFilter``` use it as well?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org