This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e6e7a00f662 [SPARK-48369][SQL][PYTHON][CONNECT] Add function 
`timestamp_add`
6e6e7a00f662 is described below

commit 6e6e7a00f662ae1dc7e081c9e8ec40c30ad8d3d4
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Tue May 21 19:35:24 2024 +0800

    [SPARK-48369][SQL][PYTHON][CONNECT] Add function `timestamp_add`
    
    ### What changes were proposed in this pull request?
    Add function `timestamp_add`
    
    ### Why are the changes needed?
    this method is missing in dataframe API due to it is not in 
`FunctionRegistry`
    
    ### Does this PR introduce _any_ user-facing change?
    yes, new method
    
    ```
        >>> import datetime
        >>> from pyspark.sql import functions as sf
        >>> df = spark.createDataFrame(
        ...     [(datetime.datetime(2016, 3, 11, 9, 0, 7), 2),
        ...      (datetime.datetime(2024, 4, 2, 9, 0, 7), 3)], ["ts", 
"quantity"])
        >>> df.select(sf.timestamp_add("year", "quantity", "ts")).show()
        +--------------------------------+
        |timestampadd(year, quantity, ts)|
        +--------------------------------+
        |             2018-03-11 09:00:07|
        |             2027-04-02 09:00:07|
        +--------------------------------+
    ```
    
    ### How was this patch tested?
    added tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #46680 from zhengruifeng/func_ts_add.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 .../scala/org/apache/spark/sql/functions.scala     |   9 +++
 .../apache/spark/sql/PlanGenerationTestSuite.scala |   4 ++
 .../explain-results/function_timestamp_add.explain |   2 +
 .../queries/function_timestamp_add.json            |  33 +++++++++++
 .../queries/function_timestamp_add.proto.bin       | Bin 0 -> 144 bytes
 .../sql/connect/planner/SparkConnectPlanner.scala  |   5 ++
 .../source/reference/pyspark.sql/functions.rst     |   1 +
 python/pyspark/sql/connect/functions/builtin.py    |   7 +++
 python/pyspark/sql/functions/builtin.py            |  63 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/functions.scala     |  10 ++++
 .../apache/spark/sql/DataFrameFunctionsSuite.scala |   1 +
 11 files changed, 135 insertions(+)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
index e886c3998658..2f459d78362b 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/functions.scala
@@ -5963,6 +5963,15 @@ object functions {
   def timestamp_diff(unit: String, start: Column, end: Column): Column =
     Column.fn("timestampdiff", lit(unit), start, end)
 
+  /**
+   * Adds the specified number of units to the given timestamp.
+   *
+   * @group datetime_funcs
+   * @since 4.0.0
+   */
+  def timestamp_add(unit: String, quantity: Column, ts: Column): Column =
+    Column.fn("timestampadd", lit(unit), quantity, ts)
+
   /**
    * Parses the `timestamp` expression with the `format` expression to a 
timestamp without time
    * zone. Returns null with invalid input.
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index e6955805d38d..49b1a5312fda 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -2309,6 +2309,10 @@ class PlanGenerationTestSuite
     fn.timestamp_diff("year", fn.col("t"), fn.col("t"))
   }
 
+  temporalFunctionTest("timestamp_add") {
+    fn.timestamp_add("week", fn.col("x"), fn.col("t"))
+  }
+
   // Array of Long
   // Array of Long
   // Array of Array of Long
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/function_timestamp_add.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_timestamp_add.explain
new file mode 100644
index 000000000000..36dde1393cdb
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/function_timestamp_add.explain
@@ -0,0 +1,2 @@
+Project [timestampadd(week, cast(x#0L as int), t#0, Some(America/Los_Angeles)) 
AS timestampadd(week, x, t)#0]
++- LocalRelation <empty>, [d#0, t#0, s#0, x#0L, wt#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_add.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_add.json
new file mode 100644
index 000000000000..8fd71bb36d85
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_add.json
@@ -0,0 +1,33 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": 
"struct\u003cd:date,t:timestamp,s:string,x:bigint,wt:struct\u003cstart:timestamp,end:timestamp\u003e\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "timestampadd",
+        "arguments": [{
+          "literal": {
+            "string": "week"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "x"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "t"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_add.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_add.proto.bin
new file mode 100644
index 000000000000..5ab8ec531e07
Binary files /dev/null and 
b/connector/connect/common/src/test/resources/query-tests/queries/function_timestamp_add.proto.bin
 differ
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 4e84cdebb545..cbc60d2873f9 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -1830,6 +1830,11 @@ class SparkConnectPlanner(
         val unit = extractString(children(0), "unit")
         Some(TimestampDiff(unit, children(1), children(2)))
 
+      case "timestampadd" if fun.getArgumentsCount == 3 =>
+        val children = fun.getArgumentsList.asScala.map(transformExpression)
+        val unit = extractString(children(0), "unit")
+        Some(TimestampAdd(unit, children(1), children(2)))
+
       case "window" if Seq(2, 3, 4).contains(fun.getArgumentsCount) =>
         val children = fun.getArgumentsList.asScala.map(transformExpression)
         val timeCol = children.head
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst 
b/python/docs/source/reference/pyspark.sql/functions.rst
index 16cf7e1337bb..e0895959e893 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -281,6 +281,7 @@ Date and Timestamp Functions
     quarter
     second
     session_window
+    timestamp_add
     timestamp_diff
     timestamp_micros
     timestamp_millis
diff --git a/python/pyspark/sql/connect/functions/builtin.py 
b/python/pyspark/sql/connect/functions/builtin.py
index a063c1b30165..8d3442b6496f 100644
--- a/python/pyspark/sql/connect/functions/builtin.py
+++ b/python/pyspark/sql/connect/functions/builtin.py
@@ -3404,6 +3404,13 @@ def timestamp_diff(unit: str, start: "ColumnOrName", 
end: "ColumnOrName") -> Col
 timestamp_diff.__doc__ = pysparkfuncs.timestamp_diff.__doc__
 
 
+def timestamp_add(unit: str, quantity: "ColumnOrName", ts: "ColumnOrName") -> 
Column:
+    return _invoke_function_over_columns("timestampadd", lit(unit), quantity, 
ts)
+
+
+timestamp_add.__doc__ = pysparkfuncs.timestamp_add.__doc__
+
+
 def window(
     timeColumn: "ColumnOrName",
     windowDuration: str,
diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index 060c835df2be..77ecb0cb920a 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -9461,6 +9461,69 @@ def timestamp_diff(unit: str, start: "ColumnOrName", 
end: "ColumnOrName") -> Col
     )
 
 
+@_try_remote_functions
+def timestamp_add(unit: str, quantity: "ColumnOrName", ts: "ColumnOrName") -> 
Column:
+    """
+    Gets the difference between the timestamps in the specified units by 
truncating
+    the fraction part.
+
+    .. versionadded:: 4.0.0
+
+    Parameters
+    ----------
+    unit : str
+        This indicates the units of the difference between the given 
timestamps.
+        Supported options are (case insensitive): "YEAR", "QUARTER", "MONTH", 
"WEEK",
+        "DAY", "HOUR", "MINUTE", "SECOND", "MILLISECOND" and "MICROSECOND".
+    quantity : :class:`~pyspark.sql.Column` or str
+        The number of units of time that you want to add.
+    ts : :class:`~pyspark.sql.Column` or str
+        A timestamp to which you want to add.
+
+    Returns
+    -------
+    :class:`~pyspark.sql.Column`
+        the difference between the timestamps.
+
+    Examples
+    --------
+    >>> import datetime
+    >>> from pyspark.sql import functions as sf
+    >>> df = spark.createDataFrame(
+    ...     [(datetime.datetime(2016, 3, 11, 9, 0, 7), 2),
+    ...      (datetime.datetime(2024, 4, 2, 9, 0, 7), 3)], ["ts", "quantity"])
+    >>> df.select(sf.timestamp_add("year", "quantity", "ts")).show()
+    +--------------------------------+
+    |timestampadd(year, quantity, ts)|
+    +--------------------------------+
+    |             2018-03-11 09:00:07|
+    |             2027-04-02 09:00:07|
+    +--------------------------------+
+    >>> df.select(sf.timestamp_add("WEEK", sf.lit(5), "ts")).show()
+    +-------------------------+
+    |timestampadd(WEEK, 5, ts)|
+    +-------------------------+
+    |      2016-04-15 09:00:07|
+    |      2024-05-07 09:00:07|
+    +-------------------------+
+    >>> df.select(sf.timestamp_add("day", sf.lit(-5), "ts")).show()
+    +-------------------------+
+    |timestampadd(day, -5, ts)|
+    +-------------------------+
+    |      2016-03-06 09:00:07|
+    |      2024-03-28 09:00:07|
+    +-------------------------+
+    """
+    from pyspark.sql.classic.column import _to_java_column
+
+    return _invoke_function(
+        "timestamp_add",
+        unit,
+        _to_java_column(quantity),
+        _to_java_column(ts),
+    )
+
+
 @_try_remote_functions
 def window(
     timeColumn: "ColumnOrName",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 628d4908f6b7..52733611e42a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -5752,6 +5752,16 @@ object functions {
     TimestampDiff(unit, start.expr, end.expr)
   }
 
+  /**
+   * Adds the specified number of units to the given timestamp.
+   *
+   * @group datetime_funcs
+   * @since 4.0.0
+   */
+  def timestamp_add(unit: String, quantity: Column, ts: Column): Column = 
withExpr {
+    TimestampAdd(unit, quantity.expr, ts.expr)
+  }
+
   /**
    * Parses the `timestamp` expression with the `format` expression
    * to a timestamp without time zone. Returns null with invalid input.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
index f4b16190dcd2..df1bb39f1874 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala
@@ -83,6 +83,7 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSparkSession {
       "product", // Discussed in https://github.com/apache/spark/pull/30745
       "unwrap_udt",
       "collect_top_k",
+      "timestamp_add",
       "timestamp_diff"
     )
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to