Kontinuation commented on code in PR #562:
URL: https://github.com/apache/sedona-db/pull/562#discussion_r2793679352


##########
rust/sedona-spatial-join/src/planner/optimizer.rs:
##########
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use std::sync::Arc;
+
+use crate::planner::logical_plan_node::SpatialJoinPlanNode;
+use crate::planner::spatial_expr_utils::collect_spatial_predicate_names;
+use crate::planner::spatial_expr_utils::is_spatial_predicate;
+use datafusion::execution::session_state::SessionStateBuilder;
+use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule};
+use datafusion_common::tree_node::Transformed;
+use datafusion_common::NullEquality;
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Extension;
+use datafusion_expr::{BinaryExpr, Expr, Operator};
+use datafusion_expr::{Filter, Join, JoinType, LogicalPlan};
+use sedona_common::option::SedonaOptions;
+
+/// Register only the logical spatial join optimizer rule.
+///
+/// This enables building `Join(filter=...)` from patterns like 
`Filter(CrossJoin)`.
+/// It intentionally does not register any physical plan rewrite rules.
+pub fn register_spatial_join_logical_optimizer(
+    session_state_builder: SessionStateBuilder,
+) -> SessionStateBuilder {
+    session_state_builder
+        .with_optimizer_rule(Arc::new(MergeSpatialProjectionIntoJoin))
+        .with_optimizer_rule(Arc::new(SpatialJoinLogicalRewrite))
+}
+/// Logical optimizer rule that enables spatial join planning.
+///
+/// This rule turns eligible `Join(filter=...)` nodes into a 
`SpatialJoinPlanNode` extension.
+#[derive(Default, Debug)]
+struct SpatialJoinLogicalRewrite;
+
+impl OptimizerRule for SpatialJoinLogicalRewrite {
+    fn name(&self) -> &str {
+        "spatial_join_logical_rewrite"
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::BottomUp)
+    }
+
+    fn supports_rewrite(&self) -> bool {
+        true
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
+        let options = config.options();
+        let Some(ext) = options.extensions.get::<SedonaOptions>() else {
+            return Ok(Transformed::no(plan));
+        };
+        if !ext.spatial_join.enable {
+            return Ok(Transformed::no(plan));
+        }
+
+        let LogicalPlan::Join(join) = &plan else {
+            return Ok(Transformed::no(plan));
+        };
+
+        // v1: only rewrite joins that already have a spatial predicate in 
`filter`.
+        let Some(filter) = join.filter.as_ref() else {
+            return Ok(Transformed::no(plan));
+        };
+
+        let spatial_predicate_names = collect_spatial_predicate_names(filter);
+        if spatial_predicate_names.is_empty() {
+            return Ok(Transformed::no(plan));
+        }
+
+        // Join with with equi-join condition and spatial join condition. Only 
handle it
+        // when the join condition contains ST_KNN. KNN join is not a regular 
join and
+        // ST_KNN is also not a regular predicate. It must be handled by our 
spatial join exec.
+        if !join.on.is_empty() && !spatial_predicate_names.contains("st_knn") {
+            return Ok(Transformed::no(plan));
+        }
+
+        // Build new filter expression including equi-join conditions
+        let filter = filter.clone();
+        let eq_op = if join.null_equality == NullEquality::NullEqualsNothing {
+            Operator::Eq
+        } else {
+            Operator::IsNotDistinctFrom
+        };
+        let filter = join.on.iter().fold(filter, |acc, (l, r)| {
+            let eq_expr = Expr::BinaryExpr(BinaryExpr::new(
+                Box::new(l.clone()),
+                eq_op,
+                Box::new(r.clone()),
+            ));
+            Expr::and(acc, eq_expr)
+        });
+
+        let schema = Arc::clone(&join.schema);
+        let node = SpatialJoinPlanNode {
+            left: join.left.as_ref().clone(),
+            right: join.right.as_ref().clone(),
+            join_type: join.join_type,
+            filter,
+            schema,
+            join_constraint: join.join_constraint,
+            null_equality: join.null_equality,
+        };
+
+        Ok(Transformed::yes(LogicalPlan::Extension(Extension {
+            node: Arc::new(node),
+        })))
+    }
+}
+
+/// Logical optimizer rule that enables spatial join planning.
+///
+/// This rule turns eligible `Filter(Join(filter=...))` nodes into a 
`Join(filter=...)` node,
+/// so that the spatial join can be rewritten later by 
[SpatialJoinLogicalRewrite].
+#[derive(Debug, Default)]
+struct MergeSpatialProjectionIntoJoin;
+
+impl OptimizerRule for MergeSpatialProjectionIntoJoin {
+    fn name(&self) -> &str {
+        "spatial_join_optimizer"

Review Comment:
   Fixed



##########
rust/sedona-spatial-join/src/utils/join_utils.rs:
##########
@@ -715,11 +778,154 @@ pub(crate) fn boundedness_from_children<'a>(
     }
 }
 
+pub(crate) fn compute_join_emission_type(
+    left: &Arc<dyn ExecutionPlan>,
+    right: &Arc<dyn ExecutionPlan>,
+    join_type: JoinType,
+    probe_side: JoinSide,
+) -> EmissionType {
+    let (build, probe) = if probe_side == JoinSide::Left {
+        (right, left)
+    } else {
+        (left, right)
+    };
+
+    if build.boundedness().is_unbounded() {
+        return EmissionType::Final;
+    }
+
+    if probe.pipeline_behavior() == EmissionType::Incremental {
+        match join_type {
+            // If we only need to generate matched rows from the probe side,
+            // we can emit rows incrementally.
+            JoinType::Inner => EmissionType::Incremental,
+            JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | 
JoinType::RightMark => {
+                if probe_side == JoinSide::Right {
+                    EmissionType::Incremental
+                } else {
+                    EmissionType::Both
+                }
+            }
+            // If we need to generate unmatched rows from the *build side*,
+            // we need to emit them at the end.
+            JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti | 
JoinType::LeftMark => {
+                if probe_side == JoinSide::Left {
+                    EmissionType::Incremental
+                } else {
+                    EmissionType::Both
+                }
+            }
+            JoinType::Full => EmissionType::Both,
+        }
+    } else {
+        probe.pipeline_behavior()
+    }
+}
+
+/// Data required to push down a projection through a spatial join.
+/// This is mostly taken from 
https://github.com/apache/datafusion/blob/51.0.0/datafusion/physical-plan/src/projection.rs
+pub(crate) struct JoinPushdownData {
+    pub projected_left_child: ProjectionExec,
+    pub projected_right_child: ProjectionExec,
+    pub join_filter: Option<JoinFilter>,
+    pub join_on: SpatialPredicate,
+}
+
+/// Push down the given `projection` through the spatial join.
+/// This code is adapted from 
https://github.com/apache/datafusion/blob/51.0.0/datafusion/physical-plan/src/projection.rs
+pub(crate) fn try_pushdown_through_join(
+    projection: &ProjectionExec,
+    join_left: &Arc<dyn ExecutionPlan>,
+    join_right: &Arc<dyn ExecutionPlan>,
+    join_schema: &SchemaRef,
+    join_type: JoinType,
+    join_filter: Option<&JoinFilter>,
+    join_on: &SpatialPredicate,
+) -> Result<Option<JoinPushdownData>> {
+    let Some(projection_as_columns) = 
physical_to_column_exprs(projection.expr()) else {
+        return Ok(None);
+    };
+
+    // Mark joins produce a synthetic column that does not belong to either 
child. This synthetic
+    // `mark` column will make `new_join_children` fail, so we skip pushdown 
for such joins.
+    // This limitation if inherited from DataFusion's builtin 
`try_pushdown_through_join`.

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to