[
https://issues.apache.org/jira/browse/SPARK-56395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gengliang Wang updated SPARK-56395:
-----------------------------------
Fix Version/s: 4.x
> SPIP: NEAREST BY Top-K Ranking Join
> -----------------------------------
>
> Key: SPARK-56395
> URL: https://issues.apache.org/jira/browse/SPARK-56395
> Project: Spark
> Issue Type: Umbrella
> Components: SQL
> Affects Versions: 4.2.0
> Reporter: Zhidong Qu
> Assignee: Dilip Biswal
> Priority: Major
> Labels: pull-request-available
> Fix For: 5.0.0, 4.x
>
> Original Estimate: 672h
> Remaining Estimate: 672h
>
> h2. Q1. What are you trying to do? Articulate your objectives using
> absolutely no jargon.
> Add a new JOIN syntax to Spark SQL that lets users find the closest matches
> between two tables. For each row in one table (the "query" side), the system
> finds the top-K most similar or closest rows in another table (the "base"
> side), ranked by a user-provided scoring expression.
> For example: given a table of user profiles and a table of products, find the
> 10 most similar products for each user based on their embeddings. Or given a
> table of points of interest and a table of areas, find the 5 geographically
> nearest areas for each point.
> This is a building block for use cases like semantic search, recommendation
> systems, retrieval-augmented generation (RAG), and geospatial
> nearest-neighbor queries — all expressed in standard SQL rather than
> requiring custom application code or external services.
> h2. Q2. What problem is this proposal NOT designed to solve?
> This proposal adds the SQL syntax and brute-force (exact) execution path for
> top-K ranking joins. It does *NOT* cover:
> * *Vector index creation or management.* Index DDL and indexed approximate
> nearest neighbor (ANN) execution are planned as subsequent work. The syntax
> is designed to accommodate indexes transparently — when indexes exist,
> {{APPROX}} queries can leverage them without changing the query — but index
> support itself is out of scope.
> * *New distance or similarity functions.* The proposal relies on existing
> scalar functions (e.g., {{{}vector_cosine_similarity{}}},
> {{{}vector_l2_distance{}}}). Adding new scoring functions is orthogonal.
> * *Cross-table filter predicates.* Predicates like {{ON t.price <=
> q.budget}} combined with NEAREST BY are a planned future extension but not
> part of this proposal.
> * *Threshold-based retrieval.* Threshold retrieval for returning all
> candidates within a given distance rather than a fixed count can be added as
> a future extension but not part of this proposal.
> h2. Q3. How is it done today, and what are the limits of current practice?
> Today, users simulate top-K ranking joins in Spark SQL using one of two
> patterns:
> *Approach 1: CROSS JOIN + window function*
> sql
> SELECT *
> FROM (
> SELECT q.{*}, t.{*},
> ROW_NUMBER() OVER (
> PARTITION BY q.id
> ORDER BY vector_cosine_similarity(q.embedding, t.embedding) DESC
> ) AS rn
> FROM query_table q
> CROSS JOIN base_table t
> ) ranked
> WHERE rn <= 10;
> *Approach 2: CROSS JOIN + {{max_by}} / {{min_by}} with K overload*
> sql
> SELECT
> q.id,
> max_by(t.id, vector_cosine_similarity(q.embedding, t.embedding), 10) AS
> top_ids
> FROM query_table q
> CROSS JOIN base_table t
> GROUP BY q.id;
> *Limitations:*
> * *Complexity.* Approach 1 requires nested subqueries, window functions, and
> manual ranking logic — verbose, error-prone, and difficult for non-expert SQL
> users. Approach 2 is more concise but packs results into arrays, requiring an
> additional {{LATERAL}} explode step to recover individual rows with all
> base-side columns. Neither pattern is self-explanatory.
> * *No path to optimization.* Because the intent (top-K nearest neighbor) is
> buried inside generic CROSS JOIN + ranking patterns, the optimizer has no
> semantic signal to apply specialized execution strategies. There is no way to
> transparently benefit from future index-based approximate search without
> rewriting the query.
> h2. Q4. What is new in your approach and why do you think it will be
> successful?
> The approach extends standard SQL {{JOIN}} syntax with a {{NEAREST ... BY}}
> clause rather than introducing a table-valued function (TVF). This is a
> deliberate design choice:
> * *Composability.* JOIN syntax naturally supports batch (multi-query) search
> — each row on the left side drives an independent top-K search on the right
> side. TVF-based approaches in other systems require workarounds like {{CROSS
> APPLY}} for batch queries.
> * *Pluggable ranking.* The {{BY DISTANCE <expr>}} / {{BY SIMILARITY <expr>}}
> clause accepts any scalar expression, not just vector similarity. This makes
> the same syntax usable for vector search, geospatial nearest-neighbor, BM25
> text relevance, or composite scoring — without needing separate functions for
> each modality.
> * *Explicit search contract.* The {{APPROX}} / {{EXACT}} keywords make the
> search algorithm contract explicit in the query. This ensures that index
> creation or deletion never silently changes query results — only queries that
> opt into {{APPROX}} are affected. This is a key differentiator from systems
> where index presence implicitly changes behavior.
> h2. Q5. Who cares? If you are successful, what difference will it make?
> * *AI/ML practitioners* get a native SQL primitive for semantic search, RAG
> pipelines, and recommendation systems without leaving SQL or depending on
> external vector databases.
> * *Data analysts* can express "find the top-K closest matches" as a single
> JOIN clause instead of a complex CROSS JOIN + window function pattern.
> * *The Spark ecosystem* closes a competitive gap with BigQuery, Snowflake,
> SQL Server, and PostgreSQL, all of which offer dedicated
> vector/nearest-neighbor search capabilities.
> * *Future work* (indexed ANN, threshold-based retrieval, ON-clause filters)
> builds directly on the syntax and semantics established here.
> h2. Q6. What are the risks?
> * *Syntax commitment.* Extending JOIN syntax is a language-level change that
> is difficult to reverse. Mitigation: competitive analysis and scrutinized SQL
> API review.
> * *Performance expectations.* Without index support (out of scope for this
> proposal), brute-force execution on large tables may disappoint users
> expecting sub-linear performance. Mitigation: the {{APPROX}} / {{EXACT}}
> keywords are part of the syntax from day one, establishing the contract for
> future index-based optimization. Documentation will clearly state the initial
> execution model.
> Risks are low overall. The feature enables strictly new workloads — it does
> not modify existing JOIN semantics or execution paths. Existing queries are
> unaffected.
> h2. Q7. How long will it take?
> Estimated 3–4 months for the scoped work:
> ||Phase||Duration||Description||
> |SQL parsing & analysis|1 week|ANTLR grammar extension, logical plan node,
> analysis rules|
> |Brute-force execution|1 week|Query resolution and optimization, LEFT OUTER /
> INNER semantics|
> |Spark Connect & DataFrame API|1 week|Proto definitions, DataFrame DSL,
> PySpark bindings|
> |Docs, testing & benchmarking|1 week|SQL reference docs, end-to-end tests,
> performance characterization|
> h2. Q8. What are the mid-term and final "exams" to check for success?
> *Mid-term:* The basic syntax parses, analyzes, and executes correctly for
> single-vector and batch vector similarity search using brute-force
> evaluation. The ad-hoc and batch examples from the function doc execute
> end-to-end.
> *Final:* Full feature parity with the scoped proposal — INNER and LEFT OUTER
> join modes, APPROX and EXACT keywords, DISTANCE and SIMILARITY ranking
> directions, and SQL reference documentation merged. Performance on
> representative benchmarks (single-query and batch top-K on tables with 1M+
> rows) is characterized and documented.
> h2. Appendix A. Proposed API Changes
> h3. Syntax
> sql
> FROM <query_table>
> [LEFT OUTER | INNER] JOIN <base_table>
> {APPROX | EXACT} NEAREST <num_results>
> BY \{DISTANCE <distance_expression> | SIMILARITY <similarity_expression>}
> h3. Arguments
> ||Argument||Description||
> |{{query_table}}|The driving (left) table. Each row generates a separate
> top-K search against the base table. Can be a table reference, subquery, or
> CTE.|
> |{{base_table}}|The target (right) table to search. Can be a table reference,
> subquery, or CTE.|
> |{{LEFT OUTER \| INNER}}|Optional. INNER (default) drops query rows with no
> matches. LEFT OUTER returns all query rows; base-side columns are NULL when
> no candidates exist.|
> |{{APPROX \| EXACT}}|Required (one of APPROX or EXACT). APPROX allows the
> optimizer to choose faster approximate strategies (e.g., indexed ANN in
> future). EXACT forces brute-force evaluation. EXACT fails for inherently
> nondeterministic expressions.|
> |{{NEAREST <num_results>}}|Compile-time constant integer expression. Maximum
> results per query row. Capped at 100,000 following
> {{{}max_by{}}}/{{{}min_by{}}} with K overload limit. Defaults to 1.|
> |{{BY DISTANCE <expr>}}|Scalar expression for distance ranking — smallest
> values rank first. Must return an orderable type.|
> |{{BY SIMILARITY <expr>}}|Scalar expression for similarity ranking — largest
> values rank first. Must return an orderable type.|
> h3. Returns
> All columns from the query table and all columns from the base table.
> h3. Examples
> sql
> – Batch recommendations
> SELECT q.user_id, t.*
> FROM users q
> INNER JOIN products t
> APPROX NEAREST 10 BY SIMILARITY vector_cosine_similarity(q.embedding,
> t.embedding)
> – Pre-filtered search
> SELECT q.user_id, t.*
> FROM users q
> INNER JOIN (SELECT * FROM products WHERE price >= 500 AND country = 'EU') AS t
> APPROX NEAREST 10 BY SIMILARITY vector_cosine_similarity(q.embedding,
> t.embedding)
> h3. Planned Future Extensions (out of scope)
> * threshold-based retrieval instead of fixed count
> * cross-table filter predicates (e.g., {{{}ON t.price <= q.budget{}}})
> * Index DDL for ANN acceleration
> h3. Backward Compatibility
> This proposal introduces new syntax only. No existing queries, plans, or
> behavior are affected. The new {{NEAREST}} keyword is contextual within JOIN
> clauses and does not conflict with existing reserved words.
> ----
> h2. Appendix B. Design Sketch
> h3. Logical Plan
> A new logical plan node NearestByJoin captures the join type (INNER/LEFT
> OUTER), search mode (APPROX/EXACT), num_results, ranking direction
> (DISTANCE/SIMILARITY), ranking expression, and child plans for the query and
> base sides.
> h3. Physical Plan
> NearestByJoin will be rewritten into existing physical operators including
> JOIN, [max_by / min_by overloaded by
> K|https://github.com/apache/spark/pull/54134], and [vector similarity /
> distance expressions|https://github.com/apache/spark/pull/53481]. No new
> physical operators are introduced. Longer term speaking, we may benefit from
> writing a dedicated fused physical operator that avoids materializing a full
> cartesian product for performance improvements, but that is out of scope for
> this proposal and initial implementation.
> h3. Analysis
> The analyzer validates that num_results is a foldable positive integer, the
> BY expression returns an orderable type, and EXACT mode is not used with
> nondeterministic expressions. Standard column resolution applies to the BY
> expression, which may reference columns from both sides.
> ----
> h2. Appendix C. Rejected Designs
> h3. Table-Valued Function (TVF)
> An earlier design used a {{VECTOR_SEARCH(base_table, query_table, ...)}} TVF.
> This was rejected because:
> * The function signature becomes unwieldy when supporting multiple distance
> types, search modes, and filter options as named parameters.
> * JOIN syntax is more natural for SQL users and composes cleanly with CTEs,
> subqueries, and standard SQL clauses.
> h3. USING clause with distance expression
> An intermediate design used {{JOIN ... USING <distance_expression>}} with a
> {{NEAREST}} clause. This was rejected because {{USING}} has established
> semantics in SQL (equi-join on identically named columns) and overloading it
> for distance expressions would be confusing.
> h3. ORDER BY + LIMIT pattern
> Relying on {{ORDER BY similarity_function(...) LIMIT K}} was considered but
> rejected because it only supports single-query search naturally. Batch search
> requires verbose CROSS JOIN + window function patterns, and the optimizer has
> no semantic signal to apply specialized execution strategies.
> h3. LATERAL Subquery
> A LATERAL subquery approach - where each query row drives an ORDER BY
> distance LIMIT K subquery against the base table - was considered. This
> composes well for batch search and is the pattern used by pgvector. However,
> LATERAL has well-defined semantics: the subquery executes deterministically
> per row. This makes it inherently an exact search construct with no room for
> approximate semantics — there is no natural way to express "the optimizer may
> skip candidates or use an ANN index" within LATERAL's execution contract.
> Since the ability to opt into approximate search (APPROX) is a core design
> goal, LATERAL was rejected as the primary syntax.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]