cloud-fan commented on code in PR #55535:
URL: https://github.com/apache/spark/pull/55535#discussion_r3162470658


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala:
##########
@@ -3897,3 +3899,197 @@ case class TimestampDiff(
     copy(startTimestamp = newLeft, endTimestamp = newRight)
   }
 }
+
+/**
+ * Aligns a timestamp to the start of a fixed-size interval bucket.
+ *
+ * Returns the start of the half-open bucket [start, start + bucketSize) 
containing ts.
+ * For TIMESTAMP_NTZ, bucketing is performed in UTC. For TIMESTAMP, buckets 
align to
+ * the session time zone.

Review Comment:
   The `@ExpressionDescription` below (line 4032) and the Python docstring 
carry the qualifier "year-month interval buckets and calendar-day components of 
day-time interval buckets align to the session time zone." A user reading just 
this Scaladoc would expect a 15-min DT bucket on LTZ to align to LA-local 
quarter-hours, but sub-day DT bucketing is UTC. Suggest matching the more 
accurate wording so the four user-facing docs (Scaladoc, 
`@ExpressionDescription`, Python, Scala API) agree.
   
   ```suggestion
    * Returns the start of the half-open bucket [start, start + bucketSize) 
containing ts.
    * For TIMESTAMP_NTZ, bucketing is performed in UTC. For TIMESTAMP, 
year-month
    * interval buckets and calendar-day components of day-time interval buckets 
align
    * to the session time zone.
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala:
##########
@@ -3897,3 +3899,197 @@ case class TimestampDiff(
     copy(startTimestamp = newLeft, endTimestamp = newRight)
   }
 }
+
+/**
+ * Aligns a timestamp to the start of a fixed-size interval bucket.
+ *
+ * Returns the start of the half-open bucket [start, start + bucketSize) 
containing ts.
+ * For TIMESTAMP_NTZ, bucketing is performed in UTC. For TIMESTAMP, buckets 
align to
+ * the session time zone.
+ */
+case class TimeBucket(
+    bucketSize: Expression,
+    ts: Expression,
+    originTs: Expression,
+    timeZoneId: Option[String] = None)
+  extends TernaryExpression with ExpectsInputTypes with 
TimeZoneAwareExpression {
+
+  override def nullIntolerant: Boolean = true
+
+  override def first: Expression = bucketSize
+  override def second: Expression = ts
+  override def third: Expression = originTs
+
+  override def withTimeZone(timeZoneId: String): TimeBucket =
+    copy(timeZoneId = Option(timeZoneId))
+
+  def this(bucketSize: Expression, ts: Expression, originTs: Expression) =
+    this(bucketSize, ts, originTs, None)
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(
+    TypeCollection(DayTimeIntervalType, YearMonthIntervalType),
+    AnyTimestampType,
+    AnyTimestampType)
+
+  override def dataType: DataType = ts.dataType
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+    val defaultCheck = super.checkInputDataTypes()
+    if (defaultCheck.isFailure) return defaultCheck
+
+    if (!bucketSize.foldable) {
+      return DataTypeMismatch(
+        errorSubClass = "NON_FOLDABLE_INPUT",
+        messageParameters = Map(
+          "inputName" -> toSQLId("bucketSize"),
+          "inputType" -> toSQLType(bucketSize.dataType),
+          "inputExpr" -> toSQLExpr(bucketSize)))
+    }
+
+    val bucketSizeValue = bucketSize.eval()
+    if (bucketSizeValue != null) {
+      val isNonPositive = bucketSize.dataType match {
+        case _: DayTimeIntervalType => bucketSizeValue.asInstanceOf[Long] <= 0
+        case _: YearMonthIntervalType => bucketSizeValue.asInstanceOf[Int] <= 0
+        case other => throw SparkException.internalError(
+          s"Unexpected bucketSize type: $other")
+      }
+      if (isNonPositive) {
+        return DataTypeMismatch(
+          errorSubClass = "VALUE_OUT_OF_RANGE",
+          messageParameters = Map(
+            "exprName" -> toSQLId("bucketSize"),
+            "valueRange" -> "(0, inf)",
+            "currentValue" -> toSQLValue(bucketSizeValue, 
bucketSize.dataType)))
+      }
+    }
+
+    if (!originTs.foldable) {
+      return DataTypeMismatch(
+        errorSubClass = "NON_FOLDABLE_INPUT",
+        messageParameters = Map(
+          "inputName" -> toSQLId("origin"),
+          "inputType" -> toSQLType(originTs.dataType),
+          "inputExpr" -> toSQLExpr(originTs)))
+    }
+
+    if (ts.dataType != originTs.dataType) {
+      return DataTypeMismatch(
+        errorSubClass = "UNEXPECTED_INPUT_TYPE",
+        messageParameters = Map(
+          "paramIndex" -> ordinalNumber(2),
+          "requiredType" -> toSQLType(ts.dataType),
+          "inputSql" -> toSQLExpr(originTs),
+          "inputType" -> toSQLType(originTs.dataType)))
+    }
+
+    TypeCheckSuccess
+  }
+
+  override def nullSafeEval(bucketSizeVal: Any, tsVal: Any, originVal: Any): 
Any = {
+    first.dataType match {
+      case _: DayTimeIntervalType =>
+        DateTimeUtils.timeBucketDTInterval(
+          bucketSizeVal.asInstanceOf[Long], tsVal.asInstanceOf[Long],
+          originVal.asInstanceOf[Long], zoneIdForType(ts.dataType))

Review Comment:
   `zoneIdForType(ts.dataType)` is recomputed for every row in `nullSafeEval` 
(3994 and 3998) and again in `doGenCode` (4007). Peer expressions cache it: 
`TimestampAddInterval` (line 1680) and `TimestampAddYMInterval` (line 1996) 
both use `@transient private lazy val zoneIdInEval = zoneIdForType(...)`. Worth 
matching the convention.



##########
sql/api/src/main/scala/org/apache/spark/sql/functions.scala:
##########
@@ -8486,6 +8486,38 @@ object functions {
   def timestamp_add(unit: String, quantity: Column, ts: Column): Column =
     Column.internalFn("timestampadd", lit(unit), quantity, ts)
 
+  /**
+   * Returns the start of the fixed-size bucket of `bucketSize` that contains 
`ts`, with buckets
+   * aligned to the default origin (1970-01-01 00:00:00). For `TIMESTAMP_NTZ`, 
bucketing is
+   * performed in UTC. For `TIMESTAMP`, buckets align to the session time zone.

Review Comment:
   Same oversimplification as the `case class TimeBucket` Scaladoc — "buckets 
align to the session time zone" misses that sub-day DT buckets are UTC-aligned 
for LTZ. Either match the Python docstring's wording or shorten to point at the 
`TimeBucket` Scaladoc / `@ExpressionDescription` for the precise rule.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to