[
https://issues.apache.org/jira/browse/SPARK-55348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bruno Messias updated SPARK-55348:
----------------------------------
Description:
This proposal introduces a *family of stateful expressions* (`async_track_min`,
`async_track_max`, `async_track_...`) that prevent exponential row explosion in
recursive CTEs for graph algorithms for example. These expressions filter
redundant rows during iteration by tracking the "best" value seen for each key.
{*}Why it works{*}: For operators like `min`, `max`, and `first-seen` etc, the
result is the same regardless of processing order. Spark's distributed task is
actually an advantage here: zero coordination overhead, with correctness
guaranteed by the mathematical properties of these operators.
{*}Complexity improvement{*}: Without filtering, recursive CTEs enumerate all
paths (exponential in graph density). With `async_track`, each key is accepted
at most once per iteration, reducing worst-case complexity
h3. Motivation: Need for Efficient Recursive CTEs
The introduction of recursive CTEs in Spark was a significant step forward. I
work in financial services where we are usubg recursive CTEs
However, there's a hard ceiling: {*}row explosion{*}. Once graphs exceed a
certain density or depth, queries become impractical due to exponential
intermediate row growth. This forces teams to either:
- Limit query scope to small subgraphs
- Accept long runtimes and high compute costs
- Move data to external graph systems (breaking the SQL-native workflow) and
going outside from databrick
{*}impact{*}:
* Data analysts can write graph queries without learning new frameworks
* No data movement to external graph databases
* Cost reduction and energy efficieny
I believe Databricks alone must be executing millions of SQL queries per hour
across its warehouses. Even a small percentage of those hitting row explosion
in recursive CTEs represents significant wasted compute and cost
*The Problem today:*
Today, to implement SSSP in pure SQL you must generate all candidate paths and
{*}apply {{MIN(dist)}} per node at the end{*}.
Maybe I'm missing something obvious, but I haven't found a pure SQL solution
that avoids this explosion. If one exists, I'd love to learn about it.
*Proposed Solution: Key-Value Memoization with Monotonic Operators*
Introduce expressions that implement key-value memoization with provable
convergence for monotonic operators.
{code:java}
async_track_min(key ANY, value NUMERIC) → BOOLEAN -- keep row if value <
stored min or key new
async_track_max(key ANY, value NUMERIC) → BOOLEAN -- keep row if value >
stored max or key new
async_track_first(key ANY) → BOOLEAN -- keep row only the first
time the key is seen
{code}
KEY identifies the entity (e.g., graph node)
VALUE is the metric to optimize (e.g., distance) OPERATOR determines
convergence behavior (min, max, first, blend, ...)
*Proof of Concept with REDIS and UDF*
I implemented a proof-of-concept using Redis as an external state store to
validate the feasibility of this approach and demonstrate that row explosion in
recursive CTEs can be effectively avoided.
* *Repository:*
[https://github.com/devmessias/spark/tree/poc_async_track_op_expr]
* *Approach:* Redis-backed key-value state with Lua scripts for atomic
check-and-update
* *Goal:* Validate correctness and asymptotic behavior, not provide a
production-ready solution
The PoC intentionally relied on an external system to keep the implementation
minimal and focused on semantics. Despite the per-row network overhead, it
successfully demonstrated that key-based, monotonic filtering during iteration
prevents exponential row growth
*Working proposal*
In the diagram bellow, the approach I implemented and tested is the global
option: an AsyncTrackCoordinator on the Driver that holds a
ConcurrentHashMap<Key, Value> and exposes an atomic compute()-style
check-and-update/. It’s extremely simple to implement and reason abou. The
trade-offs are straightforward per-update RPC latency and Driver memory/heap
limits but it was the fastest way to validate semantics and prove we can
eliminate row explosion.
!mermaid-diagram-2026-02-03-223513.png|width=570,height=380!
I also considered a local RocksDB-based backend, but did not investigate it
deeply. At this stage, the focus is on validating the proposal and its
semantics with the community; backend optimizations can be explored later if
there is agreement on the feature.
*Benchmark: Spark async_track vs igraph (C/Python)*
Here are preliminary benchmark results comparing SSSP execution time using
igraph (C backend with Python bindings) and Spark with the proposed async_track
expressions:
!REPORT.png|width=734,height=306!
*Alternative Approaches Considered*
I also considered embedding the filtering semantics directly into the CTE
column list syntax, e.g.:
```sql
– Hypothetical: lattice semantics in the CTE column list
WITH RECURSIVE paths(node KEY, dist MIN) AS (
SELECT 0, 0
UNION ALL
SELECT e.dst, p.dist + e.weight
FROM paths p JOIN edges e ON p.node = e.src
)
SELECT * FROM paths
```
This declare that `node` is the key and `dist` should be aggregated with `MIN`
during iteration, eliminating the need for explicit filtering.
I discarded this approach for three reasons:
*1. Parser changes:* The CTE column lists strictly for column aliasing in SQL.
I'm right?. Adding semantic annotations like `KEY` or `MIN` would require
modifying Spark's SQL parser a much larger and more invasive change.
{*}2. Non-standard SQL N{*}o database implements this syntax.
3. Scope of changes: The column list approach would require coordinated changes
across the parser, analyzer, and planner. In contrast, the expression-based
approach (`WHERE async_track_min(key, value)`) works entirely within the
existing Expression framework
The expression approach is less elegant syntactically, but far less invasive
and can be shipped without architectural changes to Spark SQL.
*I came across DuckDB's `USING KEY` ([https://duckdb.org/2025/05/23/using-key)]
while struggling with these queries in Spark SQL. As far as I understand,
`USING KEY` serves a completely different purpose because it provides last
write wins, while what we need here is operator-specific convergence*
*References*
- [https://github.com/lsmgeb89/asynch_bellman_ford]
- [https://arxiv.org/abs/1204.6078]
-
[https://www.repositorio.ufal.br/bitstream/123456789/10217/1/Modelagem%20ass%C3%ADncrona%20do%20Page%20Rank.pdf]
[https://duckdb.org/2025/05/23/using-key]
was:
This proposal introduces a *family of stateful expressions* (`async_track_min`,
`async_track_max`, `async_track_...`) that prevent exponential row explosion in
recursive CTEs for graph algorithms for example. These expressions filter
redundant rows during iteration by tracking the "best" value seen for each key.
{*}Why it works{*}: For operators like `min`, `max`, and `first-seen` etc, the
result is the same regardless of processing order. Spark's distributed task is
actually an advantage here: zero coordination overhead, with correctness
guaranteed by the mathematical properties of these operators.
{*}Complexity improvement{*}: Without filtering, recursive CTEs enumerate all
paths (exponential in graph density). With `async_track`, each key is accepted
at most once per iteration, reducing worst-case complexity
h3. Motivation: Need for Efficient Recursive CTEs
The introduction of recursive CTEs in Spark was a significant step forward. I
work in financial services where we are usubg recursive CTEs
However, there's a hard ceiling: {*}row explosion{*}. Once graphs exceed a
certain density or depth, queries become impractical due to exponential
intermediate row growth. This forces teams to either:
- Limit query scope to small subgraphs
- Accept long runtimes and high compute costs
- Move data to external graph systems (breaking the SQL-native workflow) and
going outside from databrick
{*}impact{*}:
* Data analysts can write graph queries without learning new frameworks
* No data movement to external graph databases
* Cost reduction and energy efficieny
I believe Databricks alone must be executing millions of SQL queries per hour
across its warehouses. Even a small percentage of those hitting row explosion
in recursive CTEs represents significant wasted compute and cost
*The Problem today:*
Today, to implement SSSP in pure SQL you must generate all candidate paths and
{*}apply {{MIN(dist)}} per node at the end{*}.
Maybe I'm missing something obvious, but I haven't found a pure SQL solution
that avoids this explosion. If one exists, I'd love to learn about it.
*Proposed Solution: Key-Value Memoization with Monotonic Operators*
Introduce expressions that implement key-value memoization with provable
convergence for monotonic operators.
{code:java}
async_track_min(key ANY, value NUMERIC) → BOOLEAN -- keep row if value <
stored min or key new
async_track_max(key ANY, value NUMERIC) → BOOLEAN -- keep row if value >
stored max or key new
async_track_first(key ANY) → BOOLEAN -- keep row only the first
time the key is seen
{code}
KEY identifies the entity (e.g., graph node)
VALUE is the metric to optimize (e.g., distance) OPERATOR determines
convergence behavior (min, max, first, blend, ...)
*Proof of Concept with REDIS and UDF*
I implemented a proof-of-concept using Redis as an external state store to
validate the feasibility of this approach and demonstrate that row explosion in
recursive CTEs can be effectively avoided.
* *Repository:*
[https://github.com/devmessias/spark/tree/poc_async_track_op_expr]
* *Approach:* Redis-backed key-value state with Lua scripts for atomic
check-and-update
* *Goal:* Validate correctness and asymptotic behavior, not provide a
production-ready solution
The PoC intentionally relied on an external system to keep the implementation
minimal and focused on semantics. Despite the per-row network overhead, it
successfully demonstrated that key-based, monotonic filtering during iteration
prevents exponential row growth
*Working proposal*
In the diagram bellow, the approach I implemented and tested is the global
option: an AsyncTrackCoordinator on the Driver that holds a
ConcurrentHashMap<Key, Value> and exposes an atomic compute()-style
check-and-update/. It’s extremely simple to implement and reason abou. The
trade-offs are straightforward per-update RPC latency and Driver memory/heap
limits but it was the fastest way to validate semantics and prove we can
eliminate row explosion.
!mermaid-diagram-2026-02-03-223513.png|width=570,height=380!
I also considered a local RocksDB-based backend, but did not investigate it
deeply. At this stage, the focus is on validating the proposal and its
semantics with the community; backend optimizations can be explored later if
there is agreement on the feature.
*Benchmark: Spark async_track vs igraph (C/Python)*
Here are preliminary benchmark results comparing SSSP execution time using
igraph (C backend with Python bindings) and Spark with the proposed async_track
expressions:
!REPORT.png|width=734,height=306!
*Alternative Approaches Considered*
I also considered embedding the filtering semantics directly into the CTE
column list syntax, e.g.:
```sql
-- Hypothetical: lattice semantics in the CTE column list
WITH RECURSIVE paths(node KEY, dist MIN) AS (
SELECT 0, 0
UNION ALL
SELECT e.dst, p.dist + e.weight
FROM paths p JOIN edges e ON p.node = e.src
)
SELECT * FROM paths
```
This declare that `node` is the key and `dist` should be aggregated with `MIN`
during iteration, eliminating the need for explicit filtering.
I discarded this approach for three reasons:
*1. Parser changes:* The CTE column lists strictly for column aliasing in SQL.
I'm right?. Adding semantic annotations like `KEY` or `MIN` would require
modifying Spark's SQL parser a much larger and more invasive change.
{*}2. Non-standard SQL N{*}o database implements this syntax.
3. Scope of changes: The column list approach would require coordinated changes
across the parser, analyzer, and planner. In contrast, the expression-based
approach (`WHERE async_track_min(key, value)`) works entirely within the
existing Expression framework
The expression approach is less elegant syntactically, but far less invasive
and can be shipped without architectural changes to Spark SQL.
*I came across DuckDB's `USING KEY` ([https://duckdb.org/2025/05/23/using-key)]
while struggling with these queries in Spark SQL. As far as I understand,
`USING KEY` serves a completely different purpose because it provides last
write wins, while what I need here is operator-specific convergence*
*References*
- [https://github.com/lsmgeb89/asynch_bellman_ford]
- [https://arxiv.org/abs/1204.6078]
-
[https://www.repositorio.ufal.br/bitstream/123456789/10217/1/Modelagem%20ass%C3%ADncrona%20do%20Page%20Rank.pdf]
https://duckdb.org/2025/05/23/using-key
> async_track expressions for efficient convergent algorithms in recursive CTEs
> (avoids exponential blowup)
> ---------------------------------------------------------------------------------------------------------
>
> Key: SPARK-55348
> URL: https://issues.apache.org/jira/browse/SPARK-55348
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 4.1.1
> Reporter: Bruno Messias
> Priority: Major
> Labels: CTE, performance, recursive
> Attachments: REPORT.png, mermaid-diagram-2026-02-03-223513.png
>
> Original Estimate: 336h
> Remaining Estimate: 336h
>
> This proposal introduces a *family of stateful expressions*
> (`async_track_min`, `async_track_max`, `async_track_...`) that prevent
> exponential row explosion in recursive CTEs for graph algorithms for example.
> These expressions filter redundant rows during iteration by tracking the
> "best" value seen for each key.
> {*}Why it works{*}: For operators like `min`, `max`, and `first-seen` etc,
> the result is the same regardless of processing order. Spark's distributed
> task is actually an advantage here: zero coordination overhead, with
> correctness guaranteed by the mathematical properties of these operators.
> {*}Complexity improvement{*}: Without filtering, recursive CTEs enumerate all
> paths (exponential in graph density). With `async_track`, each key is
> accepted at most once per iteration, reducing worst-case complexity
> h3. Motivation: Need for Efficient Recursive CTEs
> The introduction of recursive CTEs in Spark was a significant step forward.
> I work in financial services where we are usubg recursive CTEs
> However, there's a hard ceiling: {*}row explosion{*}. Once graphs exceed a
> certain density or depth, queries become impractical due to exponential
> intermediate row growth. This forces teams to either:
> - Limit query scope to small subgraphs
> - Accept long runtimes and high compute costs
> - Move data to external graph systems (breaking the SQL-native workflow) and
> going outside from databrick
> {*}impact{*}:
> * Data analysts can write graph queries without learning new frameworks
> * No data movement to external graph databases
> * Cost reduction and energy efficieny
> I believe Databricks alone must be executing millions of SQL queries per hour
> across its warehouses. Even a small percentage of those hitting row explosion
> in recursive CTEs represents significant wasted compute and cost
> *The Problem today:*
> Today, to implement SSSP in pure SQL you must generate all candidate paths
> and {*}apply {{MIN(dist)}} per node at the end{*}.
> Maybe I'm missing something obvious, but I haven't found a pure SQL solution
> that avoids this explosion. If one exists, I'd love to learn about it.
> *Proposed Solution: Key-Value Memoization with Monotonic Operators*
> Introduce expressions that implement key-value memoization with provable
> convergence for monotonic operators.
> {code:java}
> async_track_min(key ANY, value NUMERIC) → BOOLEAN -- keep row if value <
> stored min or key new
> async_track_max(key ANY, value NUMERIC) → BOOLEAN -- keep row if value >
> stored max or key new
> async_track_first(key ANY) → BOOLEAN -- keep row only the
> first time the key is seen
> {code}
> KEY identifies the entity (e.g., graph node)
> VALUE is the metric to optimize (e.g., distance) OPERATOR determines
> convergence behavior (min, max, first, blend, ...)
>
> *Proof of Concept with REDIS and UDF*
> I implemented a proof-of-concept using Redis as an external state store to
> validate the feasibility of this approach and demonstrate that row explosion
> in recursive CTEs can be effectively avoided.
> * *Repository:*
> [https://github.com/devmessias/spark/tree/poc_async_track_op_expr]
> * *Approach:* Redis-backed key-value state with Lua scripts for atomic
> check-and-update
> * *Goal:* Validate correctness and asymptotic behavior, not provide a
> production-ready solution
> The PoC intentionally relied on an external system to keep the implementation
> minimal and focused on semantics. Despite the per-row network overhead, it
> successfully demonstrated that key-based, monotonic filtering during
> iteration prevents exponential row growth
> *Working proposal*
> In the diagram bellow, the approach I implemented and tested is the global
> option: an AsyncTrackCoordinator on the Driver that holds a
> ConcurrentHashMap<Key, Value> and exposes an atomic compute()-style
> check-and-update/. It’s extremely simple to implement and reason abou. The
> trade-offs are straightforward per-update RPC latency and Driver memory/heap
> limits but it was the fastest way to validate semantics and prove we can
> eliminate row explosion.
> !mermaid-diagram-2026-02-03-223513.png|width=570,height=380!
> I also considered a local RocksDB-based backend, but did not investigate it
> deeply. At this stage, the focus is on validating the proposal and its
> semantics with the community; backend optimizations can be explored later if
> there is agreement on the feature.
> *Benchmark: Spark async_track vs igraph (C/Python)*
> Here are preliminary benchmark results comparing SSSP execution time using
> igraph (C backend with Python bindings) and Spark with the proposed
> async_track expressions:
> !REPORT.png|width=734,height=306!
> *Alternative Approaches Considered*
> I also considered embedding the filtering semantics directly into the CTE
> column list syntax, e.g.:
> ```sql
> – Hypothetical: lattice semantics in the CTE column list
> WITH RECURSIVE paths(node KEY, dist MIN) AS (
> SELECT 0, 0
> UNION ALL
> SELECT e.dst, p.dist + e.weight
> FROM paths p JOIN edges e ON p.node = e.src
> )
> SELECT * FROM paths
> ```
> This declare that `node` is the key and `dist` should be aggregated with
> `MIN` during iteration, eliminating the need for explicit filtering.
> I discarded this approach for three reasons:
> *1. Parser changes:* The CTE column lists strictly for column aliasing in
> SQL. I'm right?. Adding semantic annotations like `KEY` or `MIN` would
> require modifying Spark's SQL parser a much larger and more invasive change.
> {*}2. Non-standard SQL N{*}o database implements this syntax.
>
> 3. Scope of changes: The column list approach would require coordinated
> changes across the parser, analyzer, and planner. In contrast, the
> expression-based approach (`WHERE async_track_min(key, value)`) works
> entirely within the existing Expression framework
> The expression approach is less elegant syntactically, but far less invasive
> and can be shipped without architectural changes to Spark SQL.
> *I came across DuckDB's `USING KEY`
> ([https://duckdb.org/2025/05/23/using-key)] while struggling with these
> queries in Spark SQL. As far as I understand, `USING KEY` serves a completely
> different purpose because it provides last write wins, while what we need
> here is operator-specific convergence*
> *References*
> - [https://github.com/lsmgeb89/asynch_bellman_ford]
> - [https://arxiv.org/abs/1204.6078]
> -
> [https://www.repositorio.ufal.br/bitstream/123456789/10217/1/Modelagem%20ass%C3%ADncrona%20do%20Page%20Rank.pdf]
> [https://duckdb.org/2025/05/23/using-key]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]