beliefer commented on a change in pull request #29604: URL: https://github.com/apache/spark/pull/29604#discussion_r489145248
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala ########## @@ -549,6 +549,96 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { override def prettyName: String = "cume_dist" } +/** + * The NthValue function returns the value of `input` at the `offset`th row from beginning of the + * window frame. Offset starts at 1. When the value of `input` is null at the `offset`th row or + * there is no such an `offset`th row, null is returned. + */ +@ExpressionDescription( + usage = """ + _FUNC_(input[, offset]) - Returns the value of `input` at the row that is the `offset`th row + from beginning of the window frame. Offsets start at 1. If the value of `input` at the + `offset`th row is null, null is returned. If there is no such an offset row (e.g., when the + offset is 10, size of the window frame less than 10), null is returned. + """, + arguments = """ + Arguments: + * input - the target column or expression that the function operates on. + * offset - an int expression which determines the row number relative to the first row in + the window for which to return the expression. The offset can be a constant or an + expression and must be a positive integer that is greater than 0. + * ignoreNulls - an optional specification that indicates the NthValue should skip null + values in the determination of which row to use. + """, + since = "3.1.0", + group = "window_funcs") +case class NthValue(input: Expression, offsetExpr: Expression, ignoreNulls: Boolean) + extends AggregateWindowFunction with ImplicitCastInputTypes { + + def this(child: Expression, offset: Expression) = this(child, offset, false) + + override def children: Seq[Expression] = input :: Nil + + override def frame: WindowFrame = UnspecifiedFrame + + override def dataType: DataType = input.dataType + + override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, IntegerType, BooleanType) + + override def checkInputDataTypes(): TypeCheckResult = { + val check = super.checkInputDataTypes() + if (check.isFailure) { + check + } else if (!offsetExpr.foldable) { + TypeCheckFailure(s"Offset expression '$offsetExpr' must be a literal.") + } else { + offsetExpr.dataType match { + case IntegerType | ShortType | ByteType => + offsetExpr.eval().asInstanceOf[Int] match { + case i: Int if i <= 0 => TypeCheckFailure( + s"The 'offset' argument of nth_value must be greater than zero but it is $i.") + case _ => TypeCheckSuccess + } + case _ => TypeCheckFailure( + s"The 'offset' parameter must be a int literal but it is ${offsetExpr.dataType}.") + } + } + } + + private lazy val offset = offsetExpr.eval().asInstanceOf[Int].toLong + private lazy val result = AttributeReference("result", input.dataType)() + private lazy val count = AttributeReference("count", LongType)() + private lazy val valueSet = AttributeReference("valueSet", BooleanType)() + override lazy val aggBufferAttributes: Seq[AttributeReference] = + result :: count :: valueSet :: Nil + + override lazy val initialValues: Seq[Literal] = Seq( + /* result = */ Literal.create(null, input.dataType), + /* count = */ Literal(1L), + /* valueSet = */ Literal.create(false, BooleanType) + ) + + override lazy val updateExpressions: Seq[Expression] = { + if (ignoreNulls) { + Seq( + /* result = */ If(valueSet || input.isNull || count < offset, result, input), + /* count = */ If(input.isNull, count, count + 1L), + /* valueSet = */ valueSet || (input.isNotNull && count >= offset) + ) + } else { + Seq( + /* result = */ If(valueSet || count < offset, result, input), + /* count = */ count + 1L, + /* valueSet = */ valueSet || count >= offset Review comment: `count >= offset` implies the need value has been set. But `count >= offset` no need to change the `result` with new `input`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org