Anupam Yadav created SPARK-57091:
------------------------------------
Summary: [SQL] Add BroadcastNearestByJoinExec to avoid
cross-product materialization for NearestByJoin
Key: SPARK-57091
URL: https://issues.apache.org/jira/browse/SPARK-57091
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 4.0.0
Reporter: Anupam Yadav
h3. Problem
The current NearestByJoin implementation (RewriteNearestByJoin, added in
SPARK-56395) rewrites to cross-join + aggregate + generate. This materializes
all N*M row pairs before the aggregate can bound them. At moderate scale
(30Kx30K, k=5), this takes ~400s and 1.7GB. At 200Kx200K the current approach
is infeasible (projected 5+ hours).
h3. Proposal
Add {{BroadcastNearestByJoinExec}}, a dedicated physical operator that
broadcasts the right side and iterates per left row with a bounded priority
queue of size k. This avoids materializing the full cross product entirely.
The operator fires only when:
* {{spark.sql.join.nearestBy.broadcast.enabled}} is true (default false)
* The right side fits within {{autoBroadcastJoinThreshold}}
Otherwise the existing rewrite is used as fallback.
h3. Benchmark Results
||Scale||Current (cross-product)||BroadcastNearestByJoin||Speedup||Memory||
|10Kx10K|4.2s|0.38s|11x|7x less|
|30Kx30K|404s|31s|13x|8.3x less|
|50Kx50K|1,158s|96s|12x|~8x less|
|200Kx200K|~5h (extrapolated)|23min|~13x|-|
h3. Design Notes
This follows the same pattern as SPARK-56887 (SortMergeAsOfJoinExec for AS-OF
join by @sarutak) -- a dedicated physical operator to replace an expensive
rewrite for a specialized join type. Key design decisions:
* Null/NaN ranking values excluded (not treated as 0.0)
* INNER join preserves original nullability; LEFT OUTER makes right columns
nullable
* Heap hoisted outside per-row loop and cleared per iteration (reduces GC
pressure)
* Stores indices into broadcast array, not row copies
* Fallback guaranteed when right exceeds broadcast threshold
h3. Draft Implementation
[PR #56101|https://github.com/apache/spark/pull/56101] (draft, 11 unit tests
passing)
h3. Seeking Feedback
Would appreciate thoughts from the NearestByJoin authors on:
* Does this approach align with the planned evolution of the feature?
* Any concerns about adding a dedicated physical operator vs. optimizing the
existing rewrite?
* Happy to collaborate and adjust the approach based on feedback.
cc @dilipbiswal @cloud-fan @sarutak
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]