dtenedor commented on code in PR #55629:
URL: https://github.com/apache/spark/pull/55629#discussion_r3183366692
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -5313,6 +5313,49 @@
],
"sqlState" : "0A000"
},
+ "NEAREST_BY_JOIN" : {
+ "message" : [
+ "Invalid nearest-by join."
+ ],
+ "subClass" : {
+ "EXACT_WITH_NONDETERMINISTIC_EXPRESSION" : {
+ "message" : [
+ "EXACT nearest-by join is incompatible with the nondeterministic
ranking expression <expression>. Use APPROX, or replace the expression with a
deterministic one."
+ ]
+ },
+ "NON_ORDERABLE_RANKING_EXPRESSION" : {
+ "message" : [
+ "The ranking expression <expression> of type <type> is not
orderable."
+ ]
+ },
+ "NUM_RESULTS_OUT_OF_RANGE" : {
+ "message" : [
+ "The number of results <numResults> must be between <min> and <max>."
Review Comment:
I imagine this error would surface after the query has executed for a while.
What should the user do in this case? Is there a specific way they should amend
their query to make it work the next time? Can we mention this?
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -5313,6 +5313,49 @@
],
"sqlState" : "0A000"
},
+ "NEAREST_BY_JOIN" : {
+ "message" : [
+ "Invalid nearest-by join."
+ ],
+ "subClass" : {
+ "EXACT_WITH_NONDETERMINISTIC_EXPRESSION" : {
+ "message" : [
+ "EXACT nearest-by join is incompatible with the nondeterministic
ranking expression <expression>. Use APPROX, or replace the expression with a
deterministic one."
+ ]
+ },
+ "NON_ORDERABLE_RANKING_EXPRESSION" : {
+ "message" : [
+ "The ranking expression <expression> of type <type> is not
orderable."
Review Comment:
Can we suggest one or two alternatives that would satisfy this constraint,
to help the user move forward quickly?
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -7837,6 +7880,11 @@
"Referencing a lateral column alias <lca> in window expression
<windowExpr>."
]
},
+ "LATERAL_JOIN_NEAREST_BY" : {
+ "message" : [
+ "LATERAL correlation with NEAREST BY clause."
Review Comment:
This covers explicit LATERAL JOIN. Do we care about lateral column alias
usage for queries over the results of the nearest-neighbor join as well, or is
that orthogonal?
##########
docs/sql-ref-syntax-qry-select-join.md:
##########
@@ -53,6 +53,30 @@ relation { [ join_type ] JOIN [ LATERAL ] relation [
join_criteria ] | NATURAL j
Specifies an expression with a return type of boolean.
+* **nearest_by_clause**
+
+ Specifies a nearest-by top-K ranking join. For each row on the left (query
side), returns up to `num_results` rows from the right (base side), ranked by
`ranking_expression`. Only `INNER` (the default) and `LEFT OUTER` join types
are supported with this clause.
+
+ **Syntax:** `{ APPROX | EXACT } NEAREST [ num_results ] BY { DISTANCE |
SIMILARITY } ranking_expression`
+
+ `APPROX | EXACT`
+
+ Controls the search algorithm contract. `APPROX` allows the optimizer to
use faster approximate strategies (such as indexed nearest-neighbor search when
available). `EXACT` forces brute-force evaluation and requires
`ranking_expression` to be deterministic.
+
+ `num_results`
+
+ A positive integer literal between 1 and 100000 that limits the number of
matches per left row. Defaults to 1 when omitted.
Review Comment:
Why this limit? Is it controlled by a config?
##########
sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala:
##########
@@ -203,6 +203,31 @@ private[sql] object QueryParsingErrors extends
DataTypeErrorsBase {
ctx)
}
+ def nearestByJoinWithLateralUnsupportedError(ctx: ParserRuleContext):
Throwable = {
+ new ParseException(
+ errorClass = "UNSUPPORTED_FEATURE.LATERAL_JOIN_NEAREST_BY",
+ messageParameters = Map.empty,
+ ctx)
+ }
+
+ def unsupportedNearestByJoinTypeError(ctx: ParserRuleContext, joinType:
String): Throwable = {
+ new ParseException(
+ errorClass = "NEAREST_BY_JOIN.UNSUPPORTED_JOIN_TYPE",
+ messageParameters =
+ Map("joinType" -> toSQLStmt(joinType), "supported" -> "'INNER', 'LEFT
OUTER'"),
+ ctx)
Review Comment:
Can you move `NearestByJoinType` to sql/api?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -2362,39 +2362,68 @@ class AstBuilder extends DataTypeAstBuilder
}
}
- // Resolve the join type and join condition
- val (joinType, condition) = Option(ctx.joinCriteria) match {
- case Some(c) if c.USING != null =>
- if (ctx.LATERAL != null) {
- throw
QueryParsingErrors.lateralJoinWithUsingJoinUnsupportedError(ctx)
+ if (ctx.nearestByClause != null) {
Review Comment:
This part of the parser is already non-trivially complex. Can we move the
NEAREST BY join part into a separate helper method to help with code health?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -2362,39 +2362,68 @@ class AstBuilder extends DataTypeAstBuilder
}
}
- // Resolve the join type and join condition
- val (joinType, condition) = Option(ctx.joinCriteria) match {
- case Some(c) if c.USING != null =>
- if (ctx.LATERAL != null) {
- throw
QueryParsingErrors.lateralJoinWithUsingJoinUnsupportedError(ctx)
+ if (ctx.nearestByClause != null) {
+ if (ctx.LATERAL != null) {
+ throw
QueryParsingErrors.nearestByJoinWithLateralUnsupportedError(ctx)
+ }
+ if (!Seq(Inner, LeftOuter).contains(baseJoinType)) {
+ throw QueryParsingErrors.unsupportedNearestByJoinTypeError(
+ ctx, baseJoinType.sql, NearestByJoinType.supportedDisplay)
+ }
+ val clause = ctx.nearestByClause
+ val approx = clause.APPROX != null
+ val numResults = Option(clause.num).map { n =>
+ // Guard against literals that overflow Long.
+ val value = try n.getText.toLong catch {
Review Comment:
could this `.toLong` throw any other kinds of exceptions?
##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -5313,6 +5313,49 @@
],
"sqlState" : "0A000"
},
+ "NEAREST_BY_JOIN" : {
+ "message" : [
+ "Invalid nearest-by join."
+ ],
+ "subClass" : {
+ "EXACT_WITH_NONDETERMINISTIC_EXPRESSION" : {
+ "message" : [
+ "EXACT nearest-by join is incompatible with the nondeterministic
ranking expression <expression>. Use APPROX, or replace the expression with a
deterministic one."
+ ]
+ },
+ "NON_ORDERABLE_RANKING_EXPRESSION" : {
+ "message" : [
+ "The ranking expression <expression> of type <type> is not
orderable."
+ ]
+ },
+ "NUM_RESULTS_OUT_OF_RANGE" : {
+ "message" : [
+ "The number of results <numResults> must be between <min> and <max>."
+ ]
+ },
+ "STREAMING_NOT_SUPPORTED" : {
+ "message" : [
+ "nearest-by join is not supported with streaming
DataFrames/Datasets."
Review Comment:
```suggestion
"Nearest-by join is not supported with streaming
DataFrames/Datasets."
```
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -2420,3 +2420,58 @@ object AsOfJoin {
}
}
}
+
+object NearestByJoin {
+ /** Upper bound on `numResults`. Mirrors the K-overload limit of
`MaxMinByK`. */
+ val MaxNumResults: Int = 100000
+}
+
+/**
+ * A logical plan for a nearest-by top-K ranking join. For each row on the
left side it returns
+ * up to `numResults` rows from the right side ordered by `rankingExpression`:
+ * - `NearestByDistance`: smallest values of `rankingExpression` first.
+ * - `NearestBySimilarity`: largest values of `rankingExpression` first.
+ *
+ * The `approx` field records the user's APPROX/EXACT choice from the SPIP.
Today both modes
+ * use the same brute-force rewrite. The flag is preserved on the logical plan
so future
+ * indexed approximate-nearest-neighbor strategies can fire only when `approx
= true`,
+ * leaving EXACT queries unaffected. See the SPIP linked from SPARK-56395.
+ */
+case class NearestByJoin(
+ left: LogicalPlan,
+ right: LogicalPlan,
+ joinType: JoinType,
+ approx: Boolean,
+ numResults: Int,
Review Comment:
Can we have `@param` items explaining what each of these arguments
represents?
##########
docs/sql-ref-syntax-qry-select-join.md:
##########
@@ -53,6 +53,30 @@ relation { [ join_type ] JOIN [ LATERAL ] relation [
join_criteria ] | NATURAL j
Specifies an expression with a return type of boolean.
+* **nearest_by_clause**
+
+ Specifies a nearest-by top-K ranking join. For each row on the left (query
side), returns up to `num_results` rows from the right (base side), ranked by
`ranking_expression`. Only `INNER` (the default) and `LEFT OUTER` join types
are supported with this clause.
+
+ **Syntax:** `{ APPROX | EXACT } NEAREST [ num_results ] BY { DISTANCE |
SIMILARITY } ranking_expression`
+
+ `APPROX | EXACT`
+
+ Controls the search algorithm contract. `APPROX` allows the optimizer to
use faster approximate strategies (such as indexed nearest-neighbor search when
available). `EXACT` forces brute-force evaluation and requires
`ranking_expression` to be deterministic.
+
+ `num_results`
+
+ A positive integer literal between 1 and 100000 that limits the number of
matches per left row. Defaults to 1 when omitted.
+
+ `DISTANCE | SIMILARITY`
+
+ `DISTANCE` ranks rows by smallest value of `ranking_expression` first.
`SIMILARITY` ranks rows by largest value first.
+
+ `ranking_expression`
+
+ A scalar expression that returns an orderable type.
Review Comment:
Should we mention any requirements about the determinism of this expression,
or the presence of possible side-effects (e.g. UDFs)?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -2541,14 +2542,36 @@ object CheckCartesianProducts extends Rule[LogicalPlan]
with PredicateHelper {
}
}
- def apply(plan: LogicalPlan): LogicalPlan =
+ def apply(plan: LogicalPlan): LogicalPlan = {
if (conf.crossJoinEnabled) {
- plan
- } else plan.transformWithPruning(_.containsAnyPattern(INNER_LIKE_JOIN,
OUTER_JOIN)) {
+ return plan
+ }
+
+ // Joins synthesized by `RewriteNearestByJoin` are an intentional, bounded
cross-product
+ // wrapped by a `MaxMinByK` aggregate. Identify them by their unambiguous
post-rewrite
+ // signature -- `Aggregate(_, exprs, Join(_, _, LeftOuter, None, _))`
where `exprs`
+ // contains a `MaxMinByK` -- and skip them so user queries written as
`NEAREST BY` are not
+ // rejected when `spark.sql.crossJoin.enabled = false`. We use structural
detection rather
+ // than a `TreeNodeTag` because a tag set on the `Join` would be silently
dropped by any
+ // intervening optimizer rule that constructs a fresh `Join` via the
case-class
+ // constructor without calling `copyTagsFrom`.
+ val nearestByJoins: java.util.IdentityHashMap[Join, Unit] = {
+ val acc = new java.util.IdentityHashMap[Join, Unit]()
+ plan.foreach {
Review Comment:
This descends arbitrarily deep into the rest of the query plan, including
through other joins and union operators, but does not descend into subquery
expression plans. Do we really want to collect this deep? Would the presence of
a `MaxMinByK` below more levels of joins really change anything here, for
example?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -2420,3 +2420,58 @@ object AsOfJoin {
}
}
}
+
+object NearestByJoin {
+ /** Upper bound on `numResults`. Mirrors the K-overload limit of
`MaxMinByK`. */
+ val MaxNumResults: Int = 100000
+}
+
+/**
+ * A logical plan for a nearest-by top-K ranking join. For each row on the
left side it returns
+ * up to `numResults` rows from the right side ordered by `rankingExpression`:
+ * - `NearestByDistance`: smallest values of `rankingExpression` first.
+ * - `NearestBySimilarity`: largest values of `rankingExpression` first.
+ *
+ * The `approx` field records the user's APPROX/EXACT choice from the SPIP.
Today both modes
+ * use the same brute-force rewrite. The flag is preserved on the logical plan
so future
+ * indexed approximate-nearest-neighbor strategies can fire only when `approx
= true`,
+ * leaving EXACT queries unaffected. See the SPIP linked from SPARK-56395.
+ */
+case class NearestByJoin(
Review Comment:
This operator is not basic IMO; can we move this to a separate file for
better code health?
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -2420,3 +2420,58 @@ object AsOfJoin {
}
}
}
+
+object NearestByJoin {
+ /** Upper bound on `numResults`. Mirrors the K-overload limit of
`MaxMinByK`. */
+ val MaxNumResults: Int = 100000
+}
+
+/**
+ * A logical plan for a nearest-by top-K ranking join. For each row on the
left side it returns
+ * up to `numResults` rows from the right side ordered by `rankingExpression`:
+ * - `NearestByDistance`: smallest values of `rankingExpression` first.
+ * - `NearestBySimilarity`: largest values of `rankingExpression` first.
+ *
+ * The `approx` field records the user's APPROX/EXACT choice from the SPIP.
Today both modes
+ * use the same brute-force rewrite. The flag is preserved on the logical plan
so future
+ * indexed approximate-nearest-neighbor strategies can fire only when `approx
= true`,
+ * leaving EXACT queries unaffected. See the SPIP linked from SPARK-56395.
+ */
+case class NearestByJoin(
+ left: LogicalPlan,
+ right: LogicalPlan,
+ joinType: JoinType,
+ approx: Boolean,
+ numResults: Int,
+ rankingExpression: Expression,
+ direction: NearestByDirection)
+ extends BinaryNode with SupportsNonDeterministicExpression {
+
+ require(Seq(Inner, LeftOuter).contains(joinType),
+ s"Unsupported nearest-by join type $joinType")
+
+ // APPROX permits a nondeterministic ranking expression (per the SPIP); the
rewrite
Review Comment:
This comment is incomplete, and we shouldn't mention the SPIP here. Let's
make this comment self-explanatory.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]