This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new a3d3efa0 feat(rust/sedona-spatial-join): Add config to disable spatial
join reordering (#733)
a3d3efa0 is described below
commit a3d3efa0eef37a0ce30c41ee41fef05fba421c2b
Author: Yongting You <[email protected]>
AuthorDate: Sat Mar 28 23:19:39 2026 +0800
feat(rust/sedona-spatial-join): Add config to disable spatial join
reordering (#733)
---
python/sedonadb/tests/test_sjoin.py | 93 +++++++++++++++++++++-
rust/sedona-common/src/option.rs | 5 ++
.../src/planner/physical_planner.rs | 29 +++++--
.../tests/spatial_join_integration.rs | 33 +++++++-
4 files changed, 151 insertions(+), 9 deletions(-)
diff --git a/python/sedonadb/tests/test_sjoin.py
b/python/sedonadb/tests/test_sjoin.py
index 346fe0eb..607ff93e 100644
--- a/python/sedonadb/tests/test_sjoin.py
+++ b/python/sedonadb/tests/test_sjoin.py
@@ -16,6 +16,7 @@
# under the License.
import json
+import re
import warnings
import geopandas as gpd
@@ -23,7 +24,7 @@ import numpy as np
import pandas as pd
import pytest
import sedonadb
-from sedonadb.testing import PostGIS, SedonaDB, random_geometry
+from sedonadb.testing import PostGIS, SedonaDB, random_geometry,
skip_if_not_exists
from shapely.geometry import Point
@@ -68,6 +69,96 @@ def test_spatial_join(join_type, on):
eng_postgis.assert_query_result(sql, sedonadb_results)
+def _plan_text(df):
+ query_plan = df.to_pandas()
+ return "\n".join(query_plan.iloc[:, 1].astype(str).tolist())
+
+
+def _spatial_join_side_file_names(plan_text):
+ """Extract the left/right parquet file names used by `SpatialJoinExec`.
+
+ Example input:
+ SpatialJoinExec: join_type=Inner, on=ST_intersects(geo_right@0,
geo_left@0)
+ ProjectionExec: expr=[geometry@0 as geo_right]
+ DataSourceExec: file_groups={1 group:
[[.../natural-earth_countries_geo.parquet]]}, projection=[geometry],
file_type=parquet
+ ProbeShuffleExec: partitioning=RoundRobinBatch(1)
+ ProjectionExec: expr=[geometry@0 as geo_left]
+ DataSourceExec: file_groups={1 group:
[[.../natural-earth_cities_geo.parquet]]}, projection=[geometry],
file_type=parquet
+
+ Example output:
+ ["natural-earth_countries_geo", "natural-earth_cities_geo"]
+ """
+ spatial_join_idx = plan_text.find("SpatialJoinExec:")
+ assert spatial_join_idx != -1, plan_text
+
+ file_names = re.findall(
+ r"DataSourceExec:.*?/([^/\]]+)\.parquet", plan_text[spatial_join_idx:]
+ )
+ assert len(file_names) >= 2, plan_text
+ return file_names[:2]
+
+
+def test_spatial_join_reordering_can_be_disabled_e2e(geoarrow_data):
+ path_left = (
+ geoarrow_data / "natural-earth" / "files" /
"natural-earth_cities_geo.parquet"
+ )
+ path_right = (
+ geoarrow_data
+ / "natural-earth"
+ / "files"
+ / "natural-earth_countries_geo.parquet"
+ )
+ skip_if_not_exists(path_left)
+ skip_if_not_exists(path_right)
+
+ with SedonaDB.create_or_skip() as eng_sedonadb:
+ sql = f"""
+ SELECT t1.name
+ FROM '{path_left}' AS t1
+ JOIN '{path_right}' AS t2
+ ON ST_Intersects(t1.geometry, t2.geometry)
+ """
+
+ # Test 1: regular run swaps the join order
+ plan_text = _plan_text(eng_sedonadb.con.sql(f"EXPLAIN {sql}"))
+ print(f"Plan with reordering enabled:\n{plan_text}")
+ assert _spatial_join_side_file_names(plan_text) == [
+ "natural-earth_countries_geo",
+ "natural-earth_cities_geo",
+ ], plan_text
+
+ result_with_reordering = (
+ eng_sedonadb.execute_and_collect(sql)
+ .to_pandas()
+ .sort_values("name")
+ .reset_index(drop=True)
+ )
+ assert len(result_with_reordering) > 0
+
+ # Test 2: with config disabled, join won't reorder
+ eng_sedonadb.con.sql(
+ "SET sedona.spatial_join.spatial_join_reordering TO false"
+ ).execute()
+
+ plan_text = _plan_text(eng_sedonadb.con.sql(f"EXPLAIN {sql}"))
+ print(f"Plan with reordering disabled:\n{plan_text}")
+ assert _spatial_join_side_file_names(plan_text) == [
+ "natural-earth_cities_geo",
+ "natural-earth_countries_geo",
+ ], plan_text
+
+ result_without_reordering = (
+ eng_sedonadb.execute_and_collect(sql)
+ .to_pandas()
+ .sort_values("name")
+ .reset_index(drop=True)
+ )
+ pd.testing.assert_frame_equal(
+ result_without_reordering,
+ result_with_reordering,
+ )
+
+
@pytest.mark.parametrize(
"join_type",
[
diff --git a/rust/sedona-common/src/option.rs b/rust/sedona-common/src/option.rs
index 21b228d4..157b160a 100644
--- a/rust/sedona-common/src/option.rs
+++ b/rust/sedona-common/src/option.rs
@@ -82,6 +82,11 @@ config_namespace! {
/// locality might cause imbalanced partitions when running
out-of-core spatial join.
pub repartition_probe_side: bool, default = true
+ /// Reorder spatial join inputs to put the smaller input on the build
side
+ /// when statistics are available. If set to `false`, spatial joins
+ /// preserve the original query order.
+ pub spatial_join_reordering: bool, default = true
+
/// Maximum number of sample bounding boxes collected from the index
side for partitioning the
/// data when running out-of-core spatial join
pub max_index_side_bbox_samples: usize, default = 10000
diff --git a/rust/sedona-spatial-join/src/planner/physical_planner.rs
b/rust/sedona-spatial-join/src/planner/physical_planner.rs
index b99fe4e8..0eb7d0d8 100644
--- a/rust/sedona-spatial-join/src/planner/physical_planner.rs
+++ b/rust/sedona-spatial-join/src/planner/physical_planner.rs
@@ -40,7 +40,7 @@ use crate::planner::logical_plan_node::SpatialJoinPlanNode;
use crate::planner::probe_shuffle_exec::ProbeShuffleExec;
use crate::planner::spatial_expr_utils::{is_spatial_predicate_supported,
transform_join_filter};
use crate::spatial_predicate::SpatialPredicate;
-use sedona_common::option::SedonaOptions;
+use sedona_common::option::{SedonaOptions, SpatialJoinOptions};
/// Registers a query planner that can produce [`SpatialJoinExec`] from a
logical extension node.
pub(crate) fn register_spatial_join_planner(builder: SessionStateBuilder) ->
SessionStateBuilder {
@@ -102,6 +102,7 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner {
else {
return sedona_internal_err!("SedonaOptions not found in session
state extensions");
};
+ let spatial_join_options = &ext.spatial_join;
if !ext.spatial_join.enable {
return sedona_internal_err!("Spatial join is disabled in
SedonaOptions");
@@ -151,14 +152,18 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner {
let should_swap = !matches!(spatial_predicate,
SpatialPredicate::KNearestNeighbors(_))
&& join_type.supports_swap()
- && should_swap_join_order(physical_left.as_ref(),
physical_right.as_ref())?;
+ && should_swap_join_order(
+ spatial_join_options,
+ physical_left.as_ref(),
+ physical_right.as_ref(),
+ )?;
// Repartition the probe side when enabled. This breaks spatial
locality in sorted/skewed
// datasets, leading to more balanced workloads during out-of-core
spatial join.
// We determine which pre-swap input will be the probe AFTER any
potential swap, and
// repartition it here. swap_inputs() will then carry the
RepartitionExec to the correct
// child position.
- let (physical_left, physical_right) = if
ext.spatial_join.repartition_probe_side {
+ let (physical_left, physical_right) = if
spatial_join_options.repartition_probe_side {
repartition_probe_side(
physical_left,
physical_right,
@@ -176,7 +181,7 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner {
remainder,
join_type,
None,
- &ext.spatial_join,
+ spatial_join_options,
)?;
if should_swap {
@@ -192,8 +197,20 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner {
/// produce a smaller and more efficient spatial index (R-tree).
/// 2. If row-count statistics are unavailable (for example, for CSV sources),
/// fall back to total input size as an estimate.
-/// 3. Do not swap the join order if no relevant statistics are available.
-fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan)
-> Result<bool> {
+/// 3. Do not swap the join order if join reordering is disabled or no relevant
+/// statistics are available.
+fn should_swap_join_order(
+ spatial_join_options: &SpatialJoinOptions,
+ left: &dyn ExecutionPlan,
+ right: &dyn ExecutionPlan,
+) -> Result<bool> {
+ if !spatial_join_options.spatial_join_reordering {
+ log::info!(
+ "spatial join swap heuristic disabled via
sedona.spatial_join.spatial_join_reordering"
+ );
+ return Ok(false);
+ }
+
let left_stats = left.partition_statistics(None)?;
let right_stats = right.partition_statistics(None)?;
diff --git a/rust/sedona-spatial-join/tests/spatial_join_integration.rs
b/rust/sedona-spatial-join/tests/spatial_join_integration.rs
index da54f52e..60f0033b 100644
--- a/rust/sedona-spatial-join/tests/spatial_join_integration.rs
+++ b/rust/sedona-spatial-join/tests/spatial_join_integration.rs
@@ -283,6 +283,7 @@ fn single_row_table(schema: SchemaRef, id: i32, marker:
&str) -> Result<Arc<dyn
// Keep the data fixed and vary only the advertised stats so the planner swap
// decision is explained entirely by the heuristic under test.
async fn assert_build_side_from_stats(
+ options: SpatialJoinOptions,
left_num_rows: Option<usize>,
right_num_rows: Option<usize>,
left_total_byte_size: Option<usize>,
@@ -309,7 +310,7 @@ async fn assert_build_side_from_stats(
stats_with(right_schema.as_ref(), right_num_rows,
right_total_byte_size),
));
- let ctx = setup_context(Some(SpatialJoinOptions::default()), 10)?;
+ let ctx = setup_context(Some(options), 10)?;
ctx.register_table("L", left_provider)?;
ctx.register_table("R", right_provider)?;
@@ -692,6 +693,7 @@ async fn test_spatial_join_swap_inputs_produces_same_plan(
// smaller-row input on the build side even if it is larger by byte size.
async fn test_spatial_join_reordering_uses_row_count() -> Result<()> {
assert_build_side_from_stats(
+ SpatialJoinOptions::default(),
Some(100),
Some(10),
Some(100),
@@ -706,6 +708,7 @@ async fn test_spatial_join_reordering_uses_row_count() ->
Result<()> {
// smaller-bytes input on the build side.
async fn test_spatial_join_reordering_uses_size_fallback() -> Result<()> {
assert_build_side_from_stats(
+ SpatialJoinOptions::default(),
None,
None,
Some(10_000),
@@ -719,7 +722,33 @@ async fn test_spatial_join_reordering_uses_size_fallback()
-> Result<()> {
// When both row count and size are absent, the planner preserves the original
// join order.
async fn test_spatial_join_reordering_preserves_order_without_stats() ->
Result<()> {
- assert_build_side_from_stats(None, None, None, None,
OriginalInputSide::Left).await
+ assert_build_side_from_stats(
+ SpatialJoinOptions::default(),
+ None,
+ None,
+ None,
+ None,
+ OriginalInputSide::Left,
+ )
+ .await
+}
+
+#[tokio::test]
+// When join reordering is disabled, the planner preserves the original join
+// order even if statistics would normally trigger a swap.
+async fn test_spatial_join_reordering_can_be_disabled() -> Result<()> {
+ assert_build_side_from_stats(
+ SpatialJoinOptions {
+ spatial_join_reordering: false,
+ ..Default::default()
+ },
+ Some(100),
+ Some(10),
+ Some(100),
+ Some(10_000),
+ OriginalInputSide::Left,
+ )
+ .await
}
#[tokio::test]