vranes commented on code in PR #55535:
URL: https://github.com/apache/spark/pull/55535#discussion_r3161750693
##########
python/pyspark/sql/functions/builtin.py:
##########
@@ -13125,6 +13125,74 @@ def timestamp_add(unit: str, quantity: "ColumnOrName",
ts: "ColumnOrName") -> Co
)
+@_try_remote_functions
+def time_bucket(
+ bucket_size: "ColumnOrName",
+ ts: "ColumnOrName",
+ origin: Optional["ColumnOrName"] = None,
+) -> Column:
+ """
+ Aligns a timestamp to the start of a fixed-size interval bucket.
+
+ Returns the start of the bucket that ``ts`` falls into, where buckets are
defined by
+ the given ``bucket_size`` interval aligned to ``origin``. All bucketing is
performed on
+ UTC micros, the session time zone does not affect bucket alignment. For
local wall-clock
+ alignment in a DST zone, cast the TIMESTAMP to TIMESTAMP_NTZ.
+
+ .. versionadded:: 4.2.0
+
+ Parameters
+ ----------
+ bucket_size : :class:`~pyspark.sql.Column` or column name
+ A day-time or year-month interval defining the bucket size. Must be
positive
+ and foldable.
+ ts : :class:`~pyspark.sql.Column` or column name
+ A TIMESTAMP or TIMESTAMP_NTZ value to bucket.
+ origin : :class:`~pyspark.sql.Column` or column name, optional
+ Alignment anchor. Defaults to 1970-01-01 00:00:00 (UTC for TIMESTAMP).
Must be
+ the same type as ``ts`` and must be foldable.
+
+ Returns
+ -------
+ :class:`~pyspark.sql.Column`
+ The start of the bucket containing ``ts``, as the same type as ``ts``.
+
+ Examples
+ --------
+ >>> spark.conf.set("spark.sql.session.timeZone", "UTC")
+ >>> import datetime
+ >>> from pyspark.sql import functions as sf
+ >>> df = spark.createDataFrame(
+ ... [(datetime.datetime(2024, 1, 1, 11, 27, 0),)], ['ts'])
+ >>> df.select(
+ ... sf.time_bucket(sf.expr("INTERVAL '15' MINUTE"),
'ts').alias("bucket")
+ ... ).collect()
+ [Row(bucket=datetime.datetime(2024, 1, 1, 11, 15))]
+
+ Shift the grid with an explicit origin: buckets run at :05, :20, :35, :50:
+
+ >>> df.select(
+ ... sf.time_bucket(
+ ... sf.expr("INTERVAL '15' MINUTE"),
+ ... 'ts',
+ ... sf.expr("TIMESTAMP '1970-01-01 00:05:00'")
+ ... ).alias("bucket")
+ ... ).collect()
+ [Row(bucket=datetime.datetime(2024, 1, 1, 11, 20))]
+ >>> spark.conf.unset("spark.sql.session.timeZone")
+ """
+ from pyspark.sql.classic.column import _to_java_column
+
+ if origin is None:
+ return _invoke_function("time_bucket", _to_java_column(bucket_size),
_to_java_column(ts))
+ return _invoke_function(
+ "time_bucket",
+ _to_java_column(bucket_size),
+ _to_java_column(ts),
+ _to_java_column(origin),
+ )
Review Comment:
Switched to `_invoke_function_over_columns` in both
python/pyspark/sql/functions/builtin.py and
python/pyspark/sql/connect/functions/builtin.py. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]