This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6e6e7a00f662 [SPARK-48369][SQL][PYTHON][CONNECT] Add function `timestamp_add` 6e6e7a00f662 is described below commit 6e6e7a00f662ae1dc7e081c9e8ec40c30ad8d3d4 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Tue May 21 19:35:24 2024 +0800 [SPARK-48369][SQL][PYTHON][CONNECT] Add function `timestamp_add` ### What changes were proposed in this pull request? Add function `timestamp_add` ### Why are the changes needed? this method is missing in dataframe API due to it is not in `FunctionRegistry` ### Does this PR introduce _any_ user-facing change? yes, new method ``` >>> import datetime >>> from pyspark.sql import functions as sf >>> df = spark.createDataFrame( ... [(datetime.datetime(2016, 3, 11, 9, 0, 7), 2), ... (datetime.datetime(2024, 4, 2, 9, 0, 7), 3)], ["ts", "quantity"]) >>> df.select(sf.timestamp_add("year", "quantity", "ts")).show() +--------------------------------+ |timestampadd(year, quantity, ts)| +--------------------------------+ | 2018-03-11 09:00:07| | 2027-04-02 09:00:07| +--------------------------------+ ``` ### How was this patch tested? added tests ### Was this patch authored or co-authored using generative AI tooling? no Closes #46680 from zhengruifeng/func_ts_add. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../scala/org/apache/spark/sql/functions.scala | 9 +++ .../apache/spark/sql/PlanGenerationTestSuite.scala | 4 ++ .../explain-results/function_timestamp_add.explain | 2 + .../queries/function_timestamp_add.json | 33 +++++++++++ .../queries/function_timestamp_add.proto.bin | Bin 0 -> 144 bytes .../sql/connect/planner/SparkConnectPlanner.scala | 5 ++ .../source/reference/pyspark.sql/functions.rst | 1 + python/pyspark/sql/connect/functions/builtin.py | 7 +++ python/pyspark/sql/functions/builtin.py | 63 +++++++++++++++++++++ .../scala/org/apache/spark/sql/functions.scala | 10 ++++ .../apache/spark/sql/DataFrameFunctionsSuite.scala | 1 + 11 files changed, 135 insertions(+) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala index e886c3998658..2f459d78362b 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala @@ -5963,6 +5963,15 @@ object functions { def timestamp_diff(unit: String, start: Column, end: Column): Column = Column.fn("timestampdiff", lit(unit), start, end) + /** + * Adds the specified number of units to the given timestamp. + * + * @group datetime_funcs + * @since 4.0.0 + */ + def timestamp_add(unit: String, quantity: Column, ts: Column): Column = + Column.fn("timestampadd", lit(unit), quantity, ts) + /** * Parses the `timestamp` expression with the `format` expression to a timestamp without time * zone. Returns null with invalid input. diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala index e6955805d38d..49b1a5312fda 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala @@ -2309,6 +2309,10 @@ class PlanGenerationTestSuite fn.timestamp_diff("year", fn.col("t"), fn.col("t")) } + temporalFunctionTest("timestamp_add") { + fn.timestamp_add("week", fn.col("x"), fn.col("t")) + } + // Array of Long // Array of Long // Array of Array of Long diff --git a/connector/connect/common/src/test/resources/query-tests/explain-results/function_timestamp_add.explain b/connector/connect/common/src/test/resources/query-tests/explain-results/function_timestamp_add.explain new file mode 100644 index 000000000000..36dde1393cdb --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/explain-results/function_timestamp_add.explain @@ -0,0 +1,2 @@ +Project [timestampadd(week, cast(x#0L as int), t#0, Some(America/Los_Angeles)) AS timestampadd(week, x, t)#0] ++- LocalRelation <empty>, [d#0, t#0, s#0, x#0L, wt#0] diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_add.json b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_add.json new file mode 100644 index 000000000000..8fd71bb36d85 --- /dev/null +++ b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_add.json @@ -0,0 +1,33 @@ +{ + "common": { + "planId": "1" + }, + "project": { + "input": { + "common": { + "planId": "0" + }, + "localRelation": { + "schema": "struct\u003cd:date,t:timestamp,s:string,x:bigint,wt:struct\u003cstart:timestamp,end:timestamp\u003e\u003e" + } + }, + "expressions": [{ + "unresolvedFunction": { + "functionName": "timestampadd", + "arguments": [{ + "literal": { + "string": "week" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "x" + } + }, { + "unresolvedAttribute": { + "unparsedIdentifier": "t" + } + }] + } + }] + } +} \ No newline at end of file diff --git a/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_add.proto.bin b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_add.proto.bin new file mode 100644 index 000000000000..5ab8ec531e07 Binary files /dev/null and b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_add.proto.bin differ diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 4e84cdebb545..cbc60d2873f9 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -1830,6 +1830,11 @@ class SparkConnectPlanner( val unit = extractString(children(0), "unit") Some(TimestampDiff(unit, children(1), children(2))) + case "timestampadd" if fun.getArgumentsCount == 3 => + val children = fun.getArgumentsList.asScala.map(transformExpression) + val unit = extractString(children(0), "unit") + Some(TimestampAdd(unit, children(1), children(2))) + case "window" if Seq(2, 3, 4).contains(fun.getArgumentsCount) => val children = fun.getArgumentsList.asScala.map(transformExpression) val timeCol = children.head diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst index 16cf7e1337bb..e0895959e893 100644 --- a/python/docs/source/reference/pyspark.sql/functions.rst +++ b/python/docs/source/reference/pyspark.sql/functions.rst @@ -281,6 +281,7 @@ Date and Timestamp Functions quarter second session_window + timestamp_add timestamp_diff timestamp_micros timestamp_millis diff --git a/python/pyspark/sql/connect/functions/builtin.py b/python/pyspark/sql/connect/functions/builtin.py index a063c1b30165..8d3442b6496f 100644 --- a/python/pyspark/sql/connect/functions/builtin.py +++ b/python/pyspark/sql/connect/functions/builtin.py @@ -3404,6 +3404,13 @@ def timestamp_diff(unit: str, start: "ColumnOrName", end: "ColumnOrName") -> Col timestamp_diff.__doc__ = pysparkfuncs.timestamp_diff.__doc__ +def timestamp_add(unit: str, quantity: "ColumnOrName", ts: "ColumnOrName") -> Column: + return _invoke_function_over_columns("timestampadd", lit(unit), quantity, ts) + + +timestamp_add.__doc__ = pysparkfuncs.timestamp_add.__doc__ + + def window( timeColumn: "ColumnOrName", windowDuration: str, diff --git a/python/pyspark/sql/functions/builtin.py b/python/pyspark/sql/functions/builtin.py index 060c835df2be..77ecb0cb920a 100644 --- a/python/pyspark/sql/functions/builtin.py +++ b/python/pyspark/sql/functions/builtin.py @@ -9461,6 +9461,69 @@ def timestamp_diff(unit: str, start: "ColumnOrName", end: "ColumnOrName") -> Col ) +@_try_remote_functions +def timestamp_add(unit: str, quantity: "ColumnOrName", ts: "ColumnOrName") -> Column: + """ + Gets the difference between the timestamps in the specified units by truncating + the fraction part. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + unit : str + This indicates the units of the difference between the given timestamps. + Supported options are (case insensitive): "YEAR", "QUARTER", "MONTH", "WEEK", + "DAY", "HOUR", "MINUTE", "SECOND", "MILLISECOND" and "MICROSECOND". + quantity : :class:`~pyspark.sql.Column` or str + The number of units of time that you want to add. + ts : :class:`~pyspark.sql.Column` or str + A timestamp to which you want to add. + + Returns + ------- + :class:`~pyspark.sql.Column` + the difference between the timestamps. + + Examples + -------- + >>> import datetime + >>> from pyspark.sql import functions as sf + >>> df = spark.createDataFrame( + ... [(datetime.datetime(2016, 3, 11, 9, 0, 7), 2), + ... (datetime.datetime(2024, 4, 2, 9, 0, 7), 3)], ["ts", "quantity"]) + >>> df.select(sf.timestamp_add("year", "quantity", "ts")).show() + +--------------------------------+ + |timestampadd(year, quantity, ts)| + +--------------------------------+ + | 2018-03-11 09:00:07| + | 2027-04-02 09:00:07| + +--------------------------------+ + >>> df.select(sf.timestamp_add("WEEK", sf.lit(5), "ts")).show() + +-------------------------+ + |timestampadd(WEEK, 5, ts)| + +-------------------------+ + | 2016-04-15 09:00:07| + | 2024-05-07 09:00:07| + +-------------------------+ + >>> df.select(sf.timestamp_add("day", sf.lit(-5), "ts")).show() + +-------------------------+ + |timestampadd(day, -5, ts)| + +-------------------------+ + | 2016-03-06 09:00:07| + | 2024-03-28 09:00:07| + +-------------------------+ + """ + from pyspark.sql.classic.column import _to_java_column + + return _invoke_function( + "timestamp_add", + unit, + _to_java_column(quantity), + _to_java_column(ts), + ) + + @_try_remote_functions def window( timeColumn: "ColumnOrName", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 628d4908f6b7..52733611e42a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -5752,6 +5752,16 @@ object functions { TimestampDiff(unit, start.expr, end.expr) } + /** + * Adds the specified number of units to the given timestamp. + * + * @group datetime_funcs + * @since 4.0.0 + */ + def timestamp_add(unit: String, quantity: Column, ts: Column): Column = withExpr { + TimestampAdd(unit, quantity.expr, ts.expr) + } + /** * Parses the `timestamp` expression with the `format` expression * to a timestamp without time zone. Returns null with invalid input. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala index f4b16190dcd2..df1bb39f1874 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala @@ -83,6 +83,7 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSparkSession { "product", // Discussed in https://github.com/apache/spark/pull/30745 "unwrap_udt", "collect_top_k", + "timestamp_add", "timestamp_diff" ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org