[ 
https://issues.apache.org/jira/browse/SPARK-55348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Messias updated SPARK-55348:
----------------------------------
    Attachment: REPORT.png

> async_track expressions for efficient convergent algorithms in recursive CTEs 
> (avoids exponential blowup)
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-55348
>                 URL: https://issues.apache.org/jira/browse/SPARK-55348
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 4.1.1
>            Reporter: Bruno Messias
>            Priority: Major
>              Labels: CTE, performance, recursive
>         Attachments: REPORT.png, mermaid-diagram-2026-02-03-223513.png
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> This proposal introduces a *family of stateful expressions* 
> (`async_track_min`, `async_track_max`, `async_track_...`) that prevent 
> exponential row explosion in recursive CTEs for graph algorithms for example. 
> These expressions filter redundant rows during iteration by tracking the 
> "best" value seen for each key.
> {*}Why it works{*}: For operators like `min`, `max`, and `first-seen` etc, 
> the result is the same regardless of processing order. Spark's distributed 
> task  is actually an advantage here: zero coordination overhead, with 
> correctness guaranteed by the mathematical properties of these operators.
> {*}Complexity improvement{*}: Without filtering, recursive CTEs enumerate all 
> paths (exponential in graph density). With `async_track`, each key is 
> accepted at most once per iteration, reducing worst-case complexity 
> h3. Motivation: Need for Efficient Recursive CTEs
> The introduction of recursive CTEs in Spark  was a significant step forward. 
> I work in financial services where we are usubg recursive CTEs 
> However, there's a hard ceiling: {*}row explosion{*}. Once graphs exceed a 
> certain density or depth, queries become impractical due to exponential 
> intermediate row growth. This forces teams to either:
>  - Limit query scope to small subgraphs
>  - Accept long runtimes and high compute costs
>  - Move data to external graph systems (breaking the SQL-native workflow) and 
> going outside from databrick
> {*}impact{*}:
>  * Data analysts can write graph queries without learning new frameworks
>  * No data movement to external graph databases
>  * Cost reduction and energy efficieny
> I believe Databricks alone must be executing millions of SQL queries per hour 
> across its warehouses. Even a small percentage of those hitting row explosion 
> in recursive CTEs represents significant wasted compute and cost
> *The Problem today:*
> Today, to implement SSSP in pure SQL you must generate all candidate paths 
> and {*}apply {{MIN(dist)}} per node at the end{*}. 
> Maybe I'm missing something obvious, but I haven't found a pure SQL solution 
> that avoids this explosion. If one exists, I'd love to learn about it.
> *Proposed Solution: Key-Value Memoization with Monotonic Operators*
> Introduce expressions that implement key-value memoization with provable 
> convergence for monotonic operators.
> {code:java}
> async_track_min(key ANY, value NUMERIC)   → BOOLEAN  -- keep row if value < 
> stored min or key new
> async_track_max(key ANY, value NUMERIC)   → BOOLEAN  -- keep row if value > 
> stored max or key new
> async_track_first(key ANY)                → BOOLEAN  -- keep row only the 
> first time the key is seen
>  {code}
> KEY identifies the entity (e.g., graph node) 
> VALUE is the metric to optimize (e.g., distance) OPERATOR determines 
> convergence behavior (min, max, first, blend, ...)
>  
> *Proof of Concept with REDIS and UDF*
> I implemented a proof-of-concept using Redis as an external state store to 
> validate the feasibility of this approach and demonstrate that row explosion 
> in recursive CTEs can be effectively avoided.
>  * *Repository:* 
> [https://github.com/devmessias/spark/tree/poc_async_track_op_expr]
>  * *Approach:* Redis-backed key-value state with Lua scripts for atomic 
> check-and-update
>  * *Goal:* Validate correctness and asymptotic behavior, not provide a 
> production-ready solution
> The PoC intentionally relied on an external system to keep the implementation 
> minimal and focused on semantics. Despite the per-row network overhead, it 
> successfully demonstrated that key-based, monotonic filtering during 
> iteration prevents exponential row growth
> *Working proposal*
> In the diagram bellow, the approach I implemented and tested is the global 
> option: an AsyncTrackCoordinator on the Driver that holds a 
> ConcurrentHashMap<Key, Value> and exposes an atomic compute()-style 
> check-and-update/. It’s extremely simple to implement and reason abou. The 
> trade-offs are straightforward  per-update RPC latency and Driver memory/heap 
> limits  but it was the fastest way to validate semantics and prove we can 
> eliminate row explosion.
> !mermaid-diagram-2026-02-03-223513.png|width=570,height=380!
> I also considered a local RocksDB-based backend, but did not investigate it 
> deeply. At this stage, the focus is on validating the proposal and its 
> semantics with the community; backend optimizations can be explored later if 
> there is agreement on the feature.
> *References*
>  - [https://github.com/lsmgeb89/asynch_bellman_ford]
>  - [https://arxiv.org/abs/1204.6078]
>  - 
> [https://www.repositorio.ufal.br/bitstream/123456789/10217/1/Modelagem%20ass%C3%ADncrona%20do%20Page%20Rank.pdf]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to