cloud-fan commented on code in PR #55535:
URL: https://github.com/apache/spark/pull/55535#discussion_r3162470658
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala:
##########
@@ -3897,3 +3899,197 @@ case class TimestampDiff(
copy(startTimestamp = newLeft, endTimestamp = newRight)
}
}
+
+/**
+ * Aligns a timestamp to the start of a fixed-size interval bucket.
+ *
+ * Returns the start of the half-open bucket [start, start + bucketSize)
containing ts.
+ * For TIMESTAMP_NTZ, bucketing is performed in UTC. For TIMESTAMP, buckets
align to
+ * the session time zone.
Review Comment:
The `@ExpressionDescription` below (line 4032) and the Python docstring
carry the qualifier "year-month interval buckets and calendar-day components of
day-time interval buckets align to the session time zone." A user reading just
this Scaladoc would expect a 15-min DT bucket on LTZ to align to LA-local
quarter-hours, but sub-day DT bucketing is UTC. Suggest matching the more
accurate wording so the four user-facing docs (Scaladoc,
`@ExpressionDescription`, Python, Scala API) agree.
```suggestion
* Returns the start of the half-open bucket [start, start + bucketSize)
containing ts.
* For TIMESTAMP_NTZ, bucketing is performed in UTC. For TIMESTAMP,
year-month
* interval buckets and calendar-day components of day-time interval buckets
align
* to the session time zone.
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala:
##########
@@ -3897,3 +3899,197 @@ case class TimestampDiff(
copy(startTimestamp = newLeft, endTimestamp = newRight)
}
}
+
+/**
+ * Aligns a timestamp to the start of a fixed-size interval bucket.
+ *
+ * Returns the start of the half-open bucket [start, start + bucketSize)
containing ts.
+ * For TIMESTAMP_NTZ, bucketing is performed in UTC. For TIMESTAMP, buckets
align to
+ * the session time zone.
+ */
+case class TimeBucket(
+ bucketSize: Expression,
+ ts: Expression,
+ originTs: Expression,
+ timeZoneId: Option[String] = None)
+ extends TernaryExpression with ExpectsInputTypes with
TimeZoneAwareExpression {
+
+ override def nullIntolerant: Boolean = true
+
+ override def first: Expression = bucketSize
+ override def second: Expression = ts
+ override def third: Expression = originTs
+
+ override def withTimeZone(timeZoneId: String): TimeBucket =
+ copy(timeZoneId = Option(timeZoneId))
+
+ def this(bucketSize: Expression, ts: Expression, originTs: Expression) =
+ this(bucketSize, ts, originTs, None)
+
+ override def inputTypes: Seq[AbstractDataType] = Seq(
+ TypeCollection(DayTimeIntervalType, YearMonthIntervalType),
+ AnyTimestampType,
+ AnyTimestampType)
+
+ override def dataType: DataType = ts.dataType
+
+ override def checkInputDataTypes(): TypeCheckResult = {
+ val defaultCheck = super.checkInputDataTypes()
+ if (defaultCheck.isFailure) return defaultCheck
+
+ if (!bucketSize.foldable) {
+ return DataTypeMismatch(
+ errorSubClass = "NON_FOLDABLE_INPUT",
+ messageParameters = Map(
+ "inputName" -> toSQLId("bucketSize"),
+ "inputType" -> toSQLType(bucketSize.dataType),
+ "inputExpr" -> toSQLExpr(bucketSize)))
+ }
+
+ val bucketSizeValue = bucketSize.eval()
+ if (bucketSizeValue != null) {
+ val isNonPositive = bucketSize.dataType match {
+ case _: DayTimeIntervalType => bucketSizeValue.asInstanceOf[Long] <= 0
+ case _: YearMonthIntervalType => bucketSizeValue.asInstanceOf[Int] <= 0
+ case other => throw SparkException.internalError(
+ s"Unexpected bucketSize type: $other")
+ }
+ if (isNonPositive) {
+ return DataTypeMismatch(
+ errorSubClass = "VALUE_OUT_OF_RANGE",
+ messageParameters = Map(
+ "exprName" -> toSQLId("bucketSize"),
+ "valueRange" -> "(0, inf)",
+ "currentValue" -> toSQLValue(bucketSizeValue,
bucketSize.dataType)))
+ }
+ }
+
+ if (!originTs.foldable) {
+ return DataTypeMismatch(
+ errorSubClass = "NON_FOLDABLE_INPUT",
+ messageParameters = Map(
+ "inputName" -> toSQLId("origin"),
+ "inputType" -> toSQLType(originTs.dataType),
+ "inputExpr" -> toSQLExpr(originTs)))
+ }
+
+ if (ts.dataType != originTs.dataType) {
+ return DataTypeMismatch(
+ errorSubClass = "UNEXPECTED_INPUT_TYPE",
+ messageParameters = Map(
+ "paramIndex" -> ordinalNumber(2),
+ "requiredType" -> toSQLType(ts.dataType),
+ "inputSql" -> toSQLExpr(originTs),
+ "inputType" -> toSQLType(originTs.dataType)))
+ }
+
+ TypeCheckSuccess
+ }
+
+ override def nullSafeEval(bucketSizeVal: Any, tsVal: Any, originVal: Any):
Any = {
+ first.dataType match {
+ case _: DayTimeIntervalType =>
+ DateTimeUtils.timeBucketDTInterval(
+ bucketSizeVal.asInstanceOf[Long], tsVal.asInstanceOf[Long],
+ originVal.asInstanceOf[Long], zoneIdForType(ts.dataType))
Review Comment:
`zoneIdForType(ts.dataType)` is recomputed for every row in `nullSafeEval`
(3994 and 3998) and again in `doGenCode` (4007). Peer expressions cache it:
`TimestampAddInterval` (line 1680) and `TimestampAddYMInterval` (line 1996)
both use `@transient private lazy val zoneIdInEval = zoneIdForType(...)`. Worth
matching the convention.
##########
sql/api/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -8486,6 +8486,38 @@ object functions {
def timestamp_add(unit: String, quantity: Column, ts: Column): Column =
Column.internalFn("timestampadd", lit(unit), quantity, ts)
+ /**
+ * Returns the start of the fixed-size bucket of `bucketSize` that contains
`ts`, with buckets
+ * aligned to the default origin (1970-01-01 00:00:00). For `TIMESTAMP_NTZ`,
bucketing is
+ * performed in UTC. For `TIMESTAMP`, buckets align to the session time zone.
Review Comment:
Same oversimplification as the `case class TimeBucket` Scaladoc — "buckets
align to the session time zone" misses that sub-day DT buckets are UTC-aligned
for LTZ. Either match the Python docstring's wording or shorten to point at the
`TimeBucket` Scaladoc / `@ExpressionDescription` for the precise rule.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]