[ 
https://issues.apache.org/jira/browse/SPARK-57133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolina Vraneš updated SPARK-57133:
------------------------------------
    Description: 
h2. Summary

 

  Add a new SQL relation operator {{BIN BY (...)}} that aligns range-typed rows 
to fixed-width bin boundaries by splitting

   any row whose {{[range_start, range_end)}} crosses a boundary and 
proportionally redistributing user-selected numeric

  or DT-interval values across the resulting sub-ranges. Target use case: 
telemetry and observability data where each row

  carries its own measurement window (OpenTelemetry, Prometheus exports).

  

  The operator occupies the same grammar position as {{PIVOT}} / {{UNPIVOT}} 
(post-{{{}FROM{}}} relation extension), composes

  with {{GROUP BY}} / {{WHERE}} / {{JOIN}} downstream, and produces an ordinary 
relation.

  
h2. Syntax

 

 
{code:sql}
  SELECT * FROM relation BIN BY (

    RANGE rangeStartCol TO rangeEndCol

    BIN WIDTH widthExpr

    [ALIGN TO originExpr]

    DISTRIBUTE UNIFORM (distributeCol [, distributeCol ...])

    [BIN_START AS aliasName]

    [BIN_END AS aliasName]

    [BIN_DISTRIBUTE_RATIO AS aliasName]

  ) [AS resultAlias];

  {code}
 

  {{BIN BY}} also works as a SQL pipe-operator stage on par with {{PIVOT}} / 
{{{}UNPIVOT{}}}:

 

 
{code:sql}
  FROM relation |> BIN BY (

    RANGE rangeStartCol TO rangeEndCol

    BIN WIDTH widthExpr

    DISTRIBUTE UNIFORM (distributeCol)

  );

  {code}
  
h2. Clauses

 

  * {{{}RANGE rangeStartCol TO rangeEndCol{}}}: two TIMESTAMP or TIMESTAMP_NTZ 
columns from the input relation defining each

  row's measurement window {{{}[range_start, range_end){}}}. Both columns must 
have the same type. Column references accept

  qualified names (e.g., {{{}m.time_start{}}}). The future point-sample arm 
will land as an alternative {{VALUE <expr>}}

  branch parallel to {{{}RANGE{}}}.

  * {{{}BIN WIDTH widthExpr{}}}: a day-time interval expression defining the 
bin size. Must be positive and foldable.

  Year-month intervals are rejected; variable-length months would make 
proportional splitting ambiguous.

  * {{{}ALIGN TO originExpr{}}}: optional alignment anchor for the bin grid. 
Must be the same type as the range columns and

  must be foldable. When omitted, the default origin is the Unix epoch 
({{{}1970-01-01 00:00:00{}}}), session-zone-anchored

  for TIMESTAMP (LTZ) and wall-clock for TIMESTAMP_NTZ.

  * {{{}DISTRIBUTE UNIFORM (col ...){}}}: one or more numeric or DAY-TIME 
INTERVAL columns whose values are proportionally

  redistributed across sub-rows. Required; at least one column must be listed. 
Column references accept qualified names.

  * {{BIN_START AS}} / {{BIN_END AS}} / {{{}BIN_DISTRIBUTE_RATIO AS{}}}: 
optional rename clauses for the three appended output

   columns. When omitted, the default names ({{{}bin_start{}}}, 
{{{}bin_end{}}}, {{{}bin_distribute_ratio{}}}) are used.

  * {{{}AS resultAlias{}}}: optional table alias on the whole {{BIN BY}} 
relation (matches the {{PIVOT}} / {{UNPIVOT}}

  convention; the resolver wraps the operator in a {{{}SubqueryAlias{}}}).

 
h2. Output schema

  

  Input columns, plus three appended columns (default names shown; renameable 
via the optional {{AS}} clauses):

 

  * {{bin_start}} ({{{}TIMESTAMP{}}} or {{{}TIMESTAMP_NTZ{}}}, matches the 
input range column type)

  * {{bin_end}} (same type as {{{}bin_start{}}})

  * {{bin_distribute_ratio}} ({{{}DOUBLE{}}}): fraction of the original 
{{[range_start, range_end)}} duration that fell into

  this bin. {{1.0}} for unsplit rows.

 
h2. Behavior

 

  * For each input row, the operator emits one output row per bin that overlaps 
{{{}[range_start, range_end){}}}.

  * For each sub-row: the appended {{bin_start}} / {{bin_end}} hold the bin's 
boundaries; the input range columns are

  truncated to the sub-range (i.e., clipped to the bin's intersection with 
{{{}[range_start, range_end){}}}); DISTRIBUTE

  UNIFORM columns are scaled by the fraction of the original range that falls 
inside the bin; all other input columns are

  replicated unchanged.

  * Scaling math: integer, {{{}DECIMAL{}}}, and DAY-TIME INTERVAL columns use 
exact rational scaling ({{value * numer /

  denom}}, where {{numer = bin_overlap_micros}} and {{{}denom = 
range_total_micros{}}}), with round-to-nearest at the end.

  {{FLOAT}} / {{DOUBLE}} columns use ordinary IEEE-754 multiplication by the 
ratio as a {{{}DOUBLE{}}}. This keeps engines

  (JVM and Photon) bit-for-bit equivalent on the exact types.

  * {{bin_distribute_ratio}} reports the per-sub-row fraction.

  * TIMESTAMP (LTZ) bin boundaries align to wall-clock times in the session 
zone. Multi-day bins use civil-time arithmetic

   so boundaries land at consistent local times across DST transitions.

  * TIMESTAMP_NTZ bin boundaries use UTC arithmetic regardless of session zone.

  * Sub-day bins stay on UTC microsecond arithmetic regardless of zone.

 
h2.   Per-row edge cases

 

  * {{range_start == range_end}} (zero-length input): emits one output row with 
{{{}bin_distribute_ratio = 1.0{}}},

  {{bin_start}} / {{bin_end}} set to the bin containing {{{}range_start{}}}. 
DISTRIBUTE columns pass through unchanged.

  * {{range_start > range_end}} (inverted range): raises 
{{BIN_BY_INVALID_RANGE}} at runtime.

  * {{NULL}} in {{range_start}} or {{{}range_end{}}}: emits one output row with 
all three appended columns {{{}NULL{}}}.

  DISTRIBUTE columns pass through unchanged.

 
h2. Examples

 

 
{code:sql}
  -- 5-minute alignment, single value column, default output names, default 
origin

  SELECT * FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 5 MINUTES

    DISTRIBUTE UNIFORM (value)

  );

 

  -- Custom origin, hourly bins, renamed boundary columns

  SELECT * FROM metrics BIN BY (

    RANGE ts_begin TO ts_end

    BIN WIDTH INTERVAL 1 HOUR

    ALIGN TO TIMESTAMP '2024-01-01 00:30:00'

    DISTRIBUTE UNIFORM (bytes_sent, requests)

    BIN_START AS window_start

    BIN_END AS window_end

  );

 

  -- Composed with GROUP BY for downstream aggregation

  SELECT bin_start, SUM(value) AS total

  FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  )

  GROUP BY bin_start

  ORDER BY bin_start;

 

  -- Trailing alias on the relation result

  SELECT b.bin_start, SUM(b.value) AS total

  FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  ) AS b

  GROUP BY b.bin_start;

 

  -- SQL pipe-operator form

  FROM metrics |> BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  );

  {code}
  
h2. Errors

 

  * {{{}BIN_BY_COLUMN_NOT_FOUND{}}}: a referenced column is not in the input 
relation.

  * {{{}BIN_BY_RANGE_TYPE_MISMATCH{}}}: a range column is not TIMESTAMP or 
TIMESTAMP_NTZ, or the two range columns have

  different types.

  * {{{}BIN_BY_DUPLICATE_DISTRIBUTE_COLUMN{}}}: the same column appears more 
than once in {{{}DISTRIBUTE UNIFORM{}}}.

  * {{{}BIN_BY_INVALID_BIN_WIDTH{}}}: the {{BIN WIDTH}} expression is not a 
positive day-time interval (zero, negative, or

  year-month).

  * {{{}BIN_BY_INVALID_RANGE{}}}: a runtime row has {{{}range_start > 
range_end{}}}.

  * {{{}BIN_BY_MISSING_DISTRIBUTE{}}}: {{DISTRIBUTE UNIFORM}} clause is missing 
or empty.

  * {{{}BIN_BY_ALIGN_TO_TYPE_MISMATCH{}}}: the {{ALIGN TO}} expression type 
does not match the range column type.

  * {{{}BIN_BY_DISTRIBUTE_TYPE_MISMATCH{}}}: a {{DISTRIBUTE UNIFORM}} column is 
not a numeric type or DAY-TIME INTERVAL.

  * {{{}DATATYPE_MISMATCH.NON_FOLDABLE_INPUT{}}}: {{BIN WIDTH}} or {{ALIGN TO}} 
is not foldable.

 
h2. Implementation

 

  Delivered as three sequential PRs against {{{}master{}}}, all referencing 
this ticket:

 

  # Parser, analyzer, and error classes (parses + resolves; execution stubbed)

  # Physical execution ({{{}BinByExec{}}} with day-time-interval bucketing, LTZ 
civil-time arithmetic, per-row edge case

  handling)

  # DataFrame and PySpark API (classic + Connect)

  was:
h2. Summary

 

  Add a new SQL relation operator {{BIN BY (...)}} that aligns range-typed rows 
to fixed-width bin boundaries by splitting

   any row whose {{[range_start, range_end)}} crosses a boundary and 
proportionally redistributing user-selected numeric

  or DT-interval values across the resulting sub-ranges. Target use case: 
telemetry and observability data where each row

  carries its own measurement window (OpenTelemetry, Prometheus exports).

  

  The operator occupies the same grammar position as {{PIVOT}} / {{UNPIVOT}} 
(post-{{{}FROM{}}} relation extension), composes

  with {{GROUP BY}} / {{WHERE}} / {{JOIN}} downstream, and produces an ordinary 
relation.

  

  h2. Syntax

 

 
{code:sql}
  SELECT * FROM relation BIN BY (

    RANGE rangeStartCol TO rangeEndCol

    BIN WIDTH widthExpr

    [ALIGN TO originExpr]

    DISTRIBUTE UNIFORM (distributeCol [, distributeCol ...])

    [BIN_START AS aliasName]

    [BIN_END AS aliasName]

    [BIN_DISTRIBUTE_RATIO AS aliasName]

  ) [AS resultAlias];

  {code}
 

  {{BIN BY}} also works as a SQL pipe-operator stage on par with {{PIVOT}} / 
{{{}UNPIVOT{}}}:

 

 
{code:sql}
  FROM relation |> BIN BY (

    RANGE rangeStartCol TO rangeEndCol

    BIN WIDTH widthExpr

    DISTRIBUTE UNIFORM (distributeCol)

  );

  {code}
  

  h2. Clauses

 

  * {{{}RANGE rangeStartCol TO rangeEndCol{}}}: two TIMESTAMP or TIMESTAMP_NTZ 
columns from the input relation defining each

  row's measurement window {{{}[range_start, range_end){}}}. Both columns must 
have the same type. Column references accept

  qualified names (e.g., {{{}m.time_start{}}}). The future point-sample arm 
will land as an alternative {{VALUE <expr>}}

  branch parallel to {{{}RANGE{}}}.

  * {{{}BIN WIDTH widthExpr{}}}: a day-time interval expression defining the 
bin size. Must be positive and foldable.

  Year-month intervals are rejected; variable-length months would make 
proportional splitting ambiguous.

  * {{{}ALIGN TO originExpr{}}}: optional alignment anchor for the bin grid. 
Must be the same type as the range columns and

  must be foldable. When omitted, the default origin is the Unix epoch 
({{{}1970-01-01 00:00:00{}}}), session-zone-anchored

  for TIMESTAMP (LTZ) and wall-clock for TIMESTAMP_NTZ.

  * {{{}DISTRIBUTE UNIFORM (col ...){}}}: one or more numeric or DAY-TIME 
INTERVAL columns whose values are proportionally

  redistributed across sub-rows. Required; at least one column must be listed. 
Column references accept qualified names.

  * {{BIN_START AS}} / {{BIN_END AS}} / {{{}BIN_DISTRIBUTE_RATIO AS{}}}: 
optional rename clauses for the three appended output

   columns. When omitted, the default names ({{{}bin_start{}}}, 
{{{}bin_end{}}}, {{{}bin_distribute_ratio{}}}) are used.

  * {{{}AS resultAlias{}}}: optional table alias on the whole {{BIN BY}} 
relation (matches the {{PIVOT}} / {{UNPIVOT}}

  convention; the resolver wraps the operator in a {{{}SubqueryAlias{}}}).

 

  h2. Output schema

  

  Input columns, plus three appended columns (default names shown; renameable 
via the optional {{AS}} clauses):

 

  * {{bin_start}} ({{{}TIMESTAMP{}}} or {{{}TIMESTAMP_NTZ{}}}, matches the 
input range column type)

  * {{bin_end}} (same type as {{{}bin_start{}}})

  * {{bin_distribute_ratio}} ({{{}DOUBLE{}}}): fraction of the original 
{{[range_start, range_end)}} duration that fell into

  this bin. {{1.0}} for unsplit rows.

 

  h2. Behavior

 

  * For each input row, the operator emits one output row per bin that overlaps 
{{{}[range_start, range_end){}}}.

  * For each sub-row: the appended {{bin_start}} / {{bin_end}} hold the bin's 
boundaries; the input range columns are

  truncated to the sub-range (i.e., clipped to the bin's intersection with 
{{{}[range_start, range_end){}}}); DISTRIBUTE

  UNIFORM columns are scaled by the fraction of the original range that falls 
inside the bin; all other input columns are

  replicated unchanged.

  * Scaling math: integer, {{{}DECIMAL{}}}, and DAY-TIME INTERVAL columns use 
exact rational scaling ({{value * numer /

  denom}}, where {{numer = bin_overlap_micros}} and {{{}denom = 
range_total_micros{}}}), with round-to-nearest at the end.

  {{FLOAT}} / {{DOUBLE}} columns use ordinary IEEE-754 multiplication by the 
ratio as a {{{}DOUBLE{}}}. This keeps engines

  (JVM and Photon) bit-for-bit equivalent on the exact types.

  * {{bin_distribute_ratio}} reports the per-sub-row fraction.

  * TIMESTAMP (LTZ) bin boundaries align to wall-clock times in the session 
zone. Multi-day bins use civil-time arithmetic

   so boundaries land at consistent local times across DST transitions.

  * TIMESTAMP_NTZ bin boundaries use UTC arithmetic regardless of session zone.

  * Sub-day bins stay on UTC microsecond arithmetic regardless of zone.

 

  h2. Per-row edge cases

 

  * {{range_start == range_end}} (zero-length input): emits one output row with 
{{{}bin_distribute_ratio = 1.0{}}},

  {{bin_start}} / {{bin_end}} set to the bin containing {{{}range_start{}}}. 
DISTRIBUTE columns pass through unchanged.

  * {{range_start > range_end}} (inverted range): raises 
{{BIN_BY_INVALID_RANGE}} at runtime.

  * {{NULL}} in {{range_start}} or {{{}range_end{}}}: emits one output row with 
all three appended columns {{{}NULL{}}}.

  DISTRIBUTE columns pass through unchanged.

 

  h2. Examples

 

 
{code:sql}
  -- 5-minute alignment, single value column, default output names, default 
origin

  SELECT * FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 5 MINUTES

    DISTRIBUTE UNIFORM (value)

  );

 

  -- Custom origin, hourly bins, renamed boundary columns

  SELECT * FROM metrics BIN BY (

    RANGE ts_begin TO ts_end

    BIN WIDTH INTERVAL 1 HOUR

    ALIGN TO TIMESTAMP '2024-01-01 00:30:00'

    DISTRIBUTE UNIFORM (bytes_sent, requests)

    BIN_START AS window_start

    BIN_END AS window_end

  );

 

  -- Composed with GROUP BY for downstream aggregation

  SELECT bin_start, SUM(value) AS total

  FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  )

  GROUP BY bin_start

  ORDER BY bin_start;

 

  -- Trailing alias on the relation result

  SELECT b.bin_start, SUM(b.value) AS total

  FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  ) AS b

  GROUP BY b.bin_start;

 

  -- SQL pipe-operator form

  FROM metrics |> BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  );

  {code}
  

  h2. Errors

 

  * {{{}BIN_BY_COLUMN_NOT_FOUND{}}}: a referenced column is not in the input 
relation.

  * {{{}BIN_BY_RANGE_TYPE_MISMATCH{}}}: a range column is not TIMESTAMP or 
TIMESTAMP_NTZ, or the two range columns have

  different types.

  * {{{}BIN_BY_DUPLICATE_DISTRIBUTE_COLUMN{}}}: the same column appears more 
than once in {{{}DISTRIBUTE UNIFORM{}}}.

  * {{{}BIN_BY_INVALID_BIN_WIDTH{}}}: the {{BIN WIDTH}} expression is not a 
positive day-time interval (zero, negative, or

  year-month).

  * {{{}BIN_BY_INVALID_RANGE{}}}: a runtime row has {{{}range_start > 
range_end{}}}.

  * {{{}BIN_BY_MISSING_DISTRIBUTE{}}}: {{DISTRIBUTE UNIFORM}} clause is missing 
or empty.

  * {{{}BIN_BY_ALIGN_TO_TYPE_MISMATCH{}}}: the {{ALIGN TO}} expression type 
does not match the range column type.

  * {{{}BIN_BY_DISTRIBUTE_TYPE_MISMATCH{}}}: a {{DISTRIBUTE UNIFORM}} column is 
not a numeric type or DAY-TIME INTERVAL.

  * {{{}DATATYPE_MISMATCH.NON_FOLDABLE_INPUT{}}}: {{BIN WIDTH}} or {{ALIGN TO}} 
is not foldable.

 

  h2. Implementation

 

  Delivered as three sequential PRs against {{{}master{}}}, all referencing 
this ticket:

 

  # Parser, analyzer, and error classes (parses + resolves; execution stubbed)

  # Physical execution ({{{}BinByExec{}}} with day-time-interval bucketing, LTZ 
civil-time arithmetic, per-row edge case

  handling)

  # DataFrame and PySpark API (classic + Connect)


> Add BIN BY relation operator for aligning range-typed rows to fixed bin 
> boundaries
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-57133
>                 URL: https://issues.apache.org/jira/browse/SPARK-57133
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 5.0.0
>            Reporter: Nikolina Vraneš
>            Priority: Major
>
> h2. Summary
>  
>   Add a new SQL relation operator {{BIN BY (...)}} that aligns range-typed 
> rows to fixed-width bin boundaries by splitting
>    any row whose {{[range_start, range_end)}} crosses a boundary and 
> proportionally redistributing user-selected numeric
>   or DT-interval values across the resulting sub-ranges. Target use case: 
> telemetry and observability data where each row
>   carries its own measurement window (OpenTelemetry, Prometheus exports).
>   
>   The operator occupies the same grammar position as {{PIVOT}} / {{UNPIVOT}} 
> (post-{{{}FROM{}}} relation extension), composes
>   with {{GROUP BY}} / {{WHERE}} / {{JOIN}} downstream, and produces an 
> ordinary relation.
>   
> h2. Syntax
>  
>  
> {code:sql}
>   SELECT * FROM relation BIN BY (
>     RANGE rangeStartCol TO rangeEndCol
>     BIN WIDTH widthExpr
>     [ALIGN TO originExpr]
>     DISTRIBUTE UNIFORM (distributeCol [, distributeCol ...])
>     [BIN_START AS aliasName]
>     [BIN_END AS aliasName]
>     [BIN_DISTRIBUTE_RATIO AS aliasName]
>   ) [AS resultAlias];
>   {code}
>  
>   {{BIN BY}} also works as a SQL pipe-operator stage on par with {{PIVOT}} / 
> {{{}UNPIVOT{}}}:
>  
>  
> {code:sql}
>   FROM relation |> BIN BY (
>     RANGE rangeStartCol TO rangeEndCol
>     BIN WIDTH widthExpr
>     DISTRIBUTE UNIFORM (distributeCol)
>   );
>   {code}
>   
> h2. Clauses
>  
>   * {{{}RANGE rangeStartCol TO rangeEndCol{}}}: two TIMESTAMP or 
> TIMESTAMP_NTZ columns from the input relation defining each
>   row's measurement window {{{}[range_start, range_end){}}}. Both columns 
> must have the same type. Column references accept
>   qualified names (e.g., {{{}m.time_start{}}}). The future point-sample arm 
> will land as an alternative {{VALUE <expr>}}
>   branch parallel to {{{}RANGE{}}}.
>   * {{{}BIN WIDTH widthExpr{}}}: a day-time interval expression defining the 
> bin size. Must be positive and foldable.
>   Year-month intervals are rejected; variable-length months would make 
> proportional splitting ambiguous.
>   * {{{}ALIGN TO originExpr{}}}: optional alignment anchor for the bin grid. 
> Must be the same type as the range columns and
>   must be foldable. When omitted, the default origin is the Unix epoch 
> ({{{}1970-01-01 00:00:00{}}}), session-zone-anchored
>   for TIMESTAMP (LTZ) and wall-clock for TIMESTAMP_NTZ.
>   * {{{}DISTRIBUTE UNIFORM (col ...){}}}: one or more numeric or DAY-TIME 
> INTERVAL columns whose values are proportionally
>   redistributed across sub-rows. Required; at least one column must be 
> listed. Column references accept qualified names.
>   * {{BIN_START AS}} / {{BIN_END AS}} / {{{}BIN_DISTRIBUTE_RATIO AS{}}}: 
> optional rename clauses for the three appended output
>    columns. When omitted, the default names ({{{}bin_start{}}}, 
> {{{}bin_end{}}}, {{{}bin_distribute_ratio{}}}) are used.
>   * {{{}AS resultAlias{}}}: optional table alias on the whole {{BIN BY}} 
> relation (matches the {{PIVOT}} / {{UNPIVOT}}
>   convention; the resolver wraps the operator in a {{{}SubqueryAlias{}}}).
>  
> h2. Output schema
>   
>   Input columns, plus three appended columns (default names shown; renameable 
> via the optional {{AS}} clauses):
>  
>   * {{bin_start}} ({{{}TIMESTAMP{}}} or {{{}TIMESTAMP_NTZ{}}}, matches the 
> input range column type)
>   * {{bin_end}} (same type as {{{}bin_start{}}})
>   * {{bin_distribute_ratio}} ({{{}DOUBLE{}}}): fraction of the original 
> {{[range_start, range_end)}} duration that fell into
>   this bin. {{1.0}} for unsplit rows.
>  
> h2. Behavior
>  
>   * For each input row, the operator emits one output row per bin that 
> overlaps {{{}[range_start, range_end){}}}.
>   * For each sub-row: the appended {{bin_start}} / {{bin_end}} hold the bin's 
> boundaries; the input range columns are
>   truncated to the sub-range (i.e., clipped to the bin's intersection with 
> {{{}[range_start, range_end){}}}); DISTRIBUTE
>   UNIFORM columns are scaled by the fraction of the original range that falls 
> inside the bin; all other input columns are
>   replicated unchanged.
>   * Scaling math: integer, {{{}DECIMAL{}}}, and DAY-TIME INTERVAL columns use 
> exact rational scaling ({{value * numer /
>   denom}}, where {{numer = bin_overlap_micros}} and {{{}denom = 
> range_total_micros{}}}), with round-to-nearest at the end.
>   {{FLOAT}} / {{DOUBLE}} columns use ordinary IEEE-754 multiplication by the 
> ratio as a {{{}DOUBLE{}}}. This keeps engines
>   (JVM and Photon) bit-for-bit equivalent on the exact types.
>   * {{bin_distribute_ratio}} reports the per-sub-row fraction.
>   * TIMESTAMP (LTZ) bin boundaries align to wall-clock times in the session 
> zone. Multi-day bins use civil-time arithmetic
>    so boundaries land at consistent local times across DST transitions.
>   * TIMESTAMP_NTZ bin boundaries use UTC arithmetic regardless of session 
> zone.
>   * Sub-day bins stay on UTC microsecond arithmetic regardless of zone.
>  
> h2.   Per-row edge cases
>  
>   * {{range_start == range_end}} (zero-length input): emits one output row 
> with {{{}bin_distribute_ratio = 1.0{}}},
>   {{bin_start}} / {{bin_end}} set to the bin containing {{{}range_start{}}}. 
> DISTRIBUTE columns pass through unchanged.
>   * {{range_start > range_end}} (inverted range): raises 
> {{BIN_BY_INVALID_RANGE}} at runtime.
>   * {{NULL}} in {{range_start}} or {{{}range_end{}}}: emits one output row 
> with all three appended columns {{{}NULL{}}}.
>   DISTRIBUTE columns pass through unchanged.
>  
> h2. Examples
>  
>  
> {code:sql}
>   -- 5-minute alignment, single value column, default output names, default 
> origin
>   SELECT * FROM metrics BIN BY (
>     RANGE time_start TO time_end
>     BIN WIDTH INTERVAL 5 MINUTES
>     DISTRIBUTE UNIFORM (value)
>   );
>  
>   -- Custom origin, hourly bins, renamed boundary columns
>   SELECT * FROM metrics BIN BY (
>     RANGE ts_begin TO ts_end
>     BIN WIDTH INTERVAL 1 HOUR
>     ALIGN TO TIMESTAMP '2024-01-01 00:30:00'
>     DISTRIBUTE UNIFORM (bytes_sent, requests)
>     BIN_START AS window_start
>     BIN_END AS window_end
>   );
>  
>   -- Composed with GROUP BY for downstream aggregation
>   SELECT bin_start, SUM(value) AS total
>   FROM metrics BIN BY (
>     RANGE time_start TO time_end
>     BIN WIDTH INTERVAL 1 MINUTE
>     DISTRIBUTE UNIFORM (value)
>   )
>   GROUP BY bin_start
>   ORDER BY bin_start;
>  
>   -- Trailing alias on the relation result
>   SELECT b.bin_start, SUM(b.value) AS total
>   FROM metrics BIN BY (
>     RANGE time_start TO time_end
>     BIN WIDTH INTERVAL 1 MINUTE
>     DISTRIBUTE UNIFORM (value)
>   ) AS b
>   GROUP BY b.bin_start;
>  
>   -- SQL pipe-operator form
>   FROM metrics |> BIN BY (
>     RANGE time_start TO time_end
>     BIN WIDTH INTERVAL 1 MINUTE
>     DISTRIBUTE UNIFORM (value)
>   );
>   {code}
>   
> h2. Errors
>  
>   * {{{}BIN_BY_COLUMN_NOT_FOUND{}}}: a referenced column is not in the input 
> relation.
>   * {{{}BIN_BY_RANGE_TYPE_MISMATCH{}}}: a range column is not TIMESTAMP or 
> TIMESTAMP_NTZ, or the two range columns have
>   different types.
>   * {{{}BIN_BY_DUPLICATE_DISTRIBUTE_COLUMN{}}}: the same column appears more 
> than once in {{{}DISTRIBUTE UNIFORM{}}}.
>   * {{{}BIN_BY_INVALID_BIN_WIDTH{}}}: the {{BIN WIDTH}} expression is not a 
> positive day-time interval (zero, negative, or
>   year-month).
>   * {{{}BIN_BY_INVALID_RANGE{}}}: a runtime row has {{{}range_start > 
> range_end{}}}.
>   * {{{}BIN_BY_MISSING_DISTRIBUTE{}}}: {{DISTRIBUTE UNIFORM}} clause is 
> missing or empty.
>   * {{{}BIN_BY_ALIGN_TO_TYPE_MISMATCH{}}}: the {{ALIGN TO}} expression type 
> does not match the range column type.
>   * {{{}BIN_BY_DISTRIBUTE_TYPE_MISMATCH{}}}: a {{DISTRIBUTE UNIFORM}} column 
> is not a numeric type or DAY-TIME INTERVAL.
>   * {{{}DATATYPE_MISMATCH.NON_FOLDABLE_INPUT{}}}: {{BIN WIDTH}} or {{ALIGN 
> TO}} is not foldable.
>  
> h2. Implementation
>  
>   Delivered as three sequential PRs against {{{}master{}}}, all referencing 
> this ticket:
>  
>   # Parser, analyzer, and error classes (parses + resolves; execution stubbed)
>   # Physical execution ({{{}BinByExec{}}} with day-time-interval bucketing, 
> LTZ civil-time arithmetic, per-row edge case
>   handling)
>   # DataFrame and PySpark API (classic + Connect)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to