[ 
https://issues.apache.org/jira/browse/SPARK-57133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikolina Vraneš updated SPARK-57133:
------------------------------------
    Description: 
h2. Summary

 

  Add a new SQL relation operator \{{BIN BY (...)}} that aligns range-typed 
rows to fixed-width bin boundaries by splitting

   any row whose \{{[range_start, range_end)}} crosses a boundary and 
proportionally redistributing user-selected numeric

  or DT-interval values across the resulting sub-ranges. Target use case: 
telemetry and observability data where each row

  carries its own measurement window (OpenTelemetry, Prometheus exports).

  

  The operator occupies the same grammar position as \{{PIVOT}} / \{{UNPIVOT}} 
(post-\{{FROM}} relation extension), composes

  with \{{GROUP BY}} / \{{WHERE}} / \{{JOIN}} downstream, and produces an 
ordinary relation.

 

  This initial PR series ships one cell of a broader matrix: aligned 
(fixed-grid) bins, range samples, uniform

  (proportional) distribution, TIMESTAMP / TIMESTAMP_NTZ + DT INTERVAL types. 
Other cells (\{{EQUAL BINS}}, point samples,

  {\{MAX_OVERLAP}}, non-time domains) are explicit non-goals and would arrive 
as additive grammar extensions inside the

  same \{{BIN BY (...)}} shell.

  

  h2. Syntax

 

  {code:sql}

  SELECT * FROM relation BIN BY (

    RANGE rangeStartCol TO rangeEndCol

    BIN WIDTH widthExpr

    [ALIGN TO originExpr]

    DISTRIBUTE UNIFORM (distributeCol [, distributeCol ...])

    [BIN_START AS aliasName]

    [BIN_END AS aliasName]

    [BIN_DISTRIBUTE_RATIO AS aliasName]

  ) [AS resultAlias];

  {code}

 

  {\{BIN BY}} also works as a SQL pipe-operator stage on par with \{{PIVOT}} / 
\{{UNPIVOT}}:

 

  {code:sql}

  FROM relation |> BIN BY (

    RANGE rangeStartCol TO rangeEndCol

    BIN WIDTH widthExpr

    DISTRIBUTE UNIFORM (distributeCol)

  );

  {code}

  

  h2. Clauses

 

  * \{{RANGE rangeStartCol TO rangeEndCol}}: two TIMESTAMP or TIMESTAMP_NTZ 
columns from the input relation defining each

  row's measurement window \{{[range_start, range_end)}}. Both columns must 
have the same type. Column references accept

  qualified names (e.g., \{{m.time_start}}). The future point-sample arm will 
land as an alternative \{{VALUE <expr>}}

  branch parallel to \{{RANGE}}.

  * \{{BIN WIDTH widthExpr}}: a day-time interval expression defining the bin 
size. Must be positive and foldable.

  Year-month intervals are rejected; variable-length months would make 
proportional splitting ambiguous.

  * \{{ALIGN TO originExpr}}: optional alignment anchor for the bin grid. Must 
be the same type as the range columns and

  must be foldable. When omitted, the default origin is the Unix epoch 
(\{{1970-01-01 00:00:00}}), session-zone-anchored

  for TIMESTAMP (LTZ) and wall-clock for TIMESTAMP_NTZ.

  * \{{DISTRIBUTE UNIFORM (col ...)}}: one or more numeric or DAY-TIME INTERVAL 
columns whose values are proportionally

  redistributed across sub-rows. Required; at least one column must be listed. 
Column references accept qualified names.

  * \{{BIN_START AS}} / \{{BIN_END AS}} / \{{BIN_DISTRIBUTE_RATIO AS}}: 
optional rename clauses for the three appended output

   columns. When omitted, the default names (\{{bin_start}}, \{{bin_end}}, 
\{{bin_distribute_ratio}}) are used.

  * \{{AS resultAlias}}: optional table alias on the whole \{{BIN BY}} relation 
(matches the \{{PIVOT}} / \{{UNPIVOT}}

  convention; the resolver wraps the operator in a \{{SubqueryAlias}}).

 

  h2. Output schema

  

  Input columns, plus three appended columns (default names shown; renameable 
via the optional \{{AS}} clauses):

 

  * \{{bin_start}} (\{{TIMESTAMP}} or \{{TIMESTAMP_NTZ}}, matches the input 
range column type)

  * \{{bin_end}} (same type as \{{bin_start}})

  * \{{bin_distribute_ratio}} (\{{DOUBLE}}): fraction of the original 
\{{[range_start, range_end)}} duration that fell into

  this bin. \{{1.0}} for unsplit rows.

 

  h2. Behavior

 

  * For each input row, the operator emits one output row per bin that overlaps 
\{{[range_start, range_end)}}.

  * For each sub-row: the appended \{{bin_start}} / \{{bin_end}} hold the bin's 
boundaries; the input range columns are

  truncated to the sub-range (i.e., clipped to the bin's intersection with 
\{{[range_start, range_end)}}); DISTRIBUTE

  UNIFORM columns are scaled by the fraction of the original range that falls 
inside the bin; all other input columns are

  replicated unchanged.

  * Scaling math: integer, \{{DECIMAL}}, and DAY-TIME INTERVAL columns use 
exact rational scaling ({{value * numer /

  denom}}, where \{{numer = bin_overlap_micros}} and \{{denom = 
range_total_micros}}), with round-to-nearest at the end.

  {\{FLOAT}} / \{{DOUBLE}} columns use ordinary IEEE-754 multiplication by the 
ratio as a \{{DOUBLE}}. This keeps engines

  (JVM and Photon) bit-for-bit equivalent on the exact types.

  * \{{bin_distribute_ratio}} reports the per-sub-row fraction.

  * TIMESTAMP (LTZ) bin boundaries align to wall-clock times in the session 
zone. Multi-day bins use civil-time arithmetic

   so boundaries land at consistent local times across DST transitions.

  * TIMESTAMP_NTZ bin boundaries use UTC arithmetic regardless of session zone.

  * Sub-day bins stay on UTC microsecond arithmetic regardless of zone.

 

  h2. Per-row edge cases

 

  * \{{range_start == range_end}} (zero-length input): emits one output row 
with \{{bin_distribute_ratio = 1.0}},

  {\{bin_start}} / \{{bin_end}} set to the bin containing \{{range_start}}. 
DISTRIBUTE columns pass through unchanged.

  * \{{range_start > range_end}} (inverted range): raises 
\{{BIN_BY_INVALID_RANGE}} at runtime.

  * \{{NULL}} in \{{range_start}} or \{{range_end}}: emits one output row with 
all three appended columns \{{NULL}}.

  DISTRIBUTE columns pass through unchanged.

 

  h2. Examples

 

  {code:sql}

  -- 5-minute alignment, single value column, default output names, default 
origin

  SELECT * FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 5 MINUTES

    DISTRIBUTE UNIFORM (value)

  );

 

  -- Custom origin, hourly bins, renamed boundary columns

  SELECT * FROM metrics BIN BY (

    RANGE ts_begin TO ts_end

    BIN WIDTH INTERVAL 1 HOUR

    ALIGN TO TIMESTAMP '2024-01-01 00:30:00'

    DISTRIBUTE UNIFORM (bytes_sent, requests)

    BIN_START AS window_start

    BIN_END AS window_end

  );

 

  -- Composed with GROUP BY for downstream aggregation

  SELECT bin_start, SUM(value) AS total

  FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  )

  GROUP BY bin_start

  ORDER BY bin_start;

 

  -- Trailing alias on the relation result

  SELECT b.bin_start, SUM(b.value) AS total

  FROM metrics BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  ) AS b

  GROUP BY b.bin_start;

 

  -- SQL pipe-operator form

  FROM metrics |> BIN BY (

    RANGE time_start TO time_end

    BIN WIDTH INTERVAL 1 MINUTE

    DISTRIBUTE UNIFORM (value)

  );

  {code}

  

  h2. Errors

 

  * \{{BIN_BY_COLUMN_NOT_FOUND}}: a referenced column is not in the input 
relation.

  * \{{BIN_BY_RANGE_TYPE_MISMATCH}}: a range column is not TIMESTAMP or 
TIMESTAMP_NTZ, or the two range columns have

  different types.

  * \{{BIN_BY_DUPLICATE_DISTRIBUTE_COLUMN}}: the same column appears more than 
once in \{{DISTRIBUTE UNIFORM}}.

  * \{{BIN_BY_INVALID_BIN_WIDTH}}: the \{{BIN WIDTH}} expression is not a 
positive day-time interval (zero, negative, or

  year-month).

  * \{{BIN_BY_INVALID_RANGE}}: a runtime row has \{{range_start > range_end}}.

  * \{{BIN_BY_MISSING_DISTRIBUTE}}: \{{DISTRIBUTE UNIFORM}} clause is missing 
or empty.

  * \{{BIN_BY_ALIGN_TO_TYPE_MISMATCH}}: the \{{ALIGN TO}} expression type does 
not match the range column type.

  * \{{BIN_BY_DISTRIBUTE_TYPE_MISMATCH}}: a \{{DISTRIBUTE UNIFORM}} column is 
not a numeric type or DAY-TIME INTERVAL.

  * \{{DATATYPE_MISMATCH.NON_FOLDABLE_INPUT}}: \{{BIN WIDTH}} or \{{ALIGN TO}} 
is not foldable.

 

  h2. Implementation

 

  Delivered as three sequential PRs against \{{master}}, all referencing this 
ticket:

 

  # Parser, analyzer, and error classes (parses + resolves; execution stubbed)

  # Physical execution (\{{BinByExec}} with day-time-interval bucketing, LTZ 
civil-time arithmetic, per-row edge case

  handling)

  # DataFrame and PySpark API (classic + Connect)

> Add BIN BY relation operator for aligning range-typed rows to fixed bin 
> boundaries
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-57133
>                 URL: https://issues.apache.org/jira/browse/SPARK-57133
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 5.0.0
>            Reporter: Nikolina Vraneš
>            Priority: Major
>
> h2. Summary
>  
>   Add a new SQL relation operator \{{BIN BY (...)}} that aligns range-typed 
> rows to fixed-width bin boundaries by splitting
>    any row whose \{{[range_start, range_end)}} crosses a boundary and 
> proportionally redistributing user-selected numeric
>   or DT-interval values across the resulting sub-ranges. Target use case: 
> telemetry and observability data where each row
>   carries its own measurement window (OpenTelemetry, Prometheus exports).
>   
>   The operator occupies the same grammar position as \{{PIVOT}} / 
> \{{UNPIVOT}} (post-\{{FROM}} relation extension), composes
>   with \{{GROUP BY}} / \{{WHERE}} / \{{JOIN}} downstream, and produces an 
> ordinary relation.
>  
>   This initial PR series ships one cell of a broader matrix: aligned 
> (fixed-grid) bins, range samples, uniform
>   (proportional) distribution, TIMESTAMP / TIMESTAMP_NTZ + DT INTERVAL types. 
> Other cells (\{{EQUAL BINS}}, point samples,
>   {\{MAX_OVERLAP}}, non-time domains) are explicit non-goals and would arrive 
> as additive grammar extensions inside the
>   same \{{BIN BY (...)}} shell.
>   
>   h2. Syntax
>  
>   {code:sql}
>   SELECT * FROM relation BIN BY (
>     RANGE rangeStartCol TO rangeEndCol
>     BIN WIDTH widthExpr
>     [ALIGN TO originExpr]
>     DISTRIBUTE UNIFORM (distributeCol [, distributeCol ...])
>     [BIN_START AS aliasName]
>     [BIN_END AS aliasName]
>     [BIN_DISTRIBUTE_RATIO AS aliasName]
>   ) [AS resultAlias];
>   {code}
>  
>   {\{BIN BY}} also works as a SQL pipe-operator stage on par with \{{PIVOT}} 
> / \{{UNPIVOT}}:
>  
>   {code:sql}
>   FROM relation |> BIN BY (
>     RANGE rangeStartCol TO rangeEndCol
>     BIN WIDTH widthExpr
>     DISTRIBUTE UNIFORM (distributeCol)
>   );
>   {code}
>   
>   h2. Clauses
>  
>   * \{{RANGE rangeStartCol TO rangeEndCol}}: two TIMESTAMP or TIMESTAMP_NTZ 
> columns from the input relation defining each
>   row's measurement window \{{[range_start, range_end)}}. Both columns must 
> have the same type. Column references accept
>   qualified names (e.g., \{{m.time_start}}). The future point-sample arm will 
> land as an alternative \{{VALUE <expr>}}
>   branch parallel to \{{RANGE}}.
>   * \{{BIN WIDTH widthExpr}}: a day-time interval expression defining the bin 
> size. Must be positive and foldable.
>   Year-month intervals are rejected; variable-length months would make 
> proportional splitting ambiguous.
>   * \{{ALIGN TO originExpr}}: optional alignment anchor for the bin grid. 
> Must be the same type as the range columns and
>   must be foldable. When omitted, the default origin is the Unix epoch 
> (\{{1970-01-01 00:00:00}}), session-zone-anchored
>   for TIMESTAMP (LTZ) and wall-clock for TIMESTAMP_NTZ.
>   * \{{DISTRIBUTE UNIFORM (col ...)}}: one or more numeric or DAY-TIME 
> INTERVAL columns whose values are proportionally
>   redistributed across sub-rows. Required; at least one column must be 
> listed. Column references accept qualified names.
>   * \{{BIN_START AS}} / \{{BIN_END AS}} / \{{BIN_DISTRIBUTE_RATIO AS}}: 
> optional rename clauses for the three appended output
>    columns. When omitted, the default names (\{{bin_start}}, \{{bin_end}}, 
> \{{bin_distribute_ratio}}) are used.
>   * \{{AS resultAlias}}: optional table alias on the whole \{{BIN BY}} 
> relation (matches the \{{PIVOT}} / \{{UNPIVOT}}
>   convention; the resolver wraps the operator in a \{{SubqueryAlias}}).
>  
>   h2. Output schema
>   
>   Input columns, plus three appended columns (default names shown; renameable 
> via the optional \{{AS}} clauses):
>  
>   * \{{bin_start}} (\{{TIMESTAMP}} or \{{TIMESTAMP_NTZ}}, matches the input 
> range column type)
>   * \{{bin_end}} (same type as \{{bin_start}})
>   * \{{bin_distribute_ratio}} (\{{DOUBLE}}): fraction of the original 
> \{{[range_start, range_end)}} duration that fell into
>   this bin. \{{1.0}} for unsplit rows.
>  
>   h2. Behavior
>  
>   * For each input row, the operator emits one output row per bin that 
> overlaps \{{[range_start, range_end)}}.
>   * For each sub-row: the appended \{{bin_start}} / \{{bin_end}} hold the 
> bin's boundaries; the input range columns are
>   truncated to the sub-range (i.e., clipped to the bin's intersection with 
> \{{[range_start, range_end)}}); DISTRIBUTE
>   UNIFORM columns are scaled by the fraction of the original range that falls 
> inside the bin; all other input columns are
>   replicated unchanged.
>   * Scaling math: integer, \{{DECIMAL}}, and DAY-TIME INTERVAL columns use 
> exact rational scaling ({{value * numer /
>   denom}}, where \{{numer = bin_overlap_micros}} and \{{denom = 
> range_total_micros}}), with round-to-nearest at the end.
>   {\{FLOAT}} / \{{DOUBLE}} columns use ordinary IEEE-754 multiplication by 
> the ratio as a \{{DOUBLE}}. This keeps engines
>   (JVM and Photon) bit-for-bit equivalent on the exact types.
>   * \{{bin_distribute_ratio}} reports the per-sub-row fraction.
>   * TIMESTAMP (LTZ) bin boundaries align to wall-clock times in the session 
> zone. Multi-day bins use civil-time arithmetic
>    so boundaries land at consistent local times across DST transitions.
>   * TIMESTAMP_NTZ bin boundaries use UTC arithmetic regardless of session 
> zone.
>   * Sub-day bins stay on UTC microsecond arithmetic regardless of zone.
>  
>   h2. Per-row edge cases
>  
>   * \{{range_start == range_end}} (zero-length input): emits one output row 
> with \{{bin_distribute_ratio = 1.0}},
>   {\{bin_start}} / \{{bin_end}} set to the bin containing \{{range_start}}. 
> DISTRIBUTE columns pass through unchanged.
>   * \{{range_start > range_end}} (inverted range): raises 
> \{{BIN_BY_INVALID_RANGE}} at runtime.
>   * \{{NULL}} in \{{range_start}} or \{{range_end}}: emits one output row 
> with all three appended columns \{{NULL}}.
>   DISTRIBUTE columns pass through unchanged.
>  
>   h2. Examples
>  
>   {code:sql}
>   -- 5-minute alignment, single value column, default output names, default 
> origin
>   SELECT * FROM metrics BIN BY (
>     RANGE time_start TO time_end
>     BIN WIDTH INTERVAL 5 MINUTES
>     DISTRIBUTE UNIFORM (value)
>   );
>  
>   -- Custom origin, hourly bins, renamed boundary columns
>   SELECT * FROM metrics BIN BY (
>     RANGE ts_begin TO ts_end
>     BIN WIDTH INTERVAL 1 HOUR
>     ALIGN TO TIMESTAMP '2024-01-01 00:30:00'
>     DISTRIBUTE UNIFORM (bytes_sent, requests)
>     BIN_START AS window_start
>     BIN_END AS window_end
>   );
>  
>   -- Composed with GROUP BY for downstream aggregation
>   SELECT bin_start, SUM(value) AS total
>   FROM metrics BIN BY (
>     RANGE time_start TO time_end
>     BIN WIDTH INTERVAL 1 MINUTE
>     DISTRIBUTE UNIFORM (value)
>   )
>   GROUP BY bin_start
>   ORDER BY bin_start;
>  
>   -- Trailing alias on the relation result
>   SELECT b.bin_start, SUM(b.value) AS total
>   FROM metrics BIN BY (
>     RANGE time_start TO time_end
>     BIN WIDTH INTERVAL 1 MINUTE
>     DISTRIBUTE UNIFORM (value)
>   ) AS b
>   GROUP BY b.bin_start;
>  
>   -- SQL pipe-operator form
>   FROM metrics |> BIN BY (
>     RANGE time_start TO time_end
>     BIN WIDTH INTERVAL 1 MINUTE
>     DISTRIBUTE UNIFORM (value)
>   );
>   {code}
>   
>   h2. Errors
>  
>   * \{{BIN_BY_COLUMN_NOT_FOUND}}: a referenced column is not in the input 
> relation.
>   * \{{BIN_BY_RANGE_TYPE_MISMATCH}}: a range column is not TIMESTAMP or 
> TIMESTAMP_NTZ, or the two range columns have
>   different types.
>   * \{{BIN_BY_DUPLICATE_DISTRIBUTE_COLUMN}}: the same column appears more 
> than once in \{{DISTRIBUTE UNIFORM}}.
>   * \{{BIN_BY_INVALID_BIN_WIDTH}}: the \{{BIN WIDTH}} expression is not a 
> positive day-time interval (zero, negative, or
>   year-month).
>   * \{{BIN_BY_INVALID_RANGE}}: a runtime row has \{{range_start > range_end}}.
>   * \{{BIN_BY_MISSING_DISTRIBUTE}}: \{{DISTRIBUTE UNIFORM}} clause is missing 
> or empty.
>   * \{{BIN_BY_ALIGN_TO_TYPE_MISMATCH}}: the \{{ALIGN TO}} expression type 
> does not match the range column type.
>   * \{{BIN_BY_DISTRIBUTE_TYPE_MISMATCH}}: a \{{DISTRIBUTE UNIFORM}} column is 
> not a numeric type or DAY-TIME INTERVAL.
>   * \{{DATATYPE_MISMATCH.NON_FOLDABLE_INPUT}}: \{{BIN WIDTH}} or \{{ALIGN 
> TO}} is not foldable.
>  
>   h2. Implementation
>  
>   Delivered as three sequential PRs against \{{master}}, all referencing this 
> ticket:
>  
>   # Parser, analyzer, and error classes (parses + resolves; execution stubbed)
>   # Physical execution (\{{BinByExec}} with day-time-interval bucketing, LTZ 
> civil-time arithmetic, per-row edge case
>   handling)
>   # DataFrame and PySpark API (classic + Connect)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to