Zhidong Qu created SPARK-56395:
----------------------------------
Summary: SPIP: NEAREST BY Top-K Ranking Join
Key: SPARK-56395
URL: https://issues.apache.org/jira/browse/SPARK-56395
Project: Spark
Issue Type: Umbrella
Components: SQL
Affects Versions: 4.2.0
Reporter: Zhidong Qu
h2. Q1. What are you trying to do? Articulate your objectives using absolutely
no jargon.
Add a new JOIN syntax to Spark SQL that lets users find the closest matches
between two tables. For each row in one table (the "query" side), the system
finds the top-K most similar or closest rows in another table (the "base"
side), ranked by a user-provided scoring expression.
For example: given a table of user profiles and a table of products, find the
10 most similar products for each user based on their embeddings. Or given a
table of points of interest and a table of areas, find the 5 geographically
nearest areas for each point.
This is a building block for use cases like semantic search, recommendation
systems, retrieval-augmented generation (RAG), and geospatial nearest-neighbor
queries — all expressed in standard SQL rather than requiring custom
application code or external services.
h2. Q2. What problem is this proposal NOT designed to solve?
This proposal adds the SQL syntax and brute-force (exact) execution path for
top-K ranking joins. It does *not* cover:
* *Vector index creation or management.* Index DDL and indexed approximate
nearest neighbor (ANN) execution are planned as subsequent work. The syntax is
designed to accommodate indexes transparently — when indexes exist, {{APPROX}}
queries can leverage them without changing the query — but index support itself
is out of scope.
* *New distance or similarity functions.* The proposal relies on existing
scalar functions (e.g., {{{}vector_cosine_similarity{}}},
{{{}vector_l2_distance{}}}). Adding new scoring functions is orthogonal.
* *Cross-table filter predicates in ON clauses.* Predicates like {{ON t.price
<= q.budget}} combined with NEAREST BY are a planned future extension but not
part of this proposal.
* *Threshold-based retrieval.* A {{WITHIN <threshold>}} clause (return all
candidates within a given distance rather than a fixed count) is a planned
future extension.
h2. Q3. How is it done today, and what are the limits of current practice?
Today, users simulate top-K ranking joins in Spark SQL using one of two
patterns:
*Approach 1: CROSS JOIN + window function*
sql
SELECT *
FROM (
SELECT q.*, t.*,
ROW_NUMBER() OVER (
PARTITION BY q.id
ORDER BY vector_cosine_similarity(q.embedding, t.embedding) DESC
) AS rn
FROM query_table q
CROSS JOIN base_table t
) ranked
WHERE rn <= 10;
*Approach 2: CROSS JOIN + {{max_by}} / {{min_by}} with K overload*
sql
SELECT
q.id,
max_by(t.id, vector_cosine_similarity(q.embedding, t.embedding), 10) AS
top_ids
FROM query_table q
CROSS JOIN base_table t
GROUP BY q.id;
*Limitations:*
* *Complexity.* Approach 1 requires nested subqueries, window functions, and
manual ranking logic — verbose, error-prone, and difficult for non-expert SQL
users. Approach 2 is more concise but packs results into arrays, requiring an
additional {{LATERAL}} explode step to recover individual rows with all
base-side columns. Neither pattern is self-explanatory.
* *No path to optimization.* Because the intent (top-K nearest neighbor) is
buried inside generic CROSS JOIN + ranking patterns, the optimizer has no
semantic signal to apply specialized execution strategies. There is no way to
transparently benefit from future index-based approximate search without
rewriting the query.
h2. Q4. What is new in your approach and why do you think it will be successful?
The approach extends standard SQL {{JOIN}} syntax with a {{NEAREST ... BY}}
clause rather than introducing a table-valued function (TVF). This is a
deliberate design choice:
* *Composability.* JOIN syntax naturally supports batch (multi-query) search —
each row on the left side drives an independent top-K search on the right side.
TVF-based approaches in other systems require workarounds like {{CROSS APPLY}}
for batch queries.
* *Pluggable ranking.* The {{BY DISTANCE <expr>}} / {{BY SIMILARITY <expr>}}
clause accepts any scalar expression, not just vector similarity. This makes
the same syntax usable for vector search, geospatial nearest-neighbor, BM25
text relevance, or composite scoring — without needing separate functions for
each modality.
* *Explicit search contract.* The {{APPROX}} / {{EXACT}} keywords make the
search algorithm contract explicit in the query. This ensures that index
creation or deletion never silently changes query results — only queries that
opt into {{APPROX}} are affected. This is a key differentiator from systems
where index presence implicitly changes behavior.
h2. Q5. Who cares? If you are successful, what difference will it make?
* *AI/ML practitioners* get a native SQL primitive for semantic search, RAG
pipelines, and recommendation systems without leaving SQL or depending on
external vector databases.
* *Data analysts* can express "find the top-K closest matches" as a single
JOIN clause instead of a complex CROSS JOIN + window function pattern.
* *The Spark ecosystem* closes a competitive gap with BigQuery, Snowflake, SQL
Server, and PostgreSQL, all of which offer dedicated vector/nearest-neighbor
search capabilities.
* *Future work* (indexed ANN, threshold-based retrieval, ON-clause filters)
builds directly on the syntax and semantics established here.
h2. Q6. What are the risks?
* *Syntax commitment.* Extending JOIN syntax is a language-level change that
is difficult to reverse. Mitigation: competitive analysis and scrutinized SQL
API review.
* *Performance expectations.* Without index support (out of scope for this
proposal), brute-force execution on large tables may disappoint users expecting
sub-linear performance. Mitigation: the {{APPROX}} / {{EXACT}} keywords are
part of the syntax from day one, establishing the contract for future
index-based optimization. Documentation will clearly state the initial
execution model.
Risks are low overall. The feature enables strictly new workloads — it does not
modify existing JOIN semantics or execution paths. Existing queries are
unaffected.
h2. Q7. How long will it take?
Estimated 3–4 months for the scoped work:
||Phase||Duration||Description||
|SQL parsing & analysis|1 week|ANTLR grammar extension, logical plan node,
analysis rules|
|Brute-force execution|1 week|Query resolution and optimization, LEFT OUTER /
INNER semantics|
|Spark Connect & DataFrame API|1 week|Proto definitions, DataFrame DSL, PySpark
bindings|
|Docs, testing & benchmarking|1 week|SQL reference docs, end-to-end tests,
performance characterization|
h2. Q8. What are the mid-term and final "exams" to check for success?
*Mid-term:* The basic syntax parses, analyzes, and executes correctly for
single-vector and batch vector similarity search using brute-force evaluation.
The ad-hoc and batch examples from the function doc execute end-to-end.
*Final:* Full feature parity with the scoped proposal — INNER and LEFT OUTER
join modes, APPROX and EXACT keywords, DISTANCE and SIMILARITY ranking
directions, Spark Connect and PySpark support, and SQL reference documentation
merged. Performance on representative benchmarks (single-query and batch top-K
on tables with 1M+ rows) is characterized and documented.
h2. Appendix A. Proposed API Changes
h3. Syntax
sql
FROM <query_table>
[LEFT OUTER | INNER] JOIN <base_table>
[APPROX | EXACT] NEAREST <num_results>
BY \{DISTANCE <distance_expression> | SIMILARITY <similarity_expression>}
h3. Arguments
||Argument||Description||
|{{query_table}}|The driving (left) table. Each row generates a separate top-K
search against the base table. Can be a table reference, subquery, or CTE.|
|{{base_table}}|The target (right) table to search. Can be a table reference,
subquery, or CTE.|
|{{LEFT OUTER \| INNER}}|Optional. INNER (default) drops query rows with no
matches. LEFT OUTER returns all query rows; base-side columns are NULL when no
candidates exist.|
|{{APPROX \| EXACT}}|Optional, defaults to APPROX. APPROX allows the optimizer
to choose faster approximate strategies (e.g., indexed ANN in future). EXACT
forces brute-force evaluation. EXACT fails for inherently nondeterministic
expressions.|
|{{NEAREST <num_results>}}|Compile-time constant integer expression. Maximum
results per query row. Capped at 100,000 following
{{{}max_by{}}}/{{{}min_by{}}} with K overload limit. Defaults to 1.|
|{{BY DISTANCE <expr>}}|Scalar expression for distance ranking — smallest
values rank first. Must return an orderable type.|
|{{BY SIMILARITY <expr>}}|Scalar expression for similarity ranking — largest
values rank first. Must return an orderable type.|
h3. Returns
All columns from the query table and all columns from the base table.
h3. Examples
sql
-- Batch recommendations
SELECT q.user_id, t.*
FROM users q
INNER JOIN products t
APPROX NEAREST 10 BY SIMILARITY vector_cosine_similarity(q.embedding,
t.embedding)
-- Pre-filtered search
SELECT q.user_id, t.*
FROM users q
INNER JOIN (SELECT * FROM products WHERE price >= 500 AND country = 'EU') AS t
APPROX NEAREST 10 BY SIMILARITY vector_cosine_similarity(q.embedding,
t.embedding)
h3. Planned Future Extensions (out of scope)
* {{WITHIN <threshold>}} — threshold-based retrieval instead of fixed count
* {{ON <predicate>}} — cross-table filter predicates (e.g., {{{}ON t.price <=
q.budget{}}})
* Index DDL for ANN acceleration
h3. Backward Compatibility
This proposal introduces new syntax only. No existing queries, plans, or
behavior are affected. The new {{NEAREST}} keyword is contextual within JOIN
clauses and does not conflict with existing reserved words.
----
h2. Appendix B. Design Sketch
h3. Logical Plan
A new logical plan node {{NearestByJoin}} captures the join type (INNER/LEFT
OUTER), search mode (APPROX/EXACT), num_results, ranking direction
(DISTANCE/SIMILARITY), ranking expression, and child plans for the query and
base sides.
h3. Analysis
The analyzer validates that {{num_results}} is a foldable positive integer, the
BY expression returns an orderable type, and EXACT mode is not used with
nondeterministic expressions. Standard column resolution applies to the BY
expression, which may reference columns from both sides.
h3. Execution (Brute-Force)
For each partition of the query side, the executor evaluates the BY expression
against all rows of the base side and retains the top-K results using min_by /
max_by operator with K overload.
The APPROX keyword has no effect on execution in this initial phase
(brute-force is used in all cases). When index support is added in a subsequent
phase, APPROX queries against indexed tables will transparently use ANN search
for indexed data files and brute-force for unindexed data files.
h3. Separability
The optimizer pattern-matches the BY expression to determine future index
eligibility. For index-eligible routing, the expression must be a recognized
function with separable arguments — one argument bound entirely to the query
side and one bound entirely to the base side. Non-separable expressions are
valid but will always use brute-force execution.
----
h2. Appendix C. Rejected Designs
h3. Table-Valued Function (TVF)
An earlier design used a {{VECTOR_SEARCH(base_table, query_table, ...)}} TVF.
This was rejected because:
* The function signature becomes unwieldy when supporting multiple distance
types, search modes, and filter options as named parameters.
* JOIN syntax is more natural for SQL users and composes cleanly with CTEs,
subqueries, and standard SQL clauses.
h3. USING clause with distance expression
An intermediate design used {{JOIN ... USING <distance_expression>}} with a
{{NEAREST}} clause. This was rejected because {{USING}} has established
semantics in SQL (equi-join on identically named columns) and overloading it
for distance expressions would be confusing.
h3. ORDER BY + LIMIT pattern
Relying on {{ORDER BY similarity_function(...) LIMIT K}} was considered but
rejected because it only supports single-query search naturally. Batch search
requires verbose CROSS JOIN + window function patterns, and the optimizer has
no semantic signal to apply specialized execution strategies.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]