[ 
https://issues.apache.org/jira/browse/SPARK-56395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gengliang Wang updated SPARK-56395:
-----------------------------------
    Fix Version/s: 4.x

> SPIP: NEAREST BY Top-K Ranking Join
> -----------------------------------
>
>                 Key: SPARK-56395
>                 URL: https://issues.apache.org/jira/browse/SPARK-56395
>             Project: Spark
>          Issue Type: Umbrella
>          Components: SQL
>    Affects Versions: 4.2.0
>            Reporter: Zhidong Qu
>            Assignee: Dilip Biswal
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 5.0.0, 4.x
>
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> h2. Q1. What are you trying to do? Articulate your objectives using 
> absolutely no jargon.
> Add a new JOIN syntax to Spark SQL that lets users find the closest matches 
> between two tables. For each row in one table (the "query" side), the system 
> finds the top-K most similar or closest rows in another table (the "base" 
> side), ranked by a user-provided scoring expression.
> For example: given a table of user profiles and a table of products, find the 
> 10 most similar products for each user based on their embeddings. Or given a 
> table of points of interest and a table of areas, find the 5 geographically 
> nearest areas for each point.
> This is a building block for use cases like semantic search, recommendation 
> systems, retrieval-augmented generation (RAG), and geospatial 
> nearest-neighbor queries — all expressed in standard SQL rather than 
> requiring custom application code or external services.
> h2. Q2. What problem is this proposal NOT designed to solve?
> This proposal adds the SQL syntax and brute-force (exact) execution path for 
> top-K ranking joins. It does *NOT* cover:
>  * *Vector index creation or management.* Index DDL and indexed approximate 
> nearest neighbor (ANN) execution are planned as subsequent work. The syntax 
> is designed to accommodate indexes transparently — when indexes exist, 
> {{APPROX}} queries can leverage them without changing the query — but index 
> support itself is out of scope.
>  * *New distance or similarity functions.* The proposal relies on existing 
> scalar functions (e.g., {{{}vector_cosine_similarity{}}}, 
> {{{}vector_l2_distance{}}}). Adding new scoring functions is orthogonal.
>  * *Cross-table filter predicates.* Predicates like {{ON t.price <= 
> q.budget}} combined with NEAREST BY are a planned future extension but not 
> part of this proposal.
>  * *Threshold-based retrieval.* Threshold retrieval for returning all 
> candidates within a given distance rather than a fixed count can be added as 
> a future extension but not part of this proposal. 
> h2. Q3. How is it done today, and what are the limits of current practice?
> Today, users simulate top-K ranking joins in Spark SQL using one of two 
> patterns:
> *Approach 1: CROSS JOIN + window function*
> sql
> SELECT *
> FROM (
> SELECT q.{*}, t.{*},
> ROW_NUMBER() OVER (
> PARTITION BY q.id
> ORDER BY vector_cosine_similarity(q.embedding, t.embedding) DESC
> ) AS rn
> FROM query_table q
> CROSS JOIN base_table t
> ) ranked
> WHERE rn <= 10;
> *Approach 2: CROSS JOIN + {{max_by}} / {{min_by}} with K overload*
> sql
> SELECT
> q.id,
> max_by(t.id, vector_cosine_similarity(q.embedding, t.embedding), 10) AS 
> top_ids
> FROM query_table q
> CROSS JOIN base_table t
> GROUP BY q.id;
> *Limitations:*
>  * *Complexity.* Approach 1 requires nested subqueries, window functions, and 
> manual ranking logic — verbose, error-prone, and difficult for non-expert SQL 
> users. Approach 2 is more concise but packs results into arrays, requiring an 
> additional {{LATERAL}} explode step to recover individual rows with all 
> base-side columns. Neither pattern is self-explanatory.
>  * *No path to optimization.* Because the intent (top-K nearest neighbor) is 
> buried inside generic CROSS JOIN + ranking patterns, the optimizer has no 
> semantic signal to apply specialized execution strategies. There is no way to 
> transparently benefit from future index-based approximate search without 
> rewriting the query.
> h2. Q4. What is new in your approach and why do you think it will be 
> successful?
> The approach extends standard SQL {{JOIN}} syntax with a {{NEAREST ... BY}} 
> clause rather than introducing a table-valued function (TVF). This is a 
> deliberate design choice:
>  * *Composability.* JOIN syntax naturally supports batch (multi-query) search 
> — each row on the left side drives an independent top-K search on the right 
> side. TVF-based approaches in other systems require workarounds like {{CROSS 
> APPLY}} for batch queries.
>  * *Pluggable ranking.* The {{BY DISTANCE <expr>}} / {{BY SIMILARITY <expr>}} 
> clause accepts any scalar expression, not just vector similarity. This makes 
> the same syntax usable for vector search, geospatial nearest-neighbor, BM25 
> text relevance, or composite scoring — without needing separate functions for 
> each modality.
>  * *Explicit search contract.* The {{APPROX}} / {{EXACT}} keywords make the 
> search algorithm contract explicit in the query. This ensures that index 
> creation or deletion never silently changes query results — only queries that 
> opt into {{APPROX}} are affected. This is a key differentiator from systems 
> where index presence implicitly changes behavior.
> h2. Q5. Who cares? If you are successful, what difference will it make?
>  * *AI/ML practitioners* get a native SQL primitive for semantic search, RAG 
> pipelines, and recommendation systems without leaving SQL or depending on 
> external vector databases.
>  * *Data analysts* can express "find the top-K closest matches" as a single 
> JOIN clause instead of a complex CROSS JOIN + window function pattern.
>  * *The Spark ecosystem* closes a competitive gap with BigQuery, Snowflake, 
> SQL Server, and PostgreSQL, all of which offer dedicated 
> vector/nearest-neighbor search capabilities.
>  * *Future work* (indexed ANN, threshold-based retrieval, ON-clause filters) 
> builds directly on the syntax and semantics established here.
> h2. Q6. What are the risks?
>  * *Syntax commitment.* Extending JOIN syntax is a language-level change that 
> is difficult to reverse. Mitigation: competitive analysis and scrutinized SQL 
> API review.
>  * *Performance expectations.* Without index support (out of scope for this 
> proposal), brute-force execution on large tables may disappoint users 
> expecting sub-linear performance. Mitigation: the {{APPROX}} / {{EXACT}} 
> keywords are part of the syntax from day one, establishing the contract for 
> future index-based optimization. Documentation will clearly state the initial 
> execution model.
> Risks are low overall. The feature enables strictly new workloads — it does 
> not modify existing JOIN semantics or execution paths. Existing queries are 
> unaffected.
> h2. Q7. How long will it take?
> Estimated 3–4 months for the scoped work:
> ||Phase||Duration||Description||
> |SQL parsing & analysis|1 week|ANTLR grammar extension, logical plan node, 
> analysis rules|
> |Brute-force execution|1 week|Query resolution and optimization, LEFT OUTER / 
> INNER semantics|
> |Spark Connect & DataFrame API|1 week|Proto definitions, DataFrame DSL, 
> PySpark bindings|
> |Docs, testing & benchmarking|1 week|SQL reference docs, end-to-end tests, 
> performance characterization|
> h2. Q8. What are the mid-term and final "exams" to check for success?
> *Mid-term:* The basic syntax parses, analyzes, and executes correctly for 
> single-vector and batch vector similarity search using brute-force 
> evaluation. The ad-hoc and batch examples from the function doc execute 
> end-to-end.
> *Final:* Full feature parity with the scoped proposal — INNER and LEFT OUTER 
> join modes, APPROX and EXACT keywords, DISTANCE and SIMILARITY ranking 
> directions, and SQL reference documentation merged. Performance on 
> representative benchmarks (single-query and batch top-K on tables with 1M+ 
> rows) is characterized and documented.
> h2. Appendix A. Proposed API Changes
> h3. Syntax
> sql
> FROM <query_table>
> [LEFT OUTER | INNER] JOIN <base_table>
> {APPROX | EXACT} NEAREST <num_results>
> BY \{DISTANCE <distance_expression> | SIMILARITY <similarity_expression>}
> h3. Arguments
> ||Argument||Description||
> |{{query_table}}|The driving (left) table. Each row generates a separate 
> top-K search against the base table. Can be a table reference, subquery, or 
> CTE.|
> |{{base_table}}|The target (right) table to search. Can be a table reference, 
> subquery, or CTE.|
> |{{LEFT OUTER \| INNER}}|Optional. INNER (default) drops query rows with no 
> matches. LEFT OUTER returns all query rows; base-side columns are NULL when 
> no candidates exist.|
> |{{APPROX \| EXACT}}|Required (one of APPROX or EXACT). APPROX allows the 
> optimizer to choose faster approximate strategies (e.g., indexed ANN in 
> future). EXACT forces brute-force evaluation. EXACT fails for inherently 
> nondeterministic expressions.|
> |{{NEAREST <num_results>}}|Compile-time constant integer expression. Maximum 
> results per query row. Capped at 100,000 following 
> {{{}max_by{}}}/{{{}min_by{}}} with K overload limit. Defaults to 1.|
> |{{BY DISTANCE <expr>}}|Scalar expression for distance ranking — smallest 
> values rank first. Must return an orderable type.|
> |{{BY SIMILARITY <expr>}}|Scalar expression for similarity ranking — largest 
> values rank first. Must return an orderable type.|
> h3. Returns
> All columns from the query table and all columns from the base table.
> h3. Examples
> sql
> – Batch recommendations
> SELECT q.user_id, t.*
> FROM users q
> INNER JOIN products t
> APPROX NEAREST 10 BY SIMILARITY vector_cosine_similarity(q.embedding, 
> t.embedding)
> – Pre-filtered search
> SELECT q.user_id, t.*
> FROM users q
> INNER JOIN (SELECT * FROM products WHERE price >= 500 AND country = 'EU') AS t
> APPROX NEAREST 10 BY SIMILARITY vector_cosine_similarity(q.embedding, 
> t.embedding)
> h3. Planned Future Extensions (out of scope)
>  * threshold-based retrieval instead of fixed count
>  * cross-table filter predicates (e.g., {{{}ON t.price <= q.budget{}}})
>  * Index DDL for ANN acceleration
> h3. Backward Compatibility
> This proposal introduces new syntax only. No existing queries, plans, or 
> behavior are affected. The new {{NEAREST}} keyword is contextual within JOIN 
> clauses and does not conflict with existing reserved words.
> ----
> h2. Appendix B. Design Sketch
> h3. Logical Plan
> A new logical plan node NearestByJoin captures the join type (INNER/LEFT 
> OUTER), search mode (APPROX/EXACT), num_results, ranking direction 
> (DISTANCE/SIMILARITY), ranking expression, and child plans for the query and 
> base sides.
> h3. Physical Plan
> NearestByJoin will be rewritten into existing physical operators including 
> JOIN, [max_by / min_by overloaded by 
> K|https://github.com/apache/spark/pull/54134], and [vector similarity / 
> distance expressions|https://github.com/apache/spark/pull/53481]. No new 
> physical operators are introduced. Longer term speaking, we may benefit from 
> writing a dedicated fused physical operator that avoids materializing a full 
> cartesian product for performance improvements, but that is out of scope for 
> this proposal and initial implementation.
> h3. Analysis
> The analyzer validates that num_results is a foldable positive integer, the 
> BY expression returns an orderable type, and EXACT mode is not used with 
> nondeterministic expressions. Standard column resolution applies to the BY 
> expression, which may reference columns from both sides.
> ----
> h2. Appendix C. Rejected Designs
> h3. Table-Valued Function (TVF)
> An earlier design used a {{VECTOR_SEARCH(base_table, query_table, ...)}} TVF. 
> This was rejected because:
>  * The function signature becomes unwieldy when supporting multiple distance 
> types, search modes, and filter options as named parameters.
>  * JOIN syntax is more natural for SQL users and composes cleanly with CTEs, 
> subqueries, and standard SQL clauses.
> h3. USING clause with distance expression
> An intermediate design used {{JOIN ... USING <distance_expression>}} with a 
> {{NEAREST}} clause. This was rejected because {{USING}} has established 
> semantics in SQL (equi-join on identically named columns) and overloading it 
> for distance expressions would be confusing.
> h3. ORDER BY + LIMIT pattern
> Relying on {{ORDER BY similarity_function(...) LIMIT K}} was considered but 
> rejected because it only supports single-query search naturally. Batch search 
> requires verbose CROSS JOIN + window function patterns, and the optimizer has 
> no semantic signal to apply specialized execution strategies.
> h3. LATERAL Subquery
> A LATERAL subquery approach - where each query row drives an ORDER BY 
> distance LIMIT K subquery against the base table - was considered. This 
> composes well for batch search and is the pattern used by pgvector. However, 
> LATERAL has well-defined semantics: the subquery executes deterministically 
> per row. This makes it inherently an exact search construct with no room for 
> approximate semantics — there is no natural way to express "the optimizer may 
> skip candidates or use an ANN index" within LATERAL's execution contract. 
> Since the ability to opt into approximate search (APPROX) is a core design 
> goal, LATERAL was rejected as the primary syntax.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to