Zhidong Qu created SPARK-56395:
----------------------------------

             Summary: SPIP: NEAREST BY Top-K Ranking Join
                 Key: SPARK-56395
                 URL: https://issues.apache.org/jira/browse/SPARK-56395
             Project: Spark
          Issue Type: Umbrella
          Components: SQL
    Affects Versions: 4.2.0
            Reporter: Zhidong Qu


h2. Q1. What are you trying to do? Articulate your objectives using absolutely 
no jargon.

Add a new JOIN syntax to Spark SQL that lets users find the closest matches 
between two tables. For each row in one table (the "query" side), the system 
finds the top-K most similar or closest rows in another table (the "base" 
side), ranked by a user-provided scoring expression.

For example: given a table of user profiles and a table of products, find the 
10 most similar products for each user based on their embeddings. Or given a 
table of points of interest and a table of areas, find the 5 geographically 
nearest areas for each point.

This is a building block for use cases like semantic search, recommendation 
systems, retrieval-augmented generation (RAG), and geospatial nearest-neighbor 
queries — all expressed in standard SQL rather than requiring custom 
application code or external services.
h2. Q2. What problem is this proposal NOT designed to solve?

This proposal adds the SQL syntax and brute-force (exact) execution path for 
top-K ranking joins. It does *not* cover:
 * *Vector index creation or management.* Index DDL and indexed approximate 
nearest neighbor (ANN) execution are planned as subsequent work. The syntax is 
designed to accommodate indexes transparently — when indexes exist, {{APPROX}} 
queries can leverage them without changing the query — but index support itself 
is out of scope.
 * *New distance or similarity functions.* The proposal relies on existing 
scalar functions (e.g., {{{}vector_cosine_similarity{}}}, 
{{{}vector_l2_distance{}}}). Adding new scoring functions is orthogonal.
 * *Cross-table filter predicates in ON clauses.* Predicates like {{ON t.price 
<= q.budget}} combined with NEAREST BY are a planned future extension but not 
part of this proposal.
 * *Threshold-based retrieval.* A {{WITHIN <threshold>}} clause (return all 
candidates within a given distance rather than a fixed count) is a planned 
future extension.

h2. Q3. How is it done today, and what are the limits of current practice?

Today, users simulate top-K ranking joins in Spark SQL using one of two 
patterns:

*Approach 1: CROSS JOIN + window function*
sql
SELECT *
FROM (
  SELECT q.*, t.*,
    ROW_NUMBER() OVER (
      PARTITION BY q.id
      ORDER BY vector_cosine_similarity(q.embedding, t.embedding) DESC
    ) AS rn
  FROM query_table q
  CROSS JOIN base_table t
) ranked
WHERE rn <= 10;
*Approach 2: CROSS JOIN + {{max_by}} / {{min_by}} with K overload*
sql
SELECT
  q.id,
  max_by(t.id, vector_cosine_similarity(q.embedding, t.embedding), 10) AS 
top_ids
FROM query_table q
CROSS JOIN base_table t
GROUP BY q.id;
*Limitations:*
 * *Complexity.* Approach 1 requires nested subqueries, window functions, and 
manual ranking logic — verbose, error-prone, and difficult for non-expert SQL 
users. Approach 2 is more concise but packs results into arrays, requiring an 
additional {{LATERAL}} explode step to recover individual rows with all 
base-side columns. Neither pattern is self-explanatory.
 * *No path to optimization.* Because the intent (top-K nearest neighbor) is 
buried inside generic CROSS JOIN + ranking patterns, the optimizer has no 
semantic signal to apply specialized execution strategies. There is no way to 
transparently benefit from future index-based approximate search without 
rewriting the query.

h2. Q4. What is new in your approach and why do you think it will be successful?

The approach extends standard SQL {{JOIN}} syntax with a {{NEAREST ... BY}} 
clause rather than introducing a table-valued function (TVF). This is a 
deliberate design choice:
 * *Composability.* JOIN syntax naturally supports batch (multi-query) search — 
each row on the left side drives an independent top-K search on the right side. 
TVF-based approaches in other systems require workarounds like {{CROSS APPLY}} 
for batch queries.
 * *Pluggable ranking.* The {{BY DISTANCE <expr>}} / {{BY SIMILARITY <expr>}} 
clause accepts any scalar expression, not just vector similarity. This makes 
the same syntax usable for vector search, geospatial nearest-neighbor, BM25 
text relevance, or composite scoring — without needing separate functions for 
each modality.
 * *Explicit search contract.* The {{APPROX}} / {{EXACT}} keywords make the 
search algorithm contract explicit in the query. This ensures that index 
creation or deletion never silently changes query results — only queries that 
opt into {{APPROX}} are affected. This is a key differentiator from systems 
where index presence implicitly changes behavior.

h2. Q5. Who cares? If you are successful, what difference will it make?
 * *AI/ML practitioners* get a native SQL primitive for semantic search, RAG 
pipelines, and recommendation systems without leaving SQL or depending on 
external vector databases.
 * *Data analysts* can express "find the top-K closest matches" as a single 
JOIN clause instead of a complex CROSS JOIN + window function pattern.
 * *The Spark ecosystem* closes a competitive gap with BigQuery, Snowflake, SQL 
Server, and PostgreSQL, all of which offer dedicated vector/nearest-neighbor 
search capabilities.
 * *Future work* (indexed ANN, threshold-based retrieval, ON-clause filters) 
builds directly on the syntax and semantics established here.

h2. Q6. What are the risks?
 * *Syntax commitment.* Extending JOIN syntax is a language-level change that 
is difficult to reverse. Mitigation: competitive analysis and scrutinized SQL 
API review.
 * *Performance expectations.* Without index support (out of scope for this 
proposal), brute-force execution on large tables may disappoint users expecting 
sub-linear performance. Mitigation: the {{APPROX}} / {{EXACT}} keywords are 
part of the syntax from day one, establishing the contract for future 
index-based optimization. Documentation will clearly state the initial 
execution model.

Risks are low overall. The feature enables strictly new workloads — it does not 
modify existing JOIN semantics or execution paths. Existing queries are 
unaffected.
h2. Q7. How long will it take?

Estimated 3–4 months for the scoped work:
||Phase||Duration||Description||
|SQL parsing & analysis|1 week|ANTLR grammar extension, logical plan node, 
analysis rules|
|Brute-force execution|1 week|Query resolution and optimization, LEFT OUTER / 
INNER semantics|
|Spark Connect & DataFrame API|1 week|Proto definitions, DataFrame DSL, PySpark 
bindings|
|Docs, testing & benchmarking|1 week|SQL reference docs, end-to-end tests, 
performance characterization|
h2. Q8. What are the mid-term and final "exams" to check for success?

*Mid-term:* The basic syntax parses, analyzes, and executes correctly for 
single-vector and batch vector similarity search using brute-force evaluation. 
The ad-hoc and batch examples from the function doc execute end-to-end.

*Final:* Full feature parity with the scoped proposal — INNER and LEFT OUTER 
join modes, APPROX and EXACT keywords, DISTANCE and SIMILARITY ranking 
directions, Spark Connect and PySpark support, and SQL reference documentation 
merged. Performance on representative benchmarks (single-query and batch top-K 
on tables with 1M+ rows) is characterized and documented.
h2. Appendix A. Proposed API Changes
h3. Syntax
sql
FROM <query_table>
[LEFT OUTER | INNER] JOIN <base_table>
[APPROX | EXACT] NEAREST <num_results>
BY \{DISTANCE <distance_expression> | SIMILARITY <similarity_expression>}
h3. Arguments
||Argument||Description||
|{{query_table}}|The driving (left) table. Each row generates a separate top-K 
search against the base table. Can be a table reference, subquery, or CTE.|
|{{base_table}}|The target (right) table to search. Can be a table reference, 
subquery, or CTE.|
|{{LEFT OUTER \| INNER}}|Optional. INNER (default) drops query rows with no 
matches. LEFT OUTER returns all query rows; base-side columns are NULL when no 
candidates exist.|
|{{APPROX \| EXACT}}|Optional, defaults to APPROX. APPROX allows the optimizer 
to choose faster approximate strategies (e.g., indexed ANN in future). EXACT 
forces brute-force evaluation. EXACT fails for inherently nondeterministic 
expressions.|
|{{NEAREST <num_results>}}|Compile-time constant integer expression. Maximum 
results per query row. Capped at 100,000 following 
{{{}max_by{}}}/{{{}min_by{}}} with K overload limit. Defaults to 1.|
|{{BY DISTANCE <expr>}}|Scalar expression for distance ranking — smallest 
values rank first. Must return an orderable type.|
|{{BY SIMILARITY <expr>}}|Scalar expression for similarity ranking — largest 
values rank first. Must return an orderable type.|
h3. Returns

All columns from the query table and all columns from the base table.
h3. Examples
sql
-- Batch recommendations
SELECT q.user_id, t.*
FROM users q
INNER JOIN products t
APPROX NEAREST 10 BY SIMILARITY vector_cosine_similarity(q.embedding, 
t.embedding)

-- Pre-filtered search
SELECT q.user_id, t.*
FROM users q
INNER JOIN (SELECT * FROM products WHERE price >= 500 AND country = 'EU') AS t
APPROX NEAREST 10 BY SIMILARITY vector_cosine_similarity(q.embedding, 
t.embedding)
h3. Planned Future Extensions (out of scope)
 * {{WITHIN <threshold>}} — threshold-based retrieval instead of fixed count
 * {{ON <predicate>}} — cross-table filter predicates (e.g., {{{}ON t.price <= 
q.budget{}}})
 * Index DDL for ANN acceleration

h3. Backward Compatibility

This proposal introduces new syntax only. No existing queries, plans, or 
behavior are affected. The new {{NEAREST}} keyword is contextual within JOIN 
clauses and does not conflict with existing reserved words.
----
h2. Appendix B. Design Sketch
h3. Logical Plan

A new logical plan node {{NearestByJoin}} captures the join type (INNER/LEFT 
OUTER), search mode (APPROX/EXACT), num_results, ranking direction 
(DISTANCE/SIMILARITY), ranking expression, and child plans for the query and 
base sides.
h3. Analysis

The analyzer validates that {{num_results}} is a foldable positive integer, the 
BY expression returns an orderable type, and EXACT mode is not used with 
nondeterministic expressions. Standard column resolution applies to the BY 
expression, which may reference columns from both sides.
h3. Execution (Brute-Force)

For each partition of the query side, the executor evaluates the BY expression 
against all rows of the base side and retains the top-K results using min_by / 
max_by operator with K overload. 

The APPROX keyword has no effect on execution in this initial phase 
(brute-force is used in all cases). When index support is added in a subsequent 
phase, APPROX queries against indexed tables will transparently use ANN search 
for indexed data files and brute-force for unindexed data files.
h3. Separability

The optimizer pattern-matches the BY expression to determine future index 
eligibility. For index-eligible routing, the expression must be a recognized 
function with separable arguments — one argument bound entirely to the query 
side and one bound entirely to the base side. Non-separable expressions are 
valid but will always use brute-force execution.
----
h2. Appendix C. Rejected Designs
h3. Table-Valued Function (TVF)

An earlier design used a {{VECTOR_SEARCH(base_table, query_table, ...)}} TVF. 
This was rejected because:
 * The function signature becomes unwieldy when supporting multiple distance 
types, search modes, and filter options as named parameters.
 * JOIN syntax is more natural for SQL users and composes cleanly with CTEs, 
subqueries, and standard SQL clauses.

h3. USING clause with distance expression

An intermediate design used {{JOIN ... USING <distance_expression>}} with a 
{{NEAREST}} clause. This was rejected because {{USING}} has established 
semantics in SQL (equi-join on identically named columns) and overloading it 
for distance expressions would be confusing.
h3. ORDER BY + LIMIT pattern

Relying on {{ORDER BY similarity_function(...) LIMIT K}} was considered but 
rejected because it only supports single-query search naturally. Batch search 
requires verbose CROSS JOIN + window function patterns, and the optimizer has 
no semantic signal to apply specialized execution strategies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to