This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 01ef59be refactor(rust/sedona-spatial-join): Allow SpatialIndexBuilder
and EvaluatedGeometryArray to be configurable (#737)
01ef59be is described below
commit 01ef59be5d76721ac658a00b02c30e3d8902bd09
Author: Dewey Dunnington <[email protected]>
AuthorDate: Wed Apr 1 16:46:10 2026 -0500
refactor(rust/sedona-spatial-join): Allow SpatialIndexBuilder and
EvaluatedGeometryArray to be configurable (#737)
---
.../external_evaluated_batch_stream.rs | 9 +-
.../bench/evaluated_batch/spill.rs | 10 +-
rust/sedona-spatial-join/src/evaluated_batch.rs | 16 -
.../evaluated_batch_stream/external.rs | 12 +-
.../src/evaluated_batch/spill.rs | 126 ++++-
rust/sedona-spatial-join/src/exec.rs | 6 +
.../src/index/build_side_collector.rs | 44 +-
.../src/index/default_spatial_index.rs | 66 +--
.../src/index/default_spatial_index_builder.rs | 84 ++-
rust/sedona-spatial-join/src/index/knn_adapter.rs | 2 +-
.../src/index/partitioned_index_provider.rs | 22 +-
.../sedona-spatial-join/src/index/spatial_index.rs | 20 -
.../src/index/spatial_index_builder.rs | 27 +-
rust/sedona-spatial-join/src/join_provider.rs | 108 ++++
rust/sedona-spatial-join/src/lib.rs | 1 +
rust/sedona-spatial-join/src/operand_evaluator.rs | 561 ++++++++++++++++++---
.../src/partitioning/stream_repartitioner.rs | 321 +-----------
rust/sedona-spatial-join/src/prepare.rs | 20 +-
rust/sedona-spatial-join/src/stream.rs | 4 +-
rust/sedona-spatial-join/src/utils/once_fut.rs | 2 +-
20 files changed, 838 insertions(+), 623 deletions(-)
diff --git
a/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
b/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
index 950502be..5353f151 100644
---
a/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
+++
b/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs
@@ -57,10 +57,11 @@ fn make_evaluated_batch(num_rows: usize, sedona_type:
&SedonaType) -> EvaluatedB
let wkt_values = vec![Some("POINT (0 0)"); num_rows];
let geom_array = create_array_storage(&wkt_values, sedona_type);
- let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
sedona_type)
- .expect("failed to build geometry array for benchmark");
-
- geom_array.distance =
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+ let geom_array = EvaluatedGeometryArray::try_new(geom_array, sedona_type)
+ .expect("failed to build geometry array for benchmark")
+ .with_distance(Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(
+ 10.0,
+ )))));
EvaluatedBatch { batch, geom_array }
}
diff --git a/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs
b/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs
index b87a6b9c..cf0331f3 100644
--- a/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs
+++ b/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs
@@ -57,11 +57,11 @@ fn make_evaluated_batch(num_rows: usize, sedona_type:
&SedonaType) -> EvaluatedB
let wkt_values = vec![Some("POINT (0 0)"); num_rows];
let geom_array = create_array_storage(&wkt_values, sedona_type);
- let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
sedona_type)
- .expect("failed to build geometry array for benchmark");
-
- // Use a scalar distance so the spilled dist column is constant.
- geom_array.distance =
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+ let geom_array = EvaluatedGeometryArray::try_new(geom_array, sedona_type)
+ .expect("failed to build geometry array for benchmark")
+ .with_distance(Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(
+ 10.0,
+ )))));
EvaluatedBatch { batch, geom_array }
}
diff --git a/rust/sedona-spatial-join/src/evaluated_batch.rs
b/rust/sedona-spatial-join/src/evaluated_batch.rs
index 7fa0cd79..f3b6237e 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch.rs
@@ -18,9 +18,6 @@
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion_common::Result;
-use datafusion_expr::ColumnarValue;
-use geo::Rect;
-use wkb::reader::Wkb;
use crate::{
operand_evaluator::EvaluatedGeometryArray,
utils::arrow_utils::get_record_batch_memory_size,
@@ -55,19 +52,6 @@ impl EvaluatedBatch {
pub fn num_rows(&self) -> usize {
self.batch.num_rows()
}
-
- pub fn wkb(&self, idx: usize) -> Option<&Wkb<'_>> {
- let wkbs = self.geom_array.wkbs();
- wkbs[idx].as_ref()
- }
-
- pub fn rects(&self) -> &Vec<Option<Rect<f32>>> {
- &self.geom_array.rects
- }
-
- pub fn distance(&self) -> &Option<ColumnarValue> {
- &self.geom_array.distance
- }
}
pub mod evaluated_batch_stream;
diff --git
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
index 67d3538e..9d3fa10e 100644
---
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
+++
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs
@@ -378,11 +378,9 @@ mod tests {
fn create_test_evaluated_batch(start_id: i32) -> Result<EvaluatedBatch> {
let batch = create_test_record_batch(start_id)?;
let (geom_array, sedona_type) = create_test_geometry_array()?;
- let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?;
-
- // Add distance as a scalar value
- geom_array.distance =
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
-
+ let geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?.with_distance(
+ Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0)))),
+ );
Ok(EvaluatedBatch { batch, geom_array })
}
@@ -576,10 +574,10 @@ mod tests {
assert!(name_array.is_null(2));
// Verify geometry array
- assert_eq!(batch.geom_array.rects.len(), 3);
+ assert_eq!(batch.geom_array.rects().len(), 3);
// Verify distance
- match &batch.geom_array.distance {
+ match &batch.geom_array.distance() {
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(val)))) => {
assert_eq!(*val, 10.0);
}
diff --git a/rust/sedona-spatial-join/src/evaluated_batch/spill.rs
b/rust/sedona-spatial-join/src/evaluated_batch/spill.rs
index eddd0574..81e5d805 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch/spill.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch/spill.rs
@@ -18,13 +18,15 @@
use std::sync::Arc;
use arrow::array::Float64Array;
-use arrow_array::{Array, RecordBatch, StructArray};
+use arrow_array::{Array, ArrayRef, FixedSizeListArray, Float32Array,
RecordBatch, StructArray};
use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion::config::SpillCompression;
use datafusion_common::{Result, ScalarValue};
use datafusion_execution::{disk_manager::RefCountedTempFile,
runtime_env::RuntimeEnv};
use datafusion_expr::ColumnarValue;
use datafusion_physical_plan::metrics::SpillMetrics;
+use geo::{Coord, Rect};
+use geo_traits::CoordTrait;
use sedona_common::{sedona_internal_datafusion_err, sedona_internal_err};
use sedona_schema::datatypes::SedonaType;
@@ -52,6 +54,7 @@ pub struct EvaluatedBatchSpillWriter {
const SPILL_FIELD_DATA_INDEX: usize = 0;
const SPILL_FIELD_GEOM_INDEX: usize = 1;
const SPILL_FIELD_DIST_INDEX: usize = 2;
+const SPILL_FIELD_RECT_INDEX: usize = 3;
impl EvaluatedBatchSpillWriter {
/// Create a new SpillWriter
@@ -70,7 +73,12 @@ impl EvaluatedBatchSpillWriter {
Field::new("data", DataType::Struct(data_inner_fields.clone()),
false);
let geom_field = sedona_type.to_storage_field("geom", true)?;
let dist_field = Field::new("dist", DataType::Float64, true);
- let spill_schema = Schema::new(vec![data_struct_field, geom_field,
dist_field]);
+ let rect_field = Field::new(
+ "rect",
+ DataType::new_fixed_size_list(DataType::Float32, 4, true),
+ true,
+ );
+ let spill_schema = Schema::new(vec![data_struct_field, geom_field,
dist_field, rect_field]);
// Create spill file
let inner = RecordBatchSpillWriter::try_new(
@@ -115,7 +123,7 @@ impl EvaluatedBatchSpillWriter {
// Store dist into a Float64Array
let mut dist_builder =
arrow::array::Float64Builder::with_capacity(num_rows);
let geom_array = &evaluated_batch.geom_array;
- match &geom_array.distance {
+ match geom_array.distance() {
Some(ColumnarValue::Scalar(scalar)) => match scalar {
ScalarValue::Float64(dist_value) => {
for _ in 0..num_rows {
@@ -141,11 +149,31 @@ impl EvaluatedBatchSpillWriter {
}
let dist_array = dist_builder.finish();
+ // Store rect into a FixedSizeList array
+ let mut rect_builder =
+
arrow::array::FixedSizeListBuilder::new(arrow::array::Float32Builder::new(), 4);
+ for rect_opt in geom_array.rects() {
+ if let Some(rect) = rect_opt {
+ rect_builder.values().append_slice(&[
+ rect.min().x(),
+ rect.min().y(),
+ rect.max().x(),
+ rect.max().y(),
+ ]);
+ rect_builder.append(true);
+ } else {
+ rect_builder.values().append_nulls(4);
+ rect_builder.append(false);
+ }
+ }
+ let rect_array = rect_builder.finish();
+
// Assemble the final spilled RecordBatch
let columns = vec![
- Arc::new(data_struct_array) as Arc<dyn arrow::array::Array>,
- Arc::clone(&geom_array.geometry_array),
- Arc::new(dist_array) as Arc<dyn arrow::array::Array>,
+ Arc::new(data_struct_array) as ArrayRef,
+ Arc::clone(geom_array.geometry_array()),
+ Arc::new(dist_array) as ArrayRef,
+ Arc::new(rect_array) as ArrayRef,
];
let spilled_record_batch =
RecordBatch::try_new(Arc::new(self.spill_schema.clone()),
columns)?;
@@ -170,7 +198,6 @@ impl EvaluatedBatchSpillReader {
}
/// Read the next EvaluatedBatch from the spill file
- #[allow(unused)]
pub fn next_batch(&mut self) -> Option<Result<EvaluatedBatch>> {
self.next_raw_batch()
.map(|record_batch|
record_batch.and_then(spilled_batch_to_evaluated_batch))
@@ -250,9 +277,47 @@ pub(crate) fn spilled_batch_to_evaluated_batch(
None
};
+ // Extract the rect array
+ let rect_array = record_batch
+ .column(SPILL_FIELD_RECT_INDEX)
+ .as_any()
+ .downcast_ref::<FixedSizeListArray>()
+ .ok_or_else(|| {
+ sedona_internal_datafusion_err!("Expected rect column to be
FixedSizeListArray")
+ })?;
+ let rect_child_array = rect_array
+ .values()
+ .as_any()
+ .downcast_ref::<Float32Array>()
+ .ok_or_else(|| {
+ sedona_internal_datafusion_err!("Expected rect column child to be
Float32Array")
+ })?;
+ let rect_vec = (0..rect_array.len())
+ .map(|i| {
+ if rect_array.is_null(i) {
+ None
+ } else {
+ let child_i = i * 4;
+ unsafe {
+ Some(Rect::new(
+ Coord {
+ x: rect_child_array.value_unchecked(child_i),
+ y: rect_child_array.value_unchecked(child_i + 1),
+ },
+ Coord {
+ x: rect_child_array.value_unchecked(child_i + 2),
+ y: rect_child_array.value_unchecked(child_i + 3),
+ },
+ ))
+ }
+ }
+ })
+ .collect::<Vec<_>>();
+
// Create EvaluatedGeometryArray
- let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?;
- geom_array.distance = distance;
+ let geom_array =
+ EvaluatedGeometryArray::try_new_with_rects(geom_array, rect_vec,
&sedona_type)?
+ .with_distance(distance);
Ok(EvaluatedBatch { batch, geom_array })
}
@@ -328,10 +393,11 @@ mod tests {
fn create_test_evaluated_batch() -> Result<EvaluatedBatch> {
let batch = create_test_record_batch()?;
let (geom_array, sedona_type) = create_test_geometry_array()?;
- let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?;
- // Add distance as a scalar value
- geom_array.distance =
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0))));
+ // With distance as ScalarValue
+ let geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?.with_distance(
+ Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0)))),
+ );
Ok(EvaluatedBatch { batch, geom_array })
}
@@ -339,11 +405,11 @@ mod tests {
fn create_test_evaluated_batch_with_array_distance() ->
Result<EvaluatedBatch> {
let batch = create_test_record_batch()?;
let (geom_array, sedona_type) = create_test_geometry_array()?;
- let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?;
- // Add distance as an array value
+ // With distance as an array value
let dist_array = Arc::new(Float64Array::from(vec![Some(1.0),
Some(2.0), Some(3.0)]));
- geom_array.distance = Some(ColumnarValue::Array(dist_array));
+ let geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?
+ .with_distance(Some(ColumnarValue::Array(dist_array)));
Ok(EvaluatedBatch { batch, geom_array })
}
@@ -374,11 +440,10 @@ mod tests {
Some(point3_wkb.as_slice()),
]));
- let mut geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?;
-
- // Add distance with nulls
+ // With distance with nulls
let dist_array = Arc::new(Float64Array::from(vec![Some(1.0), None,
Some(3.0)]));
- geom_array.distance = Some(ColumnarValue::Array(dist_array));
+ let geom_array = EvaluatedGeometryArray::try_new(geom_array,
&sedona_type)?
+ .with_distance(Some(ColumnarValue::Array(dist_array)));
Ok(EvaluatedBatch { batch, geom_array })
}
@@ -402,7 +467,7 @@ mod tests {
)?;
// Verify the spill schema has the expected structure
- assert_eq!(writer.spill_schema.fields().len(), 3);
+ assert_eq!(writer.spill_schema.fields().len(), 4);
assert_eq!(
writer.spill_schema.field(SPILL_FIELD_DATA_INDEX).name(),
"data"
@@ -415,6 +480,10 @@ mod tests {
writer.spill_schema.field(SPILL_FIELD_DIST_INDEX).name(),
"dist"
);
+ assert_eq!(
+ writer.spill_schema.field(SPILL_FIELD_RECT_INDEX).name(),
+ "rect"
+ );
Ok(())
}
@@ -487,7 +556,7 @@ mod tests {
let read_batch = reader.next_batch().unwrap()?;
// Verify distance is read back as array
- match &read_batch.geom_array.distance {
+ match read_batch.geom_array.distance() {
Some(ColumnarValue::Array(array)) => {
let float_array =
array.as_any().downcast_ref::<Float64Array>().unwrap();
assert_eq!(float_array.len(), 3);
@@ -529,10 +598,10 @@ mod tests {
// Verify nulls are preserved
assert_eq!(read_batch.num_rows(), 3);
- assert!(read_batch.geom_array.rects[1].is_none()); // Null geometry
+ assert!(read_batch.geom_array.rects()[1].is_none()); // Null geometry
// Verify distance nulls
- match &read_batch.geom_array.distance {
+ match read_batch.geom_array.distance() {
Some(ColumnarValue::Array(array)) => {
let float_array =
array.as_any().downcast_ref::<Float64Array>().unwrap();
assert!(float_array.is_valid(0));
@@ -642,7 +711,7 @@ mod tests {
)?;
let evaluated_batch = create_test_evaluated_batch()?;
- let original_rects = evaluated_batch.rects().clone();
+ let original_rects = evaluated_batch.geom_array.rects();
writer.append(&evaluated_batch)?;
let temp_file = writer.finish()?;
@@ -651,8 +720,11 @@ mod tests {
let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
let read_batch = reader.next_batch().unwrap()?;
- assert_eq!(read_batch.rects().len(), original_rects.len());
- for (original, read) in
original_rects.iter().zip(read_batch.rects().iter()) {
+ assert_eq!(read_batch.geom_array.rects().len(), original_rects.len());
+ for (original, read) in original_rects
+ .iter()
+ .zip(read_batch.geom_array.rects().iter())
+ {
match (original, read) {
(Some(orig_rect), Some(read_rect)) => {
assert_eq!(orig_rect.min().x, read_rect.min().x);
@@ -694,7 +766,7 @@ mod tests {
let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?;
let read_batch = reader.next_batch().unwrap()?;
- match &read_batch.geom_array.distance {
+ match read_batch.geom_array.distance() {
Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(val)))) => {
assert_eq!(*val, 10.0);
}
diff --git a/rust/sedona-spatial-join/src/exec.rs
b/rust/sedona-spatial-join/src/exec.rs
index 071f6538..42ca6ebb 100644
--- a/rust/sedona-spatial-join/src/exec.rs
+++ b/rust/sedona-spatial-join/src/exec.rs
@@ -33,6 +33,7 @@ use parking_lot::Mutex;
use sedona_common::{sedona_internal_err, SpatialJoinOptions};
use crate::{
+ join_provider::{DefaultSpatialJoinProvider, SpatialJoinProvider},
prepare::{SpatialJoinComponents, SpatialJoinComponentsBuilder},
spatial_predicate::{KNNPredicate, SpatialPredicate, SpatialPredicateTrait},
stream::SpatialJoinStream,
@@ -113,6 +114,8 @@ pub struct SpatialJoinExec {
once_async_spatial_join_components:
Arc<Mutex<Option<OnceAsync<SpatialJoinComponents>>>>,
/// A random seed for making random procedures in spatial join
deterministic
seed: u64,
+ /// Factories to create the index builder and evaluated batches
+ join_provider: Arc<dyn SpatialJoinProvider>,
}
impl SpatialJoinExec {
@@ -172,6 +175,7 @@ impl SpatialJoinExec {
cache,
once_async_spatial_join_components: Arc::new(Mutex::new(None)),
seed,
+ join_provider: Arc::new(DefaultSpatialJoinProvider),
})
}
@@ -472,6 +476,7 @@ impl ExecutionPlan for SpatialJoinExec {
self.join_type,
probe_thread_count,
self.metrics.clone(),
+ self.join_provider.clone(),
self.seed,
);
Ok(spatial_join_components_builder.build(build_streams))
@@ -503,6 +508,7 @@ impl ExecutionPlan for SpatialJoinExec {
session_config,
context.runtime_env(),
&self.metrics,
+ self.join_provider.clone(),
once_fut_spatial_join_components,
Arc::clone(&self.once_async_spatial_join_components),
)))
diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs
b/rust/sedona-spatial-join/src/index/build_side_collector.rs
index 8d9ca579..6c51f681 100644
--- a/rust/sedona-spatial-join/src/index/build_side_collector.rs
+++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs
@@ -32,7 +32,6 @@ use sedona_expr::statistics::GeoStatistics;
use sedona_functions::st_analyze_agg::AnalyzeAccumulator;
use sedona_schema::datatypes::WKB_GEOMETRY;
-use crate::index::spatial_index_builder::SpatialIndexBuilderRef;
use crate::{
evaluated_batch::{
evaluated_batch_stream::{
@@ -42,7 +41,8 @@ use crate::{
spill::EvaluatedBatchSpillWriter,
EvaluatedBatch,
},
- operand_evaluator::{create_operand_evaluator, OperandEvaluator},
+ join_provider::SpatialJoinProvider,
+ operand_evaluator::create_operand_evaluator,
spatial_predicate::SpatialPredicate,
utils::bbox_sampler::{BoundingBoxSampler, BoundingBoxSamples},
};
@@ -81,10 +81,9 @@ pub(crate) struct BuildPartition {
pub(crate) struct BuildSideBatchesCollector {
spatial_predicate: SpatialPredicate,
spatial_join_options: SpatialJoinOptions,
- evaluator: Arc<dyn OperandEvaluator>,
runtime_env: Arc<RuntimeEnv>,
spill_compression: SpillCompression,
- spatial_index_builder: SpatialIndexBuilderRef,
+ join_provider: Arc<dyn SpatialJoinProvider>,
}
#[derive(Clone)]
@@ -128,16 +127,14 @@ impl BuildSideBatchesCollector {
spatial_join_options: SpatialJoinOptions,
runtime_env: Arc<RuntimeEnv>,
spill_compression: SpillCompression,
- spatial_index_builder: SpatialIndexBuilderRef,
+ join_provider: Arc<dyn SpatialJoinProvider>,
) -> Self {
- let evaluator = create_operand_evaluator(&spatial_predicate,
spatial_join_options.clone());
BuildSideBatchesCollector {
spatial_predicate,
spatial_join_options,
- evaluator,
runtime_env,
spill_compression,
- spatial_index_builder,
+ join_provider,
}
}
@@ -220,7 +217,7 @@ impl BuildSideBatchesCollector {
}
let geo_statistics = analyzer.finish();
- let extra_mem = self.spatial_index_builder.estimate_extra_memory_usage(
+ let extra_mem = self.join_provider.estimate_extra_memory_usage(
&geo_statistics,
&self.spatial_predicate,
&self.spatial_join_options,
@@ -328,7 +325,10 @@ impl BuildSideBatchesCollector {
.enumerate()
{
let collector = self.clone();
- let evaluator = Arc::clone(&self.evaluator);
+ let evaluator = create_operand_evaluator(
+ &self.spatial_predicate,
+ self.join_provider.evaluated_array_factory(),
+ );
let bbox_sampler = BoundingBoxSampler::try_new(
self.spatial_join_options.min_index_side_bbox_samples,
self.spatial_join_options.max_index_side_bbox_samples,
@@ -377,7 +377,10 @@ impl BuildSideBatchesCollector {
.zip(reservations)
.enumerate()
{
- let evaluator = Arc::clone(&self.evaluator);
+ let evaluator = create_operand_evaluator(
+ &self.spatial_predicate,
+ self.join_provider.evaluated_array_factory(),
+ );
let bbox_sampler = BoundingBoxSampler::try_new(
self.spatial_join_options.min_index_side_bbox_samples,
self.spatial_join_options.max_index_side_bbox_samples,
@@ -408,7 +411,7 @@ impl BuildSideBatchesCollector {
let build_side_batch = &in_mem_batches[0];
let schema = build_side_batch.schema();
- let sedona_type = &build_side_batch.geom_array.sedona_type;
+ let sedona_type = &build_side_batch.geom_array.sedona_type();
let mut spill_writer = EvaluatedBatchSpillWriter::try_new(
Arc::clone(&self.runtime_env),
schema,
@@ -442,15 +445,14 @@ impl BuildSideBatchesCollector {
#[cfg(test)]
mod tests {
use super::*;
- use crate::index::spatial_index_builder::SpatialJoinBuildMetrics;
- use crate::index::DefaultSpatialIndexBuilder;
+ use crate::join_provider::DefaultSpatialJoinProvider;
use crate::{
operand_evaluator::EvaluatedGeometryArray,
spatial_predicate::{RelationPredicate, SpatialRelationType},
};
use arrow_array::{ArrayRef, BinaryArray, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
- use datafusion_common::{JoinType, ScalarValue};
+ use datafusion_common::ScalarValue;
use datafusion_execution::memory_pool::{GreedyMemoryPool, MemoryConsumer,
MemoryPool};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -488,22 +490,12 @@ mod tests {
SpatialRelationType::Intersects,
));
- let builder = DefaultSpatialIndexBuilder::new(
- test_schema(),
- predicate.clone(),
- SpatialJoinOptions::default(),
- JoinType::Inner,
- 1,
- SpatialJoinBuildMetrics::default(),
- )
- .unwrap();
-
BuildSideBatchesCollector::new(
predicate,
SpatialJoinOptions::default(),
Arc::new(RuntimeEnv::default()),
SpillCompression::Uncompressed,
- Arc::new(builder),
+ Arc::new(DefaultSpatialJoinProvider),
)
}
diff --git a/rust/sedona-spatial-join/src/index/default_spatial_index.rs
b/rust/sedona-spatial-join/src/index/default_spatial_index.rs
index 6215c307..e1c85e77 100644
--- a/rust/sedona-spatial-join/src/index/default_spatial_index.rs
+++ b/rust/sedona-spatial-join/src/index/default_spatial_index.rs
@@ -35,7 +35,6 @@ use geo_index::rtree::{
};
use geo_index::rtree::{sort::HilbertSort, RTree, RTreeBuilder, RTreeIndex};
use geo_index::IndexableNum;
-use geo_types::Rect;
use parking_lot::Mutex;
use sedona_expr::statistics::GeoStatistics;
use sedona_geo::to_geo::item_to_geometry;
@@ -49,7 +48,7 @@ use crate::{
knn_adapter::{KnnComponents, SedonaKnnAdapter},
IndexQueryResult, QueryResultMetrics,
},
- operand_evaluator::{create_operand_evaluator, distance_value_at,
OperandEvaluator},
+ operand_evaluator::distance_value_at,
refine::{create_refiner, IndexQueryResultRefiner},
spatial_predicate::SpatialPredicate,
};
@@ -61,9 +60,6 @@ struct DefaultSpatialIndexInner {
pub(crate) schema: SchemaRef,
pub(crate) options: SpatialJoinOptions,
- /// The spatial predicate evaluator for the spatial predicate.
- pub(crate) evaluator: Arc<dyn OperandEvaluator>,
-
/// The refiner for refining the index query results.
pub(crate) refiner: Arc<dyn IndexQueryResultRefiner>,
@@ -111,7 +107,6 @@ impl DefaultSpatialIndex {
options: SpatialJoinOptions,
probe_threads_counter: AtomicUsize,
) -> Self {
- let evaluator = create_operand_evaluator(&spatial_predicate,
options.clone());
let refiner = create_refiner(
options.spatial_library,
&spatial_predicate,
@@ -126,7 +121,6 @@ impl DefaultSpatialIndex {
inner: Arc::new(DefaultSpatialIndexInner {
schema,
options,
- evaluator,
refiner,
rtree,
data_id_to_batch_pos: Vec::new(),
@@ -142,7 +136,6 @@ impl DefaultSpatialIndex {
pub fn new(
schema: SchemaRef,
options: SpatialJoinOptions,
- evaluator: Arc<dyn OperandEvaluator>,
refiner: Arc<dyn IndexQueryResultRefiner>,
rtree: RTree<f32>,
indexed_batches: Vec<EvaluatedBatch>,
@@ -156,7 +149,6 @@ impl DefaultSpatialIndex {
inner: Arc::new(DefaultSpatialIndexInner {
schema,
options,
- evaluator,
refiner,
rtree,
data_id_to_batch_pos,
@@ -194,7 +186,7 @@ impl DefaultSpatialIndex {
let chunk = chunk.to_vec();
let index_owned = self.clone();
join_set.spawn(async move {
- let Some(probe_wkb) = cloned_evaluated_batch.wkb(row_idx) else
{
+ let Some(probe_wkb) =
cloned_evaluated_batch.geom_array.wkb(row_idx) else {
return (
i,
sedona_internal_err!(
@@ -248,15 +240,16 @@ impl DefaultSpatialIndex {
let pos = self.inner.data_id_to_batch_pos[*data_idx as usize];
let (batch_idx, row_idx) = pos;
let indexed_batch = &self.inner.indexed_batches[batch_idx as
usize];
- let build_wkb = indexed_batch.wkb(row_idx as usize);
+ let build_wkb = indexed_batch.geom_array.wkb(row_idx as usize);
let Some(build_wkb) = build_wkb else {
continue;
};
- let distance = self.inner.evaluator.resolve_distance(
- indexed_batch.distance(),
- row_idx as usize,
- distance,
- )?;
+ let build_distance = indexed_batch.geom_array.distance_at(row_idx
as usize)?;
+ debug_assert!(
+ build_distance.is_none() || distance.is_none(),
+ "Distance should not be present on both build and probe sides"
+ );
+ let distance = build_distance.map(Some).unwrap_or(*distance);
let geom_idx = self.inner.geom_idx_vec[*data_idx as usize];
index_query_results.push(IndexQueryResult {
wkb: build_wkb,
@@ -299,37 +292,6 @@ impl SpatialIndex for DefaultSpatialIndex {
&self.inner.indexed_batches[batch_idx].batch
}
- /// This method implements [`SpatialIndex::query`], which is a two-phase
spatial join:
- /// 1. **Filter phase**: Uses the R-tree index with the probe geometry's
bounding rectangle
- /// to quickly identify candidate geometries that might satisfy the
spatial predicate
- /// 2. **Refinement phase**: Evaluates the exact spatial predicate on
candidates to determine
- /// actual matches
- fn query(
- &self,
- probe_wkb: &Wkb,
- probe_rect: &Rect<f32>,
- distance: &Option<f64>,
- build_batch_positions: &mut Vec<(i32, i32)>,
- ) -> Result<QueryResultMetrics> {
- let min = probe_rect.min();
- let max = probe_rect.max();
- let mut candidates = self.inner.rtree.search(min.x, min.y, max.x,
max.y);
- if candidates.is_empty() {
- return Ok(QueryResultMetrics {
- count: 0,
- candidate_count: 0,
- });
- }
-
- // Sort and dedup candidates to avoid duplicate results when we index
one geometry
- // using several boxes.
- candidates.sort_unstable();
- candidates.dedup();
-
- // Refine the candidates retrieved from the r-tree index by evaluating
the actual spatial predicate
- self.refine(probe_wkb, &candidates, distance, build_batch_positions)
- }
-
/// This method implements [`SpatialIndex::query_knn`] by:
/// 1. R-tree's built-in neighbors() method for efficient KNN search
/// 2. Distance refinement using actual geometry calculations
@@ -557,8 +519,8 @@ impl SpatialIndex for DefaultSpatialIndex {
));
}
- let rects = evaluated_batch.rects();
- let dist = evaluated_batch.distance();
+ let rects = evaluated_batch.geom_array.rects();
+ let dist = evaluated_batch.geom_array.distance();
let mut total_candidates_count = 0;
let mut total_count = 0;
let mut current_row_idx = range.start;
@@ -575,7 +537,7 @@ impl SpatialIndex for DefaultSpatialIndex {
continue;
}
- let Some(probe_wkb) = evaluated_batch.wkb(row_idx) else {
+ let Some(probe_wkb) = evaluated_batch.geom_array.wkb(row_idx) else
{
return sedona_internal_err!(
"Failed to get WKB for row {} in evaluated batch",
row_idx
@@ -752,7 +714,7 @@ mod tests {
SpatialRelationType::Intersects,
));
- let builder = DefaultSpatialIndexBuilder::new(
+ let mut builder = DefaultSpatialIndexBuilder::new(
schema.clone(),
spatial_predicate,
options,
@@ -1226,7 +1188,7 @@ mod tests {
JoinSide::Left,
));
- let builder = DefaultSpatialIndexBuilder::new(
+ let mut builder = DefaultSpatialIndexBuilder::new(
schema.clone(),
spatial_predicate,
options,
diff --git
a/rust/sedona-spatial-join/src/index/default_spatial_index_builder.rs
b/rust/sedona-spatial-join/src/index/default_spatial_index_builder.rs
index 307a916f..6dcb20d1 100644
--- a/rust/sedona-spatial-join/src/index/default_spatial_index_builder.rs
+++ b/rust/sedona-spatial-join/src/index/default_spatial_index_builder.rs
@@ -26,7 +26,6 @@ use
crate::index::spatial_index_builder::{SpatialIndexBuilder, SpatialJoinBuildM
use crate::{
evaluated_batch::{evaluated_batch_stream::SendableEvaluatedBatchStream,
EvaluatedBatch},
index::{default_spatial_index::DefaultSpatialIndex,
knn_adapter::KnnComponents},
- operand_evaluator::create_operand_evaluator,
refine::create_refiner,
spatial_predicate::SpatialPredicate,
utils::join_utils::need_produce_result_in_final,
@@ -95,6 +94,36 @@ impl DefaultSpatialIndexBuilder {
})
}
+ pub(crate) fn estimate_extra_memory_usage(
+ geo_stats: &GeoStatistics,
+ spatial_predicate: &SpatialPredicate,
+ options: &SpatialJoinOptions,
+ ) -> usize {
+ // Estimate the amount of memory needed by the refiner
+ let num_geoms = geo_stats.total_geometries().unwrap_or(0) as usize;
+ let refiner = create_refiner(
+ options.spatial_library,
+ spatial_predicate,
+ options.clone(),
+ num_geoms,
+ geo_stats.clone(),
+ );
+ let refiner_mem_usage = refiner.estimate_max_memory_usage(geo_stats);
+
+ let knn_components_mem_usage =
+ if matches!(spatial_predicate,
SpatialPredicate::KNearestNeighbors(_)) {
+ KnnComponents::estimate_max_memory_usage(geo_stats)
+ } else {
+ 0
+ };
+
+ // Estimate the amount of memory needed for the R-tree
+ let rtree_mem_usage = num_geoms * RTREE_MEMORY_ESTIMATE_PER_RECT;
+
+ // The final estimation is the sum of all above
+ refiner_mem_usage + knn_components_mem_usage + rtree_mem_usage
+ }
+
/// Build the spatial R-tree index from collected geometry batches.
fn build_rtree(&mut self) -> Result<RTreeBuildResult> {
let build_timer = self.metrics.build_time.timer();
@@ -102,14 +131,14 @@ impl DefaultSpatialIndexBuilder {
let num_rects = self
.indexed_batches
.iter()
- .map(|batch| batch.rects().iter().flatten().count())
+ .map(|batch| batch.geom_array.rects().iter().flatten().count())
.sum::<usize>();
let mut rtree_builder = RTreeBuilder::<f32>::new(num_rects as u32);
let mut batch_pos_vec = vec![(-1, -1); num_rects];
for (batch_idx, batch) in self.indexed_batches.iter().enumerate() {
- let rects = batch.rects();
+ let rects = batch.geom_array.rects();
for (idx, rect_opt) in rects.iter().enumerate() {
let Some(rect) = rect_opt else {
continue;
@@ -201,48 +230,16 @@ impl DefaultSpatialIndexBuilder {
#[async_trait]
impl SpatialIndexBuilder for DefaultSpatialIndexBuilder {
- fn estimate_extra_memory_usage(
- &self,
- geo_stats: &GeoStatistics,
- spatial_predicate: &SpatialPredicate,
- options: &SpatialJoinOptions,
- ) -> usize {
- // Estimate the amount of memory needed by the refiner
- let num_geoms = geo_stats.total_geometries().unwrap_or(0) as usize;
- let refiner = create_refiner(
- options.spatial_library,
- spatial_predicate,
- options.clone(),
- num_geoms,
- geo_stats.clone(),
- );
- let refiner_mem_usage = refiner.estimate_max_memory_usage(geo_stats);
-
- let knn_components_mem_usage =
- if matches!(spatial_predicate,
SpatialPredicate::KNearestNeighbors(_)) {
- KnnComponents::estimate_max_memory_usage(geo_stats)
- } else {
- 0
- };
-
- // Estimate the amount of memory needed for the R-tree
- let rtree_mem_usage = num_geoms * RTREE_MEMORY_ESTIMATE_PER_RECT;
-
- // The final estimation is the sum of all above
- refiner_mem_usage + knn_components_mem_usage + rtree_mem_usage
- }
-
- fn finish(mut self) -> Result<SpatialIndexRef> {
+ fn finish(&mut self) -> Result<SpatialIndexRef> {
if self.indexed_batches.is_empty() {
return Ok(Arc::new(DefaultSpatialIndex::empty(
- self.spatial_predicate,
- self.schema,
- self.options,
+ self.spatial_predicate.clone(),
+ self.schema.clone(),
+ self.options.clone(),
AtomicUsize::new(self.probe_threads_count),
)));
}
- let evaluator = create_operand_evaluator(&self.spatial_predicate,
self.options.clone());
let num_geoms = self
.indexed_batches
.iter()
@@ -282,12 +279,13 @@ impl SpatialIndexBuilder for DefaultSpatialIndexBuilder {
self.memory_used
);
Ok(Arc::new(DefaultSpatialIndex::new(
- self.schema,
- self.options,
- evaluator,
+ self.schema.clone(),
+ self.options.clone(),
refiner,
rtree,
- self.indexed_batches,
+ self.indexed_batches
+ .drain(0..self.indexed_batches.len())
+ .collect(),
batch_pos_vec,
geom_idx_vec,
visited_build_side,
diff --git a/rust/sedona-spatial-join/src/index/knn_adapter.rs
b/rust/sedona-spatial-join/src/index/knn_adapter.rs
index ec67fe15..81deb160 100644
--- a/rust/sedona-spatial-join/src/index/knn_adapter.rs
+++ b/rust/sedona-spatial-join/src/index/knn_adapter.rs
@@ -111,7 +111,7 @@ impl<'a> GeometryAccessor for SedonaKnnAdapter<'a> {
let (batch_idx, row_idx) = self.data_id_to_batch_pos[item_index];
let indexed_batch = &self.indexed_batches[batch_idx as usize];
- if let Some(wkb) = indexed_batch.wkb(row_idx as usize) {
+ if let Some(wkb) = indexed_batch.geom_array.wkb(row_idx as usize) {
if let Ok(geom) = item_to_geometry(wkb) {
// Try to store in cache - if another thread got there first,
we just use theirs
let _ = geometry_cache[item_index].set(geom);
diff --git a/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs
b/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs
index c56e9930..bd8e08c2 100644
--- a/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs
+++ b/rust/sedona-spatial-join/src/index/partitioned_index_provider.rs
@@ -21,8 +21,9 @@ use crate::evaluated_batch::evaluated_batch_stream::{
};
use crate::evaluated_batch::EvaluatedBatch;
use crate::index::spatial_index::SpatialIndexRef;
-use crate::index::spatial_index_builder::{SpatialIndexBuilder,
SpatialJoinBuildMetrics};
-use crate::index::{BuildPartition, DefaultSpatialIndexBuilder};
+use crate::index::spatial_index_builder::SpatialJoinBuildMetrics;
+use crate::index::BuildPartition;
+use crate::join_provider::SpatialJoinProvider;
use crate::partitioning::stream_repartitioner::{SpilledPartition,
SpilledPartitions};
use crate::utils::disposable_async_cell::DisposableAsyncCell;
use crate::{partitioning::SpatialPartition,
spatial_predicate::SpatialPredicate};
@@ -55,6 +56,8 @@ pub(crate) struct PartitionedIndexProvider {
/// Async cells for indexes, one per regular partition
index_cells: Vec<DisposableAsyncCell<SharedResult<SpatialIndexRef>>>,
+ join_provider: Arc<dyn SpatialJoinProvider>,
+
/// The memory reserved in the build side collection phase. We'll hold
them until
/// we don't need to build spatial indexes.
_reservations: Vec<MemoryReservation>,
@@ -75,6 +78,7 @@ impl PartitionedIndexProvider {
probe_threads_count: usize,
partitioned_spill_files: SpilledPartitions,
metrics: SpatialJoinBuildMetrics,
+ join_provider: Arc<dyn SpatialJoinProvider>,
reservations: Vec<MemoryReservation>,
) -> Self {
let num_partitions = partitioned_spill_files.num_regular_partitions();
@@ -91,6 +95,7 @@ impl PartitionedIndexProvider {
metrics,
data:
BuildSideData::MultiPartition(Mutex::new(partitioned_spill_files)),
index_cells,
+ join_provider,
_reservations: reservations,
}
}
@@ -104,6 +109,7 @@ impl PartitionedIndexProvider {
probe_threads_count: usize,
mut build_partitions: Vec<BuildPartition>,
metrics: SpatialJoinBuildMetrics,
+ join_provider: Arc<dyn SpatialJoinProvider>,
) -> Self {
let reservations = build_partitions
.iter_mut()
@@ -119,6 +125,7 @@ impl PartitionedIndexProvider {
metrics,
data:
BuildSideData::SinglePartition(Mutex::new(Some(build_partitions))),
index_cells,
+ join_provider,
_reservations: reservations,
}
}
@@ -130,6 +137,7 @@ impl PartitionedIndexProvider {
join_type: JoinType,
probe_threads_count: usize,
metrics: SpatialJoinBuildMetrics,
+ join_provider: Arc<dyn SpatialJoinProvider>,
) -> Self {
let build_partitions = Vec::new();
Self::new_single_partition(
@@ -140,6 +148,7 @@ impl PartitionedIndexProvider {
probe_threads_count,
build_partitions,
metrics,
+ join_provider,
)
}
@@ -274,7 +283,7 @@ impl PartitionedIndexProvider {
&self,
build_partitions: Vec<BuildPartition>,
) -> Result<SpatialIndexRef> {
- let mut builder = DefaultSpatialIndexBuilder::new(
+ let mut builder = self.join_provider.try_new_spatial_index_builder(
Arc::clone(&self.schema),
self.spatial_predicate.clone(),
self.options.clone(),
@@ -296,7 +305,7 @@ impl PartitionedIndexProvider {
&self,
spilled_partition: SpilledPartition,
) -> Result<SpatialIndexRef> {
- let mut builder = DefaultSpatialIndexBuilder::new(
+ let mut builder = self.join_provider.try_new_spatial_index_builder(
Arc::clone(&self.schema),
self.spatial_predicate.clone(),
self.options.clone(),
@@ -395,6 +404,7 @@ impl EvaluatedBatchStream for ReceiverBatchStream {
#[cfg(test)]
mod tests {
use super::*;
+ use crate::join_provider::DefaultSpatialJoinProvider;
use crate::operand_evaluator::EvaluatedGeometryArray;
use crate::partitioning::partition_slots::PartitionSlots;
use crate::utils::bbox_sampler::BoundingBoxSamples;
@@ -511,7 +521,7 @@ mod tests {
return Ok(SpilledPartition::empty());
}
let schema = batches[0].schema();
- let sedona_type = batches[0].geom_array.sedona_type.clone();
+ let sedona_type = batches[0].geom_array.sedona_type().clone();
let mut writer = EvaluatedBatchSpillWriter::try_new(
runtime_env,
schema,
@@ -572,6 +582,7 @@ mod tests {
1,
vec![build_partition],
SpatialJoinBuildMetrics::new(0, &metrics),
+ Arc::new(DefaultSpatialJoinProvider),
);
let first_index = provider
@@ -613,6 +624,7 @@ mod tests {
1,
spilled_partitions,
SpatialJoinBuildMetrics::new(0, &metrics),
+ Arc::new(DefaultSpatialJoinProvider),
vec![new_reservation(Arc::clone(&memory_pool))],
));
diff --git a/rust/sedona-spatial-join/src/index/spatial_index.rs
b/rust/sedona-spatial-join/src/index/spatial_index.rs
index 8c9d4827..ccad19bd 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index.rs
@@ -22,7 +22,6 @@ use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion_common::Result;
-use geo_types::Rect;
use parking_lot::Mutex;
use sedona_common::ExecutionMode;
use sedona_expr::statistics::GeoStatistics;
@@ -45,25 +44,6 @@ pub(crate) trait SpatialIndex {
fn num_indexed_batches(&self) -> usize;
/// Get the batch at the given index.
fn get_indexed_batch(&self, batch_idx: usize) -> &RecordBatch;
- /// Query the spatial index with a probe geometry to find matching
build-side geometries.
- /// # Arguments
- /// * `probe_wkb` - The probe geometry in WKB format
- /// * `probe_rect` - The minimum bounding rectangle of the probe geometry
- /// * `distance` - Optional distance parameter for distance-based spatial
predicates
- /// * `build_batch_positions` - Output vector that will be populated with
(batch_idx, row_idx)
- /// pairs for each matching build-side geometry
- ///
- /// # Returns
- /// * `JoinResultMetrics` containing the number of actual matches
(`count`) and the number
- /// of candidates from the filter phase (`candidate_count`)
- #[allow(unused)]
- fn query(
- &self,
- probe_wkb: &Wkb,
- probe_rect: &Rect<f32>,
- distance: &Option<f64>,
- build_batch_positions: &mut Vec<(i32, i32)>,
- ) -> Result<QueryResultMetrics>;
/// Query the spatial index for k nearest neighbors of a given geometry.
/// # Arguments
///
diff --git a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
index 0db1e073..c51217ba 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
@@ -16,38 +16,25 @@
// under the License.
use datafusion_physical_plan::metrics::{self, ExecutionPlanMetricsSet,
MetricBuilder};
-use sedona_common::SpatialJoinOptions;
use sedona_expr::statistics::GeoStatistics;
-use std::sync::Arc;
+use
crate::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream;
use crate::index::spatial_index::SpatialIndexRef;
-use crate::{
- evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream,
- spatial_predicate::SpatialPredicate,
-};
use async_trait::async_trait;
use datafusion_common::Result;
/// Builder for constructing a SpatialIndex from geometry batches.
#[async_trait]
-pub(crate) trait SpatialIndexBuilder {
- /// Estimate the amount of memory required by the R-tree index and
evaluating spatial predicates.
- /// The estimated memory usage does not include the memory required for
holding the build side
- /// batches.
- fn estimate_extra_memory_usage(
- &self,
- geo_stats: &GeoStatistics,
- spatial_predicate: &SpatialPredicate,
- options: &SpatialJoinOptions,
- ) -> usize;
-
- /// Finish building and return the completed SpatialIndex.
- fn finish(self) -> Result<SpatialIndexRef>;
+pub(crate) trait SpatialIndexBuilder: Send + Sync {
+ /// Add a stream to this builder
async fn add_stream(
&mut self,
stream: SendableEvaluatedBatchStream,
geo_statistics: GeoStatistics,
) -> Result<()>;
+
+ /// Finish building and return the completed SpatialIndex.
+ fn finish(&mut self) -> Result<SpatialIndexRef>;
}
/// Metrics for the build phase of the spatial join.
@@ -67,5 +54,3 @@ impl SpatialJoinBuildMetrics {
}
}
}
-
-pub(crate) type SpatialIndexBuilderRef = Arc<dyn SpatialIndexBuilder + Send +
Sync>;
diff --git a/rust/sedona-spatial-join/src/join_provider.rs
b/rust/sedona-spatial-join/src/join_provider.rs
new file mode 100644
index 00000000..595a6d12
--- /dev/null
+++ b/rust/sedona-spatial-join/src/join_provider.rs
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use arrow_schema::SchemaRef;
+use datafusion_common::Result;
+use datafusion_expr::JoinType;
+use sedona_common::SpatialJoinOptions;
+use sedona_expr::statistics::GeoStatistics;
+
+use crate::{
+ index::{
+ spatial_index_builder::{SpatialIndexBuilder, SpatialJoinBuildMetrics},
+ DefaultSpatialIndexBuilder,
+ },
+ operand_evaluator::{DefaultGeometryArrayFactory,
EvaluatedGeometryArrayFactory},
+ SpatialPredicate,
+};
+
+/// Provider for join internals
+///
+/// This trait provides an extension point for overriding the evaluation
+/// details of a spatial join. In particular it allows plugging in a custom
+/// index for accelerated joins on specific hardware (e.g., GPU) and a custom
+/// bounder for specific data types (e.g., geography).
+pub(crate) trait SpatialJoinProvider: std::fmt::Debug + Send + Sync {
+ /// Create a new [SpatialIndexBuilder]
+ fn try_new_spatial_index_builder(
+ &self,
+ schema: SchemaRef,
+ spatial_predicate: SpatialPredicate,
+ options: SpatialJoinOptions,
+ join_type: JoinType,
+ probe_threads_count: usize,
+ metrics: SpatialJoinBuildMetrics,
+ ) -> Result<Box<dyn SpatialIndexBuilder>>;
+
+ /// Estimate the amount of memory required by the index and refiner
evaluating spatial predicates
+ ///
+ /// The estimated memory usage should not include the memory required for
holding the build side
+ /// batches.
+ fn estimate_extra_memory_usage(
+ &self,
+ geo_stats: &GeoStatistics,
+ spatial_predicate: &SpatialPredicate,
+ options: &SpatialJoinOptions,
+ ) -> usize;
+
+ fn evaluated_array_factory(&self) -> Arc<dyn
EvaluatedGeometryArrayFactory>;
+}
+
+/// Default implementation of the [SpatialJoinProvider]
+#[derive(Debug)]
+pub(crate) struct DefaultSpatialJoinProvider;
+
+impl SpatialJoinProvider for DefaultSpatialJoinProvider {
+ fn try_new_spatial_index_builder(
+ &self,
+ schema: SchemaRef,
+ spatial_predicate: SpatialPredicate,
+ options: SpatialJoinOptions,
+ join_type: JoinType,
+ probe_threads_count: usize,
+ metrics: SpatialJoinBuildMetrics,
+ ) -> Result<Box<dyn SpatialIndexBuilder>> {
+ let builder = DefaultSpatialIndexBuilder::new(
+ schema,
+ spatial_predicate,
+ options,
+ join_type,
+ probe_threads_count,
+ metrics,
+ )?;
+ Ok(Box::new(builder))
+ }
+
+ fn estimate_extra_memory_usage(
+ &self,
+ geo_stats: &GeoStatistics,
+ spatial_predicate: &SpatialPredicate,
+ options: &SpatialJoinOptions,
+ ) -> usize {
+ DefaultSpatialIndexBuilder::estimate_extra_memory_usage(
+ geo_stats,
+ spatial_predicate,
+ options,
+ )
+ }
+
+ fn evaluated_array_factory(&self) -> Arc<dyn
EvaluatedGeometryArrayFactory> {
+ Arc::new(DefaultGeometryArrayFactory)
+ }
+}
diff --git a/rust/sedona-spatial-join/src/lib.rs
b/rust/sedona-spatial-join/src/lib.rs
index d3e7a293..372efb7f 100644
--- a/rust/sedona-spatial-join/src/lib.rs
+++ b/rust/sedona-spatial-join/src/lib.rs
@@ -18,6 +18,7 @@
pub mod evaluated_batch;
pub mod exec;
mod index;
+mod join_provider;
pub mod operand_evaluator;
pub mod partitioning;
pub mod planner;
diff --git a/rust/sedona-spatial-join/src/operand_evaluator.rs
b/rust/sedona-spatial-join/src/operand_evaluator.rs
index efcf2a43..3c316b4c 100644
--- a/rust/sedona-spatial-join/src/operand_evaluator.rs
+++ b/rust/sedona-spatial-join/src/operand_evaluator.rs
@@ -17,6 +17,7 @@
use core::fmt;
use std::{mem::transmute, sync::Arc};
+use arrow::compute::interleave as arrow_interleave;
use arrow_array::{Array, ArrayRef, Float64Array, RecordBatch};
use arrow_schema::DataType;
use datafusion_common::{utils::proxy::VecAllocExt, JoinSide, Result,
ScalarValue};
@@ -30,7 +31,6 @@ use sedona_geo_generic_alg::BoundingRect;
use sedona_schema::datatypes::SedonaType;
use wkb::reader::Wkb;
-use sedona_common::option::SpatialJoinOptions;
use sedona_common::sedona_internal_err;
use crate::{
@@ -38,30 +38,46 @@ use crate::{
utils::arrow_utils::get_array_memory_size,
};
+/// Factory for [EvaluatedGeometryArray] instances given an evaluated geometry
column
+///
+/// Allows join providers to customize the eagerly evaluated representation of
geometry.
+/// This is currently limited to a concrete internal representation but may
expand to
+/// support non-Cartesian bounding or non-WKB backed geometry arrays. This
also may
+/// expand to support more compact or efficient serialization/deserialization
of the
+/// evaluated array when spilling.
+pub(crate) trait EvaluatedGeometryArrayFactory: fmt::Debug + Send + Sync {
+ /// Create a new [EvaluatedGeometryArray]
+ fn try_new_evaluated_array(
+ &self,
+ geometry_array: ArrayRef,
+ sedona_type: &SedonaType,
+ ) -> Result<EvaluatedGeometryArray>;
+}
+
+/// Default EvaluatedGeometryArray factory
+///
+/// This factory constructs an array
+#[derive(Debug)]
+pub(crate) struct DefaultGeometryArrayFactory;
+
+impl EvaluatedGeometryArrayFactory for DefaultGeometryArrayFactory {
+ fn try_new_evaluated_array(
+ &self,
+ geometry_array: ArrayRef,
+ sedona_type: &SedonaType,
+ ) -> Result<EvaluatedGeometryArray> {
+ EvaluatedGeometryArray::try_new(geometry_array, sedona_type)
+ }
+}
+
/// Operand evaluator is for evaluating the operands of a spatial predicate.
It can be a distance
/// operand evaluator or a relation operand evaluator.
pub(crate) trait OperandEvaluator: fmt::Debug + Send + Sync {
/// Evaluate the spatial predicate operand on the build side.
- fn evaluate_build(&self, batch: &RecordBatch) ->
Result<EvaluatedGeometryArray> {
- let geom_expr = self.build_side_expr()?;
- evaluate_with_rects(batch, &geom_expr)
- }
+ fn evaluate_build(&self, batch: &RecordBatch) ->
Result<EvaluatedGeometryArray>;
/// Evaluate the spatial predicate operand on the probe side.
- fn evaluate_probe(&self, batch: &RecordBatch) ->
Result<EvaluatedGeometryArray> {
- let geom_expr = self.probe_side_expr()?;
- evaluate_with_rects(batch, &geom_expr)
- }
-
- /// Resolve the distance operand for a given row.
- fn resolve_distance(
- &self,
- _build_distance: &Option<ColumnarValue>,
- _build_row_idx: usize,
- _probe_distance: &Option<f64>,
- ) -> Result<Option<f64>> {
- Ok(None)
- }
+ fn evaluate_probe(&self, batch: &RecordBatch) ->
Result<EvaluatedGeometryArray>;
/// Get the expression for the build side.
fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>>;
@@ -73,32 +89,34 @@ pub(crate) trait OperandEvaluator: fmt::Debug + Send + Sync
{
/// Create a spatial predicate evaluator for the spatial predicate.
pub(crate) fn create_operand_evaluator(
predicate: &SpatialPredicate,
- options: SpatialJoinOptions,
+ evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
) -> Arc<dyn OperandEvaluator> {
match predicate {
- SpatialPredicate::Distance(predicate) => {
- Arc::new(DistanceOperandEvaluator::new(predicate.clone(), options))
- }
- SpatialPredicate::Relation(predicate) => {
- Arc::new(RelationOperandEvaluator::new(predicate.clone(), options))
- }
- SpatialPredicate::KNearestNeighbors(predicate) => {
- Arc::new(KNNOperandEvaluator::new(predicate.clone()))
- }
+ SpatialPredicate::Distance(predicate) =>
Arc::new(DistanceOperandEvaluator::new(
+ predicate.clone(),
+ evaluated_array_factory,
+ )),
+ SpatialPredicate::Relation(predicate) =>
Arc::new(RelationOperandEvaluator::new(
+ predicate.clone(),
+ evaluated_array_factory,
+ )),
+ SpatialPredicate::KNearestNeighbors(predicate) =>
Arc::new(KNNOperandEvaluator::new(
+ predicate.clone(),
+ evaluated_array_factory,
+ )),
}
}
/// Result of evaluating a geometry batch.
pub struct EvaluatedGeometryArray {
- /// Type of geometry_array
- pub sedona_type: SedonaType,
+ sedona_type: SedonaType,
/// The array of geometries produced by evaluating the geometry expression.
- pub geometry_array: ArrayRef,
+ geometry_array: ArrayRef,
/// The rects of the geometries in the geometry array. The length of this
array is equal to the number of geometries.
/// The rects will be None for empty or null geometries.
- pub rects: Vec<Option<Rect<f32>>>,
+ rects: Vec<Option<Rect<f32>>>,
/// The distance value produced by evaluating the distance expression.
- pub distance: Option<ColumnarValue>,
+ distance: Option<ColumnarValue>,
/// WKBs of the geometries in `geometry_array`. The wkb values reference
buffers inside the geometry array,
/// but we'll only allow accessing Wkb<'a> where 'a is the lifetime of the
GeometryBatchResult to make
/// the interfaces safe. The buffers in `geometry_array` are allocated on
the heap and won't be moved when
@@ -107,6 +125,7 @@ pub struct EvaluatedGeometryArray {
}
impl EvaluatedGeometryArray {
+ /// Create a new EvaluatedGeometryArray and compute item bounds
pub fn try_new(geometry_array: ArrayRef, sedona_type: &SedonaType) ->
Result<Self> {
let num_rows = geometry_array.len();
let mut rect_vec = Vec::with_capacity(num_rows);
@@ -126,19 +145,47 @@ impl EvaluatedGeometryArray {
} else {
None
};
+
rect_vec.push(rect_opt);
- wkbs.push(wkb_opt);
+
+ // Safety: The wkbs must reference buffers inside the
`geometry_array`. Since the `geometry_array` and
+ // `wkbs` are both owned by the `EvaluatedGeometryArray`, so they
have the same lifetime. We'll never
+ // have a situation where the `EvaluatedGeometryArray` is dropped
while the `wkbs` are still in use
+ // (guaranteed by the scope of the `wkbs` field and lifetime
signature of the `wkbs` method).
+ wkbs.push(wkb_opt.map(|wkb| unsafe { transmute(wkb) }));
Ok(())
})?;
+ Ok(Self {
+ sedona_type: sedona_type.clone(),
+ geometry_array,
+ rects: rect_vec,
+ distance: None,
+ wkbs,
+ })
+ }
+
+ /// Create a new EvaluatedGeometryArray with precomputed item bounds
+ pub fn try_new_with_rects(
+ geometry_array: ArrayRef,
+ rect_vec: Vec<Option<Rect<f32>>>,
+ sedona_type: &SedonaType,
+ ) -> Result<Self> {
// Safety: The wkbs must reference buffers inside the
`geometry_array`. Since the `geometry_array` and
// `wkbs` are both owned by the `EvaluatedGeometryArray`, so they have
the same lifetime. We'll never
// have a situation where the `EvaluatedGeometryArray` is dropped
while the `wkbs` are still in use
// (guaranteed by the scope of the `wkbs` field and lifetime signature
of the `wkbs` method).
- let wkbs = wkbs
- .into_iter()
- .map(|wkb| wkb.map(|wkb| unsafe { transmute(wkb) }))
- .collect();
+ let num_rows = geometry_array.len();
+ let mut wkbs = Vec::with_capacity(num_rows);
+ geometry_array.iter_as_wkb(sedona_type, num_rows, |wkb_opt| {
+ wkbs.push(wkb_opt.map(|wkb| unsafe { transmute(wkb) }));
+ Ok(())
+ })?;
+
+ if num_rows != rect_vec.len() {
+ return sedona_internal_err!("Expected rect_vec to have same length
as geometry array");
+ }
+
Ok(Self {
sedona_type: sedona_type.clone(),
geometry_array,
@@ -148,6 +195,157 @@ impl EvaluatedGeometryArray {
})
}
+ /// Set evaluated distance
+ pub fn with_distance(mut self, distance: Option<ColumnarValue>) -> Self {
+ self.distance = distance;
+ self
+ }
+
+ /// Build a new `EvaluatedGeometryArray` by interleaving rows from the
provided
+ /// source arrays according to `indices`. Each `(batch_idx, row_idx)` pair
+ /// identifies a source array and row.
+ ///
+ /// The rectangles are gathered directly from the source arrays (no
+ /// recomputation), while the WKB references are recomputed from the
+ /// interleaved Arrow array since their lifetimes are tied to it.
+ pub fn interleave(
+ geom_arrays: &[&EvaluatedGeometryArray],
+ indices: &[(usize, usize)],
+ ) -> Result<Self> {
+ if geom_arrays.is_empty() {
+ return sedona_internal_err!("interleave requires at least one
geometry array");
+ }
+
+ let sedona_type = &geom_arrays[0].sedona_type;
+
+ // Interleave the Arrow geometry arrays.
+ let value_refs: Vec<&dyn Array> = geom_arrays
+ .iter()
+ .map(|g| g.geometry_array.as_ref())
+ .collect();
+ let geometry_array = arrow_interleave(&value_refs, indices)?;
+
+ // Gather rects by index — no recomputation.
+ let rects: Vec<Option<Rect<f32>>> = indices
+ .iter()
+ .map(|&(batch_idx, row_idx)| geom_arrays[batch_idx].rects[row_idx])
+ .collect();
+
+ let mut out = Self::try_new_with_rects(geometry_array, rects,
sedona_type)?;
+
+ // Interleave distance columns.
+ out.distance = Self::interleave_distance(geom_arrays, indices)?;
+
+ Ok(out)
+ }
+
+ /// Interleave the optional distance metadata across source geometry
arrays.
+ pub(crate) fn interleave_distance(
+ geom_arrays: &[&EvaluatedGeometryArray],
+ indices: &[(usize, usize)],
+ ) -> Result<Option<ColumnarValue>> {
+ let mut first_value: Option<&ColumnarValue> = None;
+ let mut needs_array = false;
+ let mut all_null = true;
+ let mut first_scalar: Option<&ScalarValue> = None;
+
+ for geom in geom_arrays {
+ match &geom.distance {
+ Some(value) => {
+ if first_value.is_none() {
+ first_value = Some(value);
+ }
+ match value {
+ ColumnarValue::Array(array) => {
+ needs_array = true;
+ if all_null && array.logical_null_count() !=
array.len() {
+ all_null = false;
+ }
+ }
+ ColumnarValue::Scalar(scalar) => {
+ if let Some(first) = first_scalar {
+ if first != scalar {
+ needs_array = true;
+ }
+ } else {
+ first_scalar = Some(scalar);
+ }
+ if !scalar.is_null() {
+ all_null = false;
+ }
+ }
+ }
+ }
+ None => {
+ if first_value.is_some() && !all_null {
+ return sedona_internal_err!(
+ "Inconsistent distance metadata across batches"
+ );
+ }
+ }
+ }
+ }
+
+ if all_null {
+ return Ok(None);
+ }
+
+ let Some(distance_value) = first_value else {
+ return Ok(None);
+ };
+
+ if !needs_array {
+ if let ColumnarValue::Scalar(value) = distance_value {
+ return Ok(Some(ColumnarValue::Scalar(value.clone())));
+ }
+ }
+
+ let mut arrays: Vec<ArrayRef> = Vec::with_capacity(geom_arrays.len());
+ for geom in geom_arrays {
+ match &geom.distance {
+ Some(ColumnarValue::Array(array)) =>
arrays.push(array.clone()),
+ Some(ColumnarValue::Scalar(value)) => {
+
arrays.push(value.to_array_of_size(geom.geometry_array.len())?);
+ }
+ None => {
+ return sedona_internal_err!("Inconsistent distance
metadata across batches");
+ }
+ }
+ }
+
+ let array_refs: Vec<&dyn Array> = arrays.iter().map(|a|
a.as_ref()).collect();
+ let array = arrow_interleave(&array_refs, indices)?;
+ Ok(Some(ColumnarValue::Array(array)))
+ }
+
+ /// Type of geometry_array
+ pub fn sedona_type(&self) -> &SedonaType {
+ &self.sedona_type
+ }
+
+ /// Evaluated array of geometries
+ pub fn geometry_array(&self) -> &ArrayRef {
+ &self.geometry_array
+ }
+
+ /// Bounding rectangles of each element in geometry_array
+ pub fn rects(&self) -> &[Option<Rect<f32>>] {
+ &self.rects
+ }
+
+ /// Evaluated array of distances
+ pub fn distance(&self) -> &Option<ColumnarValue> {
+ &self.distance
+ }
+
+ /// Get the distance value for a specific row, if distances are present.
+ pub fn distance_at(&self, row_idx: usize) -> Result<Option<f64>> {
+ match &self.distance {
+ Some(cv) => distance_value_at(cv, row_idx),
+ None => Ok(None),
+ }
+ }
+
/// Get the WKBs of the geometries in the geometry array.
pub fn wkbs(&self) -> &Vec<Option<Wkb<'_>>> {
// The returned WKBs are guaranteed to be valid for the lifetime of
the GeometryBatchResult,
@@ -157,6 +355,11 @@ impl EvaluatedGeometryArray {
&self.wkbs
}
+ /// Get a single WKB
+ pub fn wkb(&self, idx: usize) -> Option<&Wkb<'_>> {
+ self.wkbs[idx].as_ref()
+ }
+
pub fn in_mem_size(&self) -> Result<usize> {
let geom_array_size = get_array_memory_size(&self.geometry_array)?;
@@ -177,14 +380,17 @@ impl EvaluatedGeometryArray {
#[derive(Debug)]
struct RelationOperandEvaluator {
inner: RelationPredicate,
- _options: SpatialJoinOptions,
+ evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
}
impl RelationOperandEvaluator {
- pub fn new(inner: RelationPredicate, options: SpatialJoinOptions) -> Self {
+ pub fn new(
+ inner: RelationPredicate,
+ evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
+ ) -> Self {
Self {
inner,
- _options: options,
+ evaluated_array_factory,
}
}
}
@@ -193,14 +399,17 @@ impl RelationOperandEvaluator {
#[derive(Debug)]
struct DistanceOperandEvaluator {
inner: DistancePredicate,
- _options: SpatialJoinOptions,
+ evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
}
impl DistanceOperandEvaluator {
- pub fn new(inner: DistancePredicate, options: SpatialJoinOptions) -> Self {
+ pub fn new(
+ inner: DistancePredicate,
+ evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
+ ) -> Self {
Self {
inner,
- _options: options,
+ evaluated_array_factory,
}
}
}
@@ -208,13 +417,14 @@ impl DistanceOperandEvaluator {
fn evaluate_with_rects(
batch: &RecordBatch,
geom_expr: &Arc<dyn PhysicalExpr>,
+ evaluated_array_factory: &dyn EvaluatedGeometryArrayFactory,
) -> Result<EvaluatedGeometryArray> {
let geometry_columnar_value = geom_expr.evaluate(batch)?;
let num_rows = batch.num_rows();
let geometry_array = geometry_columnar_value.to_array(num_rows)?;
let sedona_type =
SedonaType::from_storage_field(geom_expr.return_field(&batch.schema())?.as_ref())?;
- EvaluatedGeometryArray::try_new(geometry_array, &sedona_type)
+ evaluated_array_factory.try_new_evaluated_array(geometry_array,
&sedona_type)
}
impl DistanceOperandEvaluator {
@@ -224,7 +434,8 @@ impl DistanceOperandEvaluator {
geom_expr: &Arc<dyn PhysicalExpr>,
side: JoinSide,
) -> Result<EvaluatedGeometryArray> {
- let mut result = evaluate_with_rects(batch, geom_expr)?;
+ let mut result =
+ evaluate_with_rects(batch, geom_expr,
self.evaluated_array_factory.as_ref())?;
let should_expand = match side {
JoinSide::Left => self.inner.distance_side == JoinSide::Left,
@@ -333,26 +544,19 @@ impl OperandEvaluator for DistanceOperandEvaluator {
fn probe_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::clone(&self.inner.right))
}
-
- fn resolve_distance(
- &self,
- build_distance: &Option<ColumnarValue>,
- build_row_idx: usize,
- probe_distance: &Option<f64>,
- ) -> Result<Option<f64>> {
- match self.inner.distance_side {
- JoinSide::Left => {
- let Some(distance) = build_distance else {
- return Ok(None);
- };
- distance_value_at(distance, build_row_idx)
- }
- JoinSide::Right | JoinSide::None => Ok(*probe_distance),
- }
- }
}
impl OperandEvaluator for RelationOperandEvaluator {
+ fn evaluate_build(&self, batch: &RecordBatch) ->
Result<EvaluatedGeometryArray> {
+ let geom_expr = self.build_side_expr()?;
+ evaluate_with_rects(batch, &geom_expr,
self.evaluated_array_factory.as_ref())
+ }
+
+ fn evaluate_probe(&self, batch: &RecordBatch) ->
Result<EvaluatedGeometryArray> {
+ let geom_expr = self.probe_side_expr()?;
+ evaluate_with_rects(batch, &geom_expr,
self.evaluated_array_factory.as_ref())
+ }
+
fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::clone(&self.inner.left))
}
@@ -366,15 +570,32 @@ impl OperandEvaluator for RelationOperandEvaluator {
#[derive(Debug)]
struct KNNOperandEvaluator {
inner: KNNPredicate,
+ evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
}
impl KNNOperandEvaluator {
- fn new(inner: KNNPredicate) -> Self {
- Self { inner }
+ fn new(
+ inner: KNNPredicate,
+ evaluated_array_factory: Arc<dyn EvaluatedGeometryArrayFactory>,
+ ) -> Self {
+ Self {
+ inner,
+ evaluated_array_factory,
+ }
}
}
impl OperandEvaluator for KNNOperandEvaluator {
+ fn evaluate_build(&self, batch: &RecordBatch) ->
Result<EvaluatedGeometryArray> {
+ let geom_expr = self.build_side_expr()?;
+ evaluate_with_rects(batch, &geom_expr,
self.evaluated_array_factory.as_ref())
+ }
+
+ fn evaluate_probe(&self, batch: &RecordBatch) ->
Result<EvaluatedGeometryArray> {
+ let geom_expr = self.probe_side_expr()?;
+ evaluate_with_rects(batch, &geom_expr,
self.evaluated_array_factory.as_ref())
+ }
+
fn build_side_expr(&self) -> Result<Arc<dyn PhysicalExpr>> {
// For KNN, the right side (objects/candidates) is the build side
Ok(Arc::clone(&self.inner.right))
@@ -384,15 +605,205 @@ impl OperandEvaluator for KNNOperandEvaluator {
// For KNN, the left side (queries) is the probe side
Ok(Arc::clone(&self.inner.left))
}
+}
- /// Resolve the k value for KNN operation
- fn resolve_distance(
- &self,
- _build_distance: &Option<ColumnarValue>,
- _build_row_idx: usize,
- _probe_distance: &Option<f64>,
- ) -> Result<Option<f64>> {
- // NOTE: We do not support distance-based refinement for KNN
predicates in the refiner phase.
- Ok(None)
+#[cfg(test)]
+mod test {
+ use arrow_array::BinaryArray;
+ use sedona_geometry::wkb_factory::wkb_point;
+ use sedona_schema::datatypes::WKB_GEOMETRY;
+
+ use super::*;
+
+ fn make_geom_array_with_distance(
+ wkbs: Vec<Vec<u8>>,
+ distance: Option<ColumnarValue>,
+ ) -> Result<EvaluatedGeometryArray> {
+ let geom_array: ArrayRef = Arc::new(BinaryArray::from(
+ wkbs.iter()
+ .map(|wkb| Some(wkb.as_slice()))
+ .collect::<Vec<_>>(),
+ ));
+ let mut geom = EvaluatedGeometryArray::try_new(geom_array,
&WKB_GEOMETRY)?;
+ geom.distance = distance;
+ Ok(geom)
+ }
+
+ #[test]
+ fn interleave_distance_none() -> Result<()> {
+ let wkbs1 = vec![
+ wkb_point((10.0, 10.0)).unwrap(),
+ wkb_point((20.0, 20.0)).unwrap(),
+ ];
+ let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
+
+ let geom1 = make_geom_array_with_distance(wkbs1, None)?;
+ let geom2 = make_geom_array_with_distance(wkbs2, None)?;
+
+ let geom_arrays = vec![&geom1, &geom2];
+ let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+ let result = EvaluatedGeometryArray::interleave_distance(&geom_arrays,
&assignments)?;
+ assert!(result.is_none());
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_distance_uniform_scalar() -> Result<()> {
+ let wkbs1 = vec![
+ wkb_point((10.0, 10.0)).unwrap(),
+ wkb_point((20.0, 20.0)).unwrap(),
+ ];
+ let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
+
+ let scalar = ScalarValue::Float64(Some(5.0));
+ let geom1 =
+ make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Scalar(scalar.clone())))?;
+ let geom2 =
+ make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Scalar(scalar.clone())))?;
+
+ let geom_arrays = vec![&geom1, &geom2];
+ let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+ let result = EvaluatedGeometryArray::interleave_distance(&geom_arrays,
&assignments)?;
+ assert!(matches!(result, Some(ColumnarValue::Scalar(_))));
+ if let Some(ColumnarValue::Scalar(value)) = result {
+ assert_eq!(value, ScalarValue::Float64(Some(5.0)));
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_distance_different_scalars() -> Result<()> {
+ use arrow_array::Float64Array;
+
+ let wkbs1 = vec![
+ wkb_point((10.0, 10.0)).unwrap(),
+ wkb_point((20.0, 20.0)).unwrap(),
+ ];
+ let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
+
+ let scalar1 = ScalarValue::Float64(Some(5.0));
+ let scalar2 = ScalarValue::Float64(Some(10.0));
+ let geom1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Scalar(scalar1)))?;
+ let geom2 = make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Scalar(scalar2)))?;
+
+ let geom_arrays = vec![&geom1, &geom2];
+ let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+ let result = EvaluatedGeometryArray::interleave_distance(&geom_arrays,
&assignments)?;
+ assert!(matches!(result, Some(ColumnarValue::Array(_))));
+ if let Some(ColumnarValue::Array(array)) = result {
+ let float_array =
array.as_any().downcast_ref::<Float64Array>().unwrap();
+ assert_eq!(float_array.len(), 3);
+ assert_eq!(float_array.value(0), 5.0);
+ assert_eq!(float_array.value(1), 10.0);
+ assert_eq!(float_array.value(2), 5.0);
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_distance_arrays() -> Result<()> {
+ use arrow_array::Float64Array;
+
+ let wkbs1 = vec![
+ wkb_point((10.0, 10.0)).unwrap(),
+ wkb_point((20.0, 20.0)).unwrap(),
+ ];
+ let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
+
+ let array1: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0]));
+ let array2: ArrayRef = Arc::new(Float64Array::from(vec![3.0]));
+ let geom1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Array(array1)))?;
+ let geom2 = make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Array(array2)))?;
+
+ let geom_arrays = vec![&geom1, &geom2];
+ let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+ let result = EvaluatedGeometryArray::interleave_distance(&geom_arrays,
&assignments)?;
+ assert!(matches!(result, Some(ColumnarValue::Array(_))));
+ if let Some(ColumnarValue::Array(array)) = result {
+ let float_array =
array.as_any().downcast_ref::<Float64Array>().unwrap();
+ assert_eq!(float_array.len(), 3);
+ assert_eq!(float_array.value(0), 1.0);
+ assert_eq!(float_array.value(1), 3.0);
+ assert_eq!(float_array.value(2), 2.0);
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_distance_mixed_scalar_and_array() -> Result<()> {
+ use arrow_array::Float64Array;
+
+ let wkbs1 = vec![
+ wkb_point((10.0, 10.0)).unwrap(),
+ wkb_point((20.0, 20.0)).unwrap(),
+ ];
+ let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
+
+ let scalar = ScalarValue::Float64(Some(5.0));
+ let array: ArrayRef = Arc::new(Float64Array::from(vec![10.0]));
+ let geom1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Scalar(scalar)))?;
+ let geom2 = make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Array(array)))?;
+
+ let geom_arrays = vec![&geom1, &geom2];
+ let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+ let result = EvaluatedGeometryArray::interleave_distance(&geom_arrays,
&assignments)?;
+ assert!(matches!(result, Some(ColumnarValue::Array(_))));
+ if let Some(ColumnarValue::Array(array)) = result {
+ let float_array =
array.as_any().downcast_ref::<Float64Array>().unwrap();
+ assert_eq!(float_array.len(), 3);
+ assert_eq!(float_array.value(0), 5.0);
+ assert_eq!(float_array.value(1), 10.0);
+ assert_eq!(float_array.value(2), 5.0);
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_distance_inconsistent_metadata() -> Result<()> {
+ let wkbs1 = vec![wkb_point((10.0, 10.0)).unwrap()];
+ let wkbs2 = vec![wkb_point((20.0, 20.0)).unwrap()];
+
+ let scalar = ScalarValue::Float64(Some(5.0));
+ let geom1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Scalar(scalar)))?;
+ let geom2 = make_geom_array_with_distance(wkbs2, None)?;
+
+ let geom_arrays = vec![&geom1, &geom2];
+ let assignments = vec![(0, 0), (1, 0)];
+
+ let result = EvaluatedGeometryArray::interleave_distance(&geom_arrays,
&assignments);
+ assert!(result.is_err());
+ if let Err(e) = result {
+ assert!(e.to_string().contains("Inconsistent distance metadata"));
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_distance_mixed_none_and_null() -> Result<()> {
+ use arrow_array::Float64Array;
+
+ let wkbs1 = vec![wkb_point((10.0, 10.0)).unwrap()];
+ let wkbs2 = vec![wkb_point((20.0, 20.0)).unwrap()];
+ let wkbs3 = vec![wkb_point((30.0, 30.0)).unwrap()];
+
+ let null_array = Arc::new(Float64Array::new_null(1));
+ let ega1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Array(null_array)))?;
+
+ let null_scalar = ScalarValue::Float64(None);
+ let ega2 = make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Scalar(null_scalar)))?;
+
+ let ega3 = make_geom_array_with_distance(wkbs3, None)?;
+
+ let vec_ega = vec![&ega1, &ega2, &ega3];
+ let assignments = vec![(0, 0), (1, 0), (2, 0)];
+
+ let result = EvaluatedGeometryArray::interleave_distance(&vec_ega,
&assignments)?;
+ assert!(result.is_none());
+ Ok(())
}
}
diff --git a/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs
b/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs
index cc8e6d7e..8d86d40b 100644
--- a/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs
+++ b/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs
@@ -35,13 +35,11 @@ use crate::{
SpatialPartitioner,
},
};
-use arrow::compute::interleave as arrow_interleave;
use arrow::compute::interleave_record_batch;
-use arrow_array::{Array, ArrayRef, RecordBatch};
+use arrow_array::RecordBatch;
use datafusion::config::SpillCompression;
-use datafusion_common::{Result, ScalarValue};
+use datafusion_common::Result;
use datafusion_execution::{disk_manager::RefCountedTempFile,
runtime_env::RuntimeEnv};
-use datafusion_expr::ColumnarValue;
use datafusion_physical_plan::metrics::SpillMetrics;
use futures::StreamExt;
use sedona_common::sedona_internal_err;
@@ -479,7 +477,7 @@ impl StreamRepartitioner {
self.slots.num_regular_partitions()
);
};
- if let Some(wkb) = batch_ref.wkb(row_idx) {
+ if let Some(wkb) = batch_ref.geom_array.wkb(row_idx) {
self.geo_stats_accumulators[slot_idx].update_statistics(wkb)?;
}
self.slot_assignments[slot_idx].push((batch_idx, row_idx));
@@ -573,7 +571,7 @@ impl StreamRepartitioner {
self.spill_registry[slot_idx] =
Some(EvaluatedBatchSpillWriter::try_new(
Arc::clone(&self.runtime_env),
batch.schema(),
- &batch.geom_array.sedona_type,
+ batch.geom_array.sedona_type(),
"streaming repartitioner",
self.spill_compression,
self.spill_metrics.clone(),
@@ -597,13 +595,13 @@ pub(crate) fn assign_rows(
assignments: &mut Vec<SpatialPartition>,
) -> Result<()> {
assignments.clear();
- assignments.reserve(batch.rects().len());
+ assignments.reserve(batch.geom_array.rects().len());
match partitioned_side {
PartitionedSide::BuildSide => {
let mut cnt = 0;
let num_regular_partitions = partitioner.num_regular_partitions()
as u32;
- for rect_opt in batch.rects() {
+ for rect_opt in batch.geom_array.rects() {
let partition = match rect_opt {
Some(rect) =>
partitioner.partition_no_multi(&geo_rect_to_bbox(rect))?,
None => {
@@ -618,7 +616,7 @@ pub(crate) fn assign_rows(
}
}
PartitionedSide::ProbeSide => {
- for rect_opt in batch.rects() {
+ for rect_opt in batch.geom_array.rects() {
let partition = match rect_opt {
Some(rect) =>
partitioner.partition(&geo_rect_to_bbox(rect))?,
None => SpatialPartition::None,
@@ -645,115 +643,14 @@ pub(crate) fn interleave_evaluated_batch(
return sedona_internal_err!("interleave_evaluated_batch requires at
least one batch");
}
let batch = interleave_record_batch(record_batches, indices)?;
- let geom_array = interleave_geometry_array(geom_arrays, indices)?;
+ let geom_array = EvaluatedGeometryArray::interleave(geom_arrays, indices)?;
Ok(EvaluatedBatch { batch, geom_array })
}
-fn interleave_geometry_array(
- geom_arrays: &[&EvaluatedGeometryArray],
- indices: &[(usize, usize)],
-) -> Result<EvaluatedGeometryArray> {
- if geom_arrays.is_empty() {
- return sedona_internal_err!("interleave_geometry_array requires at
least one batch");
- }
- let sedona_type = &geom_arrays[0].sedona_type;
- let value_refs: Vec<&dyn Array> = geom_arrays
- .iter()
- .map(|geom| geom.geometry_array.as_ref())
- .collect();
- let geometry_array = arrow_interleave(&value_refs, indices)?;
-
- let distance = interleave_distance_columns(geom_arrays, indices)?;
-
- let mut result = EvaluatedGeometryArray::try_new(geometry_array,
sedona_type)?;
- result.distance = distance;
- Ok(result)
-}
-
-fn interleave_distance_columns(
- geom_arrays: &[&EvaluatedGeometryArray],
- assignments: &[(usize, usize)],
-) -> Result<Option<ColumnarValue>> {
- // Check consistency and determine if we need array conversion
- let mut first_value: Option<&ColumnarValue> = None;
- let mut needs_array = false;
- let mut all_null = true;
- let mut first_scalar: Option<&ScalarValue> = None;
-
- for geom in geom_arrays {
- match &geom.distance {
- Some(value) => {
- if first_value.is_none() {
- first_value = Some(value);
- }
-
- match value {
- ColumnarValue::Array(array) => {
- needs_array = true;
- if all_null && array.logical_null_count() !=
array.len() {
- all_null = false;
- }
- }
- ColumnarValue::Scalar(scalar) => {
- if let Some(first) = first_scalar {
- if first != scalar {
- needs_array = true;
- }
- } else {
- first_scalar = Some(scalar);
- }
- if !scalar.is_null() {
- all_null = false;
- }
- }
- }
- }
- None => {
- if first_value.is_some() && !all_null {
- return sedona_internal_err!("Inconsistent distance
metadata across batches");
- }
- }
- }
- }
-
- if all_null {
- return Ok(None);
- }
-
- let Some(distance_value) = first_value else {
- return Ok(None);
- };
-
- // If all scalars match, return scalar
- if !needs_array {
- if let ColumnarValue::Scalar(value) = distance_value {
- return Ok(Some(ColumnarValue::Scalar(value.clone())));
- }
- }
-
- // Convert to arrays and interleave
- let mut arrays: Vec<ArrayRef> = Vec::with_capacity(geom_arrays.len());
- for geom in geom_arrays {
- match &geom.distance {
- Some(ColumnarValue::Array(array)) => arrays.push(array.clone()),
- Some(ColumnarValue::Scalar(value)) => {
-
arrays.push(value.to_array_of_size(geom.geometry_array.len())?);
- }
- None => {
- return sedona_internal_err!("Inconsistent distance metadata
across batches");
- }
- }
- }
-
- let array_refs: Vec<&dyn Array> = arrays.iter().map(|array|
array.as_ref()).collect();
- let array = arrow_interleave(&array_refs, assignments)?;
- Ok(Some(ColumnarValue::Array(array)))
-}
-
#[cfg(test)]
mod tests {
use super::*;
- use arrow_array::{ArrayRef, BinaryArray, Int32Array};
+ use arrow_array::{Array, ArrayRef, BinaryArray, Int32Array};
use arrow_schema::{DataType, Field, Schema};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use sedona_geometry::bounding_box::BoundingBox;
@@ -1096,154 +993,6 @@ mod tests {
Ok(())
}
- fn make_geom_array_with_distance(
- wkbs: Vec<Vec<u8>>,
- distance: Option<ColumnarValue>,
- ) -> Result<EvaluatedGeometryArray> {
- let geom_array: ArrayRef = Arc::new(BinaryArray::from(
- wkbs.iter()
- .map(|wkb| Some(wkb.as_slice()))
- .collect::<Vec<_>>(),
- ));
- let mut geom = EvaluatedGeometryArray::try_new(geom_array,
&WKB_GEOMETRY)?;
- geom.distance = distance;
- Ok(geom)
- }
-
- #[test]
- fn interleave_distance_none() -> Result<()> {
- let wkbs1 = vec![
- wkb_point((10.0, 10.0)).unwrap(),
- wkb_point((20.0, 20.0)).unwrap(),
- ];
- let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
-
- let geom1 = make_geom_array_with_distance(wkbs1, None)?;
- let geom2 = make_geom_array_with_distance(wkbs2, None)?;
-
- let geom_arrays = vec![&geom1, &geom2];
- let assignments = vec![(0, 0), (1, 0), (0, 1)];
-
- let result = interleave_distance_columns(&geom_arrays, &assignments)?;
- assert!(result.is_none());
- Ok(())
- }
-
- #[test]
- fn interleave_distance_uniform_scalar() -> Result<()> {
- let wkbs1 = vec![
- wkb_point((10.0, 10.0)).unwrap(),
- wkb_point((20.0, 20.0)).unwrap(),
- ];
- let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
-
- let scalar = ScalarValue::Float64(Some(5.0));
- let geom1 =
- make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Scalar(scalar.clone())))?;
- let geom2 =
- make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Scalar(scalar.clone())))?;
-
- let geom_arrays = vec![&geom1, &geom2];
- let assignments = vec![(0, 0), (1, 0), (0, 1)];
-
- let result = interleave_distance_columns(&geom_arrays, &assignments)?;
- assert!(matches!(result, Some(ColumnarValue::Scalar(_))));
- if let Some(ColumnarValue::Scalar(value)) = result {
- assert_eq!(value, ScalarValue::Float64(Some(5.0)));
- }
- Ok(())
- }
-
- #[test]
- fn interleave_distance_different_scalars() -> Result<()> {
- use arrow_array::Float64Array;
-
- let wkbs1 = vec![
- wkb_point((10.0, 10.0)).unwrap(),
- wkb_point((20.0, 20.0)).unwrap(),
- ];
- let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
-
- let scalar1 = ScalarValue::Float64(Some(5.0));
- let scalar2 = ScalarValue::Float64(Some(10.0));
- let geom1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Scalar(scalar1)))?;
- let geom2 = make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Scalar(scalar2)))?;
-
- let geom_arrays = vec![&geom1, &geom2];
- let assignments = vec![(0, 0), (1, 0), (0, 1)];
-
- let result = interleave_distance_columns(&geom_arrays, &assignments)?;
- assert!(matches!(result, Some(ColumnarValue::Array(_))));
- if let Some(ColumnarValue::Array(array)) = result {
- let float_array =
array.as_any().downcast_ref::<Float64Array>().unwrap();
- assert_eq!(float_array.len(), 3);
- assert_eq!(float_array.value(0), 5.0);
- assert_eq!(float_array.value(1), 10.0);
- assert_eq!(float_array.value(2), 5.0);
- }
- Ok(())
- }
-
- #[test]
- fn interleave_distance_arrays() -> Result<()> {
- use arrow_array::Float64Array;
-
- let wkbs1 = vec![
- wkb_point((10.0, 10.0)).unwrap(),
- wkb_point((20.0, 20.0)).unwrap(),
- ];
- let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
-
- let array1: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0]));
- let array2: ArrayRef = Arc::new(Float64Array::from(vec![3.0]));
- let geom1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Array(array1)))?;
- let geom2 = make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Array(array2)))?;
-
- let geom_arrays = vec![&geom1, &geom2];
- let assignments = vec![(0, 0), (1, 0), (0, 1)];
-
- let result = interleave_distance_columns(&geom_arrays, &assignments)?;
- assert!(matches!(result, Some(ColumnarValue::Array(_))));
- if let Some(ColumnarValue::Array(array)) = result {
- let float_array =
array.as_any().downcast_ref::<Float64Array>().unwrap();
- assert_eq!(float_array.len(), 3);
- assert_eq!(float_array.value(0), 1.0);
- assert_eq!(float_array.value(1), 3.0);
- assert_eq!(float_array.value(2), 2.0);
- }
- Ok(())
- }
-
- #[test]
- fn interleave_distance_mixed_scalar_and_array() -> Result<()> {
- use arrow_array::Float64Array;
-
- let wkbs1 = vec![
- wkb_point((10.0, 10.0)).unwrap(),
- wkb_point((20.0, 20.0)).unwrap(),
- ];
- let wkbs2 = vec![wkb_point((30.0, 30.0)).unwrap()];
-
- let scalar = ScalarValue::Float64(Some(5.0));
- let array: ArrayRef = Arc::new(Float64Array::from(vec![10.0]));
- let geom1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Scalar(scalar)))?;
- let geom2 = make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Array(array)))?;
-
- let geom_arrays = vec![&geom1, &geom2];
- let assignments = vec![(0, 0), (1, 0), (0, 1)];
-
- let result = interleave_distance_columns(&geom_arrays, &assignments)?;
- assert!(matches!(result, Some(ColumnarValue::Array(_))));
- if let Some(ColumnarValue::Array(array)) = result {
- let float_array =
array.as_any().downcast_ref::<Float64Array>().unwrap();
- assert_eq!(float_array.len(), 3);
- assert_eq!(float_array.value(0), 5.0);
- assert_eq!(float_array.value(1), 10.0);
- assert_eq!(float_array.value(2), 5.0);
- }
- Ok(())
- }
-
#[test]
fn interleave_evaluated_batch_empty_assignments() -> Result<()> {
let batch_a = sample_batch(&[0], vec![Some(wkb_point((10.0,
10.0)).unwrap())])?;
@@ -1253,29 +1002,9 @@ mod tests {
let result = interleave_evaluated_batch(&record_batches, &geom_arrays,
&[])?;
assert_eq!(result.batch.num_rows(), 0);
- assert_eq!(result.geom_array.geometry_array.len(), 0);
- assert!(result.geom_array.rects.is_empty());
- assert!(result.geom_array.distance.is_none());
- Ok(())
- }
-
- #[test]
- fn interleave_distance_inconsistent_metadata() -> Result<()> {
- let wkbs1 = vec![wkb_point((10.0, 10.0)).unwrap()];
- let wkbs2 = vec![wkb_point((20.0, 20.0)).unwrap()];
-
- let scalar = ScalarValue::Float64(Some(5.0));
- let geom1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Scalar(scalar)))?;
- let geom2 = make_geom_array_with_distance(wkbs2, None)?;
-
- let geom_arrays = vec![&geom1, &geom2];
- let assignments = vec![(0, 0), (1, 0)];
-
- let result = interleave_distance_columns(&geom_arrays, &assignments);
- assert!(result.is_err());
- if let Err(e) = result {
- assert!(e.to_string().contains("Inconsistent distance metadata"));
- }
+ assert_eq!(result.geom_array.geometry_array().len(), 0);
+ assert!(result.geom_array.rects().is_empty());
+ assert!(result.geom_array.distance().is_none());
Ok(())
}
@@ -1314,7 +1043,7 @@ mod tests {
let result = interleave_evaluated_batch(&record_batches, &geom_arrays,
&assignments)?;
// Check if the result geometry array is BinaryViewArray
- let geom_array = result.geom_array.geometry_array;
+ let geom_array = result.geom_array.geometry_array();
assert!(geom_array
.as_any()
.downcast_ref::<BinaryViewArray>()
@@ -1332,28 +1061,4 @@ mod tests {
Ok(())
}
-
- #[test]
- fn interleave_distance_mixed_none_and_null() -> Result<()> {
- use arrow_array::Float64Array;
-
- let wkbs1 = vec![wkb_point((10.0, 10.0)).unwrap()];
- let wkbs2 = vec![wkb_point((20.0, 20.0)).unwrap()];
- let wkbs3 = vec![wkb_point((30.0, 30.0)).unwrap()];
-
- let null_array = Arc::new(Float64Array::new_null(1));
- let ega1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Array(null_array)))?;
-
- let null_scalar = ScalarValue::Float64(None);
- let ega2 = make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Scalar(null_scalar)))?;
-
- let ega3 = make_geom_array_with_distance(wkbs3, None)?;
-
- let vec_ega = vec![&ega1, &ega2, &ega3];
- let assignments = vec![(0, 0), (1, 0), (2, 0)];
-
- let result = interleave_distance_columns(&vec_ega, &assignments)?;
- assert!(result.is_none());
- Ok(())
- }
}
diff --git a/rust/sedona-spatial-join/src/prepare.rs
b/rust/sedona-spatial-join/src/prepare.rs
index b0ee38db..fae20615 100644
--- a/rust/sedona-spatial-join/src/prepare.rs
+++ b/rust/sedona-spatial-join/src/prepare.rs
@@ -33,7 +33,7 @@ use sedona_expr::statistics::GeoStatistics;
use sedona_geometry::bounding_box::BoundingBox;
use crate::index::spatial_index_builder::SpatialJoinBuildMetrics;
-use crate::index::DefaultSpatialIndexBuilder;
+use crate::join_provider::SpatialJoinProvider;
use crate::{
index::{
memory_plan::{compute_memory_plan, MemoryPlan, PartitionMemorySummary},
@@ -73,6 +73,7 @@ pub(crate) struct SpatialJoinComponentsBuilder {
join_type: JoinType,
probe_threads_count: usize,
metrics: ExecutionPlanMetricsSet,
+ join_provider: Arc<dyn SpatialJoinProvider>,
seed: u64,
sedona_options: SedonaOptions,
}
@@ -80,6 +81,7 @@ pub(crate) struct SpatialJoinComponentsBuilder {
impl SpatialJoinComponentsBuilder {
/// Create a new builder capturing the execution context and configuration
/// required to produce `SpatialJoinComponents` from build-side streams.
+ #[expect(clippy::too_many_arguments)]
pub fn new(
context: Arc<TaskContext>,
build_schema: SchemaRef,
@@ -87,6 +89,7 @@ impl SpatialJoinComponentsBuilder {
join_type: JoinType,
probe_threads_count: usize,
metrics: ExecutionPlanMetricsSet,
+ join_provider: Arc<dyn SpatialJoinProvider>,
seed: u64,
) -> Self {
let session_config = context.session_config();
@@ -103,6 +106,7 @@ impl SpatialJoinComponentsBuilder {
join_type,
probe_threads_count,
metrics,
+ join_provider,
seed,
sedona_options,
}
@@ -216,21 +220,12 @@ impl SpatialJoinComponentsBuilder {
reservations.push(reservation);
collect_metrics_vec.push(CollectBuildSideMetrics::new(k,
&self.metrics));
}
- let join_metrics = SpatialJoinBuildMetrics::new(0, &self.metrics);
- let builder = Arc::new(DefaultSpatialIndexBuilder::new(
- self.build_schema.clone(),
- self.spatial_predicate.clone(),
- self.sedona_options.spatial_join.clone(),
- self.join_type,
- self.probe_threads_count,
- join_metrics.clone(),
- )?);
let collector = BuildSideBatchesCollector::new(
self.spatial_predicate.clone(),
self.sedona_options.spatial_join.clone(),
Arc::clone(&runtime_env),
spill_compression,
- builder,
+ self.join_provider.clone(),
);
let build_partitions = collector
.collect_all(
@@ -407,6 +402,7 @@ impl SpatialJoinComponentsBuilder {
self.join_type,
self.probe_threads_count,
SpatialJoinBuildMetrics::new(0, &self.metrics),
+ self.join_provider.clone(),
);
let probe_stream_options = ProbeStreamOptions {
@@ -440,6 +436,7 @@ impl SpatialJoinComponentsBuilder {
self.probe_threads_count,
build_partitions,
SpatialJoinBuildMetrics::new(0, &self.metrics),
+ self.join_provider.clone(),
);
let probe_stream_options = ProbeStreamOptions {
@@ -477,6 +474,7 @@ impl SpatialJoinComponentsBuilder {
self.probe_threads_count,
merged_spilled_partitions,
SpatialJoinBuildMetrics::new(0, &self.metrics),
+ self.join_provider.clone(),
reservations,
);
diff --git a/rust/sedona-spatial-join/src/stream.rs
b/rust/sedona-spatial-join/src/stream.rs
index b40be025..288c59d7 100644
--- a/rust/sedona-spatial-join/src/stream.rs
+++ b/rust/sedona-spatial-join/src/stream.rs
@@ -44,6 +44,7 @@ use
crate::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream
use crate::evaluated_batch::EvaluatedBatch;
use crate::index::partitioned_index_provider::PartitionedIndexProvider;
use crate::index::spatial_index::SpatialIndexRef;
+use crate::join_provider::SpatialJoinProvider;
use crate::operand_evaluator::create_operand_evaluator;
use crate::partitioning::SpatialPartition;
use crate::prepare::SpatialJoinComponents;
@@ -144,6 +145,7 @@ impl SpatialJoinStream {
session_config: &SessionConfig,
runtime_env: Arc<RuntimeEnv>,
metrics: &ExecutionPlanMetricsSet,
+ join_provider: Arc<dyn SpatialJoinProvider>,
once_fut_spatial_join_components: OnceFut<SpatialJoinComponents>,
once_async_spatial_join_components:
Arc<Mutex<Option<OnceAsync<SpatialJoinComponents>>>>,
) -> Self {
@@ -156,7 +158,7 @@ impl SpatialJoinStream {
.cloned()
.unwrap_or_default();
- let evaluator = create_operand_evaluator(on,
sedona_options.spatial_join.clone());
+ let evaluator = create_operand_evaluator(on,
join_provider.evaluated_array_factory());
let join_metrics = SpatialJoinProbeMetrics::new(probe_partition_id,
metrics);
let probe_stream = create_evaluated_probe_stream(
probe_stream,
diff --git a/rust/sedona-spatial-join/src/utils/once_fut.rs
b/rust/sedona-spatial-join/src/utils/once_fut.rs
index 8e7f4d49..0e3043d6 100644
--- a/rust/sedona-spatial-join/src/utils/once_fut.rs
+++ b/rust/sedona-spatial-join/src/utils/once_fut.rs
@@ -131,7 +131,7 @@ impl<T: 'static> OnceFut<T> {
}
/// Get the result of the computation if it is ready, without consuming it
- #[allow(unused)]
+ #[cfg(test)]
pub(crate) fn get(&mut self, cx: &mut Context<'_>) -> Poll<Result<&T>> {
if let OnceFutState::Pending(fut) = &mut self.state {
let r = ready!(fut.poll_unpin(cx));