Repository: spark Updated Branches: refs/heads/master daa1964b6 -> 2e7f99a00
[SPARK-8195] [SPARK-8196] [SQL] udf next_day last_day next_day, returns next certain dayofweek. last_day, returns the last day of the month which given date belongs to. Author: Daoyuan Wang <daoyuan.w...@intel.com> Closes #6986 from adrian-wang/udfnlday and squashes the following commits: ef7e3da [Daoyuan Wang] fix 02b3426 [Daoyuan Wang] address 2 comments dc69630 [Daoyuan Wang] address comments from rxin 8846086 [Daoyuan Wang] address comments from rxin d09bcce [Daoyuan Wang] multi fix 1a9de3d [Daoyuan Wang] function next_day and last_day Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e7f99a0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e7f99a0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e7f99a0 Branch: refs/heads/master Commit: 2e7f99a004f08a42e86f6f603e4ba35cb52561c4 Parents: daa1964 Author: Daoyuan Wang <daoyuan.w...@intel.com> Authored: Mon Jul 27 21:08:56 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Mon Jul 27 21:08:56 2015 -0700 ---------------------------------------------------------------------- .../catalyst/analysis/FunctionRegistry.scala | 4 +- .../expressions/datetimeFunctions.scala | 72 ++++++++++++++++++++ .../spark/sql/catalyst/util/DateTimeUtils.scala | 46 +++++++++++++ .../expressions/DateExpressionsSuite.scala | 28 ++++++++ .../scala/org/apache/spark/sql/functions.scala | 17 +++++ .../apache/spark/sql/DateFunctionsSuite.scala | 22 ++++++ 6 files changed, 188 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2e7f99a0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index aa05f44..61ee6f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -219,8 +219,10 @@ object FunctionRegistry { expression[DayOfYear]("dayofyear"), expression[DayOfMonth]("dayofmonth"), expression[Hour]("hour"), - expression[Month]("month"), + expression[LastDay]("last_day"), expression[Minute]("minute"), + expression[Month]("month"), + expression[NextDay]("next_day"), expression[Quarter]("quarter"), expression[Second]("second"), expression[WeekOfYear]("weekofyear"), http://git-wip-us.apache.org/repos/asf/spark/blob/2e7f99a0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala index 9e55f05..b00a1b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeFunctions.scala @@ -265,3 +265,75 @@ case class DateFormatClass(left: Expression, right: Expression) extends BinaryEx }) } } + +/** + * Returns the last day of the month which the date belongs to. + */ +case class LastDay(startDate: Expression) extends UnaryExpression with ImplicitCastInputTypes { + override def child: Expression = startDate + + override def inputTypes: Seq[AbstractDataType] = Seq(DateType) + + override def dataType: DataType = DateType + + override def prettyName: String = "last_day" + + override def nullSafeEval(date: Any): Any = { + val days = date.asInstanceOf[Int] + DateTimeUtils.getLastDayOfMonth(days) + } + + override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + defineCodeGen(ctx, ev, (sd) => { + s"$dtu.getLastDayOfMonth($sd)" + }) + } +} + +/** + * Returns the first date which is later than startDate and named as dayOfWeek. + * For example, NextDay(2015-07-27, Sunday) would return 2015-08-02, which is the first + * sunday later than 2015-07-27. + */ +case class NextDay(startDate: Expression, dayOfWeek: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + + override def left: Expression = startDate + override def right: Expression = dayOfWeek + + override def inputTypes: Seq[AbstractDataType] = Seq(DateType, StringType) + + override def dataType: DataType = DateType + + override def nullSafeEval(start: Any, dayOfW: Any): Any = { + val dow = DateTimeUtils.getDayOfWeekFromString(dayOfW.asInstanceOf[UTF8String]) + if (dow == -1) { + null + } else { + val sd = start.asInstanceOf[Int] + DateTimeUtils.getNextDateForDayOfWeek(sd, dow) + } + } + + override protected def genCode(ctx: CodeGenContext, ev: GeneratedExpressionCode): String = { + nullSafeCodeGen(ctx, ev, (sd, dowS) => { + val dtu = DateTimeUtils.getClass.getName.stripSuffix("$") + val dow = ctx.freshName("dow") + val genDow = if (right.foldable) { + val dowVal = DateTimeUtils.getDayOfWeekFromString( + dayOfWeek.eval(InternalRow.empty).asInstanceOf[UTF8String]) + s"int $dow = $dowVal;" + } else { + s"int $dow = $dtu.getDayOfWeekFromString($dowS);" + } + genDow + s""" + if ($dow == -1) { + ${ev.isNull} = true; + } else { + ${ev.primitive} = $dtu.getNextDateForDayOfWeek($sd, $dow); + } + """ + }) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/2e7f99a0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 07412e7..2e28fb9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -573,4 +573,50 @@ object DateTimeUtils { dayInYear - 334 } } + + /** + * Returns Day of week from String. Starting from Thursday, marked as 0. + * (Because 1970-01-01 is Thursday). + */ + def getDayOfWeekFromString(string: UTF8String): Int = { + val dowString = string.toString.toUpperCase + dowString match { + case "SU" | "SUN" | "SUNDAY" => 3 + case "MO" | "MON" | "MONDAY" => 4 + case "TU" | "TUE" | "TUESDAY" => 5 + case "WE" | "WED" | "WEDNESDAY" => 6 + case "TH" | "THU" | "THURSDAY" => 0 + case "FR" | "FRI" | "FRIDAY" => 1 + case "SA" | "SAT" | "SATURDAY" => 2 + case _ => -1 + } + } + + /** + * Returns the first date which is later than startDate and is of the given dayOfWeek. + * dayOfWeek is an integer ranges in [0, 6], and 0 is Thu, 1 is Fri, etc,. + */ + def getNextDateForDayOfWeek(startDate: Int, dayOfWeek: Int): Int = { + startDate + 1 + ((dayOfWeek - 1 - startDate) % 7 + 7) % 7 + } + + /** + * number of days in a non-leap year. + */ + private[this] val daysInNormalYear = Array(31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31) + + /** + * Returns last day of the month for the given date. The date is expressed in days + * since 1.1.1970. + */ + def getLastDayOfMonth(date: Int): Int = { + val dayOfMonth = getDayOfMonth(date) + val month = getMonth(date) + if (month == 2 && isLeapYear(getYear(date))) { + date + daysInNormalYear(month - 1) + 1 - dayOfMonth + } else { + date + daysInNormalYear(month - 1) - dayOfMonth + } + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/2e7f99a0/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index bdba6ce..4d2d337 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -22,6 +22,7 @@ import java.text.SimpleDateFormat import java.util.Calendar import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.types.{StringType, TimestampType, DateType} class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { @@ -246,4 +247,31 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } } + test("last_day") { + checkEvaluation(LastDay(Literal(Date.valueOf("2015-02-28"))), Date.valueOf("2015-02-28")) + checkEvaluation(LastDay(Literal(Date.valueOf("2015-03-27"))), Date.valueOf("2015-03-31")) + checkEvaluation(LastDay(Literal(Date.valueOf("2015-04-26"))), Date.valueOf("2015-04-30")) + checkEvaluation(LastDay(Literal(Date.valueOf("2015-05-25"))), Date.valueOf("2015-05-31")) + checkEvaluation(LastDay(Literal(Date.valueOf("2015-06-24"))), Date.valueOf("2015-06-30")) + checkEvaluation(LastDay(Literal(Date.valueOf("2015-07-23"))), Date.valueOf("2015-07-31")) + checkEvaluation(LastDay(Literal(Date.valueOf("2015-08-01"))), Date.valueOf("2015-08-31")) + checkEvaluation(LastDay(Literal(Date.valueOf("2015-09-02"))), Date.valueOf("2015-09-30")) + checkEvaluation(LastDay(Literal(Date.valueOf("2015-10-03"))), Date.valueOf("2015-10-31")) + checkEvaluation(LastDay(Literal(Date.valueOf("2015-11-04"))), Date.valueOf("2015-11-30")) + checkEvaluation(LastDay(Literal(Date.valueOf("2015-12-05"))), Date.valueOf("2015-12-31")) + checkEvaluation(LastDay(Literal(Date.valueOf("2016-01-06"))), Date.valueOf("2016-01-31")) + checkEvaluation(LastDay(Literal(Date.valueOf("2016-02-07"))), Date.valueOf("2016-02-29")) + } + + test("next_day") { + checkEvaluation( + NextDay(Literal(Date.valueOf("2015-07-23")), Literal("Thu")), + DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-30"))) + checkEvaluation( + NextDay(Literal(Date.valueOf("2015-07-23")), Literal("THURSDAY")), + DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-30"))) + checkEvaluation( + NextDay(Literal(Date.valueOf("2015-07-23")), Literal("th")), + DateTimeUtils.fromJavaDate(Date.valueOf("2015-07-30"))) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/2e7f99a0/sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index cab3db6..d18558b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2033,6 +2033,13 @@ object functions { def hour(columnName: String): Column = hour(Column(columnName)) /** + * Returns the last day of the month which the given date belongs to. + * @group datetime_funcs + * @since 1.5.0 + */ + def last_day(e: Column): Column = LastDay(e.expr) + + /** * Extracts the minutes as an integer from a given date/timestamp/string. * @group datetime_funcs * @since 1.5.0 @@ -2047,6 +2054,16 @@ object functions { def minute(columnName: String): Column = minute(Column(columnName)) /** + * Returns the first date which is later than given date sd and named as dow. + * For example, `next_day('2015-07-27', "Sunday")` would return 2015-08-02, which is the + * first Sunday later than 2015-07-27. The parameter dayOfWeek could be 2-letter, 3-letter, + * or full name of the day of the week (e.g. Mo, tue, FRIDAY). + * @group datetime_funcs + * @since 1.5.0 + */ + def next_day(sd: Column, dayOfWeek: String): Column = NextDay(sd.expr, lit(dayOfWeek).expr) + + /** * Extracts the seconds as an integer from a given date/timestamp/string. * @group datetime_funcs * @since 1.5.0 http://git-wip-us.apache.org/repos/asf/spark/blob/2e7f99a0/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala index 9e80ae8..ff1c756 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala @@ -184,4 +184,26 @@ class DateFunctionsSuite extends QueryTest { Row(15, 15, 15)) } + test("function last_day") { + val df1 = Seq((1, "2015-07-23"), (2, "2015-07-24")).toDF("i", "d") + val df2 = Seq((1, "2015-07-23 00:11:22"), (2, "2015-07-24 11:22:33")).toDF("i", "t") + checkAnswer( + df1.select(last_day(col("d"))), + Seq(Row(Date.valueOf("2015-07-31")), Row(Date.valueOf("2015-07-31")))) + checkAnswer( + df2.select(last_day(col("t"))), + Seq(Row(Date.valueOf("2015-07-31")), Row(Date.valueOf("2015-07-31")))) + } + + test("function next_day") { + val df1 = Seq(("mon", "2015-07-23"), ("tuesday", "2015-07-20")).toDF("dow", "d") + val df2 = Seq(("th", "2015-07-23 00:11:22"), ("xx", "2015-07-24 11:22:33")).toDF("dow", "t") + checkAnswer( + df1.select(next_day(col("d"), "MONDAY")), + Seq(Row(Date.valueOf("2015-07-27")), Row(Date.valueOf("2015-07-27")))) + checkAnswer( + df2.select(next_day(col("t"), "th")), + Seq(Row(Date.valueOf("2015-07-30")), Row(null))) + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org