dilipbiswal opened a new pull request, #55629: URL: https://github.com/apache/spark/pull/55629
### What changes were proposed in this pull request? This is the first of two PRs implementing https://issues.apache.org/jira/browse/SPARK-56395. It introduces the SQL grammar, logical plan, analyzer checks, and optimizer rewrite. The DataFrame / PySpark / Spark Connect API surface is split into a follow-up PR. **SQL syntax** ``` left_relation [ INNER | LEFT [ OUTER ] ] JOIN right_relation nearest_by_clause nearest_by_clause: { APPROX | EXACT } NEAREST [ num_results ] BY { DISTANCE | SIMILARITY } ranking_expression ``` Only INNER (default) and LEFT OUTER join types are supported. num_results is a positive integer in [1, 100000], default 1. DISTANCE ranks smallest first; SIMILARITY ranks largest first. **Example:** ``` CREATE TEMP VIEW users(user_id, score) AS VALUES (1, 10.0), (2, 20.0), (3, 30.0); CREATE TEMP VIEW products(product, pscore) AS VALUES ('A', 11.0), ('B', 22.0), ('C', 5.0); SELECT u.user_id, p.product FROM users u JOIN products p APPROX NEAREST 2 BY DISTANCE abs(u.score - p.pscore); ``` **Parsed Plan** ``` 'Project ['u.user_id, 'p.product] +- 'NearestByJoin Inner, approx=true, k=2, direction=NearestByDistance, rank='abs('u.score - 'p.pscore) :- 'SubqueryAlias u : +- 'UnresolvedRelation [users] +- 'SubqueryAlias p +- 'UnresolvedRelation [products] ``` **Optimized Plan** ``` Project [user_id#1, product#3] +- Generate inline(__nearest_matches__#7), [product#3, pscore#4], outer=false +- Aggregate [__qid#5], [first(user_id#1) AS user_id#1, first(score#2) AS score#2, min_by(struct(product#3, pscore#4), abs(score#2 - pscore#4), 2) AS __nearest_matches__#7] +- Join LeftOuter :- Project [user_id#1, score#2, : monotonically_increasing_id() AS __qid#5] : +- LocalRelation [user_id#1, score#2] +- LocalRelation [product#3, pscore#4] ``` **Physical Plan** ``` *(3) Project [user_id#1, product#3] +- *(3) Generate inline(__nearest_matches__#7), [user_id#1, score#2], false, [product#3, pscore#4] +- ObjectHashAggregate(keys=[__qid#5], functions=[first(user_id#1), first(score#2), min_by(struct(product#3, pscore#4), abs(score#2 - pscore#4), 2)]) +- Exchange hashpartitioning(__qid#5, 200) +- ObjectHashAggregate(keys=[__qid#5], functions=[partial_first(user_id#1), partial_first(score#2), partial_min_by(struct(product#3, pscore#4), abs(score#2 - pscore#4), 2)]) +- BroadcastNestedLoopJoin BuildRight, LeftOuter :- *(1) Project [user_id#1, score#2, : monotonically_increasing_id() AS __qid#5] : +- LocalTableScan [user_id#1, score#2] +- BroadcastExchange IdentityBroadcastMode +- LocalTableScan [product#3, pscore#4] ``` ### Why are the changes needed Design and rationale: see the SPIP linked from https://issues.apache.org/jira/browse/SPARK-56395. ### Does this PR introduce _any_ user-facing change? Yes — new SQL syntax (NEAREST BY clause). Five new non-reserved keywords (APPROX, EXACT, NEAREST, DISTANCE, SIMILARITY) added to the grammar; existing queries are unaffected because they're non-reserved. New error class NEAREST_BY_JOIN. ### How was this patch tested? PlanParserSuite,RewriteNearestByJoinSuite,SQLQueryTestSuite,SparkConnectDatabaseMetaDataSuite,ThriftServerWithSparkContextSuite ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (Opus 4.7), human-reviewed and tested -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
