[
https://issues.apache.org/jira/browse/SPARK-57133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nikolina Vraneš updated SPARK-57133:
------------------------------------
Description:
h2. Summary
Add a new SQL relation operator \{{BIN BY (...)}} that aligns range-typed
rows to fixed-width bin boundaries by splitting
any row whose \{{[range_start, range_end)}} crosses a boundary and
proportionally redistributing user-selected numeric
or DT-interval values across the resulting sub-ranges. Target use case:
telemetry and observability data where each row
carries its own measurement window (OpenTelemetry, Prometheus exports).
The operator occupies the same grammar position as \{{PIVOT}} / \{{UNPIVOT}}
(post-\{{FROM}} relation extension), composes
with \{{GROUP BY}} / \{{WHERE}} / \{{JOIN}} downstream, and produces an
ordinary relation.
This initial PR series ships one cell of a broader matrix: aligned
(fixed-grid) bins, range samples, uniform
(proportional) distribution, TIMESTAMP / TIMESTAMP_NTZ + DT INTERVAL types.
Other cells (\{{EQUAL BINS}}, point samples,
{\{MAX_OVERLAP}}, non-time domains) are explicit non-goals and would arrive
as additive grammar extensions inside the
same \{{BIN BY (...)}} shell.
h2. Syntax
{code:sql}
SELECT * FROM relation BIN BY (
RANGE rangeStartCol TO rangeEndCol
BIN WIDTH widthExpr
[ALIGN TO originExpr]
DISTRIBUTE UNIFORM (distributeCol [, distributeCol ...])
[BIN_START AS aliasName]
[BIN_END AS aliasName]
[BIN_DISTRIBUTE_RATIO AS aliasName]
) [AS resultAlias];
{code}
{\{BIN BY}} also works as a SQL pipe-operator stage on par with \{{PIVOT}} /
\{{UNPIVOT}}:
{code:sql}
FROM relation |> BIN BY (
RANGE rangeStartCol TO rangeEndCol
BIN WIDTH widthExpr
DISTRIBUTE UNIFORM (distributeCol)
);
{code}
h2. Clauses
* \{{RANGE rangeStartCol TO rangeEndCol}}: two TIMESTAMP or TIMESTAMP_NTZ
columns from the input relation defining each
row's measurement window \{{[range_start, range_end)}}. Both columns must
have the same type. Column references accept
qualified names (e.g., \{{m.time_start}}). The future point-sample arm will
land as an alternative \{{VALUE <expr>}}
branch parallel to \{{RANGE}}.
* \{{BIN WIDTH widthExpr}}: a day-time interval expression defining the bin
size. Must be positive and foldable.
Year-month intervals are rejected; variable-length months would make
proportional splitting ambiguous.
* \{{ALIGN TO originExpr}}: optional alignment anchor for the bin grid. Must
be the same type as the range columns and
must be foldable. When omitted, the default origin is the Unix epoch
(\{{1970-01-01 00:00:00}}), session-zone-anchored
for TIMESTAMP (LTZ) and wall-clock for TIMESTAMP_NTZ.
* \{{DISTRIBUTE UNIFORM (col ...)}}: one or more numeric or DAY-TIME INTERVAL
columns whose values are proportionally
redistributed across sub-rows. Required; at least one column must be listed.
Column references accept qualified names.
* \{{BIN_START AS}} / \{{BIN_END AS}} / \{{BIN_DISTRIBUTE_RATIO AS}}:
optional rename clauses for the three appended output
columns. When omitted, the default names (\{{bin_start}}, \{{bin_end}},
\{{bin_distribute_ratio}}) are used.
* \{{AS resultAlias}}: optional table alias on the whole \{{BIN BY}} relation
(matches the \{{PIVOT}} / \{{UNPIVOT}}
convention; the resolver wraps the operator in a \{{SubqueryAlias}}).
h2. Output schema
Input columns, plus three appended columns (default names shown; renameable
via the optional \{{AS}} clauses):
* \{{bin_start}} (\{{TIMESTAMP}} or \{{TIMESTAMP_NTZ}}, matches the input
range column type)
* \{{bin_end}} (same type as \{{bin_start}})
* \{{bin_distribute_ratio}} (\{{DOUBLE}}): fraction of the original
\{{[range_start, range_end)}} duration that fell into
this bin. \{{1.0}} for unsplit rows.
h2. Behavior
* For each input row, the operator emits one output row per bin that overlaps
\{{[range_start, range_end)}}.
* For each sub-row: the appended \{{bin_start}} / \{{bin_end}} hold the bin's
boundaries; the input range columns are
truncated to the sub-range (i.e., clipped to the bin's intersection with
\{{[range_start, range_end)}}); DISTRIBUTE
UNIFORM columns are scaled by the fraction of the original range that falls
inside the bin; all other input columns are
replicated unchanged.
* Scaling math: integer, \{{DECIMAL}}, and DAY-TIME INTERVAL columns use
exact rational scaling ({{value * numer /
denom}}, where \{{numer = bin_overlap_micros}} and \{{denom =
range_total_micros}}), with round-to-nearest at the end.
{\{FLOAT}} / \{{DOUBLE}} columns use ordinary IEEE-754 multiplication by the
ratio as a \{{DOUBLE}}. This keeps engines
(JVM and Photon) bit-for-bit equivalent on the exact types.
* \{{bin_distribute_ratio}} reports the per-sub-row fraction.
* TIMESTAMP (LTZ) bin boundaries align to wall-clock times in the session
zone. Multi-day bins use civil-time arithmetic
so boundaries land at consistent local times across DST transitions.
* TIMESTAMP_NTZ bin boundaries use UTC arithmetic regardless of session zone.
* Sub-day bins stay on UTC microsecond arithmetic regardless of zone.
h2. Per-row edge cases
* \{{range_start == range_end}} (zero-length input): emits one output row
with \{{bin_distribute_ratio = 1.0}},
{\{bin_start}} / \{{bin_end}} set to the bin containing \{{range_start}}.
DISTRIBUTE columns pass through unchanged.
* \{{range_start > range_end}} (inverted range): raises
\{{BIN_BY_INVALID_RANGE}} at runtime.
* \{{NULL}} in \{{range_start}} or \{{range_end}}: emits one output row with
all three appended columns \{{NULL}}.
DISTRIBUTE columns pass through unchanged.
h2. Examples
{code:sql}
-- 5-minute alignment, single value column, default output names, default
origin
SELECT * FROM metrics BIN BY (
RANGE time_start TO time_end
BIN WIDTH INTERVAL 5 MINUTES
DISTRIBUTE UNIFORM (value)
);
-- Custom origin, hourly bins, renamed boundary columns
SELECT * FROM metrics BIN BY (
RANGE ts_begin TO ts_end
BIN WIDTH INTERVAL 1 HOUR
ALIGN TO TIMESTAMP '2024-01-01 00:30:00'
DISTRIBUTE UNIFORM (bytes_sent, requests)
BIN_START AS window_start
BIN_END AS window_end
);
-- Composed with GROUP BY for downstream aggregation
SELECT bin_start, SUM(value) AS total
FROM metrics BIN BY (
RANGE time_start TO time_end
BIN WIDTH INTERVAL 1 MINUTE
DISTRIBUTE UNIFORM (value)
)
GROUP BY bin_start
ORDER BY bin_start;
-- Trailing alias on the relation result
SELECT b.bin_start, SUM(b.value) AS total
FROM metrics BIN BY (
RANGE time_start TO time_end
BIN WIDTH INTERVAL 1 MINUTE
DISTRIBUTE UNIFORM (value)
) AS b
GROUP BY b.bin_start;
-- SQL pipe-operator form
FROM metrics |> BIN BY (
RANGE time_start TO time_end
BIN WIDTH INTERVAL 1 MINUTE
DISTRIBUTE UNIFORM (value)
);
{code}
h2. Errors
* \{{BIN_BY_COLUMN_NOT_FOUND}}: a referenced column is not in the input
relation.
* \{{BIN_BY_RANGE_TYPE_MISMATCH}}: a range column is not TIMESTAMP or
TIMESTAMP_NTZ, or the two range columns have
different types.
* \{{BIN_BY_DUPLICATE_DISTRIBUTE_COLUMN}}: the same column appears more than
once in \{{DISTRIBUTE UNIFORM}}.
* \{{BIN_BY_INVALID_BIN_WIDTH}}: the \{{BIN WIDTH}} expression is not a
positive day-time interval (zero, negative, or
year-month).
* \{{BIN_BY_INVALID_RANGE}}: a runtime row has \{{range_start > range_end}}.
* \{{BIN_BY_MISSING_DISTRIBUTE}}: \{{DISTRIBUTE UNIFORM}} clause is missing
or empty.
* \{{BIN_BY_ALIGN_TO_TYPE_MISMATCH}}: the \{{ALIGN TO}} expression type does
not match the range column type.
* \{{BIN_BY_DISTRIBUTE_TYPE_MISMATCH}}: a \{{DISTRIBUTE UNIFORM}} column is
not a numeric type or DAY-TIME INTERVAL.
* \{{DATATYPE_MISMATCH.NON_FOLDABLE_INPUT}}: \{{BIN WIDTH}} or \{{ALIGN TO}}
is not foldable.
h2. Implementation
Delivered as three sequential PRs against \{{master}}, all referencing this
ticket:
# Parser, analyzer, and error classes (parses + resolves; execution stubbed)
# Physical execution (\{{BinByExec}} with day-time-interval bucketing, LTZ
civil-time arithmetic, per-row edge case
handling)
# DataFrame and PySpark API (classic + Connect)
> Add BIN BY relation operator for aligning range-typed rows to fixed bin
> boundaries
> ----------------------------------------------------------------------------------
>
> Key: SPARK-57133
> URL: https://issues.apache.org/jira/browse/SPARK-57133
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 5.0.0
> Reporter: Nikolina Vraneš
> Priority: Major
>
> h2. Summary
>
> Add a new SQL relation operator \{{BIN BY (...)}} that aligns range-typed
> rows to fixed-width bin boundaries by splitting
> any row whose \{{[range_start, range_end)}} crosses a boundary and
> proportionally redistributing user-selected numeric
> or DT-interval values across the resulting sub-ranges. Target use case:
> telemetry and observability data where each row
> carries its own measurement window (OpenTelemetry, Prometheus exports).
>
> The operator occupies the same grammar position as \{{PIVOT}} /
> \{{UNPIVOT}} (post-\{{FROM}} relation extension), composes
> with \{{GROUP BY}} / \{{WHERE}} / \{{JOIN}} downstream, and produces an
> ordinary relation.
>
> This initial PR series ships one cell of a broader matrix: aligned
> (fixed-grid) bins, range samples, uniform
> (proportional) distribution, TIMESTAMP / TIMESTAMP_NTZ + DT INTERVAL types.
> Other cells (\{{EQUAL BINS}}, point samples,
> {\{MAX_OVERLAP}}, non-time domains) are explicit non-goals and would arrive
> as additive grammar extensions inside the
> same \{{BIN BY (...)}} shell.
>
> h2. Syntax
>
> {code:sql}
> SELECT * FROM relation BIN BY (
> RANGE rangeStartCol TO rangeEndCol
> BIN WIDTH widthExpr
> [ALIGN TO originExpr]
> DISTRIBUTE UNIFORM (distributeCol [, distributeCol ...])
> [BIN_START AS aliasName]
> [BIN_END AS aliasName]
> [BIN_DISTRIBUTE_RATIO AS aliasName]
> ) [AS resultAlias];
> {code}
>
> {\{BIN BY}} also works as a SQL pipe-operator stage on par with \{{PIVOT}}
> / \{{UNPIVOT}}:
>
> {code:sql}
> FROM relation |> BIN BY (
> RANGE rangeStartCol TO rangeEndCol
> BIN WIDTH widthExpr
> DISTRIBUTE UNIFORM (distributeCol)
> );
> {code}
>
> h2. Clauses
>
> * \{{RANGE rangeStartCol TO rangeEndCol}}: two TIMESTAMP or TIMESTAMP_NTZ
> columns from the input relation defining each
> row's measurement window \{{[range_start, range_end)}}. Both columns must
> have the same type. Column references accept
> qualified names (e.g., \{{m.time_start}}). The future point-sample arm will
> land as an alternative \{{VALUE <expr>}}
> branch parallel to \{{RANGE}}.
> * \{{BIN WIDTH widthExpr}}: a day-time interval expression defining the bin
> size. Must be positive and foldable.
> Year-month intervals are rejected; variable-length months would make
> proportional splitting ambiguous.
> * \{{ALIGN TO originExpr}}: optional alignment anchor for the bin grid.
> Must be the same type as the range columns and
> must be foldable. When omitted, the default origin is the Unix epoch
> (\{{1970-01-01 00:00:00}}), session-zone-anchored
> for TIMESTAMP (LTZ) and wall-clock for TIMESTAMP_NTZ.
> * \{{DISTRIBUTE UNIFORM (col ...)}}: one or more numeric or DAY-TIME
> INTERVAL columns whose values are proportionally
> redistributed across sub-rows. Required; at least one column must be
> listed. Column references accept qualified names.
> * \{{BIN_START AS}} / \{{BIN_END AS}} / \{{BIN_DISTRIBUTE_RATIO AS}}:
> optional rename clauses for the three appended output
> columns. When omitted, the default names (\{{bin_start}}, \{{bin_end}},
> \{{bin_distribute_ratio}}) are used.
> * \{{AS resultAlias}}: optional table alias on the whole \{{BIN BY}}
> relation (matches the \{{PIVOT}} / \{{UNPIVOT}}
> convention; the resolver wraps the operator in a \{{SubqueryAlias}}).
>
> h2. Output schema
>
> Input columns, plus three appended columns (default names shown; renameable
> via the optional \{{AS}} clauses):
>
> * \{{bin_start}} (\{{TIMESTAMP}} or \{{TIMESTAMP_NTZ}}, matches the input
> range column type)
> * \{{bin_end}} (same type as \{{bin_start}})
> * \{{bin_distribute_ratio}} (\{{DOUBLE}}): fraction of the original
> \{{[range_start, range_end)}} duration that fell into
> this bin. \{{1.0}} for unsplit rows.
>
> h2. Behavior
>
> * For each input row, the operator emits one output row per bin that
> overlaps \{{[range_start, range_end)}}.
> * For each sub-row: the appended \{{bin_start}} / \{{bin_end}} hold the
> bin's boundaries; the input range columns are
> truncated to the sub-range (i.e., clipped to the bin's intersection with
> \{{[range_start, range_end)}}); DISTRIBUTE
> UNIFORM columns are scaled by the fraction of the original range that falls
> inside the bin; all other input columns are
> replicated unchanged.
> * Scaling math: integer, \{{DECIMAL}}, and DAY-TIME INTERVAL columns use
> exact rational scaling ({{value * numer /
> denom}}, where \{{numer = bin_overlap_micros}} and \{{denom =
> range_total_micros}}), with round-to-nearest at the end.
> {\{FLOAT}} / \{{DOUBLE}} columns use ordinary IEEE-754 multiplication by
> the ratio as a \{{DOUBLE}}. This keeps engines
> (JVM and Photon) bit-for-bit equivalent on the exact types.
> * \{{bin_distribute_ratio}} reports the per-sub-row fraction.
> * TIMESTAMP (LTZ) bin boundaries align to wall-clock times in the session
> zone. Multi-day bins use civil-time arithmetic
> so boundaries land at consistent local times across DST transitions.
> * TIMESTAMP_NTZ bin boundaries use UTC arithmetic regardless of session
> zone.
> * Sub-day bins stay on UTC microsecond arithmetic regardless of zone.
>
> h2. Per-row edge cases
>
> * \{{range_start == range_end}} (zero-length input): emits one output row
> with \{{bin_distribute_ratio = 1.0}},
> {\{bin_start}} / \{{bin_end}} set to the bin containing \{{range_start}}.
> DISTRIBUTE columns pass through unchanged.
> * \{{range_start > range_end}} (inverted range): raises
> \{{BIN_BY_INVALID_RANGE}} at runtime.
> * \{{NULL}} in \{{range_start}} or \{{range_end}}: emits one output row
> with all three appended columns \{{NULL}}.
> DISTRIBUTE columns pass through unchanged.
>
> h2. Examples
>
> {code:sql}
> -- 5-minute alignment, single value column, default output names, default
> origin
> SELECT * FROM metrics BIN BY (
> RANGE time_start TO time_end
> BIN WIDTH INTERVAL 5 MINUTES
> DISTRIBUTE UNIFORM (value)
> );
>
> -- Custom origin, hourly bins, renamed boundary columns
> SELECT * FROM metrics BIN BY (
> RANGE ts_begin TO ts_end
> BIN WIDTH INTERVAL 1 HOUR
> ALIGN TO TIMESTAMP '2024-01-01 00:30:00'
> DISTRIBUTE UNIFORM (bytes_sent, requests)
> BIN_START AS window_start
> BIN_END AS window_end
> );
>
> -- Composed with GROUP BY for downstream aggregation
> SELECT bin_start, SUM(value) AS total
> FROM metrics BIN BY (
> RANGE time_start TO time_end
> BIN WIDTH INTERVAL 1 MINUTE
> DISTRIBUTE UNIFORM (value)
> )
> GROUP BY bin_start
> ORDER BY bin_start;
>
> -- Trailing alias on the relation result
> SELECT b.bin_start, SUM(b.value) AS total
> FROM metrics BIN BY (
> RANGE time_start TO time_end
> BIN WIDTH INTERVAL 1 MINUTE
> DISTRIBUTE UNIFORM (value)
> ) AS b
> GROUP BY b.bin_start;
>
> -- SQL pipe-operator form
> FROM metrics |> BIN BY (
> RANGE time_start TO time_end
> BIN WIDTH INTERVAL 1 MINUTE
> DISTRIBUTE UNIFORM (value)
> );
> {code}
>
> h2. Errors
>
> * \{{BIN_BY_COLUMN_NOT_FOUND}}: a referenced column is not in the input
> relation.
> * \{{BIN_BY_RANGE_TYPE_MISMATCH}}: a range column is not TIMESTAMP or
> TIMESTAMP_NTZ, or the two range columns have
> different types.
> * \{{BIN_BY_DUPLICATE_DISTRIBUTE_COLUMN}}: the same column appears more
> than once in \{{DISTRIBUTE UNIFORM}}.
> * \{{BIN_BY_INVALID_BIN_WIDTH}}: the \{{BIN WIDTH}} expression is not a
> positive day-time interval (zero, negative, or
> year-month).
> * \{{BIN_BY_INVALID_RANGE}}: a runtime row has \{{range_start > range_end}}.
> * \{{BIN_BY_MISSING_DISTRIBUTE}}: \{{DISTRIBUTE UNIFORM}} clause is missing
> or empty.
> * \{{BIN_BY_ALIGN_TO_TYPE_MISMATCH}}: the \{{ALIGN TO}} expression type
> does not match the range column type.
> * \{{BIN_BY_DISTRIBUTE_TYPE_MISMATCH}}: a \{{DISTRIBUTE UNIFORM}} column is
> not a numeric type or DAY-TIME INTERVAL.
> * \{{DATATYPE_MISMATCH.NON_FOLDABLE_INPUT}}: \{{BIN WIDTH}} or \{{ALIGN
> TO}} is not foldable.
>
> h2. Implementation
>
> Delivered as three sequential PRs against \{{master}}, all referencing this
> ticket:
>
> # Parser, analyzer, and error classes (parses + resolves; execution stubbed)
> # Physical execution (\{{BinByExec}} with day-time-interval bucketing, LTZ
> civil-time arithmetic, per-row edge case
> handling)
> # DataFrame and PySpark API (classic + Connect)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]