[
https://issues.apache.org/jira/browse/SPARK-55348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bruno Messias updated SPARK-55348:
----------------------------------
Attachment: mermaid-diagram-2026-02-03-223513.png
> async_track expressions for efficient convergent algorithms in recursive CTEs
> (avoids exponential blowup)
> ---------------------------------------------------------------------------------------------------------
>
> Key: SPARK-55348
> URL: https://issues.apache.org/jira/browse/SPARK-55348
> Project: Spark
> Issue Type: New Feature
> Components: SQL
> Affects Versions: 4.1.1
> Reporter: Bruno Messias
> Priority: Major
> Labels: CTE, performance, recursive
> Attachments: mermaid-diagram-2026-02-03-223513.png
>
> Original Estimate: 336h
> Remaining Estimate: 336h
>
> This proposal introduces a *family of stateful expressions*
> (`async_track_min`, `async_track_max`, `async_track_...`) that prevent
> exponential row explosion in recursive CTEs for graph algorithms for example.
> These expressions filter redundant rows during iteration by tracking the
> "best" value seen for each key.
> {*}Why it works{*}: For operators like `min`, `max`, and `first-seen` etc,
> the result is the same regardless of processing order. Spark's distributed
> task is actually an advantage here: zero coordination overhead, with
> correctness guaranteed by the mathematical properties of these operators.
> {*}Complexity improvement{*}: Without filtering, recursive CTEs enumerate all
> paths (exponential in graph density). With `async_track`, each key is
> accepted at most once per iteration, reducing worst-case complexity
> h3. Motivation: Need for Efficient Recursive CTEs
> The introduction of recursive CTEs in Spark was a significant step forward.
> I work in financial services where we are usubg recursive CTEs
> However, there's a hard ceiling: {*}row explosion{*}. Once graphs exceed a
> certain density or depth, queries become impractical due to exponential
> intermediate row growth. This forces teams to either:
> - Limit query scope to small subgraphs
> - Accept long runtimes and high compute costs
> - Move data to external graph systems (breaking the SQL-native workflow) and
> going outside from databrick
> {*}impact{*}:
> * Data analysts can write graph queries without learning new frameworks
> * No data movement to external graph databases
> * Cost reduction and energy efficieny
> I believe Databricks alone must be executing millions of SQL queries per hour
> across its warehouses. Even a small percentage of those hitting row explosion
> in recursive CTEs represents significant wasted compute and cost
> *The Problem today:*
> Today, to implement SSSP in pure SQL you must generate all candidate paths
> and {*}apply {{MIN(dist)}} per node at the end{*}.
> Maybe I'm missing something obvious, but I haven't found a pure SQL solution
> that avoids this explosion. If one exists, I'd love to learn about it.
> *Proposed Solution: Key-Value Memoization with Monotonic Operators*
> Introduce expressions that implement key-value memoization with provable
> convergence for monotonic operators.
> {code:java}
> async_track_min(key ANY, value NUMERIC) → BOOLEAN -- keep row if value <
> stored min or key new
> async_track_max(key ANY, value NUMERIC) → BOOLEAN -- keep row if value >
> stored max or key new
> async_track_first(key ANY) → BOOLEAN -- keep row only the
> first time the key is seen
> {code}
> KEY identifies the entity (e.g., graph node)
> VALUE is the metric to optimize (e.g., distance) OPERATOR determines
> convergence behavior (min, max, first, blend, ...)
>
> *Proof of Concept with REDIS and UDF*
> I implemented a proof-of-concept using Redis as an external state store to
> validate the feasibility of this approach and demonstrate that row explosion
> in recursive CTEs can be effectively avoided.
> * *Repository:*
> [https://github.com/devmessias/spark/tree/poc_async_track_op_expr]
> * *Approach:* Redis-backed key-value state with Lua scripts for atomic
> check-and-update
> * *Goal:* Validate correctness and asymptotic behavior, not provide a
> production-ready solution
> The PoC intentionally relied on an external system to keep the implementation
> minimal and focused on semantics. Despite the per-row network overhead, it
> successfully demonstrated that key-based, monotonic filtering during
> iteration prevents exponential row growth
> *Working proposal*
> In the diagram bellow, the approach I implemented and tested is the global
> option: an AsyncTrackCoordinator on the Driver that holds a
> ConcurrentHashMap<Key, Value> and exposes an atomic compute()-style
> check-and-update/. It’s extremely simple to implement and reason abou. The
> trade-offs are straightforward per-update RPC latency and Driver memory/heap
> limits but it was the fastest way to validate semantics and prove we can
> eliminate row explosion.
> !image-2026-02-03-22-25-23-874.png|width=663,height=442!
> I also considered a local RocksDB-based backend, but did not investigate it
> deeply. At this stage, the focus is on validating the proposal and its
> semantics with the community; backend optimizations can be explored later if
> there is agreement on the feature.
> *References*
> - [https://github.com/lsmgeb89/asynch_bellman_ford]
> - [https://arxiv.org/abs/1204.6078]
> -
> https://www.repositorio.ufal.br/bitstream/123456789/10217/1/Modelagem%20ass%C3%ADncrona%20do%20Page%20Rank.pdf
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]